高级IO(Linux)

五种IO模型

什么是IO?
就拿 read系统调用来说,从缓冲区中读取数据;首先要保证缓冲区有数据,若没有,操作就会被阻塞,也就是等待资源就绪;若有,将数据拷贝完之后直接返回;所以,IO分为两部分:等待资源就绪+拷贝数据;其中等待资源就绪在整个IO过程中占比非常大,如何降低等待的占比,也就是需要学习的目的

举个栗子:钓鱼的故事
张三,一个非常固执的钓鱼佬,在钓鱼的时候,只盯着鱼竿,身边发生什么都不在乎;
李四,一个随心所欲的钓鱼佬,在等待鱼上钩的时候,做着其他的事情,是不是地观察一个鱼竿,有没有鱼上钩;
王五,一个“非常懒”的钓鱼佬,把鱼竿放好,在上面安装一个报警器,一个有鱼上钩,直接把杆;
赵六,一个“多金”的钓鱼佬,每次都拿上十只鱼竿来钓鱼,没有都忙的不亦乐乎;
田七,一个爱吃鱼不爱钓鱼的有钱人,雇了一个小王,让他去钓鱼;

这五个人的钓鱼方式可以类比为五种IO方式:
张三-阻塞式IO,两耳不闻窗外事,一心只观水中浮
李四-非阻塞IO,闲来没事看两眼
王五-信号驱动IO,报警之后,立刻提杆
赵六-多路转接/多路复用
田七-异步IO

在整个钓鱼的过程中,鱼就是数据,河就是内核空间,鱼漂表明事件就绪,鱼竿就是文件描述符,钓鱼的动作也就是系统调用操作

钓鱼的人,等的占比越低,单位时间,钓鱼的效率就越高

多路转接/多路复用是高级IO的原因就是等待资源就绪的占比低

高级IO重要概念

同步通信 vs 异步通信

同步和异步关注的是消息通信机制

  • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果
  • 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用

多进程多线程中, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不想干的概念

  • 进程/线程同步也是进程/线程之间直接的制约关系
  • 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候

阻塞 vs 非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程

非阻塞IO

fcntl

一个文件描述符, 默认都是阻塞IO

int fcntl(int fd, int cmd, ... /* arg */ );

fcntl函数有5种功能

  • 复制一个现有的描述符(cmd=F_DUPFD)
  • 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
  • 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
  • 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
  • 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞

实现函数SetNoBlock

基于fcntl, 我们实现一个SetNoBlock函数, 将文件描述符设置为非阻塞

void SetNonBlock(int fd)
{
    int fl=fcntl(fd,F_GETFL);
    if(fl<0)
    {
        std::cerr<<"fcntl: "<<strerror(errno)<<std::endl;
        return ;
    }
    fcntl(fd,F_SETFL,fl|O_NONBLOCK);
}

轮询方式读取标准输入

void SetNonBlock(int fd)
{
    int fl = fcntl(fd, F_GETFL);
    if (fl < 0)
    {
        std::cerr << "fcntl: " << strerror(errno) << std::endl;
        return;
    }
    fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

int main()
{
    SetNonBlock(0);
    char buffer[1024];
    while (true)
    {
        printf(">>>> ");
        fflush(stdout);
        ssize_t s = read(0, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s - 1] = 0;
            std::cout << "echo# " << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "read end" << std::endl;
            break;
        }
        else
        {
        }
        sleep(1);
    }
}

在这里插入图片描述

结果与预期的一致,不过这里还存在一个问题,当s<0时,数据读取失败,打印结果又是怎么样的呢?

在这里插入图片描述

资源准备未就绪

I/O多路转接之select

初识select

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select函数原型

select的函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

参数解释

  • 参数nfds是需要监视的最大的文件描述符值+1
  • rdset,wrset,exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合,其本质是位图结构
  • 参数timeout为结构timeval,用来设置select()的等待时间

参数timeout取值

  • NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件
  • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生
  • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回

关于fd_set结构

在这里插入图片描述

其实这个结构就是一个整数数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图

