go语言使用GRPC流处理模式

发布时间 2023-06-07 16:25:13作者: TaylorSWMM

go语言使用GRPC流处理模式

标签(空格分隔): go,grpc

proto文件

syntax = "proto3";
package four_kinds_method.v1;
option go_package="go-example/grpc/four_kinds_method/proto;four_kinds_method_pb";


// gRPC 允许您定义四种服务方法
// 1. 一元 RPC,其中客户端向服务器发送单个请求并返回单个响应,就像普通函数调用一样
// rpc SayHello(HelloRequest) returns (HelloResponse);
// 2. 服务器流式处理 RPC,其中客户端向服务器发送请求并获取流以读回消息序列。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
// 3. 客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器,再次使用提供的流。客户端完成消息写入后,它将等待服务器读取消息并返回其响应。同样,gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
// 4. 双向流式处理 RPC,其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某种其他组合。保留每个流中消息的顺序
// rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);


// 生成pb文件: protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative ./four_kinds_method.proto

message StreamRequest {
  string data = 1;
}

message StreamResponse {
  string data = 1;
}

service Greeter {
  // 服务端流
  rpc GetStream(StreamRequest) returns (stream StreamResponse);
  // 客户端流
  rpc PutStream(stream StreamRequest) returns (StreamResponse);
  // 双向流
  rpc AllStream(stream StreamRequest) returns (stream StreamResponse);
}

server实现

package api

import (
	"fmt"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"sync"
	"time"
)

type Service struct {
}

// GetStream 服务端流模式
func (s *Service) GetStream(req *four_kinds_method_pb.StreamRequest, res four_kinds_method_pb.Greeter_GetStreamServer) error {
	i := 0
	for true {
		i++
		res.Send(&four_kinds_method_pb.StreamResponse{
			Data: fmt.Sprintf("%s, %+v", req.Data, time.Now().Unix()),
		})
		if i >= 10 {
			break
		}
		time.Sleep(time.Second * 2)
	}
	return nil
}

// PutStream 客户端流模式
func (s *Service) PutStream(clientStream four_kinds_method_pb.Greeter_PutStreamServer) error {
	for {
		recv, err := clientStream.Recv()
		if err != nil {
			fmt.Println("PutStreamErr: ", err)
			break
		} else {
			fmt.Println("PutStreamRecv: ", recv.Data)
		}
	}
	return nil
}

// AllStream 双向流模式
func (s *Service) AllStream(allStream four_kinds_method_pb.Greeter_AllStreamServer) error {

	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			recv, err := allStream.Recv()
			if err != nil {
				fmt.Println("服务端接受数据错误: ", err)
				break
			} else {
				fmt.Println("服务端接受数据成功: ", recv.Data)
			}

			// break
		}
	}()

	go func() {
		defer wg.Done()
		for {
			allStream.Send(&four_kinds_method_pb.StreamResponse{
				Data: "我是服务端:hello world",
			})
			// break
		}
	}()

	wg.Wait()
	return nil
}

server-main启动服务

package main

import (
	"go-example/grpc/four_kinds_service_method/api"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"net"
)

func main() {

	logger, _ := zap.NewDevelopment()

	server := grpc.NewServer()

	four_kinds_method_pb.RegisterGreeterServer(server, &api.Service{})

	listen, err := net.Listen("tcp", ":6662")
	if err != nil {
		logger.Fatal("net.Listen error", zap.Error(err))
	}
	logger.Info("grpc four_kinds_method service started [127.0.0.1:6662] ")
	err = server.Serve(listen)
	if err != nil {
		logger.Fatal("grpc serve error", zap.Error(err))
	}
}

client

package main

import (
	"context"
	"fmt"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {

	conn, err := grpc.Dial("127.0.0.1:6662", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}

	client := four_kinds_method_pb.NewGreeterClient(conn)

	//stream, _ := client.GetStream(context.Background(), &four_kinds_method_pb.StreamRequest{Data: "Hello World"})
	//for {
	//	recv, err := stream.Recv()
	//	if err != nil {
	//		fmt.Println("err:", err)
	//		break
	//	}
	//	fmt.Println("recv: ", recv)
	//}

	//putStream, err := client.PutStream(context.Background())
	//if err != nil {
	//	panic(err)
	//}
	//
	//i := 0
	//for {
	//	i++
	//	putStream.Send(&four_kinds_method_pb.StreamRequest{
	//		Data: "Hello World" + strconv.Itoa(i),
	//	})
	//	time.Sleep(time.Second * 2)
	//	if i > 10 {
	//		break
	//	}
	//}

	stream, err := client.AllStream(context.TODO())
	if err != nil {
		fmt.Println("err1:", err)
	}
	//for {

	err = stream.Send(&four_kinds_method_pb.StreamRequest{
		Data: "hello php",
	})
	fmt.Println("err:", err)
	stream.CloseSend()
	//}

	//go func() {
	//	for {
	//		recv, _ := stream.Recv()
	//		fmt.Println(recv)
	//	}
	//}()
}