CSAPP-12并发编程

Posted by SH on June 20, 2020

CSAPP-12并发编程

如果逻辑控制流在时间上重叠,那么它们就是并发的(concurrent)。

  • 系统层面:硬件异常处理程序、进程、Linux信号处理程序。
  • 应用程序中:Linux信号处理程序允许响应异步事件。

应用级并发是很有用的:

  • 访问慢速I/O设备。
  • 与人交互。
  • 通过推迟工作以降低延迟。
  • 服务多个网络客户端。
  • 在多核机器上进行并行计算。

使用应用级并发的应用程序称为并发程序,现代操作系统提供了三种基本的构造并发程序的方法:

  • 进程。每个逻辑控制流都是一个进程,由内核来调度和维护,进程有独立的虚拟地址空间,想要和其他流通信,控制流必须使用某种显式的进程间通信(interprocess communication,IPC)机制。
  • I/O多路复用。应用程序在一个进程的上下文中显式地调度它们自己的逻辑流。逻辑流被模型化为状态机,数据到达文件描述符后,主程序显式地从一个状态转换到另一个状态。因为程序是一个单独的进程,所有流共享同一个地址空间。
  • 线程。线程是运行在一个单一进程上下文中的逻辑流,由内核进行调度。可以把线程看成是其他两种方式的混合体,像进程流一样由内核进行调度,二像I/O多路复用流一样共享同一个虚拟地址空间。

基于进程的并发编程

fork、exec、waitpid等函数。

基于进程的并发服务器

基于进程的并发echo服务器,父进程派生一个子进程来处理每个新的连接请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include "csapp.h"
void echo(int connfd);

// 通常服务器会运行很长时间,必须要包括一个SIGCHLD处理程序,来回收僵死(zombie)子进程的资源
void sigchld_handler(int sig)
{
	while (waitpid(-1, 0, WNOHANG) > 0);
	return;
}

int main(int argc, char **argv)
{
	int listenfd, connfd;
	socklen_t clientlen;
	struct sockaddr_storage clientaddr;

	if (argc != 2)
	{
		fprintf(stderr, "usage: %s <port>\n", argv[0]);
		exit(0);
	}

	// 回收僵死子进程的资源
	Signal(SIGCHLD, sigchld_handler);
	listenfd = Open_listenfd(argv[1]);
	while (1)
	{
		clientlen = sizeof(struct sockaddr_storage);
		connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
		// 父进程派生一个子进程来处理每个新的连接请求。
		if (Fork() == 0)
		{
			Close(listenfd); /* Child closes its listening socket */
			echo(connfd);  /* Child services client */
			// 父、子进程必须关闭它们各自的connfd
			Close(connfd); /* Child closes connection with client */
			exit(0);	   /* Child exits */
		}
		// 父、子进程必须关闭它们各自的connfd
		// 因为父、子进程中的已连接描述符都指向同一个文件表表项,所以父进程关闭它的已连接描述符的副本至关重要。
		// 否则,将永不会释放引起内存泄漏,最终消耗光可用的内存,使系统崩溃
		Close(connfd); /* Parent closes connected socket (important!) */

		// 因为套接字的文件表表项中的引用计数,直到父子进程的connfd都关闭了,到客户端的连接才会终止。
	}
}

进程的优劣

父、子进程间共享状态信息,进程有一个非常清晰的模型:共享文件表,但是不共享用户地址空间。进程间有独立的地址空间既是优点也是缺点。

优点:

  • 一个进程不可能不小心覆盖另一个进程的虚拟内存,这就消除了许多令人迷惑的错误;

缺点:

  • 独立的地址空间使得进程共享状态信息变得更加困难。为了共享信息,它们必须使用显式的IPC机制。
  • 另一个缺点是,它们往往比较,因为进程控制和IPC的开销很高。

Unix IPC通常指所有允许进程和同一台主机上其他进程进行通信的技术,包括:管道、先进先出、系统V共享内存、系统V信号量。

基于IO多路复用的并发编程

需求:echo服务器,也能对用户从标准输入键入的交互命令做出响应。

这种情况下,服务器必须响应两个互相独立的I/O事件:1)网络客户端发起连接请求,2)用户在键盘上键入命令行。先等待哪个事件呢?

解决办法就是I/O多路复用技术:使用select函数,要求内核挂起进程,只有在一个或多个I/O事件发生后,才将控制返回给应用程序。

