[OS]课堂整理(2)

第三次OS笔记。

操作系统是一个状态机

程序运行时,每个处理器独立执行、独立响应中断;中断到来时,保存寄存器现场(的指针),切换到下一个task执行。

操作系统上的自旋锁

1
2
3
4
5
6
while (1) {
lock(&biglock);
printf("Thread-%s on CPU #%d acquired the lock\n", arg, _cpu());
unlock(&biglock);
for (int volatile i = 0; i < 100000; i++) ;
}

BUG1

如果把lock/unlock注释掉:原子性被破坏(执行printf的过程随时可能被打断,从而出现“错位”现象);
把最后一行造成延迟的语句去掉,safety和liveness都无问题,但是公平性出现了问题(一段时间几乎总是同一个线程占有锁)

其原因是printf指令执行时间相对较长,中断很有可能出现在其中,此时
locked == 1(执行printf的线程持有锁)
切换到另一个线程自旋
由于locked == 1,处理器的其他线程都在自旋
最后还是回到了自己。。。

修复:获得自旋锁的线程(处理器)不应被中断;关闭中断后,处理器上的执行流完全确定

中断的原子性/顺序
关中断后,操作系统上的程序独占处理器、不会被打断
asm volatile 汇编保证编译器不会调换指令顺序
中断的可见性
硬件保证中断必定发生在指令边界
中断前的数据全部写入(缓存/内存)

关中断+自旋

1
2
3
4
5
6
7
8
9
void spin_lock(spinlock_t &lk) {
cli();//_intr_write(0)
while (atomic_xchg(&lk->locked, 1)) ;
}

void spin_unlock(spinlock_t &lk) {
atomic_xchg(&lk->locked, 0);
sti();//_intr_write(1)
}

注意要先关中断再自旋;先释放锁再开中断!

BUG2

1
2
3
4
5
6
7
8
9
10
void foo() {
//IF=1
[1]spin_lock(&lock_a);
//IF=0
[2]spin_lock(&lock_b);
//IF=0
[3]spin_unlock(&lock_b);
//IF=1
[4]spin_unlock(&lock_a);
}

在[3]处开中断,导致在lock_a仍处于持有锁的状态下,中断打开。
所以在unlock时,要查看对应lock时的IF状态,如果lock时IF已经为0,对应的unlock就不要开中断;如果lock时IF为1,就要开中断。

在线程第一次lock时保存中断状态(把EFlags push到堆栈中)、关闭中断;最后一次unlock时恢复保存的中断状态。中间多次lock会多次push EFlags(其中的中断位为0),对应的unlock会弹出相应的栈帧,直到最后一次unlock才会把第一次push的EFlags pop到CPU上。

flags栈可以保存在线程结构体里,也可以保存在CPU结构体里,因为关中断后两者绑定,但由于CPU的数量一般少于线程数量,所以更好的选择是保存在CPU结构体里。

BUG3

1
2
3
4
5
6
// It's not a bug, it's a feature!
lock(&lk);
lock(&lk);
printf(...);
unlock(&lk);
unlock(&lk);

重入,连续对同一个对象lock两次,直接形成死锁。(第一次lock后locked == 1,第二次执行时就相当于另外一个线程得到了锁,所以会不停地自旋)。

小结

关中断+自旋用来保护一段较短的临界区:持有锁期间,线程(处理器)不能被中断;其他线程(处理器)在关中断的前提下自旋——“一核享用,九核围观”。

xv6自旋锁实现

