loading

匿名管道与命名管道

  • Home
  • Blog
  • 匿名管道与命名管道

匿名管道与命名管道

匿名管道与命名管道

一,进程间通信什么是进程间通信进程间通信的目的管道的概念

二,匿名管道匿名管道的创建匿名管道使用匿名管道的特性以及四种场景匿名管道的原理通过匿名管道实现简易进程池。

三,命名管道命名管道的创建命名管道的使用命名管道实现简易的进程池

管道总结

一,进程间通信

什么是进程间通信

🚀进程间通信就是让两个进程进行交流,但是我们知道进程具有独立性,每个进程OS都会为其维护一个pcb,一个进程中的数据在另一个进程中是看不到的,那怎么实现进程间通信的呢? 🚀两个进程要想通信,首先它们需要看到同一份资源,其实就是操作系统出面,为它们提供了一块公共的资源,就是一段内存,可能以文件的方式提供,也可能就是提供的原始的内存块。

进程间通信的目的

🚀数据传输 :一个进程需要将它的数据发送给另一个进程。 🚀资源共享 :多个进程之间共享同样的资源。 🚀通知事件 :一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。 🚀进程控制 :有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

管道的概念

🚀 管道是一种古老的进程间通信的方式,最早出现于Unix系统种。我们把一个进程连接到另一个进程的一个数据流称为一个“管道”。 🚀当前使用过Linux操作系统的应该都使用过管道,例如,我们用两个指令的组合来算出一个文本有多少行。

cat test.txt | wc -l

🚀管道就是OS通过以文件的形式,给两个进程创建一份共享资源。其实就是两个进程公用一块内核缓冲区,一个进程从里面读取数据每一个进程往其中写数据,进而达到通信的效果。

二,匿名管道

匿名管道的创建

🚀首先介绍一个系统接口pipe

int pipe(int pipefd[2]);

这个系统调用的功能就是创建一个匿名管道,注意到它的参数是一个int类型的数组,数组有两个元素,其实这是两个输出型参数pipefd[0] 是以读方式打开管道文件返回的文件描述符,pipefd[1] 是以写方式打开管道文件返回的文件描述符。 如果创建成功0会被返回,反之-1被返回。 🚀创建了匿名管道之后,再通过fork系统调用创建子进程,这样子进程就会继承来自父进程的文件描述符表,那么子进程也会看到这个管道文件,从而达到两个进程看到同一份资源。对于一个进程来说只能从这一管道文件读或者是写,不能既读又写,所以我们创建子进程后要关闭父进程与子进程的多余的文件描述符。

匿名管道使用

一般步骤: 🚀使用pipe创建管道文件 🚀使用fork创建子进程 🚀父子进程关掉相应的文件描述符 🚀父子进程间实现通信 🚀通信结束后关掉文件描述符 🚀对于命名管道还要使用unlink将磁盘文件删除

#include

#include

#include

#include

#include

#include

int main()

{

// 1.创建管道

int pipe_fd[2] = {0};

int res = pipe(pipe_fd);

if (res < 0)

{

perror("pipe");

exit(errno);

}

// 2.创建子进程

pid_t id = fork();

if (id < 0)

{

perror("fork");

exit(errno);

}

if (id == 0) // child

{

// 3.关闭相应的fd

close(pipe_fd[0]);

int cnt = 0;

std::string str = "我是子进程我正在向管道中写";

char buffer[1024] = {'\0'};

while (true)

{

//char ch = 'Y';

snprintf(buffer, sizeof(buffer), "cnt = %d,%s", cnt++, str.c_str());

//std::cout << "子进程正在写第" << cnt++ << "次" << std::endl;

int n = write(pipe_fd[1], buffer, strlen(buffer));

//t n = write(pipe_fd[1], &ch, 1);

assert(n >= 0);

(void)n;

sleep(1);

}

//5.通信结束后关掉相应的文件描述符

close(pipe_fd[1]);

exit(0);

}

// parent

// 3.关闭相应的fd

close(pipe_fd[1]);

//4.进程间通信

char buffer[1024] = {'\0'};

while (true)

{

int n = read(pipe_fd[0], buffer, sizeof(buffer) - 1);

assert(n >= 0);

(void)n;

std::cout << "我是父进程,我从子进程中读取的数据是: " << buffer << std::endl;

//sleep(20);

sleep(1);

}

//5.通信结束后关掉相应的文件描述符

close(pipe_fd[0]);

return 0;

}