select是是一个复杂的函数,有许多不同的使用场景。只讨论一种场景:等待一组描述符准备好读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include "csapp.h"
void echo(int connfd);
void command(void);

int main(int argc, char **argv)
{
    int listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    fd_set read_set, ready_set;

    if (argc != 2)
    {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }
    // 打开一个监听描述符
    listenfd = Open_listenfd(argv[1]);

    // 创建一个空的读集合
    FD_ZERO(&read_set);              /* Clear read set */
    // 添加标准输入
    FD_SET(STDIN_FILENO, &read_set); /* Add stdin to read set */
    // 添加监听描述符
    FD_SET(listenfd, &read_set);     /* Add listenfd to read set */

    while (1)
    {
        ready_set = read_set;
        // select函数,一直阻塞,直到监听描述符或者标准输入准备好可以读,返回ready_set
        Select(listenfd + 1, &ready_set, NULL, NULL, NULL);
        // 标准输入,调用command函数
        if (FD_ISSET(STDIN_FILENO, &ready_set))
            command(); /* Read command line from stdin */
        // 监听,调用accept函数
        if (FD_ISSET(listenfd, &ready_set))
        {
            clientlen = sizeof(struct sockaddr_storage);
            connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
            echo(connfd); /* Echo client input until EOF */
            Close(connfd);
        }

        // 不完善:一旦它连接到某个客户端,就会连续回送输入行,直到客户端关闭这个连接中它的那一端。输入标准输入不会得到响应。
        // 更好的方法是更细粒度的多路复用,服务器每次循环(至多)回送一个文本行。
    }
}

void command(void)
{
    char buf[MAXLINE];
    if (!Fgets(buf, MAXLINE, stdin))
        exit(0);       /* EOF */
    printf("%s", buf); /* Process the input command */
}

并发事件驱动服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/* 
 * echoservers.c - A concurrent echo server based on select
 */
#include "csapp.h"

// pool中维护着活动客户端的集合
typedef struct
{								 /* Represents a pool of connected descriptors */
	int maxfd;					 /* Largest descriptor in read_set */
	fd_set read_set;			 /* Set of all active descriptors */
	fd_set ready_set;			 /* Subset of descriptors ready for reading  */
	int nready;					 /* Number of ready descriptors from select */
	int maxi;					 /* Highwater index into client array */
	int clientfd[FD_SETSIZE];	 /* Set of active descriptors */
	rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
} pool;
void init_pool(int listenfd, pool *p);
void add_client(int connfd, pool *p);
void check_clients(pool *p);

int byte_cnt = 0; /* Counts total bytes received by server */

int main(int argc, char **argv)
{
	int listenfd, connfd;
	socklen_t clientlen;
	struct sockaddr_storage clientaddr;
	static pool pool;

	if (argc != 2)
	{
		fprintf(stderr, "usage: %s <port>\n", argv[0]);
		exit(0);
	}
	// 打开一个监听描述符
	listenfd = Open_listenfd(argv[1]);
	// 初始化pool
	init_pool(listenfd, &pool);

	while (1)
	{
		/* Wait for listening/connected descriptor(s) to become ready */
		pool.ready_set = pool.read_set;
		// 调用select函数来检测两种不同类型的输入事件
		pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);

		/* If listening descriptor ready, add new client to pool */
		// 事件 a)来自一个新客户端的连接请求到达,打开连接,添加到pool里
		if (FD_ISSET(listenfd, &pool.ready_set))
		{
			clientlen = sizeof(struct sockaddr_storage);
			connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
			add_client(connfd, &pool);
		}

		/* Echo a text line from each ready connected descriptor */
		// 事件 b)一个已存在的客户端的已连接描述符准备好可以读了
		// 把来自每个准备好的已连接描述符的一个文本行回送回去
		check_clients(&pool);
	}
}

// 初始化客户端池pool
void init_pool(int listenfd, pool *p)
{
	/* Initially, there are no connected descriptors */
	int i;
	p->maxi = -1;
	for (i = 0; i < FD_SETSIZE; i++)
		p->clientfd[i] = -1;

	/* Initially, listenfd is only member of select read set */
	p->maxfd = listenfd;
	FD_ZERO(&p->read_set);
	FD_SET(listenfd, &p->read_set);
}

