C++中IO多路复用(select、poll、epoll)的实现

来自:网络
时间:2024-06-09
阅读:

什么是IO多路复用

I/O多路复用(IO multiplexing)是一种并发处理多个I/O操作的机制。它允许一个进程或线程同时监听多个文件描述符(如套接字、管道、标准输入等)的I/O事件,并在有事件发生时进行处理。

传统的I/O模型中,通常使用阻塞I/O和非阻塞I/O来处理单个I/O操作。如果需要同时处理多个I/O操作,那么需要使用多个线程或多个进程来管理和执行这些I/O操作。这种方式会导致系统资源的浪费,且编程复杂度较高。

而I/O多路复用通过提供一个统一的接口,如selectpollepoll等,来同时监听多个文件描述符的I/O事件。它们会在任意一个文件描述符上有I/O事件发生时立即返回,并告知应用程序哪些文件描述符有事件发生。应用程序可以根据返回的结果来针对有事件发生的文件描述符进行读取、写入或其他操作。

I/O多路复用的优点包括:

  • 单个进程或线程可以同时处理多个I/O操作,提高了系统的并发性。
  • 避免了大量的进程或线程切换,节约了系统资源
  • 使用较少的线程或进程,简化了编程模型和维护工作。

IO多路复用的方式简介

主要的 I/O 多路复用方式有以下几种:

  • selectselect 是最早的一种 I/O 多路复用方式,可以同时监听多个文件描述符的可读、可写和异常事件。通过在调用 select 时传递关注的文件描述符集合,及时返回有事件发生的文件描述符,然后应用程序可以对这些文件描述符进行读写操作。

  • pollpoll 是 select 的一种改进版,也能够同时监听多个文件描述符的可读、可写和异常事件。通过调用 poll 时传递关注的文件描述符数组,返回有事件发生的文件描述符,应用程序执行对应的读写操作。

  • epollepoll 是 Linux 特有的一种 I/O 多路复用机制,相较于 select 和 poll 具有更高的性能,适用于高并发环境。epoll 使用了回调机制来通知应用程序文件描述符上的事件发生,并且支持水平触发(LT,level triggered)和边缘触发(ET,edge triggered)两种模式。

select方式

select 是一种 I/O 多路复用的机制,用于同时监听多个文件描述符的可读、可写和异常事件。它是最早的一种实现,适用于多平台。select几乎在所有的操作系统上都可用,并且拥有相似的接口和语义。这使得应用程序在多个平台上能够以相似的方式使用 select

select运行原理

select 函数在阻塞过程中,主要依赖于一个名为 fd_set 的数据结构来表示文件描述符集合。通过向 select 函数传递待检测的 fd_set 集合,可以指定需要检测哪些文件描述符。fd_set 结构一般是通过使用宏函数以及相关操作进行初始化和处理。

fd_set 结构可以用于传递三种不同类型的文件描述符集合,包括读缓冲区、写缓冲区和异常状态。通过将文件描述符放入相应的集合中,程序员可以选择性地检查特定类型的事件或操作。通过使用传出变量,程序员可以获取与就绪状态对应的文件描述符集合,并相应地处理与就绪内容相关的操作。

下面两张图展示了select函数在运行时的逻辑(读缓冲区为例)

C++中IO多路复用(select、poll、epoll)的实现

C++中IO多路复用(select、poll、epoll)的实现

select函数使用方法

select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);
  • nfds:需要监视的最大文件描述符加1,即待监视的文件描述符的最大值加1。
  • readfds:可读性检查的文件描述符集合。
  • writefds:可写性检查的文件描述符集合。
  • exceptfds:异常条件的文件描述符集合。
  • timeout:最长等待时间,也可以设置为 NULL,表示一直阻塞直到有事件发生。

函数返回值如下:

  • 大于 0:返回值为有事件发生的文件描述符的总数。
  • 0:表示超时,没有事件发生。
  • -1:出错,可以通过查看全局变量 errno 来获取错误码。

一些值得注意的小细节:

  • nfds 的值必须是所有待监视文件描述符中最大的值加1。
  • 在某些平台上,select 的文件描述符集大小有可能有限制。
  • 调用 select 会阻塞等待,直到有事件发生,这会导致效率问题。
  • 在多个线程中使用 select 可能需要使用互斥锁来保护传递的文件描述符集。

操作fd_set的API:

       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);

1. FD_ZERO(fd_set *set):清空指定的文件描述符集合 set,将其所有位都置为0。

2. FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到文件描述符集合 set 中,相应的位将被置为1。

3. FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从文件描述符集合 set 中移除,相应的位将被清零(置为0)。

4. FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在文件描述符集合 set 中,如果存在,则返回非零值(true);否则,返回零值(false)。

实例

下面是一个利用select实现的客户端与服务器端相互传输的简单示例:

服务器端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>

using namespace std;

int main() // 基于多路复用select函数实现的并行服务器
{
    // 1 创建监听的fd
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    // 2 绑定
    struct sockaddr_in addr; // struct sockaddr_in是用于表示IPv4地址的结构体,它是基于struct sockaddr的扩展。
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9997);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr *)&addr, sizeof(addr));

    // 3 设置监听
    listen(lfd, 128);

    // 将监听的fd的状态交给内核检测
    int maxfd = lfd;
    // 初始化检测的读集合
    fd_set rdset;
    fd_set rdtemp;
    // 清零
    FD_ZERO(&rdset);
    // 将监听的lfd设置到集合当中
    FD_SET(lfd, &rdset);

    // 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
    // 如果有数据, select解除阻塞返回
    while (1)
    {

        rdtemp = rdset;
        int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);

        // 判断连接请求还在不在里面,如果在,则运行accept
        if (FD_ISSET(lfd, &rdtemp))
        {
            struct sockaddr_in cliaddr;
            int cliaddrLen = sizeof(cliaddr);
            int cfd = accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&cliaddrLen);

            // 得到了有效的客户端文件描述符,将这个文件描述符放入读集合当中,并更新最大值
            FD_SET(cfd, &rdset);
            maxfd = cfd > maxfd ? cfd : maxfd;
        }

        // 如果没有建立新的连接,那么就直接通信
        for (int i = 0; i < maxfd + 1; i++)
        {
            if (i != lfd && FD_ISSET(i, &rdtemp))
            {

                // 接收数据,一次接收10个字节,客户端每次发送100个字节,下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
                //  	循环会一直持续, 知道缓冲区数据被读完位置
                char buf[10] = {0};
                int len = read(i, buf, sizeof(buf));
                cout << "len=" <<len<< endl;

                if (len == 0) // 客户端关闭了连接,,因为如果正好读完,会在select过程中删除
                {
                    printf("客户端关闭了连接.....\n");
                    // 将该文件描述符从集合中删除
                    FD_CLR(i, &rdset);
                    close(i);
                }
                else if (len > 0) // 收到了数据
                {
                    // 发送数据
                    if (len > 2)
                    {
                        write(i, buf, strlen(buf) + 1);
                        cout << "写了一次" << endl;
                        sleep(0.1);
                    }
                }
                else
                {
                    // 异常
                    perror("read");
                    FD_CLR(i, &rdset);
                }
            }
        }
    }

    return 0;
}

客户端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>

using namespace std;


int main()  //网络通信的客户端
{
    // 1 创建用于通信的套接字
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        perror("socket");
        exit(0);
    }

    // 2 连接服务器
    struct sockaddr_in addr;
    addr.sin_family=AF_INET; //ipv4
    addr.sin_port=htons(9997);// 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr);
    int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr));
    if(ret==-1)
    {
        perror("connect");
        exit(0);
    }

    //通信
    while (1)
    {
        //读数据
        char recvBuf[1024];
        //写数据
        fgets(recvBuf,sizeof(recvBuf),stdin);
        write(fd,recvBuf,strlen(recvBuf)+1);
        int oriLen=strlen(recvBuf)-1;

        cout<<"strlen(recvBuf)="<<oriLen<<endl;
        
        int total_get=0;
        while (read(fd,recvBuf,sizeof(recvBuf)))
        {
            total_get+=10;
            cout<<"total_get="<<total_get<<"          strlen(recvBuf)="<<oriLen<<endl;
            printf("recv buf: %s\n", recvBuf);
            if (total_get>=oriLen)
            {
                cout<<"out"<<endl;
                break;
            }
            
            

        }
        
        
        sleep(1);
    }

    close(fd);

    return 0;
}

注意的点

在服务器端中,调用select函数时,因为select函数会将检测的结果写回fd_set,所以如果不做其他操作的话,写回的数据会覆盖掉最初的fd_set,造成错误。所以我们在调用select函数之前可以将fd_set暂时先赋给一个临时变量,如下:

fd_set rdset;
fd_set rdtemp;


rdtemp = rdset;
int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);

代码整体工程、在以上内容中加入线程和线程池实现通信的版本可参考:GitHub - BanLi-Official/CppSelect

poll方式

poll方式运行原理

