锁机制¶
本节导读¶
到目前为止,我们已经实现了进程和线程,也能够理解在一个时间段内,会有多个线程在执行,这就是并发。 而且,由于线程的引入,多个线程可以共享进程中的全局数据。如果多个线程都想读和更新全局数据, 那么谁先更新取决于操作系统内核的抢占式调度和分派策略。在一般情况下,每个线程都有可能先执行, 且可能由于中断等因素,随时被操作系统打断其执行,而切换到另外一个线程运行, 形成在一段时间内,多个线程交替执行的现象。如果没有一些保障机制(比如互斥、同步等), 那么这些对共享数据进行读写的交替执行的线程,其期望的共享数据的正确结果可能无法达到。
所以,我们需要研究一种保障机制 — 锁 ,确保无论操作系统如何抢占线程,调度和切换线程的执行, 都可以保证对拥有锁的线程,可以独占地对共享数据进行读写,从而能够得到正确的共享数据结果。 这种机制的能力来自于处理器的指令、操作系统系统调用的基本支持,从而能够保证线程间互斥地读写共享数据。 下面各个小节将从为什么需要锁、锁的基本思路、锁的不同实现方式等逐步展开讲解。
为什么需要锁¶
上一小节已经提到,没有保障机制的多个线程,在对共享数据进行读写的过程中,可能得不到预期的结果。 我们来看看这个简单的例子:
1// 线程的入口函数
2int a=0;
3void f() {
4 a = a + 1;
5}
对于上述函数中的第 4 行代码,一般人理解处理器会一次就执行完这条简单的语句,但实际情况并不是这样。 我们可以用 GCC 编译出上述函数的汇编码:
1$ riscv64-unknown-elf-gcc -o f.s -S f.c
可以看到生成的汇编代码如下:
1//f.s
2 .text
3 .globl a
4 .section .sbss,"aw",@nobits
5 .align 2
6 .type a, @object
7 .size a, 4
8a:
9 .zero 4
10 .text
11 .align 1
12 .globl f
13 .type f, @function
14f:
15 addi sp,sp,-16
16 sd s0,8(sp)
17 addi s0,sp,16
18 lui a5,%hi(a)
19 lw a5,%lo(a)(a5)
20 addiw a5,a5,1
21 sext.w a4,a5
22 lui a5,%hi(a)
23 sw a4,%lo(a)(a5)
24 nop
25 ld s0,8(sp)
26 addi sp,sp,16
27 jr ra
从中可以看出,对于高级语言的一条简单语句(C 代码的第 4 行,对全局变量进行读写),很可能是由多条汇编代码 (汇编代码的第 18~23 行)组成。如果这个函数是多个线程要执行的函数,那么在上述汇编代码第 18 行到第 23 行中的各行之间,可能会发生中断,从而导致操作系统执行抢占式的线程调度和切换, 就会得到不一样的结果。由于执行这段汇编代码(第 18~23 行))的多个线程在访问全局变量过程中可能导致竞争状态, 因此我们将此段代码称为临界区(critical section)。临界区是访问共享变量(或共享资源)的代码片段, 不能由多个线程同时执行,即需要保证互斥。
下面是有两个线程T0、T1在一个时间段内的一种可能的执行情况:
时间 |
T0 |
T1 |
OS |
共享变量a |
寄存器a5 |
---|---|---|---|---|---|
1 |
L18 |
– |
– |
0 |
a的高位地址 |
2 |
– |
– |
切换 |
0 |
0 |
3 |
– |
L18 |
– |
0 |
a的高位地址 |
4 |
L20 |
– |
– |
0 |
1 |
5 |
– |
– |
切换 |
0 |
a的高位地址 |
6 |
– |
L20 |
– |
0 |
1 |
7 |
– |
– |
切换 |
0 |
1 |
8 |
L23 |
– |
– |
1 |
1 |
9 |
– |
– |
切换 |
1 |
1 |
10 |
– |
L23 |
– |
1 |
1 |
一般情况下,线程 T0 执行完毕后,再执行线程 T1,那么共享全局变量 a
的值为 2 。但在上面的执行过程中,
可以看到在线程执行指令的过程中会发生线程切换,这样在时刻 10 的时候,共享全局变量 a
的值为 1 ,
这不是我们预期的结果。出现这种情况的原因是两个线程在操作系统的调度下(在哪个时刻调度具有不确定性),
交错执行 a = a + 1
的不同汇编指令序列,导致虽然增加全局变量 a
的代码被执行了两次,
但结果还是只增加了 1 。这种多线程的最终执行结果不确定(indeterminate),取决于由于调度导致的、
不确定指令执行序列的情况就是竞态条件(race condition)。
如果每个线程在执行 a = a + 1
这个 C 语句所对应多条汇编语句过程中,不会被操作系统切换,
那么就不会出现多个线程交叉读写全局变量的情况,也就不会出现结果不确定的问题了。
所以,访问(特指写操作)共享变量代码片段,不能由多个线程同时执行(即并行)或者在一个时间段内都去执行 (即并发)。要做到这一点,需要互斥机制的保障。从某种角度上看,这种互斥性也是一种原子性, 即线程在临界区的执行过程中,不会出现只执行了一部分,就被打断并切换到其他线程执行的情况。即, 要么线程执行的这一系列操作/指令都完成,要么这一系列操作/指令都不做,不会出现指令序列执行中被打断的情况。
锁的基本思路¶
要保证多线程并发执行中的临界区的代码具有互斥性或原子性,我们可以建立一种锁, 只有拿到锁的线程才能在临界区中执行。这里的锁与现实生活中的锁的含义很类似。比如,我们可以写出如下的伪代码:
1lock(mutex); // 尝试取锁
2a = a + 1; // 临界区,访问临界资源 a
3unlock(mutex); // 是否锁
4... // 剩余区
对于一个应用程序而言,它的执行是受到其执行环境的管理和限制的,而执行环境的主要组成就是用户态的系统库、
操作系统和更底层的处理器,这说明我们需要有硬件和操作系统来对互斥进行支持。一个自然的想法是,这个
lock/unlock
互斥操作就是CPU提供的机器指令,那上面这一段程序就很容易在计算机上执行了。
但需要注意,这里互斥的对象是线程的临界区代码,而临界区代码可以访问各种共享变量(简称临界资源)。
只靠两条机器指令,难以识别各种共享变量,不太可能约束可能在临界区的各种指令执行共享变量操作的互斥性。
所以,我们还是需要有一些相对更灵活和复杂一点的方法,能够设置一种所有线程能看到的标记,
在一个能进入临界区的线程设置好这个标记后,其他线程都不能再进入临界区了。总体上看,
对临界区的访问过程分为四个部分:
尝试取锁: 查看锁是否可用,即临界区是否可访问(看占用临界区标志是否被设置),如果可以访问, 则设置占用临界区标志(锁不可用)并转到步骤 2 ,否则线程忙等或被阻塞;
临界区: 访问临界资源的系列操作
释放锁: 清除占用临界区标志(锁可用),如果有线程被阻塞,会唤醒阻塞线程;
剩余区: 与临界区不相关部分的代码
根据上面的步骤,可以看到锁机制有两种:让线程忙等的忙等锁(spin lock),以及让线程阻塞的睡眠锁 (sleep lock)。锁的实现大体上基于三类机制:用户态软件、机器指令硬件、内核态操作系统。 下面我们介绍来 rCore 中基于内核态操作系统级方法实现的支持互斥的锁。
我们还需要知道如何评价各种锁实现的效果。一般我们需要关注锁的三种属性:
互斥性(mutual exclusion),即锁是否能够有效阻止多个线程进入临界区,这是最基本的属性。
公平性(fairness),当锁可用时,每个竞争线程是否有公平的机会抢到锁。
性能(performance),即使用锁的时间开销。
内核态操作系统级方法实现锁 — mutex 系统调用¶
使用 mutex 系统调用¶
如何能够实现轻量的可睡眠锁?一个自然的想法就是,让等待锁的线程睡眠,让释放锁的线程显式地唤醒等待锁的线程。 如果有多个等待锁的线程,可以全部释放,让大家再次竞争锁;也可以只释放最早等待的那个线程。 这就需要更多的操作系统支持,特别是需要一个等待队列来保存等待锁的线程。
我们先看看多线程应用程序如何使用mutex系统调用的:
1// user/src/ch8b_mut_race.c
2...
3int mutex_id;
4int a;
5int threads[thread_count];
6void fun(long i)
7{
8 int t = i + 1;
9 for (int i = 0; i < per_thread; i++) {
10 assert_eq(mutex_lock(mutex_id), 0);
11 int old_a = a;
12 for (int i = 0; i < 500; i++) {
13 t = t * t % 10007;
14 }
15 a = old_a + 1;
16 assert_eq(mutex_unlock(mutex_id), 0);
17 }
18 exit(t);
19}
20
21int main()
22{
23 int64 start = get_mtime();
24 assert((mutex_id = mutex_blocking_create()) >= 0);
25 for (int i = 0; i < thread_count; i++) {
26 threads[i] = thread_create(fun, (void *)i);
27 assert(threads[i] > 0);
28 }
29 ...
30}
31
32// usr/lib/syscall.c
33int mutex_create()
34{
35 return syscall(SYS_mutex_create, 0);
36}
37
38int mutex_blocking_create()
39{
40 return syscall(SYS_mutex_create, 1);
41}
42
43int mutex_lock(int mid)
44{
45 return syscall(SYS_mutex_lock, mid);
46}
47
48int mutex_unlock(int mid)
49{
50 return syscall(SYS_mutex_unlock, mid);
51}
第24行,创建了一个ID为
mutex_id
的互斥锁,对应的是第35行SYS_mutex_create
系统调用;第10行,尝试获取锁(对应的是第45行
SYS_mutex_lock
系统调用),如果取得锁, 将继续向下执行临界区代码;如果没有取得锁,将阻塞;第16行,释放锁(对应的是第50行
SYS_mutex_unlock
系统调用),如果有等待在该锁上的线程, 则唤醒这些等待线程。
mutex 系统调用的实现¶
操作系统如何实现这些系统调用呢?首先考虑一下与此相关的核心数据结构, 然后考虑与数据结构相关的相关函数/方法的实现。
注解
互斥锁根据 lock 一个已被占用的锁时的行为也分阻塞互斥锁(blocking mutex)与自旋互斥锁(spinning mutex),
下文我们主要聚焦阻塞互斥锁,但是 ucore 里两者都通过结构体 struct mutex
与几个函数的不同分支实现。
在线程的眼里, 互斥 是一种每个线程能看到的资源,且在一个进程中,可以存在多个不同互斥资源,
所以我们可以把所有的互斥资源放在一起让进程来管理,如下面代码第 4 行所示。这里需要注意的是:
struct mutex mutex_pool[LOCK_POOL_SIZE]
表示的是实现了 mutex
的一个“互斥资源”的内存池。而
struct mutex
是会实现 mutex
的内核数据结构,它就是我们提到的 互斥资源 即
互斥锁 。操作系统需要显式地施加某种控制,来确定当一个线程释放锁时,等待的线程谁将能抢到锁。
为了做到这一点,操作系统需要有一个等待队列来保存等待锁的线程,如下面代码的第 10 行所示。
1struct proc {
2 ...
3 uint next_mutex_id;
4 struct mutex mutex_pool[LOCK_POOL_SIZE];
5};
6
7struct mutex {
8 uint blocking;
9 uint locked;
10 struct queue wait_queue;
11 // "alloc" data for wait queue
12 int _wait_queue_data[WAIT_QUEUE_MAX_LENGTH];
13};
这样,在操作系统中,需要设计实现几个核心成员变量。互斥锁的成员变量有四个:表示是阻塞锁还是自旋锁的 blocking
,
是否锁上的 locked
,和(阻塞锁会用到的)管理等待线程的等待队列 wait_queue
及其内存单元 _wait_queue_data
;
进程的成员变量:锁内存池 mutex_pool
及记录互斥锁分配情况的 next_mutex_id
。
首先需要创建一个互斥锁,下面是应对 SYSCALL_MUTEX_CREATE
系统调用的创建互斥锁的函数:
1// os/syscall.c
2int sys_mutex_create(int blocking)
3{
4 struct mutex *m = mutex_create(blocking);
5 if (m == NULL) {
6 return -1;
7 }
8 int mutex_id = m - curr_proc()->mutex_pool;
9 return mutex_id;
10}
11
12// os/sync.c
13struct mutex *mutex_create(int blocking)
14{
15 struct proc *p = curr_proc();
16 if (p->next_mutex_id >= LOCK_POOL_SIZE) {
17 return NULL;
18 }
19 struct mutex *m = &p->mutex_pool[p->next_mutex_id];
20 p->next_mutex_id++;
21 m->blocking = blocking;
22 m->locked = 0;
23 if (blocking) {
24 // blocking mutex need wait queue but spinning mutex not
25 init_queue(&m->wait_queue, WAIT_QUEUE_MAX_LENGTH,
26 m->_wait_queue_data);
27 }
28 return m;
29}
第16~19行,互斥锁池还没用完,就找下一个可行的锁,否则返回NULL。
有了互斥锁,接下来就是实现 Mutex
的内核函数:对应 SYSCALL_MUTEX_LOCK
系统调用的
sys_mutex_lock
。操作系统主要工作是,在锁已被其他线程获取的情况下,把当前线程放到等待队列中,
并调度一个新线程执行。主要代码如下:
1// os/syscall.c
2int sys_mutex_lock(int mutex_id)
3{
4 if (mutex_id < 0 || mutex_id >= curr_proc()->next_mutex_id) {
5 return -1;
6 }
7 mutex_lock(&curr_proc()->mutex_pool[mutex_id]);
8 return 0;
9}
10
11// os/sync.c
12void mutex_lock(struct mutex *m)
13{
14 if (!m->locked) {
15 m->locked = 1;
16 return;
17 }
18 if (!m->blocking) {
19 ...
20 }
21 // blocking mutex will wait in the queue
22 struct thread *t = curr_thread();
23 push_queue(&m->wait_queue, task_to_id(t));
24 // don't forget to change thread state to SLEEPING
25 t->state = SLEEPING;
26 sched();
27 // here lock is released (with locked = 1) and passed to me, so just do nothing
28}
第 7 行,以 ID 为
mutex_id
的互斥锁指针m
为参数调用mutex_lock
方法,具体工作由该方法来完成。第 18 行,分类讨论了自旋互斥锁的实现细节,这里我们聚焦阻塞互斥锁,省略了这一分支的细节;
第 14 行,如果互斥锁
m
已经被其他线程获取了,那么在第 23 行,将把当前线程放入等待队列中; 在第 25~26 行,让当前线程处于等待状态,并调度其他线程执行。第 15~16 行,如果互斥锁
m
还没被获取,那么当前线程会获取给互斥锁,并返回系统调用。
最后是实现 Mutex
的内核函数:对应 SYSCALL_MUTEX_UNLOCK
系统调用的 sys_mutex_unlock
。
操作系统的主要工作是,如果有等待在这个互斥锁上的线程,需要唤醒最早等待的线程。主要代码如下:
1// os/syscall.c
2int sys_mutex_unlock(int mutex_id)
3{
4 if (mutex_id < 0 || mutex_id >= curr_proc()->next_mutex_id) {
5 return -1;
6 }
7 mutex_unlock(&curr_proc()->mutex_pool[mutex_id]);
8 return 0;
9}
10
11// os/sync.c
12void mutex_unlock(struct mutex *m)
13{
14 if (m->blocking) {
15 struct thread *t = id_to_task(pop_queue(&m->wait_queue));
16 if (t == NULL) {
17 // Without waiting thread, just release the lock
18 m->locked = 0;
19 } else {
20 // Or we should give lock to next thread
21 t->state = RUNNABLE;
22 add_task(t);
23 }
24 } else {
25 m->locked = 0;
26 }
27}
第 7 行,以 ID 为
mutex_id
的互斥锁m
为参数调用mutex_unlock
方法,具体工作由该方法来完成的。第 18 行,如果等待队列为空,直接释放锁。
第 21-22 行,如果等待队列非空,则唤醒等待最久的线程
t
,注意这里不改变m->locked
,可以想想为什么(实验问答题之一)。