好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。
5.1 Message消息封装
先创建一个message.h头文件
lars_reactor/include/message.h
#pragma once//解决tcp粘包问题的消息头struct msg_head{int msgid;int msglen;};//消息头的二进制长度,固定数#define MESSAGE_HEAD_LEN 8//消息头+消息体的最大长度限制#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
接下来我们每次在server和 client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。
5.2 创建一个tcp_conn连接类
lars_reactor/include/tcp_conn.h
#pragma once#include "reactor_buf.h"#include "event_loop.h"//一个tcp的连接信息class tcp_conn{public://初始化tcp_conntcp_conn(int connfd, event_loop *loop);//处理读业务void do_read();//处理写业务void do_write();//销毁tcp_connvoid clean_conn();//发送消息的方法int send_message(const char *data, int msglen, int msgid);private://当前链接的fdint _connfd;//该连接归属的event_pollevent_loop *_loop;//输出bufoutput_buf obuf;//输入bufinput_buf ibuf;};
简单说明一下里面的成员和方法:
成员:
_connfd:server刚刚accept成功的套接字
_loop:当前链接所绑定的事件触发句柄.
obuf:链接输出缓冲,向对端写数据
ibuf:链接输入缓冲,从对端读数据
方法:
tcp_client():构造,主要在里面实现初始化及创建链接链接的connect过程。
do_read():读数据处理业务,主要是EPOLLIN事件触发。
do_write():写数据处理业务,主要是EPOLLOUT事件触发。
clean_conn():清空链接资源。
send_message():将消息打包成TLV格式发送给对端。
接下来,实现以下`tcp_conn`类.
lars_reactor/src/tcp_conn.cpp
#include <unistd.h>#include <fcntl.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <string.h>#include "tcp_conn.h"#include "message.h"//回显业务void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn){conn->send_message(data, len, msgid);}//连接的读事件回调static void conn_rd_callback(event_loop *loop, int fd, void *args){tcp_conn *conn = (tcp_conn*)args;conn->do_read();}//连接的写事件回调static void conn_wt_callback(event_loop *loop, int fd, void *args){tcp_conn *conn = (tcp_conn*)args;conn->do_write();}//初始化tcp_conntcp_conn::tcp_conn(int connfd, event_loop *loop){_connfd = connfd;_loop = loop;//1. 将connfd设置成非阻塞状态int flag = fcntl(_connfd, F_GETFL, 0);fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);//2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟int op = 1;setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h//3. 将该链接的读事件让event_loop监控_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);//4 将该链接集成到对应的tcp_server中//TODO}//处理读业务void tcp_conn::do_read(){//1. 从套接字读取数据int ret = ibuf.read_data(_connfd);if (ret == -1) {fprintf(stderr, "read data from socket\n");this->clean_conn();return ;}else if ( ret == 0) {//对端正常关闭printf("connection closed by peer\n");clean_conn();return ;}//2. 解析msg_head数据msg_head head;//[这里用while,可能一次性读取多个完整包过来]while (ibuf.length() >= MESSAGE_HEAD_LEN) {//2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LENmemcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);this->clean_conn();break;}if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {//缓存buf中剩余的数据,小于实际上应该接受的数据//说明是一个不完整的包,应该抛弃break;}//2.2 再根据头长度读取数据体,然后针对数据体处理 业务//TODO 添加包路由模式//头部处理完了,往后偏移MESSAGE_HEAD_LEN长度ibuf.pop(MESSAGE_HEAD_LEN);//处理ibuf.data()业务数据printf("read data: %s\n", ibuf.data());//回显业务callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);//消息体处理完了,往后便宜msglen长度ibuf.pop(head.msglen);}ibuf.adjust();return ;}//处理写业务void tcp_conn::do_write(){//do_write是触发玩event事件要处理的事情,//应该是直接将out_buf力度数据io写会对方客户端//而不是在这里组装一个message再发//组装message的过程应该是主动调用//只要obuf中有数据就写while (obuf.length()) {int ret = obuf.write2fd(_connfd);if (ret == -1) {fprintf(stderr, "write2fd error, close conn!\n");this->clean_conn();return ;}if (ret == 0) {//不是错误,仅返回0表示不可继续写break;}}if (obuf.length() == 0) {//数据已经全部写完,将_connfd的写事件取消掉_loop->del_io_event(_connfd, EPOLLOUT);}return ;}//发送消息的方法int tcp_conn::send_message(const char *data, int msglen, int msgid){printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);bool active_epollout = false;if(obuf.length() == 0) {//如果现在已经数据都发送完了,那么是一定要激活写事件的//如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活active_epollout = true;}//1 先封装message消息头msg_head head;head.msgid = msgid;head.msglen = msglen;//1.1 写消息头int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);if (ret != 0) {fprintf(stderr, "send head error\n");return -1;}//1.2 写消息体ret = obuf.send_data(data, msglen);if (ret != 0) {//如果写消息体失败,那就回滚将消息头的发送也取消obuf.pop(MESSAGE_HEAD_LEN);return -1;}if (active_epollout == true) {//2. 激活EPOLLOUT写事件_loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);}return 0;}//销毁tcp_connvoid tcp_conn::clean_conn(){//链接清理工作//1 将该链接从tcp_server摘除掉//TODO//2 将该链接从event_loop中摘除_loop->del_io_event(_connfd);//3 buf清空ibuf.clear();obuf.clear();//4 关闭原始套接字int fd = _connfd;_connfd = -1;close(fd);}
具体每个方法的实现,都很清晰。其中`conn_rd_callback()`和`conn_wt_callback()`是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用`do_read()`和`do_write()`方法。
5.3 修正tcp_server对accept之后的处理方法
lars_reactor/src/tcp_server.cpp
//...//开始提供创建链接服务void tcp_server::do_accept(){int connfd;while(true) {//accept与客户端创建链接printf("begin accept\n");connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);if (connfd == -1) {if (errno == EINTR) {fprintf(stderr, "accept errno=EINTR\n");continue;}else if (errno == EMFILE) {//建立链接过多,资源不够fprintf(stderr, "accept errno=EMFILE\n");}else if (errno == EAGAIN) {fprintf(stderr, "accept errno=EAGAIN\n");break;}else {fprintf(stderr, "accept error");exit(1);}}else {//accept succ!// ============= 将之前的触发回调的删掉,改成如下====tcp_conn *conn = new tcp_conn(connfd, _loop);if (conn == NULL) {fprintf(stderr, "new tcp_conn error\n");exit(1);}// ============================================printf("get new connection succ!\n");break;}}}//...
这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作.现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用`nc`指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。
