OSTEP作业——第31章 信号量

发布时间 2023-11-04 23:33:28作者: Nachel

声明:作者为os初学者,水平有限,仅做作业记录,有的地方可能用词有问题欢迎指正。之间的章节有空的话再补上。

每章的作业:https://github.com/remzi-arpacidusseau/ostep-homework


1. fork-join.c

The first problem is just to implement and test a solution to the fork/join problem, as described in the text. Even though this solution is described in the text, the act of typing it in on your own is worthwhile; even Bach would rewrite Vivaldi, allowing one soon-to-be master to learn from an existing one. See fork-join.c for details. Add the call sleep(1) to the child to ensure it is working.

 初始的代码并没有使用任何条件变量或信号量,创建子线程后,子线程还没有运行完,父线程可能就已经结束运行。在这段代码里,还需要通过添加信号量来实现join的功能。这里信号量初始化为0做条件变量使用,在创建子线程之前就要初始化值为0,然后创建线程后父线程调用Sem_wait(&s)。

sem_wait的内部原理可知,会将信号量的值减1,然后如果变成负数就会挂起等待直到被sem_post唤醒,否则就立即返回。

如果子线程没有结束,父线程就不会被唤醒。执行完子线程的所有任务即将返回之前,调用Sem_post(&s)将父线程唤醒,最终实现join的目的。

为了测试父线程是因为信号量而等待,而不是因为恰好子线程比父线程返回,在子线程内添加sleep(1)。

 1 #include <stdio.h>
 2 #include <unistd.h>
 3 #include <pthread.h>
 4 #include "common_threads.h"
 5 
 6 sem_t s; 
 7 
 8 void *child(void *arg) {
 9     printf("child\n");
10     // use semaphore here 
11     sleep(1); // add sleep(1)
12     Sem_post(&s);    // post
13     return NULL;
14 }
15 
16 int main(int argc, char *argv[]) {
17     pthread_t p;
18     printf("parent: begin\n");
19     // init semaphore here
20     Sem_init(&s, 0);    // inint
21     Pthread_create(&p, NULL, child, NULL);
22     // use semaphore here
23     // if child not finish: wait;
24     // else: continue
25     Sem_wait(&s);    // wait
26     printf("parent: end\n");
27     return 0;
28 }

 

测试结果:

可以看到,父线程输出了“parent: begin”,然后创建子线程,等待约1s后,子线程输出“child”,最后输出“parent: end”。

2. rendezvous problem (rendezvous.c)

Let's now generalize this a bit by investigating the rendezvous problem. The problem is as follows: you have two threads, each of which are about to enter the rendezvous point in the code. Neither should exit this part of the code before the other enters it. Consider using two semaphores for this task, and see rendezvous.c for details.

两个线程都将进入代码的对接点,在另一方进入之前不得退出这部分代码。题目要求在两个线程都必须要双方都输出了“before”之后才可以输出“after”。

所以在每个线程输出“before”之后、输出“after”之前,需要获取一个来自另一个线程的信号,这样才能知道对方是否也完成了“before”的输出。如果收到,则可以输出“after”,如果没有则挂起等待。所以使用Sem_wait(&sx);

同样的,另一个线程也在等待当前线程发出的信号,所以在输出“before”之后要调用一个Sem_post(&sy)来告知另一个线程。

需要注意对某个线程来说,post和wait时不能使用同一个信号量,不然就是自己给自己发出信号。其次,两个线程不能都先wait再post,否则就会出现两个线程都相互等待,永远不会被唤醒的情况。为测试,在某个线程输出“before”前加一个sleep(1),看另一个线程是否“等待”。

 1 #include <stdio.h>
 2 #include <unistd.h>
 3 #include "common_threads.h"
 4 
 5 // If done correctly, each child should print their "before" message
 6 // before either prints their "after" message. Test by adding sleep(1)
 7 // calls in various locations.
 8 
 9 sem_t s1, s2;
