4) 事件触发event_loop
接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。
4.1 io_event基于IO事件封装
我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.
lars_reactor/include/event_base.h
#pragma once/** 定义一些IO复用机制或者其他异常触发机制的事件封装** */class event_loop;//IO事件触发的回调函数typedef void io_callback(event_loop *loop, int fd, void *args);/** 封装一次IO触发实现* */struct io_event{io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}int mask; //EPOLLIN EPOLLOUTio_callback *read_callback; //EPOLLIN事件 触发的回调io_callback *write_callback;//EPOLLOUT事件 触发的回调void *rcb_args; //read_callback的回调函数参数void *wcb_args; //write_callback的回调函数参数};
一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。
4.2 event_loop事件循环处理机制
接下来我们就要通过event_loop类来实现io_event的基本增删操作,放在原生的`epoll`堆中。
lars_reactor/include/event_loop.h
#pragma once/*** event_loop事件处理机制** */#include <sys/epoll.h>#include <ext/hash_map>#include <ext/hash_set>#include "event_base.h"#define MAXEVENTS 10// map: fd->io_eventtypedef __gnu_cxx::hash_map<int, io_event> io_event_map;//定义指向上面map类型的迭代器typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;//全部正在监听的fd集合typedef __gnu_cxx::hash_set<int> listen_fd_set;class event_loop{public://构造,初始化epoll堆event_loop();//阻塞循环处理事件void event_process();//添加一个io事件到loop中void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);//删除一个io事件从loop中void del_io_event(int fd);//删除一个io事件的EPOLLIN/EPOLLOUTvoid del_io_event(int fd, int mask);private:int _epfd; //epoll fd//当前event_loop 监控的fd和对应事件的关系io_event_map _io_evs;//当前event_loop 一共哪些fd在监听listen_fd_set listen_fds;//一次性最大处理的事件struct epoll_event _fired_evs[MAXEVENTS];};
属性:
_epfd:是epoll原生堆的fd。
_io_evs:是一个hash_map对象,主要是方便我们管理fd<—>io_event的对应关系,方便我们来查找和处理。
_listen_fds:记录目前一共有多少个fd正在本我们的event_loop机制所监控.
_fried_evs:已经通过epoll_wait返回的被激活需要上层处理的fd集合.
方法:
event_loop():构造函数,主要初始化epoll.
event_process():永久阻塞,等待触发的事件,去调用对应的函数callback方法。
add_io_event():绑定一个fd和一个io_event的关系,并添加对应的事件到event_loop中。
del_io_event():从event_loop删除该事件。
具体实现方法如下:
lars_reactor/src/event_loop.cpp
#include "event_loop.h"#include <assert.h>//构造,初始化epoll堆event_loop::event_loop(){//flag=0 等价于epll_craete_epfd = epoll_create1(0);if (_epfd == -1) {fprintf(stderr, "epoll_create error\n");exit(1);}}//阻塞循环处理事件void event_loop::event_process(){while (true) {io_event_map_it ev_it;int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);for (int i = 0; i < nfds; i++) {//通过触发的fd找到对应的绑定事件ev_it = _io_evs.find(_fired_evs[i].data.fd);assert(ev_it != _io_evs.end());io_event *ev = &(ev_it->second);if (_fired_evs[i].events & EPOLLIN) {//读事件,掉读回调函数void *args = ev->rcb_args;ev->read_callback(this, _fired_evs[i].data.fd, args);}else if (_fired_evs[i].events & EPOLLOUT) {//写事件,掉写回调函数void *args = ev->wcb_args;ev->write_callback(this, _fired_evs[i].data.fd, args);}else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {//水平触发未处理,可能会出现HUP事件,正常处理读写,没有则清空if (ev->read_callback != NULL) {void *args = ev->rcb_args;ev->read_callback(this, _fired_evs[i].data.fd, args);}else if (ev->write_callback != NULL) {void *args = ev->wcb_args;ev->write_callback(this, _fired_evs[i].data.fd, args);}else {//删除fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);this->del_io_event(_fired_evs[i].data.fd);}}}}}/** 这里我们处理的事件机制是* 如果EPOLLIN 在mask中, EPOLLOUT就不允许在mask中* 如果EPOLLOUT 在mask中, EPOLLIN就不允许在mask中* 如果想注册EPOLLIN|EPOLLOUT的事件, 那么就调用add_io_event() 方法两次来注册。* *///添加一个io事件到loop中void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args){int final_mask;int op;//1 找到当前fd是否已经有事件io_event_map_it it = _io_evs.find(fd);if (it == _io_evs.end()) {//2 如果没有操作动作就是ADD//没有找到final_mask = mask;op = EPOLL_CTL_ADD;}else {//3 如果有操作董酒是MOD//添加事件标识位final_mask = it->second.mask | mask;op = EPOLL_CTL_MOD;}//4 注册回调函数if (mask & EPOLLIN) {//读事件回调函数注册_io_evs[fd].read_callback = proc;_io_evs[fd].rcb_args = args;}else if (mask & EPOLLOUT) {_io_evs[fd].write_callback = proc;_io_evs[fd].wcb_args = args;}//5 epoll_ctl添加到epoll堆里_io_evs[fd].mask = final_mask;//创建原生epoll事件struct epoll_event event;event.events = final_mask;event.data.fd = fd;if (epoll_ctl(_epfd, op, fd, &event) == -1) {fprintf(stderr, "epoll ctl %d error\n", fd);return;}//6 将fd添加到监听集合中listen_fds.insert(fd);}//删除一个io事件从loop中void event_loop::del_io_event(int fd){//将事件从_io_evs删除_io_evs.erase(fd);//将fd从监听集合中删除listen_fds.erase(fd);//将fd从epoll堆删除epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);}//删除一个io事件的EPOLLIN/EPOLLOUTvoid event_loop::del_io_event(int fd, int mask){//如果没有该事件,直接返回io_event_map_it it = _io_evs.find(fd);if (it == _io_evs.end()) {return ;}int &o_mask = it->second.mask;//修正masko_mask = o_mask & (~mask);if (o_mask == 0) {//如果修正之后 mask为0,则删除this->del_io_event(fd);}else {//如果修正之后,mask非0,则修改struct epoll_event event;event.events = o_mask;event.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);}}
这里`del_io_event`提供两个重载,一个是直接删除事件,一个是修正事件。
4.3 Reactor集成event_loop机制
好了,那么接下来,就让让Lars Reactor框架集成`event_loop`机制。
首先简单修正一个tcp_server.cpp文件,对之前的do_accept()的调度时机做一下修正。
1. 在`tcp_server`成员新增`event_loop`成员。
lars_reactor/include/tcp_server.h
#pragma once#include <netinet/in.h>#include "event_loop.h"class tcp_server{public://server的构造函数tcp_server(event_loop* loop, const char *ip, uint16_t port);//开始提供创建链接服务void do_accept();//链接对象释放的析构~tcp_server();private:int _sockfd; //套接字struct sockaddr_in _connaddr; //客户端链接地址socklen_t _addrlen; //客户端链接地址长度// ============= 新增 ======================//event_loop epoll事件机制event_loop* _loop;// ============= 新增 ======================};
- 构造函数在创建完listen fd之后,添加accept事件。
lars_reactor/src/tcp_server.cpp
//listen fd 客户端有新链接请求过来的回调函数void accept_callback(event_loop *loop, int fd, void *args){tcp_server *server = (tcp_server*)args;server->do_accept();}//server的构造函数tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port){bzero(&_connaddr, sizeof(_connaddr));//忽略一些信号 SIGHUP, SIGPIPE//SIGPIPE:如果客户端关闭,服务端再次write就会产生//SIGHUP:如果terminal关闭,会给当前进程发送该信号if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {fprintf(stderr, "signal ignore SIGHUP\n");}if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {fprintf(stderr, "signal ignore SIGPIPE\n");}//1. 创建socket_sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);if (_sockfd == -1) {fprintf(stderr, "tcp_server::socket()\n");exit(1);}//2 初始化地址struct sockaddr_in server_addr;bzero(&server_addr, sizeof(server_addr));server_addr.sin_family = AF_INET;inet_aton(ip, &server_addr.sin_addr);server_addr.sin_port = htons(port);//2-1可以多次监听,设置REUSE属性int op = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {fprintf(stderr, "setsocketopt SO_REUSEADDR\n");}//3 绑定端口if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {fprintf(stderr, "bind error\n");exit(1);}//4 监听ip端口if (listen(_sockfd, 500) == -1) {fprintf(stderr, "listen error\n");exit(1);}// ============= 新增 ======================//5 将_sockfd添加到event_loop中_loop = loop;//6 注册_socket读事件-->accept处理_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);// ============= 新增 ======================}
- 修改do_accept()方法
lars_reactor/src/tcp_server.cpp
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <strings.h>#include <unistd.h>#include <signal.h>#include <sys/types.h> /* See NOTES */#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include "tcp_server.h"#include "reactor_buf.h"//临时的收发消息struct message{char data[m4K];char len;};struct message msg;void server_rd_callback(event_loop *loop, int fd, void *args);void server_wt_callback(event_loop *loop, int fd, void *args);//...省略其他代码//...省略其他代码//server read_callbackvoid server_rd_callback(event_loop *loop, int fd, void *args){int ret = 0;struct message *msg = (struct message*)args;input_buf ibuf;ret = ibuf.read_data(fd);if (ret == -1) {fprintf(stderr, "ibuf read_data error\n");//删除事件loop->del_io_event(fd);//对端关闭close(fd);return;}if (ret == 0) {//删除事件loop->del_io_event(fd);//对端关闭close(fd);return ;}printf("ibuf.length() = %d\n", ibuf.length());//将读到的数据放在msg中msg->len = ibuf.length();bzero(msg->data, msg->len);memcpy(msg->data, ibuf.data(), msg->len);ibuf.pop(msg->len);ibuf.adjust();printf("recv data = %s\n", msg->data);//删除读事件,添加写事件loop->del_io_event(fd, EPOLLIN);loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);}//server write_callbackvoid server_wt_callback(event_loop *loop, int fd, void *args){struct message *msg = (struct message*)args;output_buf obuf;//回显数据obuf.send_data(msg->data, msg->len);while(obuf.length()) {int write_ret = obuf.write2fd(fd);if (write_ret == -1) {fprintf(stderr, "write connfd error\n");return;}else if(write_ret == 0) {//不是错误,表示此时不可写break;}}//删除写事件,添加读事件loop->del_io_event(fd, EPOLLOUT);loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);}//...省略其他代码//...省略其他代码//开始提供创建链接服务void tcp_server::do_accept(){int connfd;while(true) {//accept与客户端创建链接printf("begin accept\n");connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);if (connfd == -1) {if (errno == EINTR) {fprintf(stderr, "accept errno=EINTR\n");continue;}else if (errno == EMFILE) {//建立链接过多,资源不够fprintf(stderr, "accept errno=EMFILE\n");}else if (errno == EAGAIN) {fprintf(stderr, "accept errno=EAGAIN\n");break;}else {fprintf(stderr, "accept error");exit(1);}}else {//accept succ!// ============= 新增 ======================this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);break;// ============= 新增 ======================}}}//...省略其他代码//...省略其他代码
4.4 完成Lars Reactor V0.3开发
我们将lars_reactor/example/lars_reactor_0.2的代码复制一份到 lars_reactor/example/lars_reactor_0.3中。
lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp
#include "tcp_server.h"int main(){event_loop loop;tcp_server server(&loop, "127.0.0.1", 7777);loop.event_process();return 0;}
编译。
启动服务器
$ ./lars_reactor
分别启动2个客户端
client1
$ nc 127.0.0.1 7777hello Iam client1hello Iam client1 回显
client2
$ nc 127.0.0.1 7777hello Iam client2hello Iam client2 回显
服务端打印
$ ./lars_reactorbegin acceptibuf.length() = 18recv data = hello Iam client1begin acceptibuf.length() = 18recv data = hello Iam client2
目前我们已经成功将event_loop机制加入到reactor中了,接下来继续添加功能。