// 添加一个新的客户端到活动客户端池中
void add_client(int connfd, pool *p)
{
	int i;
	p->nready--;
	for (i = 0; i < FD_SETSIZE; i++) /* Find an available slot */
		if (p->clientfd[i] < 0)
		{
			/* Add connected descriptor to the pool */
			p->clientfd[i] = connfd;
			Rio_readinitb(&p->clientrio[i], connfd);

			/* Add the descriptor to descriptor set */
			FD_SET(connfd, &p->read_set);

			/* Update max descriptor and pool highwater mark */
			if (connfd > p->maxfd)
				p->maxfd = connfd;
			if (i > p->maxi)
				p->maxi = i;
			break;
		}
	if (i == FD_SETSIZE) /* Couldn't find an empty slot */
		app_error("add_client error: Too many clients");
}

// 回送来自每个准备好的已连接描述符的一个文本行。
void check_clients(pool *p)
{
	int i, connfd, n;
	char buf[MAXLINE];
	rio_t rio;

	for (i = 0; (i <= p->maxi) && (p->nready > 0); i++)
	{
		connfd = p->clientfd[i];
		rio = p->clientrio[i];

		/* If the descriptor is ready, echo a text line from it */
		if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set)))
		{
			p->nready--;
			if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0)
			{
				byte_cnt += n;
				printf("Server received %d (%d total) bytes on fd %d\n",
					   n, byte_cnt, connfd);
				Rio_writen(connfd, buf, n);
			}

			/* EOF detected, remove descriptor from pool */
			else
			{
				Close(connfd);
				FD_CLR(connfd, &p->read_set);
				p->clientfd[i] = -1;
			}
		}
	}
}

IO多路复用的优劣

优点:

  • 比基于进程的设计给了程序员更多的对程序行为的控制。
  • 运行在单一进程上下文中的,使得在流之间共享数据变得很容易。可以用GDB调试像顺序程序那样。
  • 常常比基于进程的设计要高效得多,不需要进程上下文切换来调度新的流。

缺点:

  • 编码复杂。
  • 不能充分利用多核处理器。

基于线程的并发编程

线程(thread)就是运行在进程上下文中的逻辑流。线程由内核自动调度。

每个线程都有它自己的线程上下文,包括:一个唯一的整数线程ID、栈、栈指针、程序计数器、通用目的寄存器、条件码。

线程共享进程的整个虚拟地址空间,包括:代码、数据、堆、共享库、打开的文件。

线程执行模型

image-20210121150517802

线程执行不同于进程:

  • 线程的上下文要比进程的上下文小得多,切换要快得多。
  • 线程不是按照严格的父子层次来组织的,和一个进程相关的线程组成一个对等的线程池。
    • 主线程和其他线程的区别仅仅在于它总是进程中第一个运行的线程。
    • 一个线程可以杀死它的任何对等线程,或者等待它的任意对等线程终止。
    • 每个对等线程都能读写相同的共享数据。

Posix线程

Posix线程(Pthreads)是在C程序中处理线程的一个标准接口。它定义了大约60个函数,允许程序创建、杀死和回送线程,与对等线程安全地共享数据,还可以通知对等线程系统状态的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include "csapp.h"
void *thread(void *vargp);

int main()
{
    pthread_t tid;
    Pthread_create(&tid, NULL, thread, NULL);
    Pthread_join(tid, NULL);
    exit(0);
}

void *thread(void *vargp) /* thread routine */
{
    printf("Hello, world!\n");                 
    return NULL;
}
  • 创建线程:
    • pthread_create
  • 终止线程:
    • pthread_exit
    • pthread_cancel
  • 回收已终止线程的资源:
    • pthread_join
  • 分离线程:
    • pthread_detach
  • 初始化线程
    • pthread_once

基于线程的并发服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/* 
 * echoservert.c - A concurrent echo server using threads
 */
#include "csapp.h"

void echo(int connfd);
void *thread(void *vargp);

int main(int argc, char **argv)
{
    int listenfd, *connfdp;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    pthread_t tid;

    if (argc != 2)
    {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }
    listenfd = Open_listenfd(argv[1]);

    while (1)
    {
        clientlen = sizeof(struct sockaddr_storage);
        // 为了避免竞争:必须将accept返回的每个已连接描述符分配到它自己的动态分配的内存块。
        connfdp = Malloc(sizeof(int));
        *connfdp = Accept(listenfd, (SA *)&clientaddr, &clientlen);
        Pthread_create(&tid, NULL, thread, connfdp);
    }
}