这段代码就是严格按照上面的五个步骤,其中由于管道是单项通信的,这段代码中子进程保留了写端,父进程保留了读端。达到了父子进程间的数据传输。

匿名管道的特性以及四种场景

特点: 🚀管道实现的是单向通信,具有半双工的特点。 🚀管道的本质是文件,由于fd的声明周期是随进程的—>管道的声明周期是随进程的

管道文件是一个内存级的缓冲区,不会将数据刷新到磁盘上。其本质就是在内核中创建的struct file结构体,以读方式打开管道文件的进程将数据或者是命令写入struct file中的内核级缓冲区,以读方式打开管道文件的进程,从缓冲区中,将数据读出去。每个文件都有预期对应的inode结构体,而inode结构体中有两个计数器-i_count-i_link,这两个分别是针对内存上和磁盘上的,当有进程打开文件的时候i_count就会加1,而close的时候会减1,当减到0的时候在内核中创建的struct file结构体就会被OS回收掉,而i_link是指这个文件的连接数,使用unlink函数或者指令可以使i_link这个计数器减1,同样的在减到0的时候,这个文件就会被在磁盘上删除。而对于匿名管道如果所有的进程都切断了与struct file的关系,也就是i_count减为0,此时struct file会被OS回收,并且在磁盘上的inode结构体等也会被删除。

🚀匿名管道通信通常是有血缘关系的进程间通信的,例如父子进程,兄弟进程等。

匿名管道之所以能够通信就是因为,子进程继承了父进程的文件描述表,从而看到了相同的struct file结构体,进而能够进行进程间的通信,那么爷孙进程能够通信吗?答案是肯定的,父进程中的文件描述表继承自它的父进程,子进程又继承了父进程的文件描述符表,所以爷孙进程是可以看到同一份资源的,进而爷孙进程间是可以通信的。 对于兄弟进程,由于兄弟进程都继承了来自它们父进程的文件描述符表,所以它们同样可以看大一份相同的资源,从而可以实现进程间的通信。

🚀在管道通信中,写入的次数与读取的次数不是严格匹配的,表现为字节流。 🚀管道具有一定的协同能力,能让读端与写端按照一定的规则进程通信,是自带同步机制的。

管道通信的四种场景 🚀如果读端读完了所有数据,写端没有继续写,那么读端只能等待。

就用上面那份代码做一个实验:上面的代码中子进程是写端父进程是读端,我们让子进程写完一次数据后休眠十秒,这期间让父进程一直从管道中读取数据。

父进程每读出一条数据够要等上几秒中才能从管道中读取出下一条数据,这就很好的验证了,如果读端将管道中的数据都读取了出来,而写端并没有写入,这一期间读端会一直等待。

🚀如果读端不读写端一直写入,那么将管道写满后就不能再写入了。

让子进程每次写入一个字节的数据,而父进程休眠上30秒,在这期间已经足够让子进程把管道写满,同时还可以在子进程写入时记录一下次数。

可以看到,子进程写入了65000多次的时候,就不能再写入了说明此时管道已经被写满了,同时也从侧面证明了管道的大小大约是4KB。

🚀如果关闭了写端,读取完毕管道数据,再读就会返回0,表面那个读到了文件结尾。

可以看到,杀死写端的子进程后,父进程在读取的完管道中的数据后,read的返回值变为0,并且重复读到的都是管道中最后的那条数据。

🚀如果写端一直写,读端关闭,那么写端的进程就会被OS杀死,因为OS不会维护没有意义,低效率或者是浪费资源的事情。简单说就是如果读端关闭那么OS会杀死写端的进程。

