Semaphore

Dijkstra 大佬提出了一种解决不同线程之前同步和互斥问题的方法。这种方法就是 PV 操作,它基于 Semaphore(信号量)。

可以把信号量简单看成一个非负整数,只能使用两种操作来改变它的值,这两种操作就是 P 和 V,用伪代码描述如下(与教材《操作系统教程》上不同,这里的信号量不能为负值)。PV 操作的伪代码实现如下:

  • P(s)

    P(s) {
        if (p != 0) {
         s--;
        } else {
         append this thread to list;
         sleep();
        }
    }
    
  • V(s)

    V(s) {
        s++;
        if (list is no empty) {
            list.pop().wakeup();
        }
    }
    

P 和 V 的执行过程都是不可打断的,并且 P 和 V 要成对的出现。这样就保证了程序不可能进入信号量为负值的状态,可以利用这个特性实现进程之间的互斥和同步。

Binary semaphore

Binary semaphore(二元信号量),是信号量的一种,因为其初始化信号量为1,所以二元信号量只能取值 0 或 1。使用二元信号量可以实现互斥,所以也把它叫做互斥锁(mutex)。

Semaphore in C

在使用信号量实现互斥之前,先来介绍一下 POSIX 标准中的线程(pthread)。在 C 语言中可以使用 pthread 接口来编写多进程的程序。

下面是一个简单的 hello world(进程版),参考了CSAPP 中的代码,所以使用 CSAPP 对 pthread 的 wrapper 函数:

void *thread1(void *vargp);

int main(int argc, char const *argv[]) {
  //平时编写的C程序只有一个线程--主线程
  pthread_t tid1;
  //创建线程,同时开始运行
  Pthread_create(&tid1, NULL, thread1, NULL);
  //阻塞主线程,直到tid1线程结束
  Pthread_join(tid1, NULL);
  return 0;
}

void *thread1(void *vargp) {
  printf("Hello world\n");
  return NULL;
}

对 pthread 不再过多介绍,还不如去看man page

如果两个并发的进程,在访问同一个个共享变量的时候不加限制,可能会造成结果与预期不同。比如下面的计数器例子:

#include "csapp.h"

void *thread(void *arg);
volatile int cnt = 0; // shared counter

int main(int argc, char const *argv[]) {
  if (argc != 2) {
    printf("Usage: %s <n>\n", argv[0]);
    exit(1);
  }
  int n = atoi(argv[1]);
  pthread_t tid1, tid2;
  Pthread_create(&tid1, NULL, thread, &n);
  Pthread_create(&tid2, NULL, thread, &n);
  Pthread_join(tid1, NULL);
  Pthread_join(tid2, NULL);

  if (cnt != (2 * n))
    printf("BOOM! cnt=%d\n", cnt);
  else
    printf("OK cnt=%d\n", cnt);

  return 0;
}

void *thread(void *arg) {
  int n = *(int *)arg;
  for (int i = 0; i < n; i++) {
    cnt++;
  }
  return NULL;
}

cnt变量被两个进程同时访问,在访问它的过程中随时可能被打断,即使cnt++看起来是不会被打断的操作,但是当它被编译成汇编语言时:

movl cnt, %eax
incl %eax
movl %eax, cnt

这样看来,机器在执行的时候可能会有问题。具体一点,一开始 cnt 为 0 ,一个进程如果在第二条指令被另一个进程打断,%eax 已经加一,而 cnt 的值还未保存。另一个进程完成操作后, cnt 为 1。接着原先的进程继续运行, cnt 也为 1。所以结果应该会比两倍的 n 要小。

为了解决这个问题,就可以使用二元信号量,对 临界区(critical section)加锁,来保证同时只有一个进程访问它。在上面的例子里,临界区就是那三条汇编指令,即 cnt++

sem_t mutex;

int main() {
    //...
	// init mutex with 1
    sem_init(&mutex, 0, 1);
    //...
}