10 
11 void *child_1(void *arg) {
12     sleep(1);    // test
13     printf("child 1: before\n");
14     // what goes here?
15     Sem_post(&s1);
16     Sem_wait(&s2);
17     printf("child 1: after\n");
18     return NULL;
19 }
20 
21 void *child_2(void *arg) {
22     printf("child 2: before\n");
23     // what goes here?
24     Sem_post(&s2);
25     Sem_wait(&s1);
26     printf("child 2: after\n");
28     return NULL;
29 }
30 
31 int main(int argc, char *argv[]) {
32     pthread_t p1, p2;
33     printf("parent: begin\n");
34     // init semaphores here
35     Sem_init(&s1, 0);    // init
36     Sem_init(&s2, 0);    // init
37     Pthread_create(&p1, NULL, child_1, NULL);
38     Pthread_create(&p2, NULL, child_2, NULL);
39     Pthread_join(p1, NULL);
40     Pthread_join(p2, NULL);
41     printf("parent: end\n");
42     return 0;
43 }

测试结果:

可以看到,先依次输出“parent: begin”和“child 2: before”后,等待了约1s,子线程1输出了“child 1: before”之后,子线程1和2才输出了“after”。

3. barrier synchronization (barrier.c)

 Now go one step further by implementing a general solution to barrier synchronization. Assume there are two points in a sequential piece of code, called P1 and P2. Putting a barrier between P1 and P2 guarantees that all threads will execute P1 before any one thread executes P2. Your task: write the code to implement a barrier() function that can be used in this manner. It is safe to assume you know N (the total number of threads in the running program) and that all N threads will try to enter the barrier. Again, you should likely use two semaphores to achieve the solution, and some other integers to count things. See barrier.c for details.

和第2题的要求一样,必须等所有线程都将before输出了之后才可以输出after,哪怕只有一个线程的before还没有输出,其他线程都必须等待。

由于线程的数量可以自己指定,所以在控制过程中,需要记录下来有多少个线程输出了“before”,且这个值必须全部线程共享,所以在barrier_t结构体中定义一个整型n,初始化为子线程的数量。同时定义一个整型count用来在barrier函数中计数,表示当前有多少个线程完成了“before”的输出,每完成一次,count加1。

接下来考虑如何计数,每当一个线程完成“before”输出,则进入barrier函数发出一个信号,唤醒用来承担计数任务的线程来把count++,然后继续挂起等待下一个线程完成“before”输出后发出信号。所以这个用来计数的线程需要一直循环,直到count==n才结束计数任务,跳出循环。为了达到上述目的,需要在barrier_t结构体中定义一个sem_t cv实现条件变量的功能,通过它来进行信号发送和唤醒,初始化值为0。

接下来考虑这样一个问题:承担计数任务的线程怎么选择。它一定是某个子线程,如果同时有多个线程进入计数循环,那么由于这个循环中存在对count的读和写,可能存在race condition问题。且对于所有的线程,sem_post(&cv)总共只有num_threads个,可能存在多个线程进入wait状态但永远无法被唤醒的情况。所以这个while循环的计数过程是个临界区,需要在前后加锁。所以barrier结构体内还需要一个信号量实现锁的功能,定义sem_t s1,初始化值为1。

 1 #include <assert.h>
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <unistd.h>
 5 
 6 #include "common_threads.h"
 7 
 8 // If done correctly, each child should print their "before" message
 9 // before either prints their "after" message. Test by adding sleep(1)