// 用来清除描述词组set中相关fd 的位
void FD_CLR(int fd, fd_set *set); 
// 用来测试描述词组set中相关fd 的位是否为真
int FD_ISSET(int fd, fd_set *set); 
// 用来设置描述词组set中相关fd的位
void FD_SET(int fd, fd_set *set); 
// 用来清除描述词组set的全部位
void FD_ZERO(fd_set *set); 

关于timeval结构

timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0

函数返回值

  • 执行成功则返回文件描述词状态已改变的个数
  • 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
  • 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测

错误值可能是:

  • EBADF 文件描述词为无效的或该文件已关闭
  • EINTR 此调用被信号所中断
  • EINVAL 参数n 为负值
  • ENOMEM 核心内存不足

三级目录

理解select执行过程

取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描
述符fd。则1字节长的fd_set最大可以对应8个fd;
作为输入时:表示用户告知内核,关心一下,集合中所有的fd事件;
比特位的位置,表示fd的数值;比特位的内容,表示是否关心

作为输出时:内核告知用户,所关心的多个fd中,有哪些已经就绪;
比特位的位置,表示fd的数值;比特位的内容,表示fd对应的事件已经就绪

socket就绪条件

读就绪

  • socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0
  • socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0
  • 监听的socket上有新的连接请求
  • socket上有未处理的错误

写就绪

  • socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0
  • socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号
  • socket使用非阻塞connect连接成功或失败之后
  • socket上有未读取的错误

select使用示例

Tcpserver:只处理读取,只获取数据的server
首先,监听端口可以交付给select,监听端口的连接就绪事件,其实就是读事件就绪

err.hpp

enum{
    USAGE_ERR=1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR
};

log.hpp

#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4

const char*to_levelstr(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARNING: return "WARNING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
        default: return nullptr;
    }
}

void logMessage(int level,const char* format,...)
{
#define NUM 1024
    char logprefix[NUM];
    snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid:%d]",
    to_levelstr(level),(long int)time(nullptr),getpid());

    char logcontent[NUM];
    va_list arg;
    va_start(arg,format);
    vsnprintf(logcontent,sizeof(logcontent),format,arg);
    std::cout<<logprefix<<logcontent<<std::endl;
}

Sock.hpp

class Sock
{
    const static int backlog=30;
public:
    //创建socket文件套接字对象
    static int Socket()
    {
        int sock=socket(AF_INET,SOCK_STREAM,0);
        if(sock<0)
        {
            logMessage(FATAL,"创建套接字文件对象失败!");
            exit(SOCKET_ERR);
        }
        logMessage(NORMAL,"创建套接字文件对象成功: %d",sock);
        //设置地址复用
        int opt=1;
        setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
        return sock;
    }

    //绑定自己的网络信息
    static void Bind(int sock,int port)
    {
        struct sockaddr_in local;
        memset(&local,0,sizeof(local));
        local.sin_family=AF_INET;
        local.sin_port=htons(port);
        local.sin_addr.s_addr=INADDR_ANY;
        if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
        {
            logMessage(FATAL,"绑定失败!");
            exit(BIND_ERR);
        }
        logMessage(NORMAL,"绑定成功!");
    }

    //设置socket为监听状态
    static void Listen(int sock)
    {
        if(listen(sock,backlog)<0)
        {
            logMessage(FATAL,"监听失败!");
            exit(LISTEN_ERR);
        }
        logMessage(NORMAL,"监听成功!");
    }

    static int Accept(int listensock,std::string*clientip,uint16_t*clientport)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sock=accept(listensock,(struct sockaddr*)&peer,&len);
        if(sock<0)
        {
            logMessage(ERROR,"接收失败!");
        }
        else
        {
            logMessage(NORMAL,"接收一个新连接,sock: %d",sock);
            *clientip=inet_ntoa(peer.sin_addr);
            *clientport=ntohs(peer.sin_port);
        }
        return sock;
    }
};

SelectServer.hpp