acquire(),psuhcli(),popcli()…随处可见的调试功能(getcallerpcs()打印函数调用栈!!!!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void pushcli(void) {
int eflags;

eflags = readeflags();
cli();
if(mycpu()->ncli == 0)//第一次push,则保存eflags
mycpu()->intena = eflags & FL_IF;
my_cpu()->ncli += 1;
}

void popcli(void) {
if(readeflags()&FL_IF)
panic("popcli - interruptible");
if(--mycpu()->ncli < 0)//unlock的数量与lock不匹配
panic("popcli");
if(mycpu()->ncli == 0 && mycpu()->intena)//所有的锁都被unlock后才开中断(修复BUG2)
sti();
}

互斥锁

对于长临界区,获得锁的处理器中断丢失、使其他处理器空转/中断丢失……
例如读取磁盘信息,临界区执行时间可能很长、中断根本无法到达处理器(磁盘由中断驱动)。

希望
持有锁的进程
允许处理器响应中断
允许切换到其他线程执行
等待锁的线程
不要占用处理器资源自旋

1
2
3
4
5
6
7
8
void mutex_lock(spinlock_t *lk) {
while (atomic_xchg(&lk->locked, 1)) {
// 在 atomic_xchg 执行的时候,已经有人得到了锁
// 因为是长临界区,所以预期要等很久
let_another_thread_to_execute(); // 不如让其他线程执行
// _yield() on AbstractMachine;
}
}

如果等待锁的线程很多,不如干脆直接暂停这个线程执行,等有人解锁后再调度执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void mutex_lock(mutexlock_t *lk) {
int acquired = 0;
spin_lock(&lk->spinlock);//用自旋锁保护
if (lk->locked != 0) {//锁已经被占有了
enqueue(lk->wait_list, current);//current放入等待队列
current->status = BLOCKED;//标记current为BLOCKED:在等待某个锁
} else {//否则占有锁
lk->locked = 1;
acquired = 1;
}
spin_unlock(&lk->spinlock);
if (!acquired) yield(); // 主动切换到其他线程执行
}

void mutex_unlock(mutexlock_t *lk) {
spin_lock(&lk->spinlock);
if (!empty(lk->wait_list)) {
struct task_t *task = dequeue(lk->wait_list);
task->status = RUNNABLE; // 唤醒之前睡眠的线程,RUNNABLE:可被调度执行
} else {
lk->locked = 0;
}
spin_unlock(&lk->spinlock);
}

可以看作游泳馆模型!操作系统相当于“管理员”,线程是游泳者,先到者与管理员交谈(自旋锁)获得手环,从而进入游泳馆;后到者同样与管理员交谈(自旋锁),没有手环,排队等待;先到者出来后归还手环。。。。。。

数据结构

一个较好的双向链表删除节点代码(优秀的assert机制)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void list_check(node_t *node) {
assert(node == node->prev->next); // circular list's offer
assert(node == node->next->prev);
}

void list_remove(node_t *node) {
node_t *prev = node->prev;
node_t *next = node->next;
assert(node != prev // defensive
&& node != next); // more checks than necessary
// may find bugs in other parts of the system
prev->next = next;
next->prev = prev; // possibly prev == next
list_check(prev); list_check(next);
}

List Head:嵌入式链表

嵌入在其他对象(数据结构)中的双向循环链表

1
2
3
struct list_head {
struct list_head *next, *prev;
};

对象只存在于单一的列表中,用list <struct task*> task_list较好,但操作系统中,通常一个对象会属于多个列表,分开实现多个列表会浪费空间
进程既在proc目录下的任务列表中(遍历proc获得所有进程),在获得锁时又在等待队列中。

1
2
3
4
struct task {
struct list_head wait_queue; // for mutex
struct list_head task_list; // for procfs
};

上面的task里面嵌入了两个list,解决了一个对象在多个列表里的问题。

优点:
“跟着对象走”减少分配内存的次数和大小
局部性友好:访问对象里的链表→大概率访问对象里的数据

从list_head指针找到对象首地址:

1
2
3
4
5
6
7
#define list_entry(ptr, type, member) \
((type *) \
( (char *)(ptr) - (uintptr_t)(&((type *)0)->member) ) \
)

// ptr to struct list_head
struct task *task = list_entry(ptr, struct task, wait_queue);

上面的(&((type *)0)->member) )先把0强制类型转化成对象类型指针,这样最后得到的就是member成员在对象结构体里的偏移量。

并发数据结构

自旋锁(互斥锁)帮助我们串行化对数据结构的并发访问。
但是容易犯错:重入、不同的锁

1
2
void you_did_it_2() { lock(&a); lock(&b); ... }//T1
void you_did_it_3() { lock(&b); lock(&a); ... }//T2

若T1 lock(&a)→T2 lock(&b),之后T1、T2再向下执行就会出现“互相等待”的情况。

1
void you_did_it_4() { lock(&a); ... if (...) return; ... unlock(&a); }

如果执行到if分支,导致死锁。