poll 函数是一种 I/O 多路复用机制,类似于 select 函数,但相比 select 更加高效和灵活。poll 通过轮询方式,在用户空间和内核空间之间进行交互。与 select 不同的是,poll 可以支持更大的文件描述符集合,且不会有文件描述符数量限制的问题。同时poll与select不同,select有跨平台的特点,而poll只能在Linux上使用。

poll函数使用方法

poll函数原型如下:

#include <poll.h>

struct pollfd
  {
    int fd;			/* File descriptor to poll.  */
    short int events;		/* Types of events poller cares about.  */
    short int revents;		/* Types of events that actually occurred.  */
  };


int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds一个指向 struct pollfd 结构体数组的指针,用于指定待监视的文件描述符及其感兴趣的事件。每个 struct pollfd 结构包含一个文件描述符 fd 和一个短整型 events,用于指定关注的事件类型。revents 字段在 poll 返回时被内核修改,用于指示发生的事件类型。

  • nfds:表示 fds 数组的大小,即待监视的文件描述符数量。

  • timeout:指定阻塞等待的时间(以毫秒为单位)

poll 函数会阻塞,直到以下三种情况之一发生:

  • 有一个或多个文件描述符准备好监听的事件。
  • 指定的超时时间到达。
  • 发生一个错误。

函数返回值如下:

poll 函数返回一个正整数表示就绪的文件描述符数量,或者返回以下几种特定的值:

  • 返回大于 0 的整数:表示有文件描述符就绪的数量。可以通过遍历监视的文件描述符集合,检查 revents 字段来确定哪些文件描述符具体就绪。
  • 返回 0:表示在无限等待模式下超时,即指定的超时时间到达,但没有文件描述符就绪。
  • 返回 -1:表示发生错误,可以使用 errno 变量获取具体的错误代码。

值得注意的一些小细节:

  poll 函数返回后,struct pollfd 结构中的 revents 字段会被修改,以指示每个文件描述符发生的事件类型。可以通过遍历 struct pollfd 数组,在 revents 字段中检查位来判断每个文件描述符的具体就绪事件。在处理 poll 的返回值时,通常的做法是使用 if 或 switch 语句根据每个文件描述符的 revents 值来执行相应的操作,例如读取数据、写入数据、处理异常等。

实例

下面是一个利用poll实现的客户端与服务器端相互传输的简单示例:

服务器端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>
#include <poll.h>

using namespace std;

int main() // 基于多路复用select函数实现的并行服务器
{
    // 1 创建监听的fd
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    // 2 绑定
    struct sockaddr_in addr; // struct sockaddr_in是用于表示IPv4地址的结构体,它是基于struct sockaddr的扩展。
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9995);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr *)&addr, sizeof(addr));

    // 3 设置监听
    listen(lfd, 128);

    // 将监听的fd的状态交给内核检测
    int maxfd = lfd;

    //创建文件描述符的队列
    struct pollfd myfd[100];
    for(int i=0;i<100;i++)
    {
        myfd[i].fd=-1;
        myfd[i].events=POLLIN;
    }

    myfd[0].fd=lfd;


    while (1)
    {
        //sleep(5);
        
        cout<<"poll等待开始"<<endl;
        int num=poll(myfd,maxfd+1,-1);
        cout<<"poll等待结束~"<<endl;

        // 判断连接请求还在不在里面,如果在,则运行accept

        if(myfd[0].fd && myfd[0].revents==POLLIN)
        {
            struct sockaddr_in cliaddr;
            int cliaddrLen = sizeof(cliaddr);
            int cfd = accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&cliaddrLen);

            // 得到了有效的客户端文件描述符,将这个文件描述符放入读集合当中,并更新最大值
            for(int i=0 ; i<1024 ;i++)//找到空的位置
            {
                if(myfd[i].fd==-1 && myfd[i].events==POLLIN)
                {
                    myfd[i].fd=cfd;
                    cout<<"连接成功!      fd放在了"<<i<<endl;
                    break;
                }

            }
            maxfd = cfd > maxfd ? cfd : maxfd;
            
        }

        // 如果没有建立新的连接,那么就直接通信


        for (int i = 0; i < maxfd + 1; i++)
        {
            if (myfd[i].fd && myfd[i].revents==POLLIN && i!=0)
            {

                // 接收数据,一次接收10个字节,客户端每次发送100个字节,下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
                //  	循环会一直持续, 知道缓冲区数据被读完位置
                char buf[10] = {0};
                cout<<"                  外读"<<endl;
                int len = read(myfd[i].fd, buf, sizeof(buf));
                cout<<"len="<<len<<"          i="<<i<<endl;
                if(len==0)  //外部中斷導致的連接中斷
                {
                    printf("客户端关闭了连接.....\n");
                    // 将该文件描述符从集合中删除
                    myfd[i].fd=-1;
                    break;
                }


                cout<<"Get read len="<<len<<endl;
                if (len == 0) // 客户端关闭了连接,,因为如果正好读完,会在select过程中删除
                {
                    printf("客户端关闭了连接.....\n");
                    // 将该文件描述符从集合中删除
                    myfd[i].fd=-1;
                    break;

                }
                else if (len > 0) // 收到了数据
                {
                    // 发送数据
                    if(len<=2)
                    {
                        cout<<"          out!!"<<endl;
                        break;
                    }
                    write(myfd[i].fd, buf, strlen(buf) + 1);
                    if(len<10)
                    {
                        cout<<"          out!!"<<endl;
                        break;
                    }
                    sleep(0.1);
                    cout<<"写了一次   寫的內容是:"<<string(buf)<<"###"<<endl;

                }
                else
                {
                    // 异常
                    perror("read");
                    myfd[i].fd=-1;
                    break;
                }


            }

        }
    }

    return 0;
}