void *thread(void *arg) {
  int n = *(int *)arg;
  for (int i = 0; i < n; i++) {
    P(&mutex); // sem_wait(sem_t *s)
    cnt++;
    V(&mutex); // sem_post(sem_t *s)
  }
  return NULL;
}

其中的P()V()两个函数分别是对 Linux 下信号量函数的简单封装

void P(sem_t *sem) {
  if (sem_wait(sem) < 0)
    unix_error("P error");
}

void V(sem_t *sem) {
  if (sem_post(sem) < 0)
    unix_error("V error");
}

接下来讨论几个老师上课讲到的和没讲到的:cry:几个进程同步问题,分别为生产者-消费者问题读者-写者问题睡眠理发师问题吸烟者问题和尚吃水问题吃水果问题。顺便可以写写代码(

Producer-Consumer

一般信号量,是一个非负整数,可以用来表示可用资源的数量。对生产者消费者问题,对于多个消费者,多个生产者和缓存区大小大于 1 的情况:

线程 关系
C-C 互斥
P-C 同步
P-P 互斥

生产者和消费者共享一个大小为n的缓存区,生产者不断生产数据,把他们放入缓存区;消费者不断地从缓存区取走数据,然后使用它们。

两个生产者之前对缓存区的插入是互斥的,它们使用一个共享变量来表明当前要插入的位置。消费者也类似。所以必须保证生产者和消费者分别对缓存区的访问互斥。

同时,我们还需要对调度对缓存区的访问,如果缓存区是空的,那么消费者不能从中取走数据,直到生产者生产了一个数据;如果缓存区的满的,那么生产者不能再往里面插入数据,直到消费者取走了一个数据。

好了,说了那么多,总结起来就是:

  • 生产者和消费者分别对缓存区的访问互斥
  • 对缓存区中可用资源的计数
typedef struct {
  int *buf; // buffer array
  int n; // size of solts, the buffer size is n - 1, because index 0 will not be
         // filled.
  int front;     // buf[(front+1)%n] is the first item
  int rear;      // buf[rear%n] is the last item
  sem_t mutex_p; // protect access to buf for prodecter
  sem_t mutex_c;
  sem_t slots; // counts available solts
  sem_t items; // counts available items;
} sbuf_t;

我们使用两个互斥锁来保证缓存区的访问是互斥,同时使用信号量:slots 和 items 来对缓存区中的空位和数据进行计数。

sbuf_t *sp;

sem_t mutex_c, mutex_p;
int p_cnt = 0, c_cnt = 0;

void *producter_thread(void *arg) {
  for (int i = 0; i < 10; i++) {
    printf("produce: %d\n", i);
    sbuf_insert(sp, i);
    P(&mutex_p);
    p_cnt++;
    V(&mutex_p);
  }
  return NULL;
}

void *consumer_thread(void *arg) {
  for (int i = 0; i < 10; i++) {
    printf("consume: %d\n", sbuf_remove(sp));
    P(&mutex_c);
    c_cnt++;
    V(&mutex_c);
  }
  return NULL;
}

int main(int argc, char const *argv[]) {
  sp = Calloc(1, sizeof(sbuf_t));
  sbuf_init(sp, 10);
  Sem_init(&mutex_p, 0, 1);
  Sem_init(&mutex_c, 0, 1);

  pthread_t tids[10];

  for (int i = 0; i < 10; i++) {
    if (i % 2 == 0)
      Pthread_create(&tids[i], NULL, producter_thread, NULL);
    else
      Pthread_create(&tids[i], NULL, consumer_thread, NULL);
    Pthread_join(tids[i], NULL);
  }
  sbuf_deinit(sp);
  printf("producters run: %d times\n", p_cnt);
  printf("consumers run: %d times\n", c_cnt);
  return 0;
}

// Create an empty, bounded, shared FIFO buffer with n slots
void sbuf_init(sbuf_t *sp, int n) {
  sp->buf = Calloc(n, sizeof(int));
  sp->n = n;                    /* Buffer holds max of n items */
  sp->front = sp->rear = 0;     /* Empty buffer iff front == rear */
  Sem_init(&sp->mutex_p, 0, 1); /* Binary semaphore for locking */
  Sem_init(&sp->mutex_c, 0, 1); /* Binary semaphore for locking */
  Sem_init(&sp->slots, 0, n);   /* Initially, buf has n empty slots */
  Sem_init(&sp->items, 0, 0);   /* Initially, buf has zero data items */
}

void sbuf_deinit(sbuf_t *sp) {
  Free(sp->buf);
  Free(sp);
}

// add to rear
void sbuf_insert(sbuf_t *sp, int item) {
  P(&sp->slots);  		// Request solts
  P(&sp->mutex_p); 		// Lock buf for producter
  sp->buf[(++sp->rear) % sp->n] = item;
  V(&sp->mutex_p);  	// unlock
  V(&sp->items); 		// Make a item
}

// remove from head
int sbuf_remove(sbuf_t *sp) {
  int item;
  P(&sp->items);
  P(&sp->mutex_c);
  item = sp->buf[(++sp->front) % sp->n];
  V(&sp->mutex_c);
  V(&sp->slots);
  return item;
}

从 main 函数 看起,初始化信号量和缓存区,接着我们创建 10 个进程:5 个消费者,5 个生产者。

在生产者进程中,我们模拟生产10条数据便结束进程。消费者进程相同地取走10条数据也就结束了。同时引入 c_cnt, p_cnt来验证我们的结果是否正确,对他们的访问也需要加锁。

sbuf_insert 函数等待一个可用的空位,加互斥锁,插入数据,然后宣布一个数据可用,来唤醒可能在等待的消费者进程;sbuf_remove函数等待可用的数据,加互斥锁,取走数据,然后宣布一个空位可用。

如果要进一步提高它们的并发性,当插入数据和取出数据很耗时时,把它们放在互斥锁中是不合理的。比如一个消费者正在取走数据,这需要花费很久时间,而其他消费者也只能等待,即使还有可用的空位。可以使用一个局部变量保存数据的下标,把读写操作放到互斥锁之后。

// add to rear
void sbuf_insert(sbuf_t *sp, int item) {
  int tmp;
  P(&sp->slots);
  P(&sp->mutex_p);
  tmp = (++sp->rear) % sp->n;
  V(&sp->mutex_p);
  sp->buf[tmp] = item; // assume this operation consume too much time,
                       // so put it out of the lock of the buffer, make
                       // producters more concurrent.
  V(&sp->items);
}

// remove from head
int sbuf_remove(sbuf_t *sp) {
  int item;
  int tmp;
  P(&sp->items);
  P(&sp->mutex_c);
  tmp = (++sp->front) % sp->n;
  V(&sp->mutex_c);
  item = sp->buf[tmp]; // same as productor
  V(&sp->slots);
  return item;
}

Reader-Writer

读者-写者问题的描述如下:

有两组并发进程:读者和写者,它们共同读写共享文件F,要求:

  1. 允许多个读者同时读文件
  2. 只允许一个写者同时写文件
  3. 写者在完成操作之前,不允许其他读者或写者操作
  4. 写者在执行操作之前,其他读者或写者必须全部退出

根据上面的几个要求,可以得知读者和写者之前的同步互斥关系:

进程 关系
R-R 同步:
已经有读者在读:直接进行读操作;
无读者在读:等待写操作完成
R-W 互斥
W-W 互斥

读者和写者之前,需要判断是否已经有读者在读,那么仅仅使用信号量不能解决这问题。所以,我们定义reader_cnt来统计当前的读者进程数,mutex为对read_cnt这个共享变量的互斥锁,而w为读者数量的信号量,因为最大只允许一个写者进行写操作,所以也相当于一个二元信号量。

#define THREAD_SIZE 10

char file[20] = "Init file"; // simulate file I/O
int reader_cnt;
sem_t mutex; // mutux for reader_cnt
sem_t w;     // mutex for writer

main 函数,初始化信号量mutexw为 1 ,并且创建读者和写者进程,使得读者进程个数大于写着进程:

int main(int argc, char const *argv[]) {
  pthread_t tid_list[THREAD_SIZE];
  Sem_init(&mutex, 0, 1);
  Sem_init(&w, 0, 1);

  for (int i = 0; i < THREAD_SIZE; i++) {
    if (i % 2 == 0 || i % 3 == 0) // more reader
      Pthread_create(&tid_list[i], NULL, reader_thread, NULL);
    else
      Pthread_create(&tid_list[i], NULL, writer_thread, NULL);
    Pthread_join(tid_list[i], NULL); // WAIT
  }

  puts("Finally file:");
  puts(file);
  return 0;
}

读者进程,每个读者进程模拟进行5次读操作:

void *reader_thread(void *arg) {
  int cnt = 0;
  while (cnt++ < 5) {
    // printf("%d\n", cnt);
    P(&mutex);
    reader_cnt++;
    if (reader_cnt == 1) // first reader
      P(&w);
    V(&mutex);

    // read file...
    printf("Read file: %s\n", file);

    P(&mutex);
    reader_cnt--;
    if (reader_cnt == 0) // last reader
      V(&w);
    V(&mutex);
  }
  return NULL;
}

写者进程,每个写者同样模拟5次写操作:

void *writer_thread(void *arg) {
  int cnt = 0;
  while (cnt++ < 5) {
    P(&w);
    // critical section
    printf("Writing file\n");
    strcpy(file, "Hello world");
    V(&w);
  }
  return NULL;
}

运行结果:

sem-rw

Sleepy barber

睡眠理发师问题:理发店里有一位理发师,一把理发椅和多把等候的椅子;如果没有顾客,理发师在理发椅上睡觉;当有顾客来时,他唤醒理发师,进行理发,此时如果有其他顾客进来,如果有空椅子,则坐下等待,否则直接离开。

定义4个全局变量分别为:barber_ready表示等待顾客的理发师数,因为理发师只有一个,也可以看做是二元信号量,用户阻塞顾客进程;cust_ready表示等待理发的顾客数,用于阻塞理发师进程,初始为0;mutex是用于保护共享变量wating的互斥锁;而free_seats则是常量,表示等待理发的椅子数量。

sem_t barber_ready, cust_ready;
sem_t mutex;
int free_seats, wating = 0;

理发师进程:

void *barber(void *arg) {
  static int cnt = 0;
  while (1) {
    P(&cust_ready);  // sleetp to wait customer
    P(&mutex);
    wating--;
    V(&barber_ready);  // ready to cut
    V(&mutex);
    cnt++;
    if (cnt >= (int)arg) {
      wating = free_seats + 1;
      printf("close shop\n");
      return NULL;  // barber close shop after cut <arg> customers
    }
    // cuthair
    printf("I'm cutting your hair\n");
  }
  return NULL;
}

顾客进程:

void *customer(void *arg) {
  P(&mutex);
  if (wating < free_seats) {
    wating++;
    V(&cust_ready);  // notif barber
    V(&mutex);
    P(&barber_ready);  // wait for barber
    printf("cut customer %d's hair\n", (int)arg);
  } else {
    V(&mutex);
    printf("I'm leaving without cut\n");
    return NULL;
  }
  return NULL;
}

main 函数:

int main(int argc, char const *argv[]) {
  // scanf("%d", &free_seats);
  free_seats = 5;
  Sem_init(&barber_ready, 0, 0);
  Sem_init(&cust_ready, 0, 0);
  Sem_init(&mutex, 0, 1);
  pthread_t b_tid, c_tids[free_seats * 2];
  Pthread_create(&b_tid, NULL, barber, free_seats + 2);
  for (int i = 0; i < free_seats * 2; i++) {
    Pthread_create(&c_tids[i], NULL, customer, (void *)i);
  }
  Pthread_join(b_tid, NULL);
  for (int i = 0; i < free_seats * 2; i++) {
    Pthread_join(c_tids[i], NULL);
  }

  return 0;
}

Cigarette smokers

吸烟者问题:三个抽烟者和一个供应者。每个抽烟者需要有三种材料:烟草、纸和火柴,才可吸烟。三个抽烟者分别拥有三种材料中的一种。 供应者每次随机将两种材料放到桌子上,拥有剩下那种材料的抽烟者卷一根烟并抽掉它。材料被使用后,供应者按上述原则继续提供材料。

  • 供应者与三个抽烟者分别是同步关系。由于供应者无法同时满足两个或 以上的抽烟者,三个抽烟者对抽烟这个动作互斥。

  • 设置四个进程,供应者作为生产者向三个抽烟者提供材料。

  • 信号量设置:信号量offer1offer2offer3分别表示烟草和火柴的三种两两组合。信号量finish用于互斥进行抽烟动作。

按着上面的思路,敲得下面的代码。但是运行起来还有点问题。。。

#include "csapp.h"

sem_t offer1;  // offer paper and matchs
sem_t offer2;  // offer tobacco and matchs
sem_t offer3;  // offer tobacco and paper
sem_t finish;

int times = 10;  // offer 10 times somke

void *agent_thread(void *arg) {
  while (times-- > 0) {
    int kase = rand() % 3;
    printf("time%d: %d\n", times, kase);
    if (kase == 0) {
      V(&offer1);
    } else if (kase == 1) {
      V(&offer2);
    } else {
      V(&offer3);
    }
    P(&finish);  // wait somker finish
  }
  return NULL;
}

// i hava tobacco
void *somker1_thread(void *arg) {
  while (times > 0) {
    P(&offer1);
    // smoke
    puts("somker1: that feeling...");
    V(&finish);
  }
  return NULL;
}
// i have paper
void *somker2_thread(void *arg) {
  while (times > 0) {
    P(&offer2);
    // smoke
    puts("somker2: that feeling...");
    V(&finish);
  }
  return NULL;
}
// i have matchs
void *somker3_thread(void *arg) {
  while (times > 0) {
    P(&offer3);
    // smoke
    puts("somker3: that feeling...");
    V(&finish);
  }
  return NULL;
}

int main(int argc, char const *argv[]) {
  srand(time(NULL));

  Sem_init(&offer1, 0, 0);
  Sem_init(&offer2, 0, 0);
  Sem_init(&offer3, 0, 0);
  Sem_init(&finish, 0, 0);
  pthread_t somker1, somker2, somker3;
  pthread_t agent;
  Pthread_create(&somker1, NULL, somker1_thread, NULL);
  Pthread_create(&somker2, NULL, somker1_thread, NULL);
  Pthread_create(&somker3, NULL, somker1_thread, NULL);
  Pthread_create(&agent, NULL, agent_thread, NULL);
  Pthread_join(somker3, NULL);
  Pthread_join(somker2, NULL);
  Pthread_join(somker1, NULL);
  Pthread_join(agent, NULL);
  puts("all threads created\n");
  return 0;
}

和尚吃水问题

某寺庙,有小、老和尚若干,有一水缸,有小和尚提水入缸供老和尚饮用。水缸可容 10 桶水,水取自同一井中。水井径窄,每次只能容一个桶取水。水桶总数为 3 个。每次入、取缸水仅为 1 桶,且不可同时进行。

  • 每次入、取缸水仅为 1 桶,所以小和尚和老和尚有互斥关系
  • 水井每次只能容一个桶取水,所以小和尚之前也有互斥关系
  • 信号量:mutex1用于小和尚之前互斥的从水井取水;mutex2用于互斥的从水缸倒水或取水;empty表示水缸还可放入的水;full表示水缸中已有的水;pail则是表示水桶的数量的信号量。
sem_t mutex1, mutex2;
sem_t empty;  // 水缸中的空位
sem_t full;   //水缸中已经放入的水
sem_t pail;   //水桶

int water = 0;  //水缸中的水量

小和尚取水:

void *get_water(void *arg) {
  while (1) {
    P(&empty);
    P(&pail);
    P(&mutex1);
    // 从井中取水
    printf("little monk get water from wall\n");
    V(&mutex1);

    P(&mutex2);
    // 倒入水缸
    printf("little monk put water to vat, water: %d\n", ++water);
    V(&mutex2);
    V(&full);
    V(&pail);

    Sleep(1);  // slow down...
  }
}

老和尚喝水:

void *use_water(void *arg) {
  while (1) {
    P(&full);
    P(&pail);
    P(&mutex2);
    // 从水缸取水
    printf("old monk get water from vat, water: %d\n", --water);
    V(&mutex2);
    V(&empty);
    // 喝水
    printf("old monk drink water\n");
    V(&pail);

    Sleep(1);
  }
}

main 函数:

int main(int argc, char const *argv[]) {
  Sem_init(&mutex1, 0, 1);
  Sem_init(&mutex2, 0, 1);
  Sem_init(&empty, 0, 10);  //水缸中最多放10桶水
  Sem_init(&full, 0, 0);
  Sem_init(&pail, 0, 3);  //三个木桶

  pthread_t little_id, old_id;
  Pthread_create(&little_id, NULL, get_water, NULL);
  Pthread_create(&old_id, NULL, use_water, NULL);
  Pthread_join(little_id, NULL);
  Pthread_join(old_id, NULL);
  return 0;
}

运行结果:

sem-monk

吃水果问题

桌上有一个可以容纳3个水果的盘子,每次只能放或取一个水果,爸爸负责放苹果,妈妈负责放桔子,儿子只吃苹果,女儿只吃橘子。

  • 因为每次只能放或取一个水果,所以四人之前都存在互斥关系。
  • 爸爸放苹果,儿子吃苹果,两者是同步关系
  • 妈妈放橘子,女儿吃橘子,两者也是同步关系
  • 很显然需要四个进程,表示四个人
  • 信号量:empty表示盘子中还可放的水果数量,organge表示盘子中橘子的数量,apple表示盘子中苹果的数量,mutex是二元信号量,用来互斥的放或取水果。
sem_t empty, orange, apple;
sem_t mutex;

// 盘子中的苹果数和橘子数
int apple_num = 0, orange_num = 0;

四个进程分别如下:

void father(void *arg) {
  while (1) {
    P(&empty);
    P(&mutex);
    // put an apple
    apple_num++;
    printf("Put an apple: %d apples, %d oranges\n", apple_num, orange_num);
    V(&mutex);
    V(&apple);
  }
}

void mother(void *arg) {
  while (1) {
    P(&empty);
    P(&mutex);
    // put an orange
    orange_num++;
    printf("Put an orange: %d apples, %d oranges\n", apple_num, orange_num);
    V(&mutex);
    V(&orange);
  }
}

void son(void *arg) {
  while (1) {
    P(&apple);
    P(&mutex);
    // eat apple
    apple_num--;
    printf("Eat an apple: %d apples, %d oranges\n", apple_num, orange_num);
    V(&mutex);
    V(&empty);
  }
}

void daugther(void *arg) {
  P(&orange);
  P(&mutex);
  // eat an orange
  orange_num--;
  printf("Eat an orange: %d apples, %d oranges\n", apple_num, orange_num);
  V(&mutex);
  V(&empty);
}

main 函数:

int main(int argc, char const *argv[]) {
  Sem_init(&empty, 0, 3);  // 盘子最多放三个水果
  Sem_init(&orange, 0, 0);
  Sem_init(&apple, 0, 0);
  Sem_init(&mutex, 0, 1);
  pthread_t father_id, mother_id, son_id, daugther_id;
  Pthread_create(&father_id, NULL, father, NULL);
  Pthread_create(&mother_id, NULL, mother, NULL);
  Pthread_create(&father_id, NULL, son, NULL);
  Pthread_create(&daugther_id, NULL, daugther, NULL);

  Pthread_join(father_id, NULL);
  Pthread_join(mother_id, NULL);
  Pthread_join(son_id, NULL);
  Pthread_join(daugther_id, NULL);
  return 0;
}

运行结果:

sem-fruit

参考