Asp-Net-Core学习笔记:gRPC快速入门

发布时间 2023-07-11 23:12:27作者: 程序设计实验室

前言

此前,我在做跨语言调用时,用的是 Facebook 的 Thrift,挺轻量的,还不错。

Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服务开发”而开发的。它通过一个代码生成引擎联合了一个软件栈,来创建不同程度的、无缝的跨平台高效服务,可以使用C#、C++(基于POSIX兼容系统)Cappuccino、Cocoa、Delphi、Erlang、Go、Haskell、Java、Node.js、OCaml、Perl、PHP、Python、Ruby和Smalltalk编程语言开发。 2007由Facebook开源,2008年5月进入Apache孵化器, 2010年10月成为Apache的顶级项目。

最近的项目中也有类似的需求,这次打算使用 Google 的 gRPC,原因是 gRPC 知名度也很高,之前一直想用但没有场景,加上 AspNetCore 的文档里有介绍,看起来官方也是推荐使用这种方式进行 RPC 调用。

所以就果断开始了 gRPC 实践,最终做出来的效果还是可以的(这是后话)。

本文主要介绍 C# 与 Python 之间,使用 gRPC 进行跨语言调用。

概念科普

什么是RPC?

远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。 比如 Java RMI(远程方法调用(Remote
Method Invocation)。能够让在某个java虚拟机上的对象像调用本地对象一样调用另一个java虚拟机中的对象上的方法)。

从上图可以看出, RPC 本身是 client-server模型,也是一种 request-response 协议。
有些实现扩展了远程调用的模型,实现了双向的服务调用,但是不管怎样,调用过程还是由一个客户端发起,服务器端提供响应,基本模型没有变化。
服务的调用过程为:

  1. client调用client stub,这是一次本地过程调用
  2. client stub将参数打包成一个消息,然后发送这个消息。打包过程也叫做 marshalling
  3. client所在的系统将消息发送给server
  4. server的的系统将收到的包传给server stub
  5. server stub解包得到参数。 解包也被称作 unmarshalling
  6. 最后server stub调用服务过程. 返回结果按照相反的步骤传给client

关于 gRPC

gRPC 是一种与语言无关的高性能远程过程调用 (RPC) 框架。

前文有说过,这个是 Google 开发的,以下是 AspNetCore 的文档中,对 gRPC 的介绍。

gRPC 的主要优点是:

  • 现代高性能轻量级 RPC 框架。
  • 协定优先 API 开发,默认使用协议缓冲区,允许与语言无关的实现。
  • 可用于多种语言的工具,以生成强类型服务器和客户端。
  • 支持客户端、服务器和双向流式处理调用。
  • 使用 Protobuf 二进制序列化减少对网络的使用。

这些优点使 gRPC 适用于:

  • 效率至关重要的轻量级微服务。
  • 需要多种语言用于开发的 Polyglot 系统。
  • 需要处理流式处理请求或响应的点对点实时服务。

gRPC 使用 .proto 文件来定义接口和数据类型,同时提供了通过 .proto 文件生成不同语言调用代码的工具。

项目介绍

本文的项目是使用 C# 调用 Python 写的服务,这个服务是一个大语言模型。

在本项目中,C# 项目作为客户端,Python 项目作为服务端。

编写 .proto 文件

第一步,要先编写用于定义接口和数据结构的 .proto 文件。

将下面代码保存到 chat.proto 文件里,接下来在要接入 gRPC 的每个项目里,都要复制一份这个文件。

syntax = "proto3";

import "google/protobuf/wrappers.proto";

option csharp_namespace = "AIHub.Blazor";

package aihub;

service ChatHub {
  rpc Chat (ChatRequest) returns (ChatReply);
  rpc StreamingChat (ChatRequest) returns (stream ChatReply);
}

message ChatRequest {
  string prompt = 1;
  repeated google.protobuf.StringValue history = 2;
  int32 max_length = 3;
  float top_p = 4;
  float temperature = 5;
}

message ChatReply {
  string response = 1;
}

这里定义了两个 Chat 方法,其中一个是一元调用,一个是流式输出。

gRPC 服务可以有不同类型的方法,服务发送和接收消息的方式取决于所定义的方法的类型

  • 一元 - 调用完就返回,同步方法
  • 服务器流式处理 - 服务端流式返回数据
  • 客户端流式处理 - 客户端流式写入数据
  • 双向流式处理

都挺好理解的,在 proto 定义里面也很易懂,stream 放在哪里,哪里就是流式。

stream 放在请求参数,那客户端输入就是流式的,放在返回值前面,那服务端的返回就是流式。

都放就代表双向流式。

接着又定义了用到的数据结构,一个输入参数 ChatRequest,一个返回参数 ChatReply

string prompt = 1; 这里赋值的意思是这个参数的位置,不是定义变量的赋值?,第一次用差点整蒙了。

服务端 Python 项目

先在 Python 项目里,把 gRPC 服务器搭建起来。

把上面的 chat.proto 文件,复制到 Python 项目的目录里面。

Python 项目需要安装依赖

pip install grpcio
pip install grpcio-tools

执行命令生成代码

python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. ./chat.proto