客户端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>

using namespace std;


int main()  //网络通信的客户端
{
    // 1 创建用于通信的套接字
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        perror("socket");
        exit(0);
    }

    // 2 连接服务器
    struct sockaddr_in addr;
    addr.sin_family=AF_INET; //ipv4
    addr.sin_port=htons(9995);// 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr);
    int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr));
    if(ret==-1)
    {
        perror("connect");
        exit(0);
    }

    //通信
    while (1)
    {
        //读数据
        char recvBuf[1024];
        //写数据
        fgets(recvBuf,sizeof(recvBuf),stdin);
        write(fd,recvBuf,strlen(recvBuf)+1);

        int oriLen=strlen(recvBuf)-1;

        cout<<"strlen(recvBuf)="<<oriLen<<endl;
        
        int total_get=0;
        while (total_get<oriLen)
        {
            //cout<<"開始讀"<<endl;
            read(fd,recvBuf,sizeof(recvBuf));
            total_get+=10;
            //cout<<"total_get="<<total_get<<"          strlen(recvBuf)="<<oriLen<<endl;
            printf("recv buf: %s\n", recvBuf);
            if (total_get>=oriLen)
            {
                cout<<"out"<<endl;
                break;
            }
            
            

        }
        
        
        sleep(1);
    }

    close(fd);

    return 0;
}

整体工程与线程池版本可以参考:GitHub - BanLi-Official/CppPoll: C++ Network Programming: Linux Operating System Poll Example

epoll方式

epoll运行原理

epoll是Linux下的一种I/O 多路复用机制,可以高效地处理大量的并发连接。

epoll模型使用一个文件描述符(epoll fd)来管理多个其他文件描述符(event fd)。在epoll fd上注册了感兴趣的事件,当有感兴趣的事件发生时,epoll会通知应用程序。相比于传统的select和poll模型,epoll模型有以下几个优势:

  • 高效:在大规模并发连接的场景下,epoll模型可以显著提高效率。使用一个文件描述符来管理多个连接,避免了遍历所有连接的开销。并且epoll使用了“事件通知”的方式,只有在有事件发生时才会通知应用程序,避免了无效轮询。

  • 更快的响应速度:由于epoll是基于事件驱动的模型,在有事件发生时立即通知应用程序,可以更快地响应客户端的请求。

  • 可扩展性好:epoll模型采用了无锁设计,将连接集合的管理交给内核处理,并利用回调函数机制处理连接的读写事件,减少了锁竞争,提高了系统的可扩展性。

epoll使用红黑树来存储和管理注册的事件。红黑树是一种自平衡的二叉搜索树,具有以下特点:

  • 二叉搜索树的性质:红黑树是一棵二叉搜索树,即对于任意一个节点,其左子树的值都小于该节点的值,右子树的值都大于该节点的值。

  • 自平衡性:红黑树通过对节点进行一系列旋转和重新着色操作来保持树的平衡。具体来说,红黑树通过五个性质来保持平衡:根节点是黑色的、叶子节点(NIL节点)是黑色的、红色节点的两个子节点都是黑色的、从任一节点到其叶子节点的所有路径都包含相同数目的黑色节点、新插入的节点是红色的。

红黑树介绍可以参考百度百科:红黑树_百度百科

C++中IO多路复用(select、poll、epoll)的实现