如果匿名管道的读端关闭,那么OS会给写端的进程发送SIGPIPE这个信号杀死写端进程,为了验证这一结论,父进程读取一条数据后休眠10秒休眠后close掉读端的fd,然后waitpid等待子进程读取子进程获取到的信号。

while (true)

{

int n = read(pipe_fd[0], buffer, sizeof(buffer) - 1);

assert(n >= 0);

(void)n;

std::cout << "read的返回值:" << n << "我是父进程,我从子进程中读取的数据是: " << buffer << std::endl;

sleep(10);

break;

//sleep(1);

}

//5.通信结束后关掉相应的文件描述符

close(pipe_fd[0]);

int status = 0;

waitpid(id,&status,0);

std::cout << "singal : " << (status & 0x7f) << std::endl;

当读端关闭后,OS会给写端的进程发送13号SIGPIPE信号杀死该进程。

匿名管道的原理

🚀匿名管道就是一块内核缓冲区(在struct file内)。 🚀并且匿名管道文件对应的struct file是不会进行刷盘操作。

进程间通信就是要让不同的进程看到相同的资源,而使用匿名管道这种通信方式,是子进程继承了来自父进程的文件描述符表,从而和父进程看到了同一块资源,达到进程间通信的效果。

通过匿名管道实现简易进程池。

🚀让父进程通过fork系统调用创建多个子进程,并且父进程与每个子进程间都会通过匿名管道来进行进程间通信,通过匿名管道父进程给子进程发配任务信号,从而使子进程完成相应的任务。 步骤: 🚀首先循环创建出多个匿名管道和多个子进程 🚀将每个子进程与某个匿名管道建立起联系 🚀父进程关闭相应读端,子进程关闭相应写端 🚀通过进程间通信达到父进程对子进程的控制 🚀使用结束后关闭相应的文件描述符

在Task.hpp中创建相应的任务。

#pragma once

#include

#include

#include

typedef void (*func_t)();

void PrintLog()

{

std::cout << " pid : " << getpid() << "打印日志任务,正在被执行..." << std::endl;

}

void InsertMysql()

{

std::cout << " pid : " << getpid() << "访问数据库任务,正在被执行..." << std::endl;

}

void NetRequest()

{

std::cout << " pid : " << getpid() << "访问网络的任务,正在被执行..." << std::endl;

}

#define COMMAND_LOG 0

#define COMMAND_MYSQL 1

#define CONMAND_REQUEST 2

#define QUIT 3

class Task

{

public:

Task()

{

functions.push_back(PrintLog);

functions.push_back(InsertMysql);

functions.push_back(NetRequest);

}

void Execute(int index)

{

functions[index]();

}

~Task()

{

}

private:

std::vector functions;

};

在ctrlProcess.cpp中完成进程的控制

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include "Task.hpp"

const int num = 6;

Task t;

class EndPoint

{

public:

int write_fd;

pid_t child_pid;

std::string _name;

public:

EndPoint(int fd, pid_t id)

: write_fd(fd), child_pid(id)

{

char namebuffer[64];

snprintf(namebuffer, sizeof(namebuffer), "Process-%d[%d-%d]", number++, child_pid, write_fd);

_name = namebuffer;

}

std::string getname() const

{

return _name;

}

std::string getname()

{

return _name;

}

~EndPoint()

{

}

private:

static int number;

};

int EndPoint::number = 1;

void WaitCommand()

{

while (true)

{

int command = 0;

int n = read(0, &command, sizeof(command));

if (n > 0)

{

t.Execute(command);

}

else if (n == 0)

{

break;

}

else

{

break;

}

sleep(1);

}

}

void CreateProcess(std::vector &end_points)