10 // calls in various locations.
11 
12 // You likely need two semaphores to do this correctly, and some
13 // other integers to track things.
14 
15 typedef struct __barrier_t {
16     // add semaphores and other information here
17     sem_t s1;    // lock
18     sem_t cv;    // cv
19     int n;    // num_threads
20     int count;  // num_before
21 } barrier_t;
22 
23 
24 // the single barrier we are using for this program
25 barrier_t b;
26 
27 void barrier_init(barrier_t *b, int num_threads) {
28     // initialization code goes here
29     Sem_init(&(b->s1), 1);
30     Sem_init(&(b->cv), 0);
31     b->n = num_threads;
32     b->count = 0;
33     return;
34 }
35 
36 void barrier(barrier_t *b) {
37     // barrier code goes here
38     sem_post(&b->cv); // signal
39     sem_wait(&b->s1); // lock
40     while(b->count < b->n){
41       sem_wait(&b->cv);
42       b->count++;
43     }
44     sem_post(&b->s1); // unlock
45 }
46 
47 //
48 // XXX: don't change below here (just run it!)
49 //
50 typedef struct __tinfo_t {
51     int thread_id;
52 } tinfo_t;
53 
54 void *child(void *arg) {
55     tinfo_t *t = (tinfo_t *) arg;
56     //
57     sleep((t->thread_id)%3);
58     //
59     printf("child %d: before\n", t->thread_id);
60     barrier(&b);
61     printf("child %d: after\n", t->thread_id);
62     return NULL;
63 }
64 
65 
66 // run with a single argument indicating the number of 
67 // threads you wish to create (1 or more)
68 int main(int argc, char *argv[]) {
69     assert(argc == 2);
70     int num_threads = atoi(argv[1]);
71     assert(num_threads > 0);
72 
73     pthread_t p[num_threads];
74     tinfo_t t[num_threads];
75 
76     printf("parent: begin\n");
77     barrier_init(&b, num_threads);
78     
79     int i;
80     for (i = 0; i < num_threads; i++) {
81     t[i].thread_id = i;
82     Pthread_create(&p[i], NULL, child, &t[i]);
83     }
84 
85     for (i = 0; i < num_threads; i++) 
86     Pthread_join(p[i], NULL);
87 
88     printf("parent: end\n");
89     return 0;
90 }

 输出before之前睡眠0-2s来测试是否等所有线程(特别是睡眠3s的线程)输出before之后才输出after。

测试结果:

  结果先依次输出“parent: begin”、“child 3: before”、“child 6: before”、“child 0: before”后,等待了约1s,输出“child 1: before”、“child 4: before”、“child 7: before”之后又等了约1s输出剩下的所有内容。线程3/6/0/1/3/7输出before后都进行了等待而不是马上输出after。等所有线程的before都输出了之后才看到有after输出。

4. reader-writer problem

 Now let's solve the reader-writer problem, also as described in the text. In this first take, dont worry about starvation. See the code in reader-writer.c for details. Add sleep() calls to your code to demonstrate it works as you expect. Can you show the existence of the starvation problem?

多个读者和多个写者,都对value进行操作(读或写),其中,所有读者可以同时读value,但同一时间只有一个写者可以写value,且读者的“读”和写者的“写”不能同时发生。

总的来说,多个读者的“读”,和每个写者的“写”是互斥的。因此,每位写者写value都必须获取wlock这个锁,每个读者要读value也需要获得wlock这个锁,但这个锁可以看作所有读者共享,只要有一个读者获取了这个lock,其他读者都视为已持有这个锁。

对于写者,在写之前获取Sem_wait(&wlock),写完释放Sem_post(&wlock)。对于读者,如果它是第一个试图“读”,则先获取锁Sem_wait(&wlock),如果此时还有其他读者,那么说明读者全体已经持有了锁,因此它不必重新试图获取。当读者结束“读”后,如果别的读者还没结束,那这个锁依旧保持,如果它是最后结束的,也就是读者数为0,那么读者就释放该锁Sem_post(&wlock)。

由前面的分析可知还需要一个整型的数count来记录当前的读者数量,每次一个读者试图读value,之前先把这个整数加1,当结束“读”之后把这个数减1。

由于多个读者对count有读有写,对count的操作需要互斥,所以在读者写count和读count两侧另外加一个锁。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "common_threads.h"

//
// Your code goes in the structure and functions below
//

typedef struct __rwlock_t {
  sem_t lock;
  sem_t wlock;
  int count;
} rwlock_t;


void rwlock_init(rwlock_t *rw) {
  Sem_init(&rw->lock, 1);
  Sem_init(&rw->wlock, 1);
  rw->count = 0;
}

void rwlock_acquire_readlock(rwlock_t *rw) {
  Sem_wait(&rw->lock);
  rw->count++;
  if(rw->count == 1){
    Sem_wait(&rw->wlock);
  }
  Sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t *rw) {
  Sem_wait(&rw->lock);
  rw->count--;
  if(rw->count == 0){
    Sem_post(&rw->wlock);
  }
  Sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t *rw) {
  Sem_wait(&rw->wlock);
}