namespace select_yjm
{
    static const int defaultport=8082;
    static const int fdnum=sizeof(fd_set)*8;
    static const int defaultfd=-1;

    using func_t =std::function<std::string(const std::string&)>;

    class SelectServer
    {
    public:
        SelectServer(func_t f,int port=defaultport)
            :_func(f)
            ,_port(port)
            ,_listensock(-1)
            ,fdarray(nullptr)
        {

        }

        
        ~SelectServer()
        {
            if(_listensock<0) close(_listensock);
            if(fdarray) delete[] fdarray;
        }

        void initServer()
        {
            _listensock=Sock::Socket();
            Sock::Bind(_listensock,_port);
            Sock::Listen(_listensock);
            fdarray=new int[fdnum];
            for(int i=0;i<fdnum;i++) fdarray[i]=defaultfd;
            fdarray[0]=_listensock;
        }

        void Print()
        {
            std::cout<<"fd list: ";
            for(int i=0;i<fdnum;i++)
            {
                if(fdarray[i]!=defaultfd) std::cout<<fdarray[i]<<" ";
            }
            std::cout<<std::endl;
        }

        void Accepter(int listensock)
        {
            logMessage(DEBUG,"Accepter in");
            //此时,listen不会阻塞,已经就绪
            std::string clientip;
            uint16_t clinetport=0;
            int sock=Sock::Accept(listensock,&clientip,&clinetport);
            if(sock<0)
            {
                return ;
            }
            logMessage(NORMAL,"accept success [%s:%d]",clientip.c_str(),clinetport);
            //此时不能直接读取,只有select有资格检测事件是否就绪
            //将新的sock交给select,本质就是将sock添加到fdarray数组中即可
            int i=0;
            for(;i<fdnum;i++)
            {
                if(fdarray[i]!=defaultfd) continue;
                else break;
            }
            
            if(i==fdnum)
            {
                logMessage(WARNING,"server id full,please wait!");
                close(sock);
            }
            else
            {
                fdarray[i]=sock;
            }

            Print();
            logMessage(DEBUG,"Accepter out");
        }

        void Recver(int sock,int pos)
        {
            logMessage(DEBUG,"in Recver");
            //读取请求,并不会阻塞
            char buffer[1024];
            ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);
            if(s>0)
            {
                buffer[s]=0;
                logMessage(NORMAL,"client# %s",buffer);
            }
            else if(s==0)
            {
                close(sock);
                fdarray[pos]=defaultfd;
                logMessage(NORMAL,"client quit");
                return;
            }
            else
            {
                close(sock);
                fdarray[pos]=defaultfd;
                logMessage(ERROR,"client quit: %s",strerror(errno));
                return;
            }

            //处理响应
            std::string response=_func(buffer);

            //返回response
            write(sock,response.c_str(),response.size());

            logMessage(DEBUG,"out Recver");
        }

        //
        void HandlerReadEvent(fd_set& rfds)
        {
            for(int i=0;i<fdnum;i++)
            {
                //过滤非法的fd
                if(fdarray[i]==defaultfd) continue;

                //正常的fd,不一定就绪,需要判断
                //此时只有监听事件就绪
                if(FD_ISSET(fdarray[i],&rfds)&&fdarray[i]==_listensock) Accepter(_listensock);
                //获取新连接之后,进行读取
                else if(FD_ISSET(fdarray[i],&rfds)) Recver(fdarray[i],i);
                else {}
            }
        }

        void start()
        {
            for(;;)
            {
                fd_set rfds;
                FD_ZERO(&rfds);
                int maxfd=fdarray[0];

                for(int i=0;i<fdnum;i++)
                {
                    if(fdarray[i]==defaultfd) continue;
                    //合法的fd全部添加到读文件描述符集中
                    FD_SET(fdarray[i],&rfds);
                    //更新所有fd中最大的fd
                    if(maxfd<fdarray[i]) maxfd=fdarray[i];
                }
                logMessage(NORMAL,"max fd is: %d",maxfd);
                //使用select,需要程序员维护一个保存所有合法的fd的数组
                int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
                switch(n)
                {
                    case 0:
                        logMessage(NORMAL,"timeout...");
                        break;
                    case -1:
                        logMessage(WARNING,"select error,code: %d,err string: %s",errno,strerror(errno));
                        break;
                    default:
                        //表明有事件就绪,目前只有监听事件就绪
                        logMessage(NORMAL,"have event ready!");
                        HandlerReadEvent(rfds);
                        break;
                }
            }
        }

