现在我们要为Zinx框架增加链接个数的限定,如果超过一定量的客户端个数,Zinx为了保证后端的及时响应,而拒绝链接请求。
9.1 创建链接管理模块
这里面我们就需要对链接有一个管理的模块.
我们在ziface和znet分别建立iconnmanager.go和connmanager.go文件
zinx/ziface/iconmanager.go
package ziface/*连接管理抽象层*/type IConnManager interface {Add(conn IConnection) //添加链接Remove(conn IConnection) //删除连接Get(connID uint32) (IConnection, error) //利用ConnID获取链接Len() int //获取当前连接ClearConn() //删除并停止所有链接}
这里定义了一些接口方法,添加链接、删除链接、根据ID获取链接、链接数量、和清除链接等。
zinx/znet/connmanager.go
package znetimport ("errors""fmt""sync""zinx/ziface")/*连接管理模块*/type ConnManager struct {connections map[uint32]ziface.IConnection //管理的连接信息connLock sync.RWMutex //读写连接的读写锁}/*创建一个链接管理*/func NewConnManager() *ConnManager {return &ConnManager{connections:make(map[uint32] ziface.IConnection),}}//添加链接func (connMgr *ConnManager) Add(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//将conn连接添加到ConnMananger中connMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())}//删除连接func (connMgr *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove ConnID=",conn.GetConnID(), " successfully: conn num = ", connMgr.Len())}//利用ConnID获取链接func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {//保护共享资源Map 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}}//获取当前连接func (connMgr *ConnManager) Len() int {return len(connMgr.connections)}//清除并停止所有连接func (connMgr *ConnManager) ClearConn() {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//停止并删除全部的连接信息for connID, conn := range connMgr.connections {//停止conn.Stop()//删除delete(connMgr.connections,connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())}
这里面ConnManager中,其中用一个map来承载全部的连接信息,key是连接ID,value则是连接本身。其中有一个读写锁connLock主要是针对map做多任务修改时的保护作用。
Remove()方法只是单纯的将conn从map中摘掉,而ClearConn()方法则会先停止链接业务,c.Stop(),然后再从map中摘除。
9.2 链接管理模块集成到Zinx中
A)ConnManager集成到Server中
现在需要将ConnManager添加到Server中
zinx/znet/server.go
//iServer 接口实现,定义一个Server服务类type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager}/*创建一个服务器句柄*/func NewServer () ziface.IServer {utils.GlobalObject.Reload()s:= &Server {Name :utils.GlobalObject.Name,IPVersion:"tcp4",IP:utils.GlobalObject.Host,Port:utils.GlobalObject.TcpPort,msgHandler: NewMsgHandle(),ConnMgr:NewConnManager(), //创建ConnManager}return s}
那么,既然server具备了ConnManager成员,在获取的时候需要给抽象层提供一个获取ConnManager方法
zinx/ziface/iserver.go
type IServer interface{//启动服务器方法Start()//停止服务器方法Stop()//开启业务服务方法Serve()//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager}
zinx/znet/server.go
//得到链接管理func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr}
因为我们现在在server中有链接的管理,有的时候conn也需要得到这个ConnMgr的使用权,那么我们需要将Server和Connection建立能够互相索引的关系,我们在Connection中,添加Server当前conn隶属的server句柄。
zinx/znet/connection.go
type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte}
B) 链接的添加
那么我们什么选择将创建好的连接添加到ConnManager中呢,这里我们选择在初始化一个新链接的时候,加进来就好了
zinx/znet/connection.go
//创建连接的方法func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{//初始化Conn属性c := &Connection{TcpServer:server, //将隶属的server传递进来Conn: conn,ConnID: connID,isClosed: false,MsgHandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan:make(chan []byte),msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen),}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中return c}
C) Server中添加链接数量的判断
在server的Start()方法中,在Accept与客户端链接建立成功后,可以直接对链接的个数做一个判断
zinx/znet/server.go
//开启网络服务func (s *Server) Start() {fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",utils.GlobalObject.Version,utils.GlobalObject.MaxConn,utils.GlobalObject.MaxPacketSize)//开启一个go去做服务端Linster业务go func() {// ....//3 启动server网络连接业务for {//3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//=============//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {conn.Close()continue}//=============//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid ++//3.4 启动当前链接的处理业务go dealConn.Start()}}()}
当然,我们应该在配置文件zinx.json或者在GlobalObject全局配置中,定义好我们期望的连接的最大数目限制MaxConn。
D) 连接的删除
我们应该在连接停止的时候,将该连接从ConnManager中删除,所以在connection的Stop()方法中添加。
zinx/znet/connecion.go
func (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true// 关闭socket链接c.Conn.Close()//关闭Writer Goroutinec.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c) //删除conn从ConnManager中//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)}
当然,我们也应该在server停止的时候,将全部的连接清空
zinx/znet/server.go
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name " , s.Name)//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理s.ConnMgr.ClearConn()}
现在我们已经将连接管理成功的集成到了Zinx之中了。
9.3 链接的带缓冲的发包方法
我们之前给Connection提供了一个发消息的方法SendMsg(),这个是将数据发送到一个无缓冲的channel中msgChan。但是如果客户端链接比较多的话,如果对方处理不及时,可能会出现短暂的阻塞现象,我们可以做一个提供一定缓冲的发消息方法,做一些非阻塞的发送体验。
zinx/ziface/iconnection.go
//定义连接接口type IConnection interface {//启动连接,让当前连接开始工作Start()//停止连接,结束当前连接状态MStop()//从当前连接获取原始的socket TCPConnGetTCPConnection() *net.TCPConn//获取当前连接IDGetConnID() uint32//获取远程客户端地址信息RemoteAddr() net.Addr//直接将Message数据发送数据给远程的TCP客户端(无缓冲)SendMsg(msgId uint32, data []byte) error//直接将Message数据发送给远程的TCP客户端(有缓冲)SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口}
zinx/znet/connection.go
type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte //定义channel成员}//创建连接的方法func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{//初始化Conn属性c := &Connection{TcpServer:server,Conn: conn,ConnID: connID,isClosed: false,MsgHandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan:make(chan []byte),msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c)return c}
然后将SendBuffMsg()方法实现一下:
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgBuffChan <- msgreturn nil}
我们在Writer中也要有对msgBuffChan的数据监控:
/*写消息Goroutine, 用户将数据发送给客户端*/func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan://有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:, ", err, " Conn Writer exit")return}//针对有缓冲channel需要些的数据处理case data, ok:= <-c.msgBuffChan:if ok {//有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")return}} else {breakfmt.Println("msgBuffChan is Closed")}case <-c.ExitBuffChan:return}}}
9.4 注册链接启动/停止自定义Hook方法功能
有的时候,在创建链接的时候,希望在创建链接之后、和断开链接之前,执行一些用户自定义的业务。那么我们就需要给Zinx增添两个链接创建后和断开前时机的回调函数,一般也称作Hook(钩子)函数。
我们可以通过Server来注册conn的hook方法
zinx/ziface/iserver.go
type IServer interface{//启动服务器方法Start()//停止服务器方法Stop()//开启业务服务方法Serve()//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager//设置该Server的连接创建时Hook函数SetOnConnStart(func (IConnection))//设置该Server的连接断开时的Hook函数SetOnConnStop(func (IConnection))//调用连接OnConnStart Hook函数CallOnConnStart(conn IConnection)//调用连接OnConnStop Hook函数CallOnConnStop(conn IConnection)}
zinx/znet/server.go
//iServer 接口实现,定义一个Server服务类type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager// =======================//新增两个hook函数原型//该Server的连接创建时Hook函数OnConnStart func(conn ziface.IConnection)//该Server的连接断开时的Hook函数OnConnStop func(conn ziface.IConnection)// =======================}
实现添加hook函数的接口和调用hook函数的接口
//设置该Server的连接创建时Hook函数func (s *Server) SetOnConnStart(hookFunc func (ziface.IConnection)) {s.OnConnStart = hookFunc}//设置该Server的连接断开时的Hook函数func (s *Server) SetOnConnStop(hookFunc func (ziface.IConnection)) {s.OnConnStop = hookFunc}//调用连接OnConnStart Hook函数func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---> CallOnConnStart....")s.OnConnStart(conn)}}//调用连接OnConnStop Hook函数func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("---> CallOnConnStop....")s.OnConnStop(conn)}}
那么接下来,需要选定两个Hook方法的调用位置。
一个是创建链接之后:
zinx/znet/connection.go
//启动连接,让当前连接开始工作func (c *Connection) Start() {//1 开启用户从客户端读取数据流程的Goroutinego c.StartReader()//2 开启用于写回客户端数据流程的Goroutinego c.StartWriter()//==================//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法c.TcpServer.CallOnConnStart(c)//==================}
一个是停止链接之前:
zinx/znet/connection.go
//停止连接,结束当前连接状态Mfunc (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true//==================//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用c.TcpServer.CallOnConnStop(c)//==================// 关闭socket链接c.Conn.Close()//关闭Writerc.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c)//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)}
9.5 使用Zinx-V0.9完成应用程序
好了,现在我们基本上已经将全部的连接管理的功能集成到Zinx中了,接下来就需要测试一下链接管理模块是否可以使用了。
写一个服务端:
Server.go
package mainimport ("fmt""zinx/ziface""zinx/znet")//ping test 自定义路由type PingRouter struct {znet.BaseRouter}//Ping Handlefunc (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call PingRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}}type HelloZinxRouter struct {znet.BaseRouter}//HelloZinxRouter Handlefunc (this *HelloZinxRouter) Handle(request ziface.IRequest) {fmt.Println("Call HelloZinxRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.8"))if err != nil {fmt.Println(err)}}//创建连接的时候执行func DoConnectionBegin(conn ziface.IConnection) {fmt.Println("DoConnecionBegin is Called ... ")err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))if err != nil {fmt.Println(err)}}//连接断开的时候执行func DoConnectionLost(conn ziface.IConnection) {fmt.Println("DoConneciotnLost is Called ... ")}func main() {//创建一个server句柄s := znet.NewServer()//注册链接hook回调函数s.SetOnConnStart(DoConnectionBegin)s.SetOnConnStop(DoConnectionLost)//配置路由s.AddRouter(0, &PingRouter{})s.AddRouter(1, &HelloZinxRouter{})//开启服务s.Serve()}
我们这里注册了两个Hook函数一个是链接初始化之后DoConnectionBegin()和链接停止之前DoConnectionLost()。
DoConnectionBegin()会发给客户端一个消息2的文本,并且在服务端打印一个调试信息”DoConnecionBegin is Called … “
DoConnectionLost()在服务端打印一个调试信息”DoConneciotnLost is Called … “
客户端:
Client.go
package mainimport ("fmt""io""net""time""zinx/znet")/*模拟客户端*/func main() {fmt.Println("Client Test ... start")//3秒之后发起测试请求,给服务端开启服务的机会time.Sleep(3 * time.Second)conn,err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client start err, exit!")return}for {//发封包message消息dp := znet.NewDataPack()msg, _ := dp.Pack(znet.NewMsgPackage(0,[]byte("Zinx V0.8 Client0 Test Message")))_, err := conn.Write(msg)if err !=nil {fmt.Println("write error err ", err)return}//先读出流中的head部分headData := make([]byte, dp.GetHeadLen())_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止if err != nil {fmt.Println("read head error")break}//将headData字节流 拆包到msg中msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpack err:", err)return}if msgHead.GetDataLen() > 0 {//msg 是有data数据的,需要再次读取data数据msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetDataLen())//根据dataLen从io中读取字节流_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err:", err)return}fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))}time.Sleep(1*time.Second)}}
代码不变。
启动服务端
$go run Server.go
启动客户端
$go run Client.go
服务端结果:
$ go run Server.goAdd api msgId = 0Add api msgId = 1[START] Server name: zinx v-0.8 demoApp,listenner at IP: 127.0.0.1, Port 7777 is starting[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096start Zinx server zinx v-0.8 demoApp succ, now listenning...Worker ID = 9 is started.Worker ID = 5 is started.Worker ID = 6 is started.Worker ID = 7 is started.Worker ID = 8 is started.Worker ID = 1 is started.Worker ID = 0 is started.Worker ID = 2 is started.Worker ID = 3 is started.Worker ID = 4 is started.connection add to ConnManager successfully: conn num = 1---> CallOnConnStart....DoConnecionBegin is Called ...[Writer Goroutine is running][Reader Goroutine is running]Add ConnID= 0 request msgID= 0 to workerID= 0Call PingRouter Handlerecv from client : msgId= 0 , data= Zinx V0.8 Client0 Test MessageAdd ConnID= 0 request msgID= 0 to workerID= 0Call PingRouter Handlerecv from client : msgId= 0 , data= Zinx V0.8 Client0 Test MessageAdd ConnID= 0 request msgID= 0 to workerID= 0Call PingRouter Handlerecv from client : msgId= 0 , data= Zinx V0.8 Client0 Test MessageAdd ConnID= 0 request msgID= 0 to workerID= 0Call PingRouter Handlerecv from client : msgId= 0 , data= Zinx V0.8 Client0 Test MessageAdd ConnID= 0 request msgID= 0 to workerID= 0Call PingRouter Handlerecv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Messageread msg head error read tcp4 127.0.0.1:7777->127.0.0.1:49510: read: connection reset by peerConn Stop()...ConnID = 0---> CallOnConnStop....DoConneciotnLost is Called ...connection Remove ConnID= 0 successfully: conn num = 0127.0.0.1:49510 [conn Reader exit!]127.0.0.1:49510 [conn Writer exit!]
客户端结果:
$ go run Client0.goClient Test ... start==> Recv Msg: ID= 2 , len= 21 , data= DoConnection BEGIN...==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping^Csignal: interrupt
客户端创建成功,回调Hook已经执行,并且Conn被添加到ConnManager 中, conn num = 1,
当我们手动CTRL+C 关闭客户端的时候, 服务器ConnManager已经成功将Conn摘掉,conn num = 0.
同时服务端也打印出 conn停止之后的回调信息。