void rwlock_release_writelock(rwlock_t *rw) {
  Sem_post(&rw->wlock);
}

//
// Don't change the code below (just use it!)
// 

int loops;
int value = 0;

rwlock_t lock;

void *reader(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
    rwlock_acquire_readlock(&lock);
    printf("read %d\n", value);
    rwlock_release_readlock(&lock);
    }
    return NULL;
}

void *writer(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
    rwlock_acquire_writelock(&lock);
    value++;
    printf("write %d\n", value);
    rwlock_release_writelock(&lock);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    assert(argc == 4);
    int num_readers = atoi(argv[1]);
    int num_writers = atoi(argv[2]);
    loops = atoi(argv[3]);

    pthread_t pr[num_readers], pw[num_writers];

    rwlock_init(&lock);

    printf("begin\n");

    int i;
    for (i = 0; i < num_readers; i++)
    Pthread_create(&pr[i], NULL, reader, NULL);
    for (i = 0; i < num_writers; i++)
    Pthread_create(&pw[i], NULL, writer, NULL);

    for (i = 0; i < num_readers; i++)
    Pthread_join(pr[i], NULL);
    for (i = 0; i < num_writers; i++)
    Pthread_join(pw[i], NULL);

    printf("end: value %d\n", value);

    return 0;
}

测试结果

 

 

 

存在一个问题是,如果和写者相比,读者非常多(例如很多客户端访问一个网页)。一但由一个读者获取到了wlock锁,那么写者就必须等待,而其他读者可以顺利进行读,这时由于读者很多,读者始终持有锁,写者必须等所有读者都结束。导致写者可能迟迟无法获取到锁。

./reader writer 100 2 4)输出结果如下:输出了400个read 0后才开始write。

begin

read 0  read 0 ... ... read 0  write1  write2  write3  write4  write5  write6  write7  write8  end value:8

5. Reader-writer problem about starvation

 Lets look at the reader-writer problem again, but this time, worry about starvation. How can you ensure that all readers and writers eventually make progress? See reader-writer-nostarve.c for details.

reader-writer-nostarve.c

数据结构及各个函数和reader-writer.c基本一样,但是为了解决starvation的问题,需要做出如下修改:

