学习在 Linux 中进程是如何与其他进程进行同步的。
本篇是 Linux 下进程间通信(IPC)系列的第二篇文章。第一篇文章 聚焦于通过共享文件和共享内存段这样的共享存储来进行 IPC。这篇文件的重点将转向管道,它是连接需要通信的进程之间的通道。管道拥有一个写端用于写入字节数据,还有一个读端用于按照先入先出的顺序读入这些字节数据。而这些字节数据可能代表任何东西:数字、员工记录、数字电影等等。
管道有两种类型,命名管道和无名管道,都可以交互式的在命令行或程序中使用它们;相关的例子在下面展示。这篇文章也将介绍内存队列,尽管它们有些过时了,但它们不应该受这样的待遇。
在本系列的第一篇文章中的示例代码承认了在 IPC 中可能受到竞争条件(不管是基于文件的还是基于内存的)的威胁。自然地我们也会考虑基于管道的 IPC 的安全并发问题,这个也将在本文中提及。针对管道和内存队列的例子将会使用 POSIX 推荐使用的 API,POSIX 的一个核心目标就是线程安全。
请查看一些 mq_open 函数的 man 页,这个函数属于内存队列的 API。这个 man 页中有关 特性 的章节带有一个小表格:
上面的 MT-Safe(MT 指的是多线程)意味着 mq_open 函数是线程安全的,进而暗示是进程安全的:一个进程的执行和它的一个线程执行的过程类似,假如竞争条件不会发生在处于相同进程的线程中,那么这样的条件也不会发生在处于不同进程的线程中。MT-Safe 特性保证了调用 mq_open 时不会出现竞争条件。一般来说,基于通道的 IPC 是并发安全的,尽管在下面例子中会出现一个有关警告的注意事项。
无名管道
首先让我们通过一个特意构造的命令行例子来展示无名管道是如何工作的。在所有的现代系统中,符号 | 在命令行中都代表一个无名管道。假设我们的命令行提示符为 % ,接下来考虑下面的命令:
## 写入方在 | 左边,读取方在右边 % sleep 5 | echo "Hello, world!"
sleep 和 echo 程序以不同的进程执行,无名管道允许它们进行通信。但是上面的例子被特意设计为没有通信发生。问候语 “Hello, world!” 出现在屏幕中,然后过了 5 秒后,命令行返回,暗示 sleep 和 echo 进程都已经结束了。这期间发生了什么呢?
在命令行中的竖线 | 的语法中,左边的进程(sleep )是写入方,右边的进程(echo )为读取方。默认情况下,读取方将会阻塞,直到从通道中能够读取到字节数据,而写入方在写完它的字节数据后,将发送 流已终止的标志。(即便写入方过早终止了,一个流已终止的标志还是会发给读取方。)无名管道将保持到写入方和读取方都停止的那个时刻。
在上面的例子中,sleep 进程并没有向通道写入任何的字节数据,但在 5 秒后就终止了,这时将向通道发送一个流已终止的标志。与此同时,echo 进程立即向标准输出(屏幕)写入问候语,因为这个进程并不从通道中读入任何字节,所以它并没有等待。一旦 sleep 和 echo 进程都终止了,不会再用作通信的无名管道将会消失然后返回命令行提示符。
下面这个更加实用的示例将使用两个无名管道。我们假定文件 test.dat 的内容如下:
this is the way the world ends
下面的命令:
% cat test.dat | sort | uniq
会将 cat (连接的缩写)进程的输出通过管道传给 sort 进程以生成排序后的输出,然后将排序后的输出通过管道传给 uniq 进程以消除重复的记录(在本例中,会将两次出现的 “the” 缩减为一个):
ends is the this way world
下面展示的情景展示的是一个带有两个进程的程序通过一个无名管道通信来进行通信。
示例 1. 两个进程通过一个无名管道来进行通信
#include <sys/wait.h> /* wait */ #include <stdio.h> #include <stdlib.h> /* exit functions */ #include <unistd.h> /* read, write, pipe, _exit */ #include <string.h> -
#define ReadEnd 0 #define WriteEnd 1 -
void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /** failure **/ } -
int main() { int pipeFDs[2]; /* two file descriptors */ char buf; /* 1-byte buffer */ const char* msg = "Nature's first green is gold\n"; /* bytes to write */ -
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD"); pid_t cpid = fork(); /* fork a child process */ if (cpid < 0) report_and_exit("fork"); /* check for failure */ -
if (0 == cpid) { /*** child ***/ /* child process */ close(pipeFDs[WriteEnd]); /* child reads, doesn't write */ -
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */ write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */ -
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */ _exit(0); /* exit and notify parent at once */ } else { /*** parent ***/ close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */ -
write(pipeFDs[WriteEnd], msg, [strlen][8](msg)); /* write the bytes to the pipe */ close(pipeFDs[WriteEnd]); /* done writing: generate eof */ -
wait(NULL); /* wait for child to exit */ [exit][7](0); /* exit normally */ } return 0; }
上面名为 pipeUN 的程序使用系统函数 fork 来创建一个进程。尽管这个程序只有一个单一的源文件,在它正确执行的情况下将会发生多进程的情况。
下面的内容是对库函数 fork 如何工作的一个简要回顾:
|