CSAPP 并发编程读书笔记

CSAPP 并发编程笔记

并发和并行

构造并发程序的方法

现代操作系统提供了 3 种基本的构造并发程序的方法:

12.1 基于进程的并发编程

示例代码

void sigchld_handler(int sig)
{
    while(waitpid(-1, 0, WNOHANG) > 0) 
        ;
}

int main()
{
    signal(SIGCHLD, sigchld_handler); // 回收僵死子进程资源
    listenfd = open_listenfd();
    while (1) {
      connfd = accept(...);
      if (fork() == 0) { // 子进程
        close(listenfd); // 关闭父进程 fd,不关闭问题不大,子进程结束时自动关闭
        process(connfd);
        close(connfd);
        exit(0);
      }
      close(connfd); // 父进程关闭 fd。重要!否则永远不会释放 connfd 连接描述符,导致内存泄漏!
    }
}

父、子进程中的已连接描述符都指向同一个文件表表项,所以父进程关闭 connfd 至关重要。否则,永远不会释放已连接描述符 4 connfd 的文件表条目,引起内存泄漏。因为 socket 文件表表项中的引用计数,直到父子进程 connfd 都关闭了,到客户端的连接才会终止。

进程并发优缺点

共享文件表,但不共享地址空间(是优点,也是缺点)。不方便共享数据,只能通过显式 IPC。进程控制和 IPC 开销大。

12.2 基于 I/O 多路复用的并发编程

背景知识

通过 select 函数,等待一组描述符 ready。

#include <sys/select.h>
int select(int n, fd_set *fdset, NULL, NULL, NULL); // 返回 ready fd 的个数,出错返回 -1

FD_ZERO(fd_set *fdset);
FD_CLR(int fd, fd_set *fdset);
FD_SET(int fd, fd_set *fdset);
FD_ISSET(int fd, fd_set *fdset);

select 阻塞,直到至少一个 fd ready(即读取一个字节不阻塞)

示例代码

注意:select 有副作用,会修改入参 fdset 的内容!

fd_set read_set;
FD_ZERO(&read_set);
FD_SET(STDIN_FILENO, &read_set);
FD_SET(listenfd, &read_set);