在epoll模型中,当应用程序调用epoll_ctl函数注册事件时,epoll将会将文件描述符和其对应的事件信息存储到红黑树中,这样可以方便地查询和管理事件。红黑树的高效查询特性可以快速找到特定文件描述符对应的事件信息,并且可以保持事件信息的有序性。

当有事件发生时,epoll调用epoll_wait函数去查询红黑树上已注册的事件,如果有匹配的事件发生,就会通知应用程序进行处理。红黑树是epoll实现高效I/O多路复用的关键技术之一。通过使用红黑树,epoll可以将事件的查询、插入和删除等操作的时间复杂度降低到O(log n),使得在大规模并发连接的场景下也能够高效地处理事件。

epoll函数使用方法

在Linux下,epoll函数主要包括以下几个:

#include <sys/epoll.h>  //头文件

int epoll_create(int size);   //创建一个epoll实例

int epoll_ctl(int epfd, int op, 
              int fd, struct epoll_event *event);  //控制epoll上的事件

int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout); //阻塞等待事件发生

同时在这些参数中,有一个重要的数据结构epoll_event。epoll_event结构体用于描述事件,包括文件描述符、事件类型和事件数据。其中的定义如下:

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;

其中,events是事件类型,包括以下几种:

  • EPOLLIN:可读事件,表示连接上有数据可读。
  • EPOLLOUT:可写事件,表示连接上可以写入数据。
  • EPOLLPRI:紧急事件,表示连接上有紧急数据可读。
  • EPOLLRDHUP:连接关闭事件,表示连接已关闭。
  • EPOLLERR:错误事件,表示连接上发生错误。
  • EPOLLHUP:挂起事件,表示连接被挂起。

结构体中的epoll_data是一个联合体,用于在epoll_event结构体中传递事件数据。它有四个成员变量,可以根据具体的需求选择使用其中的一个。通常可以选择int类型的fd,用于存储发生对应事件的文件描述符

epoll_create函数:创建一个epoll fd,返回一个新的epoll文件描述符。参数size用于指定监听的文件描述符个数,但是在Linux 2.6.8之后的版本,该参数已经没有实际意义。传入一个大于0的值即可。

int epfd=epoll_create(1);

epoll_ctl函数:用于控制epoll事件的函数之一。它用于向epoll实例中添加、修改或删除关注的文件描述符和对应事件。函数原型如下:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数参数:

  • epfd:epoll文件描述符,通过epoll_create函数创建获得。
  • op:操作类型,可以是以下三种取值之一:
    • EPOLL_CTL_ADD:将文件描述符添加到epoll实例中。
    • EPOLL_CTL_MOD:修改已添加到epoll实例中的文件描述符的关注事件。
    • EPOLL_CTL_DEL:从epoll实例中删除文件描述符。
  • fd:要控制的文件描述符。
  • event:指向epoll_event结构体的指针,用于指定要添加、修改或删除的事件。

函数返回值:

  • 成功时返回0,表示操作成功。
  • 失败时返回-1,并设置errno错误码来指示具体错误原因。

epoll_wait函数:用于等待事件的发生。它会一直阻塞直到有事件发生或超时。函数原型如下:

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

函数参数:

  • epfd:epoll文件描述符,通过epoll_create函数创建获得。
  • events:用于接收事件的epoll_event结构体数组。
  • maxeventsevents数组的大小,表示最多可以接收多少个事件。
  • timeout:超时时间,单位为毫秒,表示epoll_wait函数阻塞的最长时间。常用的取值有以下三种:
    • -1:表示一直阻塞,直到有事件发生。
    • 0:表示立即返回,不管有没有事件发生。
    • > 0:表示等待指定的时间(以毫秒为单位),如果在指定时间内没有事件发生,则返回。

函数返回值:

  • 成功时返回接收到的事件的数量。如果超时时间为0并且没有事件发生,则返回0。
  • 失败时返回-1,并设置errno错误码来指示具体错误原因。

一些要注意的点:

在epoll_wait函数中用于接收事件的epoll_event结构体数组是一个传出参数,需要定义一个epoll_event的数组,比如:

    struct epoll_event evens[100];//用于接取传出的内容
    int len=sizeof(evens)/sizeof(struct epoll_event);

工作模式

epoll的工作模式可以分为两种:边缘触发(Edge Triggered, ET)模式和水平触发(Level Triggered, LT)模式。一般epoll运行的模式默认是水平触发模式。

水平模式 

有事件就一直不断通知(默认就是这个)

  • 当被监控的文件描述符上的状态发生变化时,epoll会不断通知应用程序,直到应用程序处理完事件并返回。
  • 如果应用程序没有处理完事件,而文件描述符上的状态再次发生变化,epoll会再次通知应用程序。
  • 应用程序可以使用阻塞或非阻塞I/O来处理事件。
  • 水平触发模式适合处理低并发的I/O场景。