void *thread(void *vargp)
{
    int connfd = *((int *)vargp);
    // 避免内存泄漏,不显示地收回线程,就必须分离每个线程,使得它在终止时它的内存资源能被收回。
    Pthread_detach(pthread_self());
    Free(vargp);
    echo(connfd);
    Close(connfd);
    return NULL;
}

多线程程序中的共享变量

一个变量是共享的,当且仅当多个线程引用这个变量的某个实例。

基本问题:

  • 线程的基础内存模型是什么?
  • 根据这个模型,变量实例是如何映射到内存的?
  • 有多少线程引用这些实例?

示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "csapp.h"
#define N 2
void *thread(void *vargp);

char **ptr; // 全局变量

int main()
{
    int i;
    pthread_t tid;
    char *msgs[N] = {
        "Hello from foo",
        "Hello from bar"};

    ptr = msgs;
    // 创建N个对等线程
    for (i = 0; i < N; i++)
        // 主线程传递一个唯一的ID给每个对等线程
        Pthread_create(&tid, NULL, thread, (void *)i);
    Pthread_exit(NULL);
}

void *thread(void *vargp)
{
    // 每个对等线程利用主线程传递的唯一ID输出一条个性化的信息
    int myid = (int)vargp; // 本地自动变量
    static int cnt = 0;    // 本地静态变量,共享变量
    // 不同的线程栈是不对其他线程设防的。
    // 对等线程直接通过全局变量ptr间接引用主线程的栈的内容。
    printf("[%d]: %s (cnt=%d)\n", myid, ptr[myid], ++cnt);
    return NULL;
}

线程内存模型

一组并发线程运行在一个进程的上下文中。每个线程都有它自己独立的线程上下文,包括线程ID、栈、栈指针、程序计数器、条件码和通用目的寄存器。每个线程和其他线程一起共享进程上下文的剩余部分,这包括整个用户虚拟地址空间,它是由只读文本(代码)、读/写数据、堆以及所有的共享库代码和数据区域组成的。线程也共享相同的打开文件的集合。

从实际操作的角度来说,让一个线程去读或写另一个线程的寄存器值是不可能的。另一方面,任何线程都可以访问共享虚拟内存的任意位置。如果某个线程修改了一个内存位置,那么其他每个线程最终都能在它读这个位置时发现这个变化。因此,寄存器是从不共享的,而虚拟内存总是共享的。

各自独立的线程栈的内存模型不是那么整齐清除的。这些栈被保存在虚拟地址空间的栈区域中,并且通常是被相应的线程独立地访问的。说通常而不是总是,是因为不同的线程栈是不对其他线程设防的。所以,如果一个线程以某种方式得到一个指向其他线程栈的指针,那么它就可以读写这个栈的任何部分。

将变量映射到内存

变量根据它们的存储类型被映射到虚拟内存:

  • 全局变量。定义在函数之外的变量。运行时,虚拟内存的读/写区域只包含每个全局变量的一个实例,任何线程都可以引用。
  • 本地自动变量。定义在函数内部但是没有static属性的变量。运行时,每个线程的栈都包含它自己的所有本地自动变量的实例。
  • 本地静态变量。定义在函数内部并有static属性的变量。和全局变量一样,虚拟内存的读/写区域只包含程序中声明的每个本地静态变量的一个实例。

共享变量

说一个变量v是共享的,当且仅当它的一个实例被一个以上的线程引用。

用信号量同步线程

共享变量十分方便,但是它们引入了同步错误(synchronization error)的可能性。

错误的并发计数程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* 
 * badcnt.c - An improperly synchronized counter program 
 */
/* WARNING: This code is buggy! */
#include "csapp.h"

void *thread(void *vargp); /* Thread routine prototype */

/* Global shared variable */
volatile long cnt = 0; /* Counter */

int main(int argc, char **argv)
{
    long niters;
    pthread_t tid1, tid2;

    /* Check input argument */
    if (argc != 2)
    {
        printf("usage: %s <niters>\n", argv[0]);
        exit(0);
    }
    niters = atoi(argv[1]);

    /* Create threads and wait for them to finish */
    Pthread_create(&tid1, NULL, thread, &niters);
    Pthread_create(&tid2, NULL, thread, &niters);
    Pthread_join(tid1, NULL);
    Pthread_join(tid2, NULL);

    /* Check result */
    if (cnt != (2 * niters))
        printf("BOOM! cnt=%ld\n", cnt);
    else
        printf("OK cnt=%ld\n", cnt);
    exit(0);
}