while(1) {
  fd_set ready_set = read_set; // 因为 select 会修改入参 read_set 的内容,每次都重新从 read_set 拷贝!
  select(listenfd+1, &ready_set, NULL, NULL, NULL);
  if(FD_ISSET(STDIN_FILENO, &ready_set)
     // ...
  if(FD_ISSET(listenfd, &ready_set)
     // ...
}

I/O 多路复用优缺点

因为明显的性能优势,现代高性能服务器如 Node.js、nginx 和 Tornado 都是基于 I/O 多路复用的事件驱动

12.3 基于线程的并发编程

背景知识

# include <pthread.h>
typedef void *(func)(void *);

// 创建线程
int pthread_create(pthread_t *tid, pthread_attr_t *attr, func *f, void *arg);
// 返回调用者线程 ID
pthread_t pthread_self();
// 显示终止线程(threadFunc 返回即隐式终止),如果主线程调用,则等待所有其他对等线程终止,再终止主线程和整个进程
void pthread_exit(void *thread_return);
// 对等线程以当前线程 ID 作为参数,调用 pthread_cancel 来终止当前线程
int pthread_cancel(pthread_t tid); // 成功返回 0
// 回收已终止线程的资源
int pthread_join(pthread_t tid, void **thread_return); // 阻塞直到 tid 终止,成功返回 0
// 分离线程
int pthread_detach(pthread_t tid); // 成功返回 0
// 初始化线程 (可以用来实现单例模式)
pthread_once_t once = PTHREAD_ONCE_INIT; // 必须是全局或者静态变量,固定初始化为 PTHREAD_ONCE_INIT(主要用其地址)
int pthread_once(pthread_once_t *once_control, void(*init_routine)(void));

线程由内核自动调度,每个线程有自己的线程上下文。

线程上下文:线程 ID(TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码。

示例代码

while (1) {
  pConnfd = new int();
  *pConnfd = accept(...);
  pthread_create(&tid, NULL, threadFunc, pConnfd);
}

void* threadFunc(void* p)
{
  int connfd = *p;
  pthread_detach(pthread_self());
  free(p);
}

为了避免 race condition,connfd 必须在堆中创建,在线程中释放,而不能直接把 connfd 的地址传给 threadFunc!

12.4 多线程共享变量

线程内存模型

将变量映射到内存

C++11 thread_local 存在哪里?

12.5 信号量

背景知识

#include <semaphore.h>

// 成功返回 0,出错返回 -1
int sem_init(sem_t *sem, 0, unsigned int value);
int sem_wait(sem_t *sem);  // P(s) 如果 s 为 0,挂起,直到 s 变为非零。V 操作可以重启这个线程。
int sem_post(sem_t *sem);  // V(s)

生产者-消费者

int buf[N]; // 理解成 queue<int>
sem_t mutex, slots, items;

void init(int n)
{
    sem_init(&mutex, 0, 1);
    sem_init(&slots, 0, n);
    sem_init(&items, 0, 0);
}

void producer(T item)
{
    sem_wait(&slots)
    sem_wait(&mutex);
    // insert item into buf
    sem_post(&mutex);
    sem_post(&items);
}

T consumer()
{
    T item;
    sem_wait(&items);
    sem_wait(&mutex);
    // pop item from buf
    sem_post(&mutex);
    sem_post(&slots);
    return item;
}

读者-写者

读者、写者平等地争夺 w,一旦读者获取了 w,将一直占有 w,直到最后一个读者离开,释放 w。

如果读者不断到达,写者可能无限等待,导致饥饿。

以下是一个读者优先的例子。(弱优先级,当最后一个读者释放 w,下一个获取 w 的不一定是等待 w 的读者,也有可能是等待 w 的写者!)

int readcnt = 0;
sem_t mutex; // 保护 readcnt
sem_t w;     // 读者或写者抢占 w

void init()
{
    sem_init(&mutex, 0, 1);
    sem_init(&w, 0, 1);
}

void reader()
{
    while(1)
    {
        sem_wait(&mutex);
        readcnt++;
        if(readcnt == 1)
            sem_wait(&w); // 如果这是第一个读者,抢占 w
        sem_post(&mutex);
      
        // Critical section
        // Reading...
      
        sem_wait(&mutex);
        readcnt--;
        if(readcnt == 0)
            sem_post(&w); // 如果这是最后一个读者,释放 w
        sem_post(&mutex);
    }
}

void writer()
{
    while(1)
    {
        sem_wait(&w);
            
        // Critical section
        // Writing...
        
        sem_post(&w);
    }
}

综合:基于预线程化的并发服务器

很好的例子,结合上述多种方式的优点,建议亲自写一遍。代码参考 CSAPP,不再赘述。

12.6 使用线程提高并行性

通用技术:向对等线程传递一个小整数,作为唯一的线程 ID。每个对等线程根据线程 ID 来决定它应该计算序列的哪一部分。

通常每个核上运行一个线程,在一个核上运行运行多个线程会有额外的上下文切换开销。

多线程求和的例子:

线程数 1 2 4 8 16 sum_mutex 68.00 432.00 719.00 552.00 599.00 sum_global 7.26 3.64 1.91 1.85 1.84 sum_local 1.06 0.54 0.28 0.29 0.30
// 加锁操作全局变量
void* sum_mutex(void* vargp)
{
    // 根据 vargp 确定计算范围
    for(i=start; i<end; i++) {
        sem_wait(&mutex);
        gsum += i;
        sem_post(&mutex);
    }
    return NULL;
}

// 每个线程独立位置存放结果,无需 mutex,直接累加到全局数组。主线程等待所有子线程完成
void* sum_global(void* vargp)
{
    long threadId = *((long*) vargp);
    // 根据 vargp 确定计算范围
    for(i=start; i<end; i++)
        gsum[threadId] += i;
    return NULL;
}

// 先用局部变量累加结果,减少不必要的内存引用,最后一次性赋给全局数组
void* sum_local(void* vargp)
{
    // 根据 vargp 确定计算范围
    int local_sum = 0;
    for(i=start; i<end; i++) {
        local_sum += i;
    }
    gsum[threadId] = local_sum;
    return NULL;
}

12.7 其他并发问题

线程安全

线程安全(thread-safe):多个并发线程反复调用,结果正确

可重入(reentrant):线程安全的真子集,不需要同步操作,比不可重入的线程安全的函数更高效。

四类不相交的线程不安全函数:

不安全类 说明 例子 变为线程安全的方法 1 不保护共享变量的函数 - 同步操作保护共享变量;缺点:慢 2 保持跨越多个调用的状态的函数 伪随机数生成器:当前调用结果依赖前次调用的中间结果 唯一方式是重写。不再依赖 static 数据,而是依靠调用者在参数中传递状态 3 返回指向静态变量的指针的函数 ctime、gethostbyname:将结果保存在 static 变量中,然后返回这个变量的指针 a) 重写:调用者传递存放结果的变量地址; b) 如果难以修改,则创建包装函数,进行加锁-复制。 4 调用线程不安全函数的函数 f 调用线程不安全函数 g 如果 g 是第 2 类,只能重写 g;如果 g 是 1、3 类,可以加锁(拷贝)

Linux 中线程不安全函数

大多数 Linux 函数都是线程安全的,包括定义在 C 库中的函数(例如 malloc、free、realloc、printf、scanf)

线程不安全函数 线程不安全类 Linux 线程安全版本 rand 2 rand_r strtok(已弃用) 2 strtok_r asctime 3 asctime_r ctime 3 ctime_r gethostbyaddr(已弃用,推荐 getaddrinfo) 3 gethostbyaddr_r gethostbyname(已弃用,推荐 getnameinfo) 3 gethostbyname_r net_ntoa(已弃用,推荐 inet_ntop) 3 无 localtime 3 localtime_r

死锁

注:现代 C++ 可以一次获得多个锁,从根源上避免了死锁。

12.8 小结

Reference