实例

服务器端:

// server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <iostream>

using namespace std;

int main()
{
    // 1. 创建监听的套接字
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    if(lfd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 将socket()返回值和本地的IP端口绑定到一起
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9996);   // 大端端口
    // INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址
    // 这个宏可以代表任意一个IP地址
    // 这个宏一般用于本地的绑定操作
    addr.sin_addr.s_addr = INADDR_ANY;  // 这个宏的值为0 == 0.0.0.0
//    inet_pton(AF_INET, "192.168.8.161", &addr.sin_addr.s_addr);
    int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
        perror("bind");
        exit(0);
    }

    // 3. 设置监听
    ret = listen(lfd, 128);
    if(ret == -1)
    {
        perror("listen");
        exit(0);
    }

    int epfd=epoll_create(1);
    struct epoll_event even;
    even.events=EPOLLIN;  //用水平触发模式来检测
    even.data.fd=lfd;
    ret=epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&even);

    struct epoll_event evens[100];//用于接取传出的内容
    int len=sizeof(evens)/sizeof(struct epoll_event);

    while (1)
    {
        cout<<"                     開始等待!!!"<<endl;
        int num=epoll_wait(epfd,evens,len,-1);
        cout<<"                     等待結束!!!"<<"   num="<<num<<endl;
        for(int i=0;i<num;i++)//取出所有的检测到的事件
        {

            int curfd = evens[i].data.fd;
            if(evens[i].data.fd==lfd)
            {
                struct sockaddr_in *add;
                int len=sizeof(struct sockaddr_in);
                int cfd=accept(evens[i].data.fd,NULL,NULL);
                struct epoll_event even;
                even.events=EPOLLIN;
                even.data.fd=cfd;
                //将接收到的cfd放入epoll检测的红黑树当中
                ret=epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&even);
                if(ret==-1)
                {
                    cout<<"登錄失敗"<<endl;
                }
                else
                {
                    cout<<"登陸成功,已加入紅黑樹"<<endl;
                }
                
            }
            else
            {
                // 接收数据
                char buf[10];
                memset(buf, 0, sizeof(buf));
                cout<<"正在讀!!!!"<<endl;
                int len = read(evens[i].data.fd, buf, sizeof(buf));
                if(len > 0)
                {
                    // 发送数据
                    if(len<=2)
                    {
                        cout<<"          out!!"<<endl;
                        break;
                    }
                    printf("客户端say: %s\n", buf);
                    write(evens[i].data.fd, buf, len);
                    sleep(0.1);
                }
                else if(len  == 0)
                {
                    printf("客户端断开了连接...\n");
                    ret=epoll_ctl(epfd,EPOLL_CTL_DEL,evens[i].data.fd,NULL);        
                    close(curfd);
                    //break;
                }
                else
                {
                    perror("read");
                    ret=epoll_ctl(epfd,EPOLL_CTL_DEL,evens[i].data.fd,NULL);       
                    close(curfd); 
                    //break;
                }
            }

            

        }
    }
    



    return 0;
}

客户端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>

using namespace std;


int main()  //网络通信的客户端
{
    // 1 创建用于通信的套接字
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        perror("socket");
        exit(0);
    }

    // 2 连接服务器
    struct sockaddr_in addr;
    addr.sin_family=AF_INET; //ipv4
    addr.sin_port=htons(9996);// 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr);
    int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr));
    if(ret==-1)
    {
        perror("connect");
        exit(0);
    }

    //通信
    while (1)
    {
        //读数据
        char recvBuf[1024];
        //写数据
        fgets(recvBuf,sizeof(recvBuf),stdin);
        write(fd,recvBuf,strlen(recvBuf)+1);

        int oriLen=strlen(recvBuf)-1;

        cout<<"strlen(recvBuf)="<<oriLen<<endl;
        
        int total_get=0;
        while (total_get<oriLen)
        {
            //cout<<"開始讀"<<endl;
            char recvBuf2[1024];
            read(fd,recvBuf2,sizeof(recvBuf2));
            total_get+=10;
            cout<<"total_get="<<total_get<<"          strlen(recvBuf)="<<oriLen<<endl;
            printf("recv buf: %s\n", recvBuf2);
            if (total_get>=oriLen)
            {
                cout<<"out"<<endl;
                break;
            }
            
            

        }
        
        
        sleep(1);
    }

    close(fd);

    return 0;
}