C RAII

这里不是很懂。。。。。。。

1
2
3
4
5
6
7
8
9
10
11
#define HoldLock(lk) \
for (int __lock_var = (pthread_mutex_lock(lk), 0); \
__lock_var < 1; \
pthread_mutex_unlock(lk), __lock_var++)

void insert(node_t *node, node_t *cur) {
HoldLock(&list_lock) {
...
// 此处不能调用break/return等跳出此块
}
}

malloc/free

malloc本质:分配一段指定大小的额外内存。
古代方法:void *sbrk(intptr_t increment);堆区的末尾就是break(相当于内存的边界),sbrk做的就是改变break的指针。
现代方法:

1
2
3
4
5
6
void *mmap(void *addr,    // 建议的分配位置,可为 NULL
size_t length, // 大小
int port, // 访问权限 PROT_READ | PROT_WRITE
int flags, // MAP_ANONYMOUS,匿名映射 = 内存分配
int fd, // 不涉及任何文件描述符;-1
off_t offset); // 0

我们需要维护一个被分配区间的集合
malloc:从已有的[L,R)切出一段[l,r)
free:给定l,删除[l,r)
怎么从左端点找到右端点(不知道大小):实际malloc(size+head),多分配的头部里面有大小等信息。
什么样的数据结构管理区间集合

现代malloc/free算法(Fast Path and Slow Path)

一些工具
strace:追踪系统调用;
ltrace:追踪库函数调用
mtrace:追踪调用的内存分配函数(这是jyy基于ltrace编写的一个shell脚本,这里先放一个Linux中编写Shell脚本,以后学)

Fast Path:小内存
key insight:对象越小,分配越频繁,越可能是性能瓶颈
策略:线程本地分配;使用简单的数据结构
Slab分配器(segregated list):对每个常见分配大小(8B,16B, 24B……)设置per-thread分配缓存。根据所需分配内存的大小决定属于哪个Slab(e.g. 20B的内存分配24B的Slab),管理一个Slab的对象可使用链表,或bitmap(未分配0,分配1)。

Slow Path:大内存
Slab分配失败(per-thread满了),此时执行一次大内存分配(从全局获得一个新的Slab,per-thread的Slab用链表连接)。
用线段树管理

任何一个区间都可以用若干个节点表示。实际情况下可以用radix tree。

同步

同步:两个或两个以上的量在变化过程中保持一定的相对关系(不同步即为异步)。
线程同步:线程在某个时间点达到一致状态(等待汇合点)。

并发程序中的同步:在未来某个约定的时刻,两个线程的执行点互相可知;通常先到的线程要等待。
一段错误的同步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int done;

void workload() {
// workload
asm volatile("lock addq $1, %0" :
"=m"(done)); // sync(): 顺序、原子、可见
}

int main() {
for (int i = 0; i < nworkers; i++)
create(workload);
while (done != nworkers); // sync(): 等到所有 workers 都完成
printf("All done\n");
}

然而执行时死循环(由于并发程序的不确定性,有时也会返回):while()语句对done的读取并未保护,数据竞争。gdb layout asm:done→eax,cmp eax, nworkers。如果done传给eax时的值不是nworkers,cmp时就会不断读取线程本地的eax寄存器。
解决方案:while读取数据时上锁、volatile int done进制编译器优化(这样while每次都会重新从共享内存中读取done到eax)。使用volatile关键字并没有消除数据竞争,所以不推荐。

生产者-消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
void consumer_thread() {//dequeue:取出对象  
while (1) {
object_t *obj = dequeue(); // spin:队列可能没有元素
if (obj) consume(obj);
}
}
void producer_thread() {//enqueue:生产对象放入队列
while (1) {
object_t *obj = produce();
while (enqueue(obj) != SUCC); // spin: 队列可能空间不足
}
}

精简表达:

1
2
3
4
5
6
void type1_thread() {
while (1) printf("("); // enqueue
}
void type2_thread() {
while (1) printf(")"); // dequeue
}

需要适当的线程同步,使得括号序列
一定是某个合法括号序列的前缀
嵌套深度不超过n(对应缓冲区大小)

条件变量(万能同步算法)