    private:
        int _port;
        int _listensock;
        int *fdarray;
        func_t _func;
    };
}

在这里插入图片描述

select的特点

  1. select能够同时等待的文件fd是有上限的
  2. 必须借助第三方数组来维护合法的fd
  3. select的大部分参数都是输入输出型的,调用select之前,要重新设置所有的fd;调用之后,还要检查更新所有的fd,带来了遍历的成本
  4. select第一个参数的目的是为了确定遍历范围
  5. select采取位图的方式,用户到内核,内核到用户,来回地进行数据拷贝,造成拷贝成本的问题

select缺点

  1. select的fd有上限的问题
  2. 每次调用都要重新设置关心的fd

I/O多路转接之epoll

epoll的相关系统调用

epoll 有3个相关的系统调用

epoll_create

int epoll_create(int size);

在这里插入图片描述

创建一个epoll的句柄或者说是创建一个epoll模型(下面介绍):用完之后, 必须调用close()关闭

epoll_ctl

用户告知内核需要关心哪个文件描述符上的什么事件

int epoll_ctl(int epfd, int op, int fd,
 struct epoll_event *event);

epoll的事件注册函数

  • 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
  • 第一个参数是epoll_create()的返回值(epoll的句柄)
  • 第二个参数表示动作,用三个宏来表示(增删改)
  • 第三个参数是需要监听的fd
  • 第四个参数是告诉内核需要监听什么事

第二个参数的取值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL:从epfd中删除一个fd

struct epoll_event结构如下:
在这里插入图片描述
events可以是以下几个宏的集合:

  • EPOLLIN:表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
  • EPOLLOUT:表示对应的文件描述符可以写
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读
  • EPOLLERR:: 表示对应的文件描述符发生错误
  • EPOLLHUP:表示对应的文件描述符被挂断
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

epoll_wait

内核告知用户,那些文件描述符上的什么事件已经就绪

int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

收集在epoll监控的事件中已经发送的事件

  • 参数events是分配好的epoll_event结构体数组
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
  • 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败

epoll工作原理

在这里插入图片描述

  • 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体(其实就是红黑树),这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll{ 
 .... 
 /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
 struct rb_root rbr; 
 /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
 struct list_head rdlist; 
 .... 
}
  • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;当使用epoll_ctl方法时相等于向红黑树中的节点中添加数据,而这个数据是由文件描述符和事件组成的键值对
  • 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;红黑树的每个结点中还存在着链表的结构,每当文件描述符上的事件就绪之后,这些结点就会前后连接组成一个链表
  • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中
  • 在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{ 
 struct rb_node rbn;//红黑树节点 
 struct list_head rdllink;//双向链表节点 
 struct epoll_filefd ffd; //事件句柄信息 
 struct eventpoll *ep; //指向其所属的eventpoll对象 
 struct epoll_event event; //期待发生的事件类型 
}
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
  • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户

epoll的使用过程就是三部曲

  • 调用epoll_create创建一个epoll句柄
  • 调用epoll_ctl, 将要监控的文件描述符进行注册
  • 调用epoll_wait, 等待文件描述符就绪

epoll的实例

namespace epoll_yjm
{
    static const int defaultport = 8083;
    static const int size = 128;
    static const int defaultvalue = -1;
    static const int defaultnum = 64;

    using func_t = std::function<std::string(const std::string &)>;

    class EpollServer
    {
    public:
        EpollServer(func_t f, uint16_t port = defaultport, int num = defaultnum)
            : _func(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultport), _epfd(defaultvalue)
        {
        }

