GRPC server的四种传输模式

发布时间 2023-05-24 16:48:45作者: 南北丿

GRPC作用,为什么要有GRPC?

gRPC(gRPC Remote Procedure Call)是一种高性能、通用的远程过程调用(RPC)框架,由Google开发并开源。它使用现代的、高效的协议缓冲区(Protocol Buffers)作为接口定义语言(IDL),并提供多种支持多种编程语言的客户端和服务器端库。

gRPC 的主要作用如下:

  1. 远程过程调用:gRPC 允许你像调用本地函数一样调用远程服务器上的函数,简化了分布式系统之间的通信。你可以定义一个接口,指定方法的输入参数和返回值,并生成客户端和服务器端的代码,从而实现跨网络的函数调用。
  2. 高性能:gRPC 使用基于HTTP/2 的传输协议,能够同时处理多个请求和响应,减少了网络延迟。它还使用 Protocol Buffers 作为默认的序列化机制,可以高效地序列化和传输数据,提高了性能和网络利用率。
  3. 多语言支持:gRPC 提供了多种编程语言的客户端和服务器端库,包括但不限于 Go、Java、Python、C++、C# 等。这意味着你可以使用自己喜欢的编程语言来开发 gRPC 的客户端和服务器端,方便了跨语言的开发和集成。
  4. 可扩展性:gRPC 支持基于服务定义的代码生成,可以自动生成服务端和客户端的代码,减少了开发工作量。同时,它还支持双向流式通信和流式传输,使得在大规模系统中处理大量请求和响应变得更加高效。

总而言之,gRPC 提供了一种高性能、跨语言的远程过程调用框架,使得分布式系统之间的通信变得更加简单和高效。它广泛应用于微服务架构、分布式系统和云原生应用开发中。

关于环境配置可参考上一篇博客:https://www.cnblogs.com/nanbeipie/p/17427171.html

创建项目

以go.mod项目为例

创建如下图所示的目录结构:

编写person.proto 以及四种传输服务

syntax = "proto3";  // 告诉编译器  用proto3  来解读

package person;   // 这个包不是goland的包  message 整体都是 person.xxx

//  option go_package="包路径(从mod下开始写);别名";

option go_package= "go_grpc/pb/person;person";   // import person go_grpc/pb/person

message PersonReq{
  string name = 1;   // 类型  变量  和goland中相反(golang中是先变量后类型)  = 1 就是唯一标识的意思
  int32  age = 2;    //   1 2 3 就是唯一标识符  顺序
}

message PersonRes{
  string name = 1;   // 类型  变量  和goland中相反(golang中是先变量后类型)  = 1 就是唯一标识的意思
  int32  age = 2;    //   1 2 3 就是唯一标识符  顺序
}