void *thread(void *vargp)
{
    long i, niters = *((long *)vargp);
    for (i = 0; i < niters; i++)
        cnt++;

    return NULL;
}

for循环的汇编代码:

image-20210121161641185

关键点:

  • 一般而言,没有办法预测操作系统是否将为你的线程选择一个正确的顺序。
  • 有的顺序会产生一个不正确的cnt值。

进度图

image-20210121162044353

操作共享变量cnt内容的指令(Li, Ui, Si)构成一个临界区,这个临界区不应该和其他进程的临界区交替指向。

信号量

解决同步不同执行线程问题的方法:信号量(semaphore)。

信号量s是具有非负整数值的全局变量,只能由两种特殊的操作来处理P和V:

  • P(s):如果s是非零的,那么将s减1,并且立即返回。如果s为零,那么就挂起这个线程,直到s变为非零,而一个V操作会重启这个线程,在重启之后,P操作将s减1,并将控制返回给调用者。
  • V(s):V操作将s加1。如果有任何线程阻塞在P操作等待s变成非零,那么V操作会重启这些线程中的一个,然后该线程将s减1,完成它的P操作。

P和V中的操作不可分割。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <semaphore.h>

void P(sem_t *sem) 
{
    if (sem_wait(sem) < 0)
	unix_error("P error");
}

void V(sem_t *sem) 
{
    if (sem_post(sem) < 0)
	unix_error("V error");
}

使用信号量来实现互斥

对共享变量的互斥访问:将每个共享变量与一个信号量s联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。叫做二元信号量,值总是0或者1。互斥锁。

image-20210121163129178

利用信号量来调度共享资源

生产者-消费者问题、读者-写者问题。

生产者-消费者问题

生产者线程 -> 有限的缓冲区 -> 消费者线程。

  • 插入和取出都涉及更新共享变量,所以必须保证对缓冲区的访问是互斥的。但是只保证互斥访问是不够的,还需要调度对缓冲区的访问。
  • 如果缓冲区是满的,那么生产者必须等待直到有一个槽位变为可用。
  • 如果缓冲区是空的,那么消费者必须等待直到有一个项目变为可用。

现实应用:

  • 多媒体系统,生产者编码视频帧,消费者解码并在屏幕上呈现出来。缓冲区的目的是为了减少视频流的抖动。

用来构造生产者-消费者程序的包SBUF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
typedef struct {
    int *buf;          /* Buffer array */         
    int n;             /* Maximum number of slots */
    int front;         /* buf[(front+1)%n] is first item */
    int rear;          /* buf[rear%n] is last item */
    sem_t mutex;       /* Protects accesses to buf */
    sem_t slots;       /* Counts available slots */
    sem_t items;       /* Counts available items */
} sbuf_t;

/* Create an empty, bounded, shared FIFO buffer with n slots */
void sbuf_init(sbuf_t *sp, int n)
{
    sp->buf = Calloc(n, sizeof(int)); 
    sp->n = n;                       /* Buffer holds max of n items */
    sp->front = sp->rear = 0;        /* Empty buffer iff front == rear */
    Sem_init(&sp->mutex, 0, 1);      /* Binary semaphore for locking */
    Sem_init(&sp->slots, 0, n);      /* Initially, buf has n empty slots */
    Sem_init(&sp->items, 0, 0);      /* Initially, buf has zero data items */
}

/* Clean up buffer sp */
void sbuf_deinit(sbuf_t *sp)
{
    Free(sp->buf);
}

/* Insert item onto the rear of shared buffer sp */
void sbuf_insert(sbuf_t *sp, int item)
{
    P(&sp->slots);                          /* Wait for available slot */
    P(&sp->mutex);                          /* Lock the buffer */
    sp->buf[(++sp->rear)%(sp->n)] = item;   /* Insert the item */
    V(&sp->mutex);                          /* Unlock the buffer */
    V(&sp->items);                          /* Announce available item */
}

/* Remove and return the first item from buffer sp */
int sbuf_remove(sbuf_t *sp)
{
    int item;
    P(&sp->items);                          /* Wait for available item */
    P(&sp->mutex);                          /* Lock the buffer */
    item = sp->buf[(++sp->front)%(sp->n)];  /* Remove the item */
    V(&sp->mutex);                          /* Unlock the buffer */
    V(&sp->slots);                          /* Announce available slot */
    return item;
}