        ~EpollServer()
        {
            if (_listensock != defaultvalue)
                close(_listensock);

            if (_epfd != defaultvalue)
                close(_epfd);

            if (_revs)
                delete[] _revs;
        }

        void initserver()
        {
            // 1.创建socket
            _listensock = Sock::Socket();
            Sock::Bind(_listensock, _port);
            Sock::Listen(_listensock);

            // 2.创建epoll模型
            _epfd = epoll_create(size);
            if (_epfd < 0)
            {
                logMessage(FATAL, "epoll create error: %s", strerror(errno));
                exit(EPOLL_CREATE_ERR);
            }
            // 3.添加listen到epoll模型中
            struct epoll_event ev;
            ev.events = EPOLLIN;
            // 当事件就绪,被重新捞上来时,需要知道哪一个fd就绪
            ev.data.fd = _listensock;
            epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);

            // 4.申请就绪事件的空间
            _revs = new struct epoll_event[_num];

            logMessage(NORMAL, "init server success!");
        }

        void HandlerEvent(int readynum)
        {
            logMessage(DEBUG, "HandlerEvent in");
            for (int i = 0; i < readynum; i++)
            {
                uint32_t events = _revs[i].events;
                int sock = _revs[i].data.fd;

                if (sock == _listensock && events & EPOLLIN)
                {
                    // listensock读事件就绪,获取新连接
                    std::string clientip;
                    uint16_t clientport;
                    int fd = Sock::Accept(sock, &clientip, &clientport);
                    if (fd < 0)
                    {
                        logMessage(WARNING, "accept error");
                        continue;
                    }

                    // 获取fd成功,不可以直接读,数据可能还没就绪
                    // 将fd放入epoll等待就绪
                    struct epoll_event ev;
                    ev.events = EPOLLIN;
                    ev.data.fd = fd;
                    epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
                }
                else if (events & EPOLLIN)
                {
                    // 普通事件就绪
                    char buffer[1024];
                    // 把本轮数据读完,不一定能够读取完整的请求
                    int n = recv(sock, buffer, sizeof(buffer), 0);
                    if (n > 0)
                    {
                        buffer[n] = 0;
                        logMessage(DEBUG, "client# %s", buffer);
                        std::string response = _func(buffer);
                        send(sock, response.c_str(), response.size(), 0);
                    }
                    else if (n == 0)
                    {
                        // 建议先将fd从epoll中移除,再关闭
                        epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(NORMAL, "client quit");
                    }
                    else
                    {
                        // 建议先将fd从epoll中移除,再关闭
                        epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(ERROR,"recv error,code: %d,errstring: %s",errno,strerror(errno));
                    }
                }
                else
                {

                }
            }
            logMessage(DEBUG,"HandlerEvent out");
        }

        void start()
        {
            int timeout = -1;
            for (;;)
            {
                int n = epoll_wait(_epfd, _revs, _num, timeout);
                switch (n)
                {
                case 0:
                    logMessage(NORMAL, "timeout ...");
                    break;
                case -1:
                    logMessage(WARNING, "epoll_wait failed,code: %d,errstring: %s", errno, strerror(errno));
                    break;
                default:
                    logMessage(NORMAL, "have event ready!");
                    HandlerEvent(n);
                    break;
                }
            }
        }

    private:
        uint16_t _port;
        int _listensock;
        int _epfd;
        struct epoll_event *_revs;
        int _num;
        func_t _func;
    };
}

在这里插入图片描述

epoll的优点

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响
  • 没有数量限制: 文件描述符数目无上限

epoll工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
假如有这样一个例子

你网购了好几个快递,张三快递员先给你派发了几个快递,你并没有去拿,因为你知道张三肯定会一直通知你,直到把你所有的快递都给你为止,因此你一直在忙别的事,直到很晚采取拿快递;又过了几天,你又网购了几个快递,不过这次给你派发快递的小哥李四不同于张三,他打电话通过你,并告知你:如果你不抓紧拿快递,那么就再也不通知你,你没有办法只能一次性将所有的快递都拿走;如果你这次没有将所有的快递都拿走,当李四再次拿到需要给你派发的快递时,就会再次通知你一次

