Langchain语言模型提问请求,提问使用非标准的sse请求获取流式数据,java后台版解决方式

发布时间 2023-10-26 19:34:27作者: 我就是coder

问题描述:请求后接收的数据流,不走EventSourceListener的onEvent事件,但onOpen onClosed都是正常走的。

 

问题原因:默认的接口返回是StreamingResponse不是EventSourceResponse,无法走标准sse协议的onEvent()方法

 

目标需求:在不改动模型方面接口的情况下,接收到流式数据并通过sse协议转发前端。

 

网上现成方案:

可以使用SSE协议
1、 pip install sse_starlette.sse
2、引入依赖: from sse_starlette.sse import EventSourceResponse
3、将StreamingResponse改为EventSourceResponse

前端使用EventSource进行请求,因原生EventSource支持get请求,可以使用Fetch请求或者微软的 @microsoft/fetch-event-source进行请求

 

最终解决方式:因为不能修改模型api的基础上,不能够使用标准sse来获取数据流,所以最终自己实现实时读取流式数据。故使用OkHttp包,请求链接,并把返回的请求生成一个缓冲区,通过逐字读取,匹配到两个会话之间断点的标志(“}{”)来实现分割,并把分割的数据进行转发。

 

参考code

public void sendQuery(ChatGlmDto chatGlmDto) {

    OkHttpClient client = new OkHttpClient.Builder()
            .connectTimeout(1, TimeUnit.DAYS)
            .readTimeout(1, TimeUnit.DAYS)
            .build();
    okhttp3.RequestBody formBody = okhttp3.RequestBody.create(JSON.toJSONString(chatGlmDto), MediaType.parse("application/json; charset=utf-8"));
    Request.Builder requestBuilder = new Request.Builder();
    Request request = requestBuilder.url(apiUrl).post(formBody).build();
    try (Response response = client.newCall(request).execute()) {
        ResponseBody responseBody = response.body();
        if (responseBody == null) {
            // 处理响应体为空的情况
            return;
        }

        try (BufferedSource source = responseBody.source()) {
            // 将源缓冲区包装成一个流式输入流
            InputStream inputStream = source.inputStream();

            // 此处需要将编码格式设置为UTF_8,解决 InputStream 流读取时的中文乱码问题
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
            int byteRead;
            int preRead = 0;

            String str = "";

            while ((byteRead = reader.read()) != -1) {

                char nowStr = (char) byteRead;
                char preStr = (char) preRead;
                if (preStr == '}' && nowStr == '{' ){
                    // 转发给客户端
                    log.info("Received message: " + str);
                    callback.send(str);
                    str = "";
                }
                str = str + nowStr;
                preRead = byteRead;
            }
            //最后一个为关联文档,
            log.info("Received message: " + str);
            callback.send(str);

        }
    } catch (IOException e) {
        log.error("获取失败",e);
    }
}