starvation问题源于后来源源不断的读者比写者优先占用了wlock。这是后来的读者和写者的竞争,由于“读者共享锁”所以,对于写者来说是不公平的。因此再添加一个waiting_lock锁,这个锁会被所有的读者和写者来使用。在读者获取读者锁和写者获取写者锁的开始和结束两侧加上Sem_wait(&waiting_lock)和Sem_post(&waiting_lock),一但写者得到这个锁,虽然仍有读者在使用wlock,但写者持有了waiting_lock后,不会再有新的读者加入(它们都在等着waiting_这个锁),写者只需等待当前正在执行的读者全部结束就能使用wlock了。

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <unistd.h>
  4 #include "common_threads.h"
  5 
  6 //
  7 // Your code goes in the structure and functions below
  8 //
  9 
 10 typedef struct __rwlock_t {
 11   sem_t lock;
 12   sem_t wlock;
 13   sem_t waiting_lock;    // new lock
 14   int count;
 15 } rwlock_t;
 16 
 17 
 18 void rwlock_init(rwlock_t *rw) {
 19   Sem_init(&rw->lock, 1);
 20   Sem_init(&rw->wlock, 1);
 21   Sem_init(&rw->waiting_lock, 1);    // 
 22   rw->count = 0;
 23 }
 24 
 25 void rwlock_acquire_readlock(rwlock_t *rw) {
 26   sleep(1);
 27   Sem_wait(&rw->waiting_lock);    // acquire waiting_lock 
 28   Sem_wait(&rw->lock);
 29 
 30   rw->count++;
 31   if(rw->count == 1){
 32     Sem_wait(&rw->wlock);
 33   }
 34   Sem_post(&rw->lock);
 35   Sem_post(&rw->waiting_lock);    // release
 36 
 37 }
 38 
 39 void rwlock_release_readlock(rwlock_t *rw) {
 40   Sem_wait(&rw->lock);
 41   rw->count--;
 42   if(rw->count == 0){
 43     Sem_post(&rw->wlock);
 44   }
 45   Sem_post(&rw->lock);
 46 }
 47 
 48 void rwlock_acquire_writelock(rwlock_t *rw) {
 49   sleep(1);
 50   Sem_wait(&rw->waiting_lock);    // acquire
 51   Sem_wait(&rw->wlock);
 52   Sem_post(&rw->waiting_lock);    // release
 53 }
 54 
 55 void rwlock_release_writelock(rwlock_t *rw) {
 56   Sem_post(&rw->wlock);
 57 }
 58 
 59 //
 60 // Don't change the code below (just use it!)
 61 // 
 62 
 63 int loops;
 64 int value = 0;
 65 
 66 rwlock_t lock;
 67 
 68 void *reader(void *arg) {
 69     int i;
 70     for (i = 0; i < loops; i++) {
 71     rwlock_acquire_readlock(&lock);
 72     printf("read %d\n", value);
 73     rwlock_release_readlock(&lock);
 74     }
 75     return NULL;
 76 }
 77 
 78 void *writer(void *arg) {
 79     int i;
 80     for (i = 0; i < loops; i++) {
 81     rwlock_acquire_writelock(&lock);
 82     value++;
 83     printf("write %d\n", value);
 84     rwlock_release_writelock(&lock);
 85     }
 86     return NULL;
 87 }
 88 
 89 int main(int argc, char *argv[]) {
 90     assert(argc == 4);
 91     int num_readers = atoi(argv[1]);
 92     int num_writers = atoi(argv[2]);
 93     loops = atoi(argv[3]);
 94 
 95     pthread_t pr[num_readers], pw[num_writers];
 96 
 97     rwlock_init(&lock);
 98 
 99     printf("begin\n");
100 
101     int i;
102     for (i = 0; i < num_readers; i++)
103     Pthread_create(&pr[i], NULL, reader, NULL);
104     for (i = 0; i < num_writers; i++)
105     Pthread_create(&pw[i], NULL, writer, NULL);
106 
107     for (i = 0; i < num_readers; i++)
108     Pthread_join(pr[i], NULL);
109     for (i = 0; i < num_writers; i++)
110     Pthread_join(pw[i], NULL);
111 
112     printf("end: value %d\n", value);
113 
114     return 0;
115 }

测试结果:

 结果read和write相对分布均匀,说明reader和writer获取wlock的机会相对均衡。

6. no-starve mutex

 Use semaphores to build a no-starve mutex, in which any thread that tries to acquire the mutex will eventually obtain it. See the code in mutex-nostarve.c for more information.

要求让所有子线程都能获得锁,那么使用一个全局变量count,每当某个线程获得了锁ns_mutex_t就count++,最后判断count和loops*num_threads是否相等就行,期望的结果是相等。

先补全main函数和worker函数,main函数传入了一个参数表示创建子线程数num_threads,另外一个参数表示每个线程里期望获取锁的次数loops,如果没有则默认为1。

 【图片】【图片】【图片】

再考虑nostarve_mutex的结构体定义、初始化、获取和释放的具体实现。由于并不能产生starvation的情况,则需要保证所有的线程里只要进入Sem_wait就总会在将来的某个时刻被唤醒并成功获取到锁。

no_mutex_t里定义一个整型ticket,用count来担任“turn”的角色。每个线程在试图获取锁的时候,获取一个my_t=ticket,然后更新ticket(加1)。这样每个线程每次获取锁都会有一个不同的ticket(my_t),

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <pthread.h>
 5 #include "common_threads.h"
 6 
 7 //
 8 // Here, you have to write (almost) ALL the code. Oh no!
 9 // How can you show that a thread does not starve