水平触发Level Triggered 工作模式

epoll默认状态下就是LT工作模式

  • 在上面的栗子中,张三的派发模式就是LT工作模式
  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回
  • 支持阻塞读写和非阻塞读写

边缘触发Edge Triggered工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

  • 在上面的栗子中,李四的派发模式就是ET工作模式
  • 当epoll检测到socket上事件就绪时, 必须立刻处理
  • ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epol
  • 只支持非阻塞的读写

epoll既可以支持LT, 也可以支持ET

对比LT和ET

LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完;ET的高效不仅仅体现在通知机制上,为了尽快让上层将数据取走,TCP可以给发送方提供一个更大的窗口大小,让对方更新出更大的滑动窗口,提高底层的数据发送效率

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的

epoll示例: epoll服务器(ET模式)

Reactor

Reactor=IO+协议定制+业务处理,基于ET模式下的Reactor,可以处理所有的IO

处理几个小细节:
在epollserver接收到数据之后,并不能确定数据的完整性,因为ET保证了数据被读取完,而协议才能保证数据的完整性;因此在epollserver还需要自己的接收缓冲区以保证数据的完整性

在epollserver同样也需要发送缓冲区,服务器刚开始启动,或者很多情况下,发送事件一直都是就绪的,可以直接发送;如果用户一次并没有把所有数据发送完,还需要再次发送;同样文件描述符上的发送事件也要注册到epoll模型中,一般按需设置

接下来就编写代码

tcpserver.hpp

namespace tcpserver
{
    class Connection;
    class Tcpserver;

    static const uint16_t defaultport = 8084;
    static const int num = 64;

    using func_t = std::function<void(Connection *)>;

    class Connection
    {
    public:
        Connection(int sock,Tcpserver*tsp)
            :_sock(sock)
            ,_tsp(tsp)
        {
        }

        void Register(func_t r,func_t s,func_t e)
        {
            _recver=r;
            _sender=s;
            _excepter=e;
        }

        ~Connection()
        {

        }

        void Close()
        {
            close(_sock);
        }
    public:
        int _sock;
        std::string _inbuffer;  // 输入缓冲区
        std::string _outbuffer; // 输出缓冲区

        func_t _recver;   // 从sock中读取
        func_t _sender;   // 向sock中写入
        func_t _excepter; // 处理sockIO的异常事件

        Tcpserver *_tsp; // 回指指针
        uint64_t lasttime;
    };

    class Tcpserver
    {
    private:
        void Recver(Connection *conn)
        {
            conn->lasttime = time(nullptr);

            char buffer[1024];
            while (true)
            {
                ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
                if (s > 0)
                {
                    buffer[s] = 0;
                    conn->_inbuffer += buffer; // 将读到的数据放入缓冲区中
                    logMessage(DEBUG, "n%s", conn->_inbuffer);
                    _service(conn);
                }
                else if (s == 0)
                {
                    if (conn->_excepter)
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        if (conn->_excepter)
                        {
                            conn->_excepter(conn);
                            return;
                        }
                    }
                }
            }
        }

        void Sender(Connection *conn)
        {
            conn->lasttime = time(nullptr);
            while (true)
            {
                ssize_t s = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
                if (s > 0)
                {
                    if (conn->_outbuffer.empty())
                        break;
                    else
                        conn->_outbuffer.erase(0, s);
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        if (conn->_excepter)
                        {
                            conn->_excepter(conn);
                            return;
                        }
                    }
                }
            }