{

std::vector close_array;

for (int i = 0; i < num; ++i)

{

int pipe_fd[2] = {0};

int n = pipe(pipe_fd);

if (n < 0)

{

perror("pipe");

exit(errno);

}

pid_t id = fork();

if (id < 0)

{

perror("fork");

exit(errno);

}

if (id == 0)

{

// 先关掉从父进程那里继承得关于别的管道得写端

for (auto e : close_array)

{

close(e);

}

// child

close(pipe_fd[1]);

// 输入重定向

dup2(pipe_fd[0], 0);

WaitCommand();

close(pipe_fd[0]);

exit(0);

}

// parent

close(pipe_fd[0]);

end_points.push_back(EndPoint(pipe_fd[1], id));

close_array.push_back(pipe_fd[1]);

}

}

int select_command()

{

int command = 0;

std::cout << "#############################" << std::endl;

std::cout << "#0.PrintLog #1.InsertMysql#" << std::endl;

std::cout << "#2.NetRequest #3.Quit ###" << std::endl;

std::cout << "#############################" << std::endl;

std::cout << "Please select# ";

std::cin >> command;

return command;

}

void MakeProcess(std::vector &end_points)

{

int cnt = 0;

while (true)

{

// 1.选择任务

int command = select_command();

if (command == 3)

break;

if (command < 0 || command > 2)

continue;

// 2.选择进程---以轮询的方式

int index = cnt++;

cnt %= end_points.size();

std::cout << "父进程选择了" << end_points[index].getname() << " 处理任务" << std::endl;

// 3.下发任务

write(end_points[index].write_fd, &command, sizeof(command));

sleep(1);

}

}

void RecycleProcess(std::vector &end_points)

{

// 依次关闭父进程的写端,子进程就会退出

for (int i = 0; i < end_points.size(); i++)

{

close(end_points[i].write_fd);

int res = waitpid(end_points[i].child_pid, nullptr, 0);

std::cout << "父进程回收了 " << end_points[i].child_pid << " 子进程" << std::endl;

assert(res);

(void)res;

sleep(1);

}

}

int main()

{

srand((size_t)time(NULL));

std::vector end_points;

CreateProcess(end_points);

MakeProcess(end_points);

RecycleProcess(end_points);

return 0;

}

注意一个小问题,我在代码中,没fork出一个子进程后,都要先关掉它从父进程中继承的多余的文件描述符,如果不这样做那么会在你关闭父进程写端的时候会出现bug。下面来解释一下:

父进程在创建第一个子进程的时候,子进程1会继承第一个管道的读端和写端,然后父进程关闭掉管道1的读端,子进程1关掉管道1的写端。但是父进程在创建第2个管道的时候,第二个子进程不仅会继承了管道2的读写端,还会继承了管道1的写端,同样子进程3会进程了管道1的写端和管道2的写端。这样的话在想结束进程前,关闭父进程的写端,使对应子进程对应得管道得写端关闭那么子进程读完管道内得所有数据后就会退出,但是在父进程关闭管道1得写端得时候,就会出现问题,不仅父进程是管道1得写端,子进程2和子进程3都是父进程得写端,导致waitpid这段代码一直待等待子进程1的退出,而一直卡在这。

int res = waitpid(end_points[i].child_pid, nullptr, 0);

🚀解决的方法就是在创建一个新的子进程的时候,首先关掉继承自父进程关于其他管道的写端的文件描述符,达到一个匿名管道文件的写端只有父进程。

演示:

选择任务始于用户交互式的,发配任务是让子进程轮询的去执行内务的。

三,命名管道

命名管道的创建

🚀由于匿名管道只支持有血缘关系的进程实现通信,难道两个毫不相干的进程就无法实现通信嘛?答案是否定的,没有血缘关系的两个进程可以通过命名管道的方式实现进程间的通信。 🚀命名管道与匿名管道的区别就是命名管道有名字,可以被你看到,可以通过mkfifo指令创建一个命名管道。

值得注意的是即使你向管道文件中写了数据,你仍然会看到,管道文件的大小是0,这是因为命名管道文件在磁盘上没有其对应的data block数据块,意味着它里面的数据都是在内存上的不会冲刷到磁盘上。

