顺序锁为写者赋予了较高的优先级,即使在读者正在读的时候,也允许写着继续运行。这种策略的好处是,写者永远不会等待,缺点是有时候读者不得不反复多次读相同的数据,直到它获得有效的副本。
在linux内核代码中,顺序锁被定义成seqlock_t结构体(代码位于include/linux/seqlock.h中):
typedef struct {
struct seqcount seqcount;
spinlock_t lock;
} seqlock_t;
所以,包含一个自旋锁lock和一个表示当前锁的顺序数的seqcount。seqcount结构体被定义为:
typedef struct seqcount {
unsigned sequence;
......
} seqcount_t;
就是包含了一个无符号整型的sequence变量。
注:需要C/C++ Linux高级服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
初始化
顺序锁在使用之前需要先初始化,一般有两种方法:
DEFINE_SEQLOCK(lock1);
seqlock_t lock2;
seqlock_init(&lock2);
可以看到,第一种方法是用宏直接定义并且初始化一个顺序锁变量:
#define SEQCNT_ZERO(lockname) { .sequence = 0, ......}
......
#define __SEQLOCK_UNLOCKED(lockname)
{
.seqcount = SEQCNT_ZERO(lockname),
.lock = __SPIN_LOCK_UNLOCKED(lockname)
}
......
#define DEFINE_SEQLOCK(x)
seqlock_t x = __SEQLOCK_UNLOCKED(x)
所以,直接通过宏定义初始化就是定义了一个seqlock_t结构体变量,将其内部的自旋锁lock初始化为未加锁,并将表示顺序数的seqcount变量初始化为0。
第二种方法是自己定义一个seqlock_t结构体变量,然后调用seqlock_init函数将其初始化:
static inline void __seqcount_init(seqcount_t *s, const char *name,
struct lock_class_key *key)
{
......
s->sequence = 0;
}
......
# define seqcount_init(s) __seqcount_init(s, NULL, NULL)
......
#define seqlock_init(x)
do {
seqcount_init(&(x)->seqcount);
spin_lock_init(&(x)->lock);
} while (0)
也是初始化内部的自旋锁变量lock,并将表示顺序数的seqcount变量初始化为0。
写操作
顺序锁区分写者与读者,对于写者来说,一般使用下面的用法:
write_seqlock(&seq_lock);
/* 修改数据 */
......
write_sequnlock(&seq_lock);
将对数据的修改代码夹在write_seqlock和write_sequnlock函数之间就行了。
获得写顺序锁的write_seqlock函数定义如下:
static inline void write_seqlock(seqlock_t *sl)
{
/* 获得自旋锁 */
spin_lock(&sl->lock);
write_seqcount_begin(&sl->seqcount);
}
首先要获得顺序锁内部的自旋锁,然后调用write_seqcount_begin函数:
static inline void write_seqcount_begin(seqcount_t *s)
{
write_seqcount_begin_nested(s, 0);
}
接着调用write_seqcount_begin_nested函数:
static inline void write_seqcount_begin_nested(seqcount_t *s, int subclass)
{
raw_write_seqcount_begin(s);
......
}
最终调用了raw_write_seqcount_begin函数:
static inline void raw_write_seqcount_begin(seqcount_t *s)
{
/* 累加顺序数 */
s->sequence++;
/* 写内存屏障 */
smp_wmb();
}
累加了顺序锁内的顺序数,这之后添加了一个写内存屏障。这是为了保证在还没有正式执行write_seqlock函数之后的修改数据代码之前,保证系统中的其它模块能感知到顺序数已经被累加了。也就是保证累加顺序数的指令不会被重排序到后面的修改数据代码中,否则,有可能修改数据代码的代码已经执行了一点了,别的CPU还没感知到顺序数被更改了,会造成读取数据不一致的情况。当然,应该也要在读取的时候对应的添加上读内存屏障。
释放写顺序锁的write_sequnlock函数的功能基本上就是把获得锁的过程倒过来,定义如下:
static inline void write_sequnlock(seqlock_t *sl)
{
write_seqcount_end(&sl->seqcount);
/* 释放自旋锁 */
spin_unlock(&sl->lock);
}
先调用write_seqcount_end函数,然后释放自旋锁:
static inline void write_seqcount_end(seqcount_t *s)
{
......
raw_write_seqcount_end(s);
}
接着调用raw_write_seqcount_end函数:
static inline void raw_write_seqcount_end(seqcount_t *s)
{
/* 写内存屏障 */
smp_wmb();
/* 累加顺序数 */
s->sequence++;
}
先加写内存屏障,再累加顺序数。这是为了保证修改数据的代码都执行完毕后才能将顺序数累加。所以,在前面write_seqlock的时候使用的写内存屏障和这里write_sequnlock使用的写内存屏障是成对的,组成了一个临界区,用来执行修改数据的操作。
同时,由于顺序锁的顺序数被初始化为1,当上写锁的时候会将顺序数加1,且开写锁的时候仍然会加1,所以当读到顺序数是奇数的时候就一定表示有一个写者获得了写顺序锁,而当读到顺序数是偶数的时候就一定表示当前没有任何写者获得了写顺序锁。
而且,对于不同写入者来说,序列锁是有自旋锁保护的,所以同一时间只能有一个写入者。
最后,非常关键的,写顺序锁不会造成当前进程休眠。
读操作
接下来,我们分析一下顺序锁的读者。对于读者来说,一般使用下面的用法:
unsigned int seq;
do {
seq = read_seqbegin(&seq_lock);
/* 读取数据 */
......
} while read_seqretry(&seq_lock, seq);
一般都是先使用read_seqbegin函数读取出顺序锁的顺序数,接着执行实际的读取数据操作,最后调用read_seqretry函数,看看当前顺序锁的顺序数是否和前面读到的顺序锁一致。如果一致,那证明读取的过程中没有写者在写入,可以直接退出了;如果不一致,说明在读取的过程中已经至少有一个写者修改了数据,那就循环重新执行上面的步骤,直到前后读到的顺序数一致为止。
读取当前顺序锁顺序数的read_seqbegin函数定义如下:
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
return read_seqcount_begin(&sl->seqcount);
}
调用了raw_read_seqcount_begin函数:
static inline unsigned read_seqcount_begin(const seqcount_t *s)
{
......
return raw_read_seqcount_begin(s);
}
接着调用了raw_read_seqcount_begin函数:
static inline unsigned raw_read_seqcount_begin(const seqcount_t *s)
{
/* 读取顺序数 */
unsigned ret = __read_seqcount_begin(s);
/* 读内存屏障 */
smp_rmb();
return ret;
}
先调用了__read_seqcount_begin函数读取了当前顺序锁的顺序数,然后加上了一个读内存屏障。
static inline unsigned __read_seqcount_begin(const seqcount_t *s)
{
unsigned ret;
repeat:
/* 读取顺序锁的顺序数 */
ret = READ_ONCE(s->sequence);
/* 如果顺序数是奇数表明有写者正在写入 */
if (unlikely(ret & 1)) {
/* 循环等待 */
cpu_relax();
goto repeat;
}
/* 返回顺序数直到其为偶数 */
return ret;
}
先读取顺序锁的顺序数,加上READ_ONCE是为了防止编译器将其和后面的条件判断一起优化,打乱了执行次序。然后,判断顺序数是否是奇数,前面提过,如果是奇数的话说明有一个写者正在持有写顺序锁,这时候就调用cpu_relax函数,让出CPU的控制权,并且再次从头循环读取顺序数,直到其为偶数为止。
cpu_relax函数由各个架构自己实现,Arm64架构的实现如下(代码位于arch/arm64/include/asm/processor.h中):
static inline void cpu_relax(void)
{
asm volatile("yield" ::: "memory");
}
在大多数的Arm64的实现中,yield指令等同于nop空指令。它只是告诉当前CPU核,目前执行的线程无事可做,当前CPU核可以去做点别的。通常,这种指令只对支持超线程的CPU核有用,但是目前所有Arm64的实现都不支持超线程技术,所以只是作为空指令来处理。
接着我们来看看判断当前顺序锁的顺序数是否和前面读到的顺序锁一致的read_seqretry函数的实现:
static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start)
{
return read_seqcount_retry(&sl->seqcount, start);
}
调用了read_seqcount_retry函数:
static inline int read_seqcount_retry(const seqcount_t *s, unsigned start)
{
/* 读内存屏障 */
smp_rmb();
return __read_seqcount_retry(s, start);
}
先加上了一个读内存屏障,和在前面read_seqbegin的时候使用的读内存屏障是成对的,组成了一个临界区,用来执行读取数据的操作。接着,调用了__read_seqcount_retry函数:
static inline int __read_seqcount_retry(const seqcount_t *s, unsigned start)
{
return unlikely(s->sequence != start);
}
就是简单判断了一下当前顺序锁的顺序数是否和传入的start参数的值一致。
读者是没有自旋锁保护的,所以可以多个读者同时读取数据,并且读顺序锁也不会造成当前进程休眠。
使用场景
顺序锁并不是万能的,适合它的使用场景要满足下面的条件:
- 比较适合读多写少的场景。前面分析代码的时候看到了,写者是有自旋锁保护的,因此一次只能有一个写者写入数据,而读者没有任何其它锁保护,是并发读取的。所以,本来写的性能就不高,而读者要保证在读数据的整个期间不会有写者写入,如果写者有很多的话,就会不停的重新尝试读取,也会严重影响性能。
- 被保护的数据一般不会太大太多,否则也会影响性能。
- 被保护的数据结构不包括被写者修改和被读者间接引用的指针。否则,写者可能会在读者正在读指针指向的数据的时候就将该指针变失效了。
- 读者的临界区代码除了读数据外没有别的会引起其它副作用的操作。否则,多个读者的操作会互相竞争。这是因为顺序锁的读者并没有任何其它锁来保护,大家是并发读取的,只是简单的用了一对读内存屏障来保护。
- 顺序锁不会造成读者和写者休眠。
最常见的,在Linux内核中,更新系统jiffies就是使用的顺序锁。