上面的命令,可以根据 proto 文件,生成以下代码

  • chat_pb2_grpc.py
  • chat_pb2.py
  • chat_pb2.pyi

编写服务器代码

from concurrent import futures
import logging
import grpc
import chat_pb2
import chat_pb2_grpc

class Greeter(chat_pb2_grpc.ChatHubServicer):
    def Chat(self, request: chat_pb2.ChatRequest, context):
        response, history = model.chat(
            tokenizer,
            request.prompt,
            history=[],
            max_length=request.max_length,
            top_p=request.top_p,
            temperature=request.temperature)
        return chat_pb2.ChatReply(response=response)

    def StreamingChat(self, request: chat_pb2.ChatRequest, context):
        past_key_values, history = None, []
        current_length = 0
        for response, history, past_key_values in model.stream_chat(
                tokenizer, request.prompt, history=history,
                past_key_values=past_key_values,
                return_past_key_values=True):
            print(response[current_length:], end="", flush=True)
            yield chat_pb2.ChatReply(response=response)
            current_length = len(response)
            
def serve():
    port = '50051'
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    chat_pb2_grpc.add_ChatHubServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:' + port)
    server.start()
    print("Server started, listening on " + port)
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()

前面在 proto 里定义的俩方法:Chat, StreamingChat 要自己实现,这里我直接调用 Transformer ,一元方法就直接返回,流式输出使用 yield 生成结果。

扩展:Python gRPC客户端

本文把 Python 项目作为 gRPC 的服务端供 AspNetCore 项目调用

反过来也是没问题的

Python 作为客户端的代码如下

import logging
import grpc
import chat_pb2
import chat_pb2_grpc

def run():
    print("Will try to greet world ...")
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = chat_pb2_grpc.ChatHubStub(channel)
        response = stub.Chat(chat_pb2.ChatRequest(prompt='你好'))
    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    logging.basicConfig()
    run()

客户端 AspNetCore 项目

AspNetCore 与 gRPC 的集成非常容易

先把上面的 proto 文件添加到项目中

在本项目中我放到 Protos/chat.proto

先安装 nuget 包

dotnet add Grpc.AspNetCore

然后编辑 .csproj 添加以下配置

<ItemGroup>
  <Protobuf Include="Protos\chat.proto" GrpcServices="Both" />
</ItemGroup>

其中 GrpcServices="Both" 表示同时生成服务端和客户端代码

完成之后,Build 项目,然后就会在 obj\Debug\net6.0\Protos 目录下生成对应的代码

不过这些代码不需要直接引用,编译器会自动处理。

客户端

编辑 Program.cs 文件

注册一个 gRPC 客户端

builder.Services.AddGrpcClient<ChatHub.ChatHubClient>(options => {
    options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});

如果要注册多个相同的客户端,可以加个名字区分

builder.Services.AddGrpcClient<ChatHub.ChatHubClient>("client1", options => {
    options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});
builder.Services.AddGrpcClient<ChatHub.ChatHubClient>("client2", options => {
    options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});

使用的时候直接注入就完事了。

如果是命名客户端,就先注入 GrpcClientFactory 对象

var clinet1 = grpcClientFactory.CreateClient<ChatHub.ChatHubClient>("client1");
var clinet2 = grpcClientFactory.CreateClient<ChatHub.ChatHubClient>("client2");

发起一个请求(一元模式)

var request = new ChatRequest { Prompt = "哈喽" };
ChatReply reply = ChatClient.Chat(request);

读取流式返回数据

using (var call = ChatClient.StreamingChat(request)) {
  await foreach (var resp in call.ResponseStream.ReadAllAsync()) {
    Console.Write(resp.Response);
  }
}

简简单单,搞定~

服务端

同样很简单

注册服务、配置中间件

builder.Services.AddGrpc();
app.MapGrpcService<ChatService>();

ChatService 代码

public class ChatService : ChatHub.ChatHubBase {
  private readonly ILogger<ChatService> _logger;

  public ChatService(ILogger<ChatService> logger) {
    _logger = logger;
  }
  public override Task<ChatReply> Chat(ChatRequest request, ServerCallContext context) {
    return new Task<ChatReply>(() => new ChatReply { Response = $"你好,{request.Prompt}" });
  }

  public override async Task StreamingChat(ChatRequest request, IServerStreamWriter<ChatReply> responseStream,
                                           ServerCallContext context) {
    for (var i = 0; i < 5; i++) {
      await responseStream.WriteAsync(new ChatReply { Response = $"{i}" });
      await Task.Delay(TimeSpan.FromSeconds(1));
    }
  }
}

同样是把 protobuf 里定义的俩方法实现了一遍。

代码比较易懂,跟上面的 Python 版差不多,这里就不重复解释了。

小结

gRPC 使用起来非常的丝滑,目前来说也没遇到什么坑,可以非常平滑地与已有项目集成,如果有类似的场景,强烈推荐尝试一下 gRPC !

gRPC 的功能很多,本文仅介绍了最基本的使用,更多的请阅读文档,详细有了本文的基础铺垫,读者再阅读文档深入使用时,会比较轻松上手。

PS:最近这个使用到 gRPC 的项目,接下来我会写个小结博客介绍一下~

参考资料