🚀使用mkfifo函数创建命名管道

int mkfifo(const char *pathname, mode_t mode);

第一个参数是命名管道的名字,第二个参数是创建命名管道的权限。如果创建成功返回0,否则返回-1。 注意: 第二个参数的权限是要与umask掩码运算后得到最终的权限的。

命名管道的使用

🚀首先创建出命名管道 🚀一个进程以读方式打开该管道,另一个进程以写方式打开该管道 🚀实现进程间通信 🚀通信结束后关闭相应的文件描述符 🚀在关闭文件描述符后使用unlink函数删除管道文件(关闭文件描述符是让其i_count计数器为0,使用unlink是让其i_link计数器为0 ,是这个文件在磁盘上被删除)

comm.hpp

#pragma once

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

using namespace std;

string file_name = "fifo";

server.cpp

#include "comm.hpp"

int main()

{

// 1.创建命名管道

int n = mkfifo(file_name.c_str(), 0664);

if (n == -1)

{

cerr << errno << strerror(errno) << endl;

return 1;

}

// 2.打开管道文件

int res = open(file_name.c_str(), O_RDONLY);

if (res == -1)

{

perror("open");

return 2;

}

// 3.通信

while (true)

{

char buffer[1024];

int ret = read(res, buffer, sizeof(buffer) - 1);

// int ret = read(res, &buffer[0], sizeof(char));

if (ret > 0)

{

buffer[ret] = '\0';

cout << buffer << endl;

// fflush(stdout);

}

else if (ret == 0)

{

cout << "写端关闭" << endl;

break;

}

else

break;

}

// 4.关闭命名管道

close(res);

unlink(file_name.c_str());

return 0;

}

client.cpp

#include "comm.hpp"

int main()

{

// 客户端打卡命名管道文件

int n = open(file_name.c_str(), O_WRONLY);

if (n == -1)

{

perror("open");

return 3;

}

//通信

char buffer[1024];

memset(buffer, '\0', sizeof(buffer));

while (true)

{

cout << "请输入# ";

fgets(buffer, sizeof(buffer), stdin);

buffer[strlen(buffer) - 1] = '\0';

write(n, buffer, strlen(buffer));

// system("stty raw");

// int c = getchar();

// system("stty -raw");

// write(n, (char *)&c, sizeof(char));

}

// 关闭文件

close(n);

return 0;

}

这段代码实现了客户端输入消息,在服务端可以接收到消息。

命名管道实现简易的进程池

大体的思想与匿名管道实现进程池的思想一致,就是在创建管道,和使不同的进程与管道建立联系的方式不同。 首先在头文件中定义了三个管道文件的名字,在主进程中首先创建这三个管道,并且以写方式打开管道,在其他三个次进程中,都以读方式打开相应的进程,例如Process1就打开1.pipe管道文件。通过主进程给此进程发送任务编号,次进程从管道中读取到信息后,去执行相应的任务。最后在主进程中关闭掉所有的写端,则在次进程中对应的管道写端关闭在其read完所有数据后就会退出,也会关闭掉相应的文件描述符,最终在主进程中使用unlink函数删除掉所有的管道文件。

comm.hpp

#pragma once

#include

#include

#include

#include

#include

#include

#include

#include

using namespace std;

const int num = 6;

string S[num] =

{

"1.pipe",

"2.pipe",

"3.pipe"};

// vector s(3);

// void init_s()

// {

// s[0] = "1.pipe";

// s[1] = "2.pipe";

// s[2] = "3.pipe";

// }

typedef void (*func_t)();

void PrintLog()

{

std::cout << " pid : " << getpid() << "打印日志任务,正在被执行..." << std::endl;

}

void InsertMysql()

{

std::cout << " pid : " << getpid() << "访问数据库任务,正在被执行..." << std::endl;

}

void NetRequest()

{

std::cout << " pid : " << getpid() << "访问网络的任务,正在被执行..." << std::endl;

}