某个条件满足,线程执行,否则等待(嵌套深度k小于n可打印左括号;k>0打印右括号。否则等待)。于是用一个变量表示满足/不满足。
wait(cv): 等待cv上的事件发生
signal/notify(cv): 报告cv上的事发生;如果有线程等待cv,则唤醒其中一个线程
broadcast/notifyAll(cv): 报告cv上的事发生;唤醒全部正在等待cv的线程
和互斥锁联合使用:wait时释放互斥锁,其他线程获得锁;signal后恢复执行,首先试图获得锁(如果获得失败,则等待该线程释放锁)。

如下代码有问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void producer_thread() {
while (1) {
// produce
mutex_lock(&mutex);
if (count == n) wait(&cv, &mutex); // 等待“有空闲”
printf("("); // push
count++; signal(&cv);
mutex_unlock(&mutex);
}
}

void consumer_thread() {
while (1) {
mutex_lock(&mutex);
if (count == 0) wait(&cv, &mutex); // 等待“有数据”
printf(")"); // pop
count--; signal(&cv);
mutex_unlock(&mutex);
// consume
}
}

可以使用两个信号量(empty作为count!=n的信号量,fill则作为count!=0

1
2
3
4
5
6
7
8
9
10
11
12
13
mutex_lock();
if(count==n)//while(count==n)
wait(empty);
printf('('); count++;
signal(fill);
mutex_unlock();

mutex_lock();
if(count==0)//while(count==0)
wait(fill);
printf(')'); count--;
signal(empty);
mutex_unlock();

It’s better to modify ‘if’ to ‘while’! Why?

首先考虑顺序程序,有

1
2
while(cond){...};
assert(!cond);

对于并发程序

1
2
3
4
5
6
7
mutex_lock(&big_lock);
while (!(cond)) {
wait(&cv, &big_lock);
}
assert(cond); // 一定为真
// 其他需要原子性的操作
mutex_unlock(&big_lock);

如果cond不成立,则wait并释放锁。当signal到来时,并不会像if语句一样直接退出,而是仍会进入while判断一次(防止被错误signal唤醒),条件成立时,相当于执行了if(cond) break;。尽管如此,assert()不会被触发的前提是对cond的访问由同一把互斥锁保护——保证了条件访问的原子性

信号量

更衣室管理:线程可以“变”一个手环。手环看作令牌;得到令牌可以进入执行;可以随时创建令牌。
“手环”=“令牌”=“一个资源”=“信号量”。
P操作:(想进入同步)等待一个手环后返回;如果管理员有手环,立即返回
V操作:“变”出一个手环给管理员

信号量=互斥锁和条件变量的结合体
互斥锁:
相当于仅有一个手环(线程不能凭空创造手环);P=lock,V=unlock
条件变量:
手环数量作为等待条件;P=wait(等待空余的手环),V=signal(创造一个手环,没有线程等待时会给管理员,所以不会信号丢失)

1
2
3
4
5
6
7
8
9
10
11
void producer() {
P(&empty);//empty表示还可以打印(的个数,初始化为N
printf("(");
V(&fill);//fill表示已经打印(的个数,初始化为0
}

void consumer() {
P(&fill);
printf(")");
V(&empty);
}

但是设计一个优美的信号量往往是困难的!!!

哲学家吃饭问题:很容易出现锁成环的问题。
使用条件变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define cond (empty[lhs] && empty[rhs])//左右两边叉子都为空

void philosopher_thread(int id) {
int lhs = (id - 1 + n) % n, rhs = (id + 1) % n;
mutex_lock(&mutex);
while(!cond) {
wait(&cv, &mutex);
}
assert(cond);
empty[lhs] = empty[rhs] = 0;
mutex_unlock(&mutex);

_philosopher_eat();

mutex_lock(&mutex);
empty[lhs] = empty[rhs] = 1;
broadcast(&cv);//告诉所有人:叉子放回去啦
mutex_unlock(&mutex);
}

或者使用一个服务员统一管理叉子(从而避免了并发)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void philosopher_thread(int id) {
send_request(id, EAT);
P(allow(id));//直到manager允许了才吃

__philosopher_eat();

send_request(id, DONE);
}

void waiter_thread() {
while(1) {
(id, status) = receive_request();
if(status == EAT) {...};
if(status == DONE) {...};
}
}
0%