边沿模式

有事件只通知一次,后续一次处理没解决玩的内容需要程序员自己解决

  • 仅当被监控的文件描述符上的状态发生变化时,epoll才会通知应用程序。
  • 当文件描述符上有数据可读或可写时,epoll会立即通知应用程序,并且保证应用程序能够全部读取或写入数据,直到读写缓冲区为空。
  • 应用程序需要使用非阻塞I/O来处理事件,以避免阻塞其他文件描述符的事件通知。
  • 边缘触发模式适合处理高并发的网络通信场景。

实例

服务器端:

// server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>

using namespace std;

int main()
{
    // 1. 创建监听的套接字
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    if (lfd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 将socket()返回值和本地的IP端口绑定到一起
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9996); // 大端端口
    // INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址
    // 这个宏可以代表任意一个IP地址
    // 这个宏一般用于本地的绑定操作
    addr.sin_addr.s_addr = INADDR_ANY; // 这个宏的值为0 == 0.0.0.0
                                       //    inet_pton(AF_INET, "192.168.8.161", &addr.sin_addr.s_addr);
    int ret = bind(lfd, (struct sockaddr *)&addr, sizeof(addr));
    if (ret == -1)
    {
        perror("bind");
        exit(0);
    }

    // 3. 设置监听
    ret = listen(lfd, 128);
    if (ret == -1)
    {
        perror("listen");
        exit(0);
    }

    int epfd = epoll_create(1);
    struct epoll_event even;
    even.events = EPOLLIN | EPOLLET; //使用边沿触发模式检测
    even.data.fd = lfd;
    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &even);

    struct epoll_event evens[100]; // 用于接取传出的内容
    int len = sizeof(evens) / sizeof(struct epoll_event);

    while (1)
    {
        cout << "                     開始等待!!!" << endl;
        int num = epoll_wait(epfd, evens, len, -1);
        cout << "                     等待結束!!!"
             << "   num=" << num << endl;
        for (int i = 0; i < num; i++) // 取出所有的检测到的事件
        {

            int curfd = evens[i].data.fd;
            if (evens[i].data.fd == lfd)
            {
                struct sockaddr_in *add;
                int len = sizeof(struct sockaddr_in);
                int cfd = accept(evens[i].data.fd, NULL, NULL);
                // 将这个文件标识符改为非阻塞模式
                int flag = fcntl(cfd, F_GETFL); // 获取该文件描述符的状态标志
                flag = O_NONBLOCK;              // 设置为 O_NONBLOCK,即非阻塞模式。
                fcntl(cfd, F_SETFL, flag);      // 将新的状态标志设置为非阻塞模式。
                struct epoll_event even;
                even.events = EPOLLIN | EPOLLET;//使用边沿触发模式检测
                even.data.fd = cfd;
                // 将接收到的cfd放入epoll检测的红黑树当中
                ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &even);
                if (ret == -1)
                {
                    cout << "登錄失敗" << endl;
                }
                else
                {
                    cout << "登陸成功,已加入紅黑樹" << endl;
                }
            }
            else
            {
                // 接收数据
                char buf[10];
                memset(buf, 0, sizeof(buf));
                cout << "正在讀!!!!" << endl;
                while (1) // 应对Epoll的ET模式而用的循环read,read要将文件标识符改为非阻塞版本
                {
                    int len = read(evens[i].data.fd, buf, sizeof(buf));
                    if (len > 0)
                    {
                        // 发送数据
                        if (len <= 2)
                        {
                            cout << "          out!!" << endl;
                            break;
                        }
                        printf("客户端say: %s\n", buf);
                        write(evens[i].data.fd, buf, len);
                        sleep(0.1);
                    }
                    else if (len == 0)
                    {
                        printf("客户端断开了连接...\n");
                        ret = epoll_ctl(epfd, EPOLL_CTL_DEL, evens[i].data.fd, NULL);
                        close(curfd);
                        break;
                    }
                    else
                    {

                        perror("read");
                        //ret = epoll_ctl(epfd, EPOLL_CTL_DEL, evens[i].data.fd, NULL);
                        //close(curfd);
                        if (errno == EAGAIN)
                        {
                            cout << "接收完毕!" << endl;
                            break;
                        }
                        // break;
                    }
                }
            }
        }
    }

    return 0;
}

客户端:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>

using namespace std;


