package mainimport ( "context" "fmt" "google.golang.org/grpc" "grpc_test_stream/pb/person" "net" "strconv" "time")type Sever struct { person.UnimplementedSearchServiceServer}func (Sever) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) { name := req.GetName() p := new(person.PersonRes) p.Name = "我收到了来自" + name + "的请求" fmt.Println(req) return p, nil}func (Sever) SearchIn(stream_server person.SearchService_SearchInServer) error { for { recv, err := stream_server.Recv() fmt.Println("读取到的内容是:", recv) if err != nil { err1 := stream_server.SendAndClose(&person.PersonRes{Name: "读取完成"}) if err1 != nil { fmt.Println(err1) } break } } return nil}func (Sever) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error { name := req.Name for i := 0; i < 10; i++ { server.Send(&person.PersonRes{Name: name + "服务端收到了请求" + strconv.Itoa(i)}) time.Sleep(1 * time.Second) } fmt.Println("服务端响应完成") return nil}func (Sever) SearchAll(server person.SearchService_SearchAllServer) error { c := make(chan string) //接收流式请求 go func() { for i := 0; i <= 10; i++ { recv, err := server.Recv() if err != nil { c <- "over" break } c <- recv.Name } }() //发送请求 for { s := <-c if s == "over" { break } server.Send(&person.PersonRes{Name: "已接收到:" + s}) } return nil}func main() { listener, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println(err) fmt.Println("监听失败1") } grpc_server := grpc.NewServer() person.RegisterSearchServiceServer(grpc_server, &Sever{}) err = grpc_server.Serve(listener) if err != nil { fmt.Println(err) fmt.Println("监听失败2") } fmt.Println("服务端启动成功")}
package mainimport ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "grpc_test_stream/pb/person" "strconv" "sync" "time")func main() { conn, err := grpc.Dial(":8888", grpc.WithTransportCredentials(insecure.NewCredentials())) defer conn.Close() if err != nil { fmt.Println("客户端启用失败") fmt.Println(err) } client := person.NewSearchServiceClient(conn) fmt.Println("客户端启动成功") //personRes1, err := client.Search(context.Background(), &person.PersonReq{ // Name: "黄宽", // Age: 18, //}) //fmt.Println(personRes1.GetAge(), personRes1.GetName()) //流式传入。 //cin, err := client.SearchIn(context.Background()) //for i := 0; i < 10; i++ { // cin.Send(&person.PersonReq{Name: "传入消息" + strconv.Itoa(i)}) // time.Sleep(1 * time.Second) //} //res, err := cin.CloseAndRecv() //if err != nil { // fmt.Println(err) //} //流式接收 //out, err := client.SearchOut(context.Background(), &person.PersonReq{Name: "黄宽1"}) //if err != nil { // fmt.Println(err) //} //for { // recv, err := out.Recv() // if err != nil { // fmt.Println(err) // break // } // fmt.Println(recv) //} //双向流 all, err := client.SearchAll(context.Background()) if err != nil { fmt.Print(err) } wg := sync.WaitGroup{} wg.Add(2) go func() { for i := 0; i <= 10; i++ { all.Send(&person.PersonReq{Name: "黄宽" + strconv.Itoa(i)}) time.Sleep(1 * time.Second) } wg.Done() }() go func() { for { recv, err := all.Recv() if nil != err { fmt.Println(err) wg.Done() break } fmt.Println(recv.GetName()) } }() wg.Wait() fmt.Println("客户端完成")}
syntax = "proto3";package person;option go_package = "./;person";message PersonReq { string name = 1; int32 age = 2;}message PersonRes { string name = 1; int32 age = 2;}service SearchService { rpc Search(PersonReq) returns (PersonRes);//传统的即刻响应 rpc SearchIn(stream PersonReq) returns (PersonRes);//请求为流 rpc SearchOut(PersonReq) returns (stream PersonRes);//响应为流 rpc SearchAll(stream PersonReq) returns (stream PersonRes);//均为流}