问题描述:请求后接收的数据流,不走EventSourceListener的onEvent事件,但onOpen onClosed都是正常走的。
问题原因:默认的接口返回是StreamingResponse不是EventSourceResponse,无法走标准sse协议的onEvent()方法
目标需求:在不改动模型方面接口的情况下,接收到流式数据并通过sse协议转发前端。
网上现成方案:
可以使用SSE协议
1、 pip install sse_starlette.sse
2、引入依赖: from sse_starlette.sse import EventSourceResponse
3、将StreamingResponse改为EventSourceResponse
前端使用EventSource进行请求,因原生EventSource支持get请求,可以使用Fetch请求或者微软的 @microsoft/fetch-event-source进行请求
最终解决方式:因为不能修改模型api的基础上,不能够使用标准sse来获取数据流,所以最终自己实现实时读取流式数据。故使用OkHttp包,请求链接,并把返回的请求生成一个缓冲区,通过逐字读取,匹配到两个会话之间断点的标志(“}{”)来实现分割,并把分割的数据进行转发。
参考code:
public void sendQuery(ChatGlmDto chatGlmDto) { OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(1, TimeUnit.DAYS) .readTimeout(1, TimeUnit.DAYS) .build(); okhttp3.RequestBody formBody = okhttp3.RequestBody.create(JSON.toJSONString(chatGlmDto), MediaType.parse("application/json; charset=utf-8")); Request.Builder requestBuilder = new Request.Builder(); Request request = requestBuilder.url(apiUrl).post(formBody).build(); try (Response response = client.newCall(request).execute()) { ResponseBody responseBody = response.body(); if (responseBody == null) { // 处理响应体为空的情况 return; } try (BufferedSource source = responseBody.source()) { // 将源缓冲区包装成一个流式输入流 InputStream inputStream = source.inputStream(); // 此处需要将编码格式设置为UTF_8,解决 InputStream 流读取时的中文乱码问题 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); int byteRead; int preRead = 0; String str = ""; while ((byteRead = reader.read()) != -1) { char nowStr = (char) byteRead; char preStr = (char) preRead; if (preStr == '}' && nowStr == '{' ){ // 转发给客户端 log.info("Received message: " + str); callback.send(str); str = ""; } str = str + nowStr; preRead = byteRead; } //最后一个为关联文档, log.info("Received message: " + str); callback.send(str); } } catch (IOException e) { log.error("获取失败",e); } }