1 stream.proto
syntax = "proto3";option go_package="./;proto";message StreamReqData {string data = 1;}message StreamResData {string data = 1;}service Greeter {// 服务端流模式, 客户端发一次请求, 服务端源源不断地推送数据rpc GetStream(StreamReqData) returns (stream StreamResData);// 客户端流模式rpc PutStream(stream StreamReqData) returns (StreamResData);// 双向流模式rpc AllStream(stream StreamReqData) returns (stream StreamResData);}
protoc -I . stream.proto —go_out=plugins=grpc:.
2 server.go
package mainimport ("fmt""google.golang.org/grpc""net""stream_grpc_test/proto""sync""time")const PORT = ":50052"type server struct{}func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {i := 0for {i++_ = res.Send(&proto.StreamResData{Data: fmt.Sprintf("%v", time.Now().Unix()),})time.Sleep(time.Second)if i > 10 {break}}return nil}func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {for {a, err := cliStr.Recv()if err != nil {fmt.Println(err)break}fmt.Println(a.Data)}return nil}func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {wg := sync.WaitGroup{}wg.Add(2)go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到客户端消息:" + data.Data)}}()go func() {defer wg.Done()for {_ = allStr.Send(&proto.StreamResData{Data: "我是服务器"})time.Sleep(time.Second)}}()wg.Wait()return nil}func main() {lis, err := net.Listen("tcp", PORT)if err != nil {panic(err)}s := grpc.NewServer()proto.RegisterGreeterServer(s, &server{})err = s.Serve(lis)if err != nil {panic(err)}}
3 client.go
package mainimport ("context""fmt""google.golang.org/grpc""stream_grpc_test/proto""sync""time")func main() {conn, err := grpc.Dial(":50052", grpc.WithInsecure())if err != nil {panic(err)}defer conn.Close()c := proto.NewGreeterClient(conn)// 服务端流模式res, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "慕课网"})for {a, err := res.Recv()if err != nil {fmt.Println(err)break}fmt.Println(a)}// 客户端流模式putS, _ := c.PutStream(context.Background())i := 0for {i++putS.Send(&proto.StreamReqData{Data: fmt.Sprintf("慕课网%d", i),})time.Sleep(time.Second)if i > 10 {break}}// 双向流模式allStr, _ := c.AllStream(context.Background())wg := sync.WaitGroup{}wg.Add(2)go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到客户端消息:" + data.Data)}}()go func() {defer wg.Done()for {_ = allStr.Send(&proto.StreamReqData{Data: "慕课网"})time.Sleep(time.Second)}}()wg.Wait()}