10 // when attempting to acquire this mutex you build?
11 //
12 
13 typedef struct __ns_mutex_t{
14     sem_t s;
15     int ticket;
16 } ns_mutex_t;
17 
18 ns_mutex_t lock;
19 int count = 0;
20 int loops;
21 
22 void ns_mutex_init(ns_mutex_t *m) {
23     Sem_init(&m->s, 1);
24     m->ticket = 0;
25 }
26 
27 void ns_mutex_acquire(ns_mutex_t *m) {
28     int my_t = (m->ticket)++;    
29     do{
30       Sem_wait(&m->s);
31       if(my_t == count) break;
32       else{
33         printf("Not my turn!\n");
34         Sem_post(&m->s);
35       }
36     } while(my_t != count);
37 }
38 
39 void ns_mutex_release(ns_mutex_t *m) { 
40     Sem_post(&m->s);
41 }
42 
43 
44 void *worker(void *arg) {
45     for(int i = 0; i < loops; i++){
46       ns_mutex_acquire(&lock);
47       count++;
48       ns_mutex_release(&lock);
49     }
50     
51     return NULL;
52 }
53 
54 int main(int argc, char *argv[]) {
55     printf("parent: begin\n");
56     assert(argc == 3 || argc == 2);
57     int num_threads = atoi(argv[1]);
58     loops = (argc == 2) ? 1 : atoi(argv[2]);
59     pthread_t p[num_threads];
60     ns_mutex_init(&lock);
61 
62     int i;
63     for (i = 0; i < num_threads; i++)
64     Pthread_create(&p[i], NULL, worker, NULL);
65 
66     for (i = 0; i < num_threads; i++)
67     Pthread_join(p[i], NULL);
68     if(count == loops*num_threads){
69         printf("All threads acquire successfully!\n");
70         printf("count: %d\n", count);
71       }
72     else{
73         printf("Maybe some threads acquire failed.\n");
74         printf("count: %d\n", count);
75     }
76             
77     printf("parent: end\n");
78     return 0;
79 }

测试结果:

 


 1 #ifndef __common_threads_h__
 2 #define __common_threads_h__
 3 
 4 #include <pthread.h>
 5 #include <assert.h>
 6 #include <sched.h>
 7 
 8 #ifdef __linux__
 9 #include <semaphore.h>
10 #endif
11 
12 #define Pthread_create(thread, attr, start_routine, arg) assert(pthread_create(thread, attr, start_routine, arg) == 0);
13 #define Pthread_join(thread, value_ptr)                  assert(pthread_join(thread, value_ptr) == 0);
14 
15 #define Pthread_mutex_init(m, v)                         assert(pthread_mutex_init(m, v) == 0);
16 #define Pthread_mutex_lock(m)                            assert(pthread_mutex_lock(m) == 0);
17 #define Pthread_mutex_unlock(m)                          assert(pthread_mutex_unlock(m) == 0);
18 
19 #define Pthread_cond_init(cond, v)                       assert(pthread_cond_init(cond, v) == 0);
20 #define Pthread_cond_signal(cond)                        assert(pthread_cond_signal(cond) == 0);
21 #define Pthread_cond_wait(cond, mutex)                   assert(pthread_cond_wait(cond, mutex) == 0);
22 
23 #define Mutex_init(m)                                    assert(pthread_mutex_init(m, NULL) == 0);
24 #define Mutex_lock(m)                                    assert(pthread_mutex_lock(m) == 0);
25 #define Mutex_unlock(m)                                  assert(pthread_mutex_unlock(m) == 0);
26 
27 #define Cond_init(cond)                                  assert(pthread_cond_init(cond, NULL) == 0);
28 #define Cond_signal(cond)                                assert(pthread_cond_signal(cond) == 0);
29 #define Cond_wait(cond, mutex)                           assert(pthread_cond_wait(cond, mutex) == 0);
30 
31 #ifdef __linux__
32 #define Sem_init(sem, value)                             assert(sem_init(sem, 0, value) == 0);
33 #define Sem_wait(sem)                                    assert(sem_wait(sem) == 0);
34 #define Sem_post(sem)                                    assert(sem_post(sem) == 0);
35 #endif // __linux__
36 
37 #endif // __common_thre
common_threads.h