int main()  //网络通信的客户端
{
    // 1 创建用于通信的套接字
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        perror("socket");
        exit(0);
    }

    // 2 连接服务器
    struct sockaddr_in addr;
    addr.sin_family=AF_INET; //ipv4
    addr.sin_port=htons(9996);// 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET,"127.0.0.1",&addr.sin_addr.s_addr);
    int ret=connect(fd,(struct sockaddr*)&addr,sizeof(addr));
    if(ret==-1)
    {
        perror("connect");
        exit(0);
    }

    //通信
    while (1)
    {
        //读数据
        char recvBuf[1024];
        //写数据
        fgets(recvBuf,sizeof(recvBuf),stdin);
        write(fd,recvBuf,strlen(recvBuf)+1);

        int oriLen=strlen(recvBuf)-1;

        cout<<"strlen(recvBuf)="<<oriLen<<endl;
        
        int total_get=0;
        while (total_get<oriLen)
        {
            //cout<<"開始讀"<<endl;
            char recvBuf2[1024];
            read(fd,recvBuf2,sizeof(recvBuf2));
            total_get+=10;
            cout<<"total_get="<<total_get<<"          strlen(recvBuf)="<<oriLen<<endl;
            printf("recv buf: %s\n", recvBuf2);
            if (total_get>=oriLen)
            {
                cout<<"out"<<endl;
                break;
            }
            
            

        }
        
        
        sleep(1);
    }

    close(fd);

    return 0;
}

边沿模式需要注意的点

由于边沿模式只通知一次事件发生,所以当我们服务器端接收来自客户端的较为长的内容时,可能会出现,一次无法完全接收的情况。而边沿模式又只通知一次,所以此时没读取完的内容可能无法及时读取。为了应对这个问题,我们可以采取循环接收的方法,如:

while (1) // 应对Epoll的ET模式而用的循环read,
{
   int len = read(evens[i].data.fd, buf, sizeof(buf));
   if (len > 0)
   {
       // 发送数据               
   }
   else if (len == 0)
   {
        printf("客户端断开了连接...\n");
        break;
   }
   else
   {

        perror("read");
        break;
   }
}

应用程序在处理事件时需要使用非阻塞I/O,确保能够立即处理事件并避免阻塞其他事件的通知。需要注意将被监控的文件描述符设置为非阻塞状态,以确保事件的及时处理。可以使用fcntl函数的O_NONBLOCK标志来将文件描述符设置为非阻塞模式。(因为如果不设置为非阻塞模式的话,服务器端在循环读取客户端发来的内容时,如果读完了内容,应用程序就会阻塞在read函数部分)将其设置为非阻塞模式后,我们在读取完内容之后,就可以根据read返回的EAGAIN错误(接收缓冲区为空时会报)来跳出循环。设置方式如下:

int cfd = accept(evens[i].data.fd, NULL, NULL);


// 将这个文件标识符改为非阻塞模式
int flag = fcntl(cfd, F_GETFL); // 获取该文件描述符的状态标志
flag = O_NONBLOCK;              // 设置为 O_NONBLOCK,即非阻塞模式。
fcntl(cfd, F_SETFL, flag);      // 将新的状态标志设置为非阻塞模式。

struct epoll_event even;
even.events = EPOLLIN | EPOLLET;  //使用边沿触发模式检测读缓冲区
even.data.fd = cfd;

// 将接收到的cfd放入epoll检测的红黑树当中
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &even);

退出时判断的EAGAIN错误,存在erron.h库中。errno(error number)是C语言标准库(C Standard Library)提供的一个全局变量,用于表示上一次发生的错误代码。errno库提供了一些宏定义和函数,用于获取和处理错误代码。需要注意的是,errno是全局变量,在多线程环境下需要注意线程安全。如:

while (1) // 应对Epoll的ET模式而用的循环read,
{
   int len = read(evens[i].data.fd, buf, sizeof(buf));
   if (len > 0)
   {
       // 发送数据               
   }
   else if (len == 0)
   {
        printf("客户端断开了连接...\n");
        break;
   }
   else
   {
        perror("read");
        if (errno == EAGAIN) //判断是否读取完毕
        {
             cout << "接收完毕!" << endl;
             break;
        }
   }
}

整体工程与线程池版本可以参考:https://github.com/BanLi-Official/CppEpoll

参考资料

感谢苏丙榅大佬的教程

Linux 教程 | 爱编程的大丙

爱编程的大丙的个人空间-爱编程的大丙个人主页-哔哩哔哩视频

红黑树_百度百科

(C++通讯架构学习笔记):epoll介绍及原理详解_c++ epoll-CSDN博客

C++ select模型详解(多路复用IO)-CSDN博客

C++网络编程select函数原理详解_c++ select-CSDN博客

返回顶部
顶部