第三次OS笔记。
操作系统是一个状态机
程序运行时,每个处理器独立执行、独立响应中断;中断到来时,保存寄存器现场(的指针),切换到下一个task执行。
操作系统上的自旋锁
1 | while (1) { |
BUG1
如果把lock/unlock注释掉:原子性被破坏(执行printf
的过程随时可能被打断,从而出现“错位”现象);
把最后一行造成延迟的语句去掉,safety和liveness都无问题,但是公平性出现了问题(一段时间几乎总是同一个线程占有锁)
其原因是printf
指令执行时间相对较长,中断很有可能出现在其中,此时
locked == 1(执行printf
的线程持有锁)
切换到另一个线程自旋
由于locked == 1,处理器的其他线程都在自旋
最后还是回到了自己。。。
修复:获得自旋锁的线程(处理器)不应被中断;关闭中断后,处理器上的执行流完全确定
中断的原子性/顺序
关中断后,操作系统上的程序独占处理器、不会被打断
asm volatile 汇编保证编译器不会调换指令顺序
中断的可见性
硬件保证中断必定发生在指令边界
中断前的数据全部写入(缓存/内存)
关中断+自旋
1 | void spin_lock(spinlock_t &lk) { |
注意要先关中断再自旋;先释放锁再开中断!
BUG2
1 | void foo() { |
在[3]处开中断,导致在lock_a仍处于持有锁的状态下,中断打开。
所以在unlock时,要查看对应lock时的IF状态,如果lock时IF已经为0,对应的unlock就不要开中断;如果lock时IF为1,就要开中断。
在线程第一次lock时保存中断状态(把EFlags push到堆栈中)、关闭中断;最后一次unlock时恢复保存的中断状态。中间多次lock会多次push EFlags(其中的中断位为0),对应的unlock会弹出相应的栈帧,直到最后一次unlock才会把第一次push的EFlags pop到CPU上。
flags栈可以保存在线程结构体里,也可以保存在CPU结构体里,因为关中断后两者绑定,但由于CPU的数量一般少于线程数量,所以更好的选择是保存在CPU结构体里。
BUG3
1 | // It's not a bug, it's a feature! |
重入,连续对同一个对象lock两次,直接形成死锁。(第一次lock后locked == 1,第二次执行时就相当于另外一个线程得到了锁,所以会不停地自旋)。
小结
关中断+自旋用来保护一段较短的临界区:持有锁期间,线程(处理器)不能被中断;其他线程(处理器)在关中断的前提下自旋——“一核享用,九核围观”。
xv6自旋锁实现
acquire()
,psuhcli()
,popcli()
…随处可见的调试功能(getcallerpcs()
打印函数调用栈!!!!)
1 | void pushcli(void) { |
互斥锁
对于长临界区,获得锁的处理器中断丢失、使其他处理器空转/中断丢失……
例如读取磁盘信息,临界区执行时间可能很长、中断根本无法到达处理器(磁盘由中断驱动)。
希望
持有锁的进程
允许处理器响应中断
允许切换到其他线程执行
等待锁的线程
不要占用处理器资源自旋
1 | void mutex_lock(spinlock_t *lk) { |
如果等待锁的线程很多,不如干脆直接暂停这个线程执行,等有人解锁后再调度执行。
1 | void mutex_lock(mutexlock_t *lk) { |
可以看作游泳馆模型!操作系统相当于“管理员”,线程是游泳者,先到者与管理员交谈(自旋锁)获得手环,从而进入游泳馆;后到者同样与管理员交谈(自旋锁),没有手环,排队等待;先到者出来后归还手环。。。。。。
数据结构
一个较好的双向链表删除节点代码(优秀的assert机制)
1 | static inline void list_check(node_t *node) { |
List Head:嵌入式链表
嵌入在其他对象(数据结构)中的双向循环链表
1 | struct list_head { |
对象只存在于单一的列表中,用list <struct task*> task_list
较好,但操作系统中,通常一个对象会属于多个列表,分开实现多个列表会浪费空间
进程既在proc目录下的任务列表中(遍历proc获得所有进程),在获得锁时又在等待队列中。
1 | struct task { |
上面的task
里面嵌入了两个list,解决了一个对象在多个列表里的问题。
优点:
“跟着对象走”减少分配内存的次数和大小
局部性友好:访问对象里的链表→大概率访问对象里的数据
从list_head指针找到对象首地址:
1 |
|
上面的(&((type *)0)->member) )
先把0强制类型转化成对象类型指针,这样最后得到的就是member成员在对象结构体里的偏移量。
并发数据结构
自旋锁(互斥锁)帮助我们串行化对数据结构的并发访问。
但是容易犯错:重入、不同的锁
1 | void you_did_it_2() { lock(&a); lock(&b); ... }//T1 |
若T1 lock(&a)→T2 lock(&b),之后T1、T2再向下执行就会出现“互相等待”的情况。
1 | void you_did_it_4() { lock(&a); ... if (...) return; ... unlock(&a); } |
如果执行到if分支,导致死锁。
C RAII
这里不是很懂。。。。。。。
1 |
|
malloc/free
malloc本质:分配一段指定大小的额外内存。
古代方法:void *sbrk(intptr_t increment);
堆区的末尾就是break(相当于内存的边界),sbrk
做的就是改变break的指针。
现代方法:
1 | void *mmap(void *addr, // 建议的分配位置,可为 NULL |
我们需要维护一个被分配区间的集合
malloc:从已有的[L,R)切出一段[l,r)
free:给定l,删除[l,r)
怎么从左端点找到右端点(不知道大小):实际malloc(size+head),多分配的头部里面有大小等信息。
什么样的数据结构管理区间集合
现代malloc/free算法(Fast Path and Slow Path)
一些工具
strace:追踪系统调用;
ltrace:追踪库函数调用
mtrace:追踪调用的内存分配函数(这是jyy基于ltrace编写的一个shell脚本,这里先放一个Linux中编写Shell脚本,以后学)
Fast Path:小内存
key insight:对象越小,分配越频繁,越可能是性能瓶颈
策略:线程本地分配;使用简单的数据结构
Slab分配器(segregated list):对每个常见分配大小(8B,16B, 24B……)设置per-thread分配缓存。根据所需分配内存的大小决定属于哪个Slab(e.g. 20B的内存分配24B的Slab),管理一个Slab的对象可使用链表,或bitmap(未分配0,分配1)。
Slow Path:大内存
Slab分配失败(per-thread满了),此时执行一次大内存分配(从全局获得一个新的Slab,per-thread的Slab用链表连接)。
用线段树管理
任何一个区间都可以用若干个节点表示。实际情况下可以用radix tree。
同步
同步:两个或两个以上的量在变化过程中保持一定的相对关系(不同步即为异步)。
线程同步:线程在某个时间点达到一致状态(等待汇合点)。
并发程序中的同步:在未来某个约定的时刻,两个线程的执行点互相可知;通常先到的线程要等待。
一段错误的同步代码
1 | int done; |
然而执行时死循环(由于并发程序的不确定性,有时也会返回):while()
语句对done
的读取并未保护,数据竞争。gdb layout asm:done→eax,cmp eax, nworkers。如果done传给eax时的值不是nworkers,cmp时就会不断读取线程本地的eax寄存器。
解决方案:while
读取数据时上锁、volatile int done
进制编译器优化(这样while每次都会重新从共享内存中读取done到eax)。使用volatile关键字并没有消除数据竞争,所以不推荐。
生产者-消费者问题
1 | void consumer_thread() {//dequeue:取出对象 |
精简表达:
1 | void type1_thread() { |
需要适当的线程同步,使得括号序列
一定是某个合法括号序列的前缀
嵌套深度不超过n(对应缓冲区大小)
条件变量(万能同步算法)
某个条件满足,线程执行,否则等待(嵌套深度k小于n可打印左括号;k>0打印右括号。否则等待)。于是用一个变量表示满足/不满足。
wait(cv): 等待cv上的事件发生
signal/notify(cv): 报告cv上的事发生;如果有线程等待cv,则唤醒其中一个线程
broadcast/notifyAll(cv): 报告cv上的事发生;唤醒全部正在等待cv的线程
和互斥锁联合使用:wait时释放互斥锁,其他线程获得锁;signal后恢复执行,首先试图获得锁(如果获得失败,则等待该线程释放锁)。
如下代码有问题:
1 | void producer_thread() { |
可以使用两个信号量(empty作为count!=n
的信号量,fill则作为count!=0
)
1 | mutex_lock(); |
It’s better to modify ‘if’ to ‘while’! Why?
首先考虑顺序程序,有
1 | while(cond){...}; |
对于并发程序
1 | mutex_lock(&big_lock); |
如果cond不成立,则wait并释放锁。当signal到来时,并不会像if语句一样直接退出,而是仍会进入while判断一次(防止被错误signal唤醒),条件成立时,相当于执行了if(cond) break;
。尽管如此,assert()
不会被触发的前提是对cond的访问由同一把互斥锁保护——保证了条件访问的原子性!
信号量
更衣室管理:线程可以“变”一个手环。手环看作令牌;得到令牌可以进入执行;可以随时创建令牌。
“手环”=“令牌”=“一个资源”=“信号量”。
P操作:(想进入同步)等待一个手环后返回;如果管理员有手环,立即返回
V操作:“变”出一个手环给管理员
信号量=互斥锁和条件变量的结合体:
互斥锁:
相当于仅有一个手环(线程不能凭空创造手环);P=lock,V=unlock
条件变量:
手环数量作为等待条件;P=wait(等待空余的手环),V=signal(创造一个手环,没有线程等待时会给管理员,所以不会信号丢失)
1 | void producer() { |
但是设计一个优美的信号量往往是困难的!!!
哲学家吃饭问题:很容易出现锁成环的问题。
使用条件变量:
1 |
|
或者使用一个服务员统一管理叉子(从而避免了并发)
1 | void philosopher_thread(int id) { |