读者-写者问题

一组并发的线程要访问一个共享对象,例如一个主存中的数据结构,或者一个磁盘上的数据库。有些线程只读对象(读者),而其他的线程只修改对象(写者)。写者必须拥有对对象的独占的访问,而读者可以和无限多个其他的读者共享对象。

现实应用:

  • 在线航空预定系统中,允许有无限多个客户同时查看座位分配,但是正在预定座位的客户必须拥有对数据库的独占的访问。
  • 多线程缓存Web代理中,无限多个线程可以从共享页面缓存中取出已有的页面,但是任何向缓存中写入一个新页面的线程必须拥有独占的访问。

读者-写者问题变种,优先级不同:

  • 第一类:读者优先,要求不要让读者等待,除非已经把使用对象的权限赋予了一个写者。换句话说,读者不会因为有一个写者在等待而等待。
  • 第二类:写者优先,要求一旦一个写者准备好可以写,它就会尽可能快地完成它的写操作。

基于预线程化的并发服务器

image-20210121204241055

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/* 
 * echoservert_pre.c - A prethreaded concurrent echo server
 */
#include "csapp.h"
#include "sbuf.h"
// 处理线程数
#define NTHREADS 4
// 缓存大小
#define SBUFSIZE 16

void echo_cnt(int connfd);
void *thread(void *vargp);

sbuf_t sbuf; /* Shared buffer of connected descriptors */

int main(int argc, char **argv)
{
    int i, listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    pthread_t tid;

    if (argc != 2)
    {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }
    listenfd = Open_listenfd(argv[1]);

    // 初始化缓冲区
    sbuf_init(&sbuf, SBUFSIZE);
    // 创建一组工作者线程
    for (i = 0; i < NTHREADS; i++) /* Create worker threads */
        Pthread_create(&tid, NULL, thread, NULL);

    // 处理线程,循环接受连接请求,将得到的已连接描述符插入到缓冲区sbuf中。
    while (1)
    {
        clientlen = sizeof(struct sockaddr_storage);
        connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
        sbuf_insert(&sbuf, connfd); /* Insert connfd in buffer */
    }
}

// 工作线程
void *thread(void *vargp)
{
    Pthread_detach(pthread_self());
    while (1)
    {
        // 等待直到能从缓冲区取出一个已连接描述符
        int connfd = sbuf_remove(&sbuf); /* Remove connfd from buffer */ //line:conc:pre:removeconnfd
        // 调用echo_cnt函数回送客户端的输入
        echo_cnt(connfd); /* Service client */
        Close(connfd);
    }
}

使用线程提高并行性

同步开销巨大,要尽可能避免。如果无可避免,必须要用尽可能多的有用计算弥补这个开销。

其他并发问题

线程安全

一个函数被称为线程安全的,当且仅当被多个并发线程反复地调用时,它会一直产生正确的结果。

线程不安全函数类:

  • 不保护共享变量的函数。
  • 保持跨越多个调用的状态的函数。
  • 返回指向静态变量的指针的函数。
  • 调用线程不安全函数的函数。

可重入性

可重入函数(reentrant function):当被多个线程调用时,不会引用任何共享数据。是线程安全函数的一个真子集。

  • 显示可重入的:所有的函数参数都是值传递的,没有指针;
  • 隐式可重入的:允许一些参数是引用传递的,如果调用线程小心地传递指向非共享数据的指针,它是可重入的。

库函数

常见的线程不安全的库函数:

线程不安全函数 线程不安全类 Linux线程安全版本
rand 2 rand_r
strtok 2 strtok_r
asctime 3 asctime_r
ctime 3 ctime_r
gethostbyaddr 3 gethostbyaddr_r
gethostbyname 3 gethostbyname_r
inet_ntoa 3 (无)
localtime 3 localtime_r

竞争

当一个程序的正确性依赖于一个线程要在另一个线程到达y点之前到达它的控制流中的x点时,就会发生竞争。

死锁

image-20210122094623931

  • 程序员使用P和V操作顺序不当,以至于两个信号量的禁止区域重叠。换句话说,程序死锁是因为每个线程都在等待其他线程执行一个根本不可能发生的V操作。
  • 避免死锁:给定所有互斥操作的一个全序,如果每个线程都是以一种顺序获得互斥锁并以相反顺序释放,那么这个程序就是无死锁的。