在Server和Client通讯中,由于网络等原因很可能会发生数据丢包的现象。如果数据缺失,服务端接收的信息不完整,就会造成混乱。
我们需要在Server和Client之间建立一个通讯协议,通过协议中的规则,判断当前接收到的信息是否完整。根据信息的完整情况,采取不同的处理方式。
通讯协议protocol的核心就是设计一个头部。如果传来的信息不包含这个头部,就说明当前信息和之前的信息是同一条。那么就把当前信息和之前的那条信息合并成一条。
而协议主要包含的功能是封装(Enpack)和解析(Depack)。Enpack是客户端对信息进行数据封装。封装之后可以传递给服务器。Depack是服务器对信息进行数据解析。
其中有个Const部分,用于定义头部、头部长度、客户端传入信息长度。
在代码中,我们这样定义:
const (ConstHeader = "Headers"ConstHeaderLength = 7ConstMLength = 4)
头部的内容为”Headers”,长度为7。所以ConstHeaderLenth=7.
而信息传递过程中,我们会把int类型转换成byte类型。一个int的长度等于4个byte的长度。因此,我们设置ConstMLength=4.代表客户端的传来的信息大小。
自定义协议protocal的代码示例如下:
/*** protocol* @Author: Jian Junbo* @Email: junbojian@qq.com* @Create: 2017/9/14 11:49** Description: 通讯协议处理*/package protocolimport ("bytes""encoding/binary")const (ConstHeader = "Headers"ConstHeaderLength = 7ConstMLength = 4)//封包func Enpack(message []byte) []byte {return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)}//解包func Depack(buffer []byte) []byte {length := len(buffer)var i intdata := make([]byte, 32)for i = 0; i < length; i++ {if length < i + ConstHeaderLength + ConstMLength{break}if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {messageLength := ByteToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength])if length < i+ConstHeaderLength+ConstMLength+messageLength {break}data = buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength]}}if i == length {return make([]byte, 0)}return data}//字节转换成整形func ByteToInt(n []byte) int {bytesbuffer := bytes.NewBuffer(n)var x int32binary.Read(bytesbuffer, binary.BigEndian, &x)return int(x)}//整数转换成字节func IntToBytes(n int) []byte {x := int32(n)bytesBuffer := bytes.NewBuffer([]byte{})binary.Write(bytesBuffer, binary.BigEndian, x)return bytesBuffer.Bytes()}
Server端主要通过协议来解析客户端发送来的信息。建立一个函数,用来完成连接对接收信息的处理。其中建立了通道readerChannel,并把接收来的信息放在通道里。在放入通道之前,使用protocol和Depack对信息进行解析。
//连接处理func handleConnection(conn net.Conn) {//缓冲区,存储被截断的数据tmpBuffer := make([]byte, 0)//接收解包readerChannel := make(chan []byte, 10000)go reader(readerChannel)buffer := make([]byte, 1024)for{n, err := conn.Read(buffer)if err != nil{Log(conn.RemoteAddr().String(), "connection error: ", err)return}tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))readerChannel <- tmpBuffer //接收的信息写入通道}defer conn.Close()}
如果信息读取发生错误(包括读取到信息结束符EOF),都会打印错误信息,并跳出循环。
Log(conn.RemoteAddr().String(), "connection error: ", err)return
由于通道内的数据是[]byte型的。需要转换成string。这个工作有专门的获取通道数据的reader(readerChannel chan []byte)来完成。
//获取通道数据func reader(readerchannel chan []byte) {for{select {case data := <-readerchannel:Log(string(data)) //打印通道内的信息}}}
查看Server端代码示例:
/*** MySocketProtocalServer* @Author: Jian Junbo* @Email: junbojian@qq.com* @Create: 2017/9/14 13:54* Copyright (c) 2017 Jian Junbo All rights reserved.** Description: 服务端,接收客户端传来的信息*/package mainimport ("net""fmt""os""log""protocol")func main() {netListen, err := net.Listen("tcp", "localhost:7373")CheckErr(err)defer netListen.Close()Log("Waiting for client ...") //启动后,等待客户端访问。for{conn, err := netListen.Accept() //监听客户端if err != nil {Log(conn.RemoteAddr().String(), "发了了错误:", err)continue}Log(conn.RemoteAddr().String(), "tcp connection success")go handleConnection(conn)}}//连接处理func handleConnection(conn net.Conn) {//缓冲区,存储被截断的数据tmpBuffer := make([]byte, 0)//接收解包readerChannel := make(chan []byte, 10000)go reader(readerChannel)buffer := make([]byte, 1024)for{n, err := conn.Read(buffer)if err != nil{Log(conn.RemoteAddr().String(), "connection error: ", err)return}tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))readerChannel <- tmpBuffer //接收的信息写入通道}defer conn.Close()}//获取通道数据func reader(readerchannel chan []byte) {for{select {case data := <-readerchannel:Log(string(data)) //打印通道内的信息}}}//日志处理func Log(v ...interface{}) {log.Println(v...)}//错误处理func CheckErr(err error) {if err != nil {fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())os.Exit(1)}}
客户端使用Enpack封装要发送到服务端的信息后,写入连接conn中。
/*** MySocketProtocalClient* @Author: Jian Junbo* @Email: junbojian@qq.com* @Create: 2017/9/14 15:23* Copyright (c) 2017 Jian Junbo All rights reserved.** Description:*/package mainimport ("net""time""strconv""protocol""fmt""os")//发送100次请求func send(conn net.Conn) {for i := 0; i < 100; i++ {session := GetSession()words := "{\"ID\":\""+strconv.Itoa(i)+"\",\"Session\":\""+session+"20170914165908\",\"Meta\":\"golang\",\"Content\":\"message\"}"conn.Write(protocol.Enpack([]byte(words)))fmt.Println(words) //打印发送出去的信息}fmt.Println("send over")defer conn.Close()}//用当前时间做识别。当前时间的十进制整数func GetSession() string {gs1 := time.Now().Unix()gs2 := strconv.FormatInt(gs1, 10)return gs2}func main() {server := "localhost:7373"tcpAddr, err := net.ResolveTCPAddr("tcp4", server)if err != nil{fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())os.Exit(1)}conn, err := net.DialTCP("tcp", nil, tcpAddr)if err != nil{fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())os.Exit(1)}fmt.Println("connect success")send(conn)}