service SearchService{
  rpc Search(PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  传统的即刻响应
  rpc SearchIn(stream PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参为流的形式
  rpc SearchOut(PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  出参为流的形式
  rpc SearchIO(stream PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参出参都为流的形式
}

运行脚本生成对应的代码

即刻传输

server端

service SearchService{
  rpc Search(PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  传统的即刻响应
  rpc SearchIn(stream PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参为流的形式
  rpc SearchOut(PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  出参为流的形式
  rpc SearchIO(stream PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参出参都为流的形式
}

编写服务端和客户端的代码

服务端:主要步骤有四步:第一步:取出server服务、第二步:挂载方法、第三步:注册服务、第四步:创建监听



完整代码:

package main

import (
	"context"
	"fmt"
	"go_grpc/pb/person"
	"google.golang.org/grpc"

	"net"
)

type personServe struct {
	person.UnimplementedSearchServiceServer
}
// 将服务的方法引进进来
func (*personServe) Search(ctx context.Context,req *person.PersonReq) (*person.PersonRes, error) {
	name := req.GetName()
	res :=&person.PersonRes{Name: "我收到了"+name+"的信息"}
	return res, nil
}
func (*personServe) SearchIn(person.SearchService_SearchInServer) error {
	return nil
}
func (*personServe) SearchOut(*person.PersonReq, person.SearchService_SearchOutServer) error {
	return nil
}
func (*personServe) SearchIO(person.SearchService_SearchIOServer) error {
	return nil
}
// 创建完成了一个服务端
func main() {
	// 创建监听
	l, err := net.Listen("tcp", ":8888") // 使用 ":8888" 表示在所有可用 IP 地址上监听 8888 端口
	if err != nil {
		fmt.Printf("Failed to listen: %v\n", err)
		return
	}
	s := grpc.NewServer()
	person.RegisterSearchServiceServer(s,&personServe{})   // personServe注册进来
	s.Serve(l)  // 建立监听

	defer s.Stop() // 在程序退出时关闭服务器

	fmt.Println("Server started, listening on :8888")
	if err := s.Serve(l); err != nil {
		fmt.Printf("Failed to serve: %v\n", err)
		return
	}
}

client端

客户端:主要步骤有四步:第一步:创建一个连接、第二步:new一个client、第三步:调用client方法、第四步:获取返回值

完整代码:

package main

import (
	"context"
	"fmt"
	"go_grpc/pb/person"
	"google.golang.org/grpc"
)

func main() {

	// 创建连接
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
	if err != nil {
		fmt.Printf("Failed to connect: %v\n", err)
		return
	}
	defer conn.Close()


	// 创建客户端实例
	client :=person.NewSearchServiceClient(conn)

	res, err := client.Search(context.Background(), &person.PersonReq{Name: "我是liup"})
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(res)


}

运行结果截图:

入参为流的形式、流式传入

service SearchService{
  rpc Search(PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  传统的即刻响应
  rpc SearchIn(stream PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参为流的形式
  rpc SearchOut(PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  出参为流的形式
  rpc SearchIO(stream PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参出参都为流的形式
}

server端

func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
	for{
		req, err := server.Recv()
		fmt.Println(req)
		if err !=nil{
			server.SendAndClose(&person.PersonRes{Name: "完成了"})
			break
		}
	}

	return nil
}

循环传输流

client端

	// 流式输入
	c,err :=client.SearchIn(context.Background())
	if err != nil {
		fmt.Printf("Failed to create stream: %v\n", err)
		return
	}
	i :=0
	for {
		if i>10{
			res, err := c.CloseAndRecv()
			if err != nil {
				fmt.Printf("Failed to receive response: %v\n", err)
				return
			}
			fmt.Println(res)
			break
		}
		time.Sleep(1*time.Second)
		c.Send(&person.PersonReq{Name: "我是进来的信息"})
		i++
	}

运行结果截图:


出参为流的形式

service SearchService{
  rpc Search(PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  传统的即刻响应
  rpc SearchIn(stream PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参为流的形式
  rpc SearchOut(PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  出参为流的形式
  rpc SearchIO(stream PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参出参都为流的形式
}

server端

func (*personServe) SearchOut(req *person.PersonReq,server person.SearchService_SearchOutServer) error {
	name :=req.Name
	i :=0
	for {
		if i>10{
			break
		}
		time.Sleep(1*time.Second)
		server.Send(&person.PersonRes{Name: "我拿到了"+name})
		i++
	}

	return nil
}

与入参流不同,这里只有一个Send方法

client端

	// 流式输出
	c,err :=client.SearchOut(context.Background(),&person.PersonReq{Name: "liup"})
	if err != nil {
		fmt.Printf("Failed to create stream: %v\n", err)
		return
	}

	for  {
		req,err :=c.Recv()
		if err !=nil {
			fmt.Println(err)
			break
		}
		fmt.Println(req)
	}

运行结果截图:

出参、入参都为流的形式

service SearchService{
  rpc Search(PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  传统的即刻响应
  rpc SearchIn(stream PersonReq) returns (PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参为流的形式
  rpc SearchOut(PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  出参为流的形式
  rpc SearchIO(stream PersonReq) returns (stream PersonRes);   // 返回方式固定一个放入参 一个放出参,  入参出参都为流的形式
}

server端

func (*personServe) SearchIO(server person.SearchService_SearchIOServer) error {
	i :=0
	str := make(chan string)
	go func() {
		for {
			i++
			req,_ := server.Recv()
			if i>10{
				str<- "结束"

				break
			}

			str<-req.Name
		}
	}()
	for {
		s :=<-str
		if s=="结束"{
			server.Send(&person.PersonRes{Name: s})
			break
		}
		server.Send(&person.PersonRes{Name: s})
	}
	return nil
}


client端

	// 流式输入输出
	c,err :=client.SearchIO(context.Background())
	wg:=sync.WaitGroup{}    //创建等待的信息
	wg.Add(2)
	go func() {
		for {
			time.Sleep(1*time.Second)
			err :=c.Send(&person.PersonReq{Name: "liup"})
			if err!=nil{
				wg.Done()
				break
			}
		}
	}()
	go func() {
		for {
			req,err := c.Recv()
			if err !=nil{
				fmt.Println(err)
				wg.Done()
				break
			}
			fmt.Println(req)

		}
	}()
	wg.Wait()

运行结果