#define COMMAND_LOG 1

#define COMMAND_MYSQL 2

#define CONMAND_REQUEST 3

#define QUIT 0

class Task

{

public:

Task()

{

functions.push_back(PrintLog);

functions.push_back(InsertMysql);

functions.push_back(NetRequest);

}

void Execute(int index)

{

functions[index]();

}

~Task()

{

}

private:

std::vector functions;

};

ctrlProcess.cpp

#include "comm.hpp"

class EndPoint

{

public:

EndPoint(int fd, string name)

: _write_fd(fd), _pipename(name)

{

}

public:

int _write_fd;

string _pipename;

};

int select_command()

{

int command = 0;

std::cout << "#############################" << std::endl;

std::cout << "#0.PrintLog #1.InsertMysql#" << std::endl;

std::cout << "#2.NetRequest #3.Quit ###" << std::endl;

std::cout << "#############################" << std::endl;

std::cout << "Please select# ";

std::cin >> command;

return command;

}

int main()

{

// 1.打开相应的管道。

for (int i = 0; i < 3; i++)

{

int n = mkfifo(S[i].c_str(), 0664);

if (n == -1)

{

perror("mkfifo");

return 1;

}

}

vector end_points;

// 2.打开相应文件

for (int i = 0; i < 3; i++)

{

int n = open(S[i].c_str(), O_WRONLY);

if (n == -1)

{

perror("open");

return 1;

}

end_points.push_back(EndPoint(n, string(S[i])));

}

cout << "size:----------" << end_points.size() << endl;

// 通信

int cnt = 0;

while (true)

{

// 1.选择任务

int command = select_command();

if (command == 3)

break;

if (command < 0 || command > 2)

continue;

// 2.选择进程

int index = cnt++;

// cnt %= end_points.size();

cnt %= 3;

// 3.下发任务

write(end_points[index]._write_fd, &command, sizeof(int));

sleep(1);

}

// 关闭

for (int i = 0; i < end_points.size(); i++)

{

close(end_points[i]._write_fd);

}

// 删除pipe文件

for (int i = 0; i < end_points.size(); i++)

{

unlink(end_points[i]._pipename.c_str());

}

std::cout << "已经彻底删除了pipe文件" << std::endl;

return 0;

}

Process1.cpp

#include "comm.hpp"

int main()

{

Task t;

// 打开管道文件

int n = open(S[0].c_str(), O_RDONLY);

if (n == -1)

{

perror("open");

return 2;

}

// 读取任务编号

int command = 0;

while (true)

{

int res = read(n, &command, sizeof(command));

if (res > 0)

{

t.Execute(command);

}

else if (res == 0)

{

cout << "写端关闭 " << endl;

break;

}

else

break;

}

close(n);

return 0;

}

Process2.cpp

#include "comm.hpp"

int main()

{

Task t;

// 打开管道文件

int n = open(S[1].c_str(), O_RDONLY);

if (n == -1)

{

perror("open");

return 3;

}

// 读取任务编号

int command = 0;

while (true)

{

int res = read(n, &command, sizeof(command));

if (res > 0)

{

t.Execute(command);

}

else if (res == 0)

{

cout << "写端关闭 " << endl;

break;

}

else

break;

}

close(n);

return 0;

}

Process3.cpp

#include "comm.hpp"

int main()

{

Task t;

// 打开管道文件

int n = open(S[2].c_str(), O_RDONLY);

if (n == -1)

{

perror("open");

return 4;

}

// 读取任务编号

int command = 0;

while (true)

{

int res = read(n, &command, sizeof(command));

if (res > 0)

{

t.Execute(command);

}

else if (res == 0)

{

cout << "写端关闭 " << endl;

break;

}

else

break;

}

close(n);

return 0;

}

演示:

管道总结

🚀管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。 🚀如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。 🚀命名管道是一种特殊类型的文件。 🚀匿名管道由pipe函数创建并打开。 🚀命名管道由mkfifo函数创建,打开用open。 🚀FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。