            //如果没有发送完毕,需要对对应的sock开启写事件的关心
            //如果发送完毕,需要关闭对写事件的关心
            if(!conn->_outbuffer.empty())
                conn->_tsp->EnableReadWrite(conn,true,true);
            else
                conn->_tsp->EnableReadWrite(conn,true,false);
        }

        void Excepter(Connection*conn)
        {
            logMessage(DEBUG,"Excepter begin");
            _epoller.Control(conn->_sock,0,EPOLL_CTL_DEL);
            conn->Close();
            _connections.erase(conn->_sock);
            logMessage(DEBUG,"关闭%d 文件描述符的所有资源",conn->_sock);

            delete conn;
        }

        void Accepter(Connection *conn)
        {
            for (;;)
            {
                std::string clientip;
                uint16_t clientport;
                int err;
                int sock = _sock.Accept(&clientip, &clientport, &err);
                if (sock > 0)
                {
                    AddConnection(
                        sock, EPOLLIN | EPOLLET,
                        std::bind(&Tcpserver::Recver, this, std::placeholders::_1),
                        std::bind(&Tcpserver::Sender, this, std::placeholders::_1),
                        std::bind(&Tcpserver::Excepter, this, std::placeholders::_1));

                    logMessage(DEBUG, "get a new link,info: [%s:%d]", clientip.c_str(), clientport);
                }
                else
                {
                    if (err == EAGAIN || err == EWOULDBLOCK)
                        break;
                    else if (err == EINTR)
                        continue;
                    else
                        break;
                }
            }
        }

        void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)
        {
            // 1.首先给该sock创建connection,并初始化,并添加到_connections中
            // 将该sock设置为非阻塞
            if (events & EPOLLET)
                Util::SetNonBlock(sock);
            Connection *conn = new Connection(sock, this);
            // 2.给对应的sock设置对应的回调处理方法
            conn->Register(recver, sender, excepter);
            // 3.将sock与其要关心的事件注册到epoll中
            bool r = _epoller.AddEvent(sock, events);
            assert(r);
            (void)r;
            // 4.将kv添加到_connections中
            _connections.insert(std::pair<int, Connection *>(sock, conn));
            logMessage(DEBUG, "add new sock: %d in epoll and unordered_map", sock);
        }

        bool IsConnectionExists(int sock)
        {
            auto iter = _connections.find(sock);
            return iter != _connections.end();
        }

        void Loop(int timeout)
        {
            // 获得已就绪事件
            int n = _epoller.Wait(_revs, _num, timeout);
            for (int i = 0; i < n; i++)
            {
                int sock = _revs[i].data.fd;
                uint32_t events = _revs[i].events;

                // 将所有异常问题,全部转化为读写问题
                if (events & EPOLLERR)
                    events |= (EPOLLIN | EPOLLOUT);
                if (events & EPOLLHUP)
                    events |= (EPOLLIN | EPOLLOUT);

                // listen事件就绪
                if ((events & EPOLLIN) && IsConnectionExists(sock) && _connections[sock]->_recver)
                    _connections[sock]->_recver(_connections[sock]);

                if ((events & EPOLLOUT) && IsConnectionExists(sock) && _connections[sock]->_sender)
                    _connections[sock]->_sender(_connections[sock]);
            }
        }

    public:
        Tcpserver(func_t func, uint16_t port = defaultport)
            : _service(func), _port(port), _revs(nullptr)
        {
        }

        ~Tcpserver()
        {
            _sock.Close();
            _epoller.Close();
            if (_revs == nullptr)
                delete[] _revs;
        }

        void initserver()
        {
            // 1.创建socket
            _sock.Socket();
            _sock.Bind(_port);
            _sock.Listen();
            // 2.创建epoll模型
            _epoller.Create();
            // 先将listenock设置为非阻塞
            // 3.将目前唯一一个sock,添加到epoll模型中
            AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                          std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);

            _revs = new struct epoll_event[num];
            _num = num;
        }

        void EnableReadWrite(Connection *conn, bool readable, bool writeable)
        {
            uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLIN : 0) | EPOLLET;
            _epoller.Control(conn->_sock, event, EPOLL_CTL_MOD);
        }

        // 事件派发
        void Dispatcher()
        {
            int timeout = 1000;
            while (true)
            {
                Loop(timeout);
            }
        }

    private:
        uint16_t _port;
        Sock _sock;
        Epoll _epoller;
        std::unordered_map<int, Connection *> _connections;
        struct epoll_event *_revs;
        int _num;
        func_t _service;
    };
}

在这里插入图片描述