chatgpt SSE传输,打字显示相关

发布时间 2023-09-20 15:26:45作者: PKGAME

一、SSE简介

二、代码实操

三、问题

 

一、简介

text/event-stream

  是一种用于服务器向客户端推送事件的媒体类型(Media Type)。它是基于 HTTP 协议的一种流式传输技术,也被称为 Server-Sent Events(SSE)

  格式:text/event-stream 使用纯文本的格式,基于换行符分隔每个事 件。每个事件由多个字段组成,包括事件类型、数据等。常见字段有 event、data、id、retry 等(包含四个字段:event、data、id和retry。event表示事件类型,data表示消息内容,id用于设置客户端EventSource对象的“last event ID string”内部属性,retry指定了重新连接的时间)。
  事件流:服务器通过持久连接(长轮询或HTTP/2流)将事件流式传输给客户端。客户端通过监听 onmessage 事件来接收并处理从服务器发送的事件。
  服务器推送:服务器可以在任何时间点向客户端推送事件,无需客户端发起请求。这使得实时更新、通知和推送等应用场景变得可行,例如聊天应用、实时股票报价、即时通知等。
  处理丢失连接:
    如果客户端与服务器之间的连接中断,客户端可以尝试重新连接以继续接收事件。
    客户端可以使用 Last-Event-ID 请求头字段来指定从特定事件 ID 开始接收事件,以便处理连接断开后的事件不丢失。

  标准的 SSE 响应解析流程:
  1. 先使用 \n\n 进行分割
  2. 分割的每个元素使用 \n 进行分割,从而得到 event 和 data
  3. 对 data 进行序列化解析

 

二、代码实操

  由于我们这是针对chatgpt得SSE传输,所以参考了官方文档内容,具体可以开外网访问:去看看 

  1.服务端的接受与发送。

    正常来讲,如果使用python,直接就可以实现事件流传输。

# imports
import openai  # for OpenAI API calls
# 设定api_key
openai.api_key = "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" # 已被封号

response = openai.ChatCompletion.create( model='gpt-3.5-turbo', messages=[ {'role': 'user', 'content': "What's 1+1? Answer in one word."} ], temperature=0, stream=True # this time, we set stream=True ) for chunk in response: print(chunk)

   打印内容如下:

{
  "choices": [
    {
      "delta": {
        "role": "assistant"
      },
      "finish_reason": null,
      "index": 0
    }
  ],
  "created": 1677825464,
  "id": "chatcmpl-6ptKyqKOGXZT6iQnqiXAH8adNLUzD",
  "model": "gpt-3.5-turbo-0301",
  "object": "chat.completion.chunk"
}
{
  "choices": [
    {
      "delta": {
        "content": "\n\n"
      },
      "finish_reason": null,
      "index": 0
    }
  ],
  "created": 1677825464,
  "id": "chatcmpl-6ptKyqKOGXZT6iQnqiXAH8adNLUzD",
  "model": "gpt-3.5-turbo-0301",
  "object": "chat.completion.chunk"
}
{
  "choices": [
    {
      "delta": {
        "content": "2"
      },
      "finish_reason": null,
      "index": 0
    }
  ],
  "created": 1677825464,
  "id": "chatcmpl-6ptKyqKOGXZT6iQnqiXAH8adNLUzD",
  "model": "gpt-3.5-turbo-0301",
  "object": "chat.completion.chunk"
}
{
  "choices": [
    {
      "delta": {},
      "finish_reason": "stop",
      "index": 0
    }
  ],
  "created": 1677825464,
  "id": "chatcmpl-6ptKyqKOGXZT6iQnqiXAH8adNLUzD",
  "model": "gpt-3.5-turbo-0301",
  "object": "chat.completion.chunk"
}

   openai 已经封装好了SSE传输,直接调用遍历就行了。

   但是,我这需要中转,所以在flask 重新设置了下,可以该路由实现服务端发送SSE

   先封装了send_msg 方法

def send_msg(content, stream=False):
    """
    :param content:
        生成文本
        message = []
        user = {"role": "user", "content": "Who won the world series in 2020?"},
        assistant = {"role": "assistant", "content": "Who won the world series in 2020?"},
        system = {"role": "system", "content": "你是一个编程助手"},
    :return:
        response Example:
        {
          "choices": [
            {
              "finish_reason": "stop",
              "index": 0,
              "message": {
                "content": "The Los Angeles Dodgers won the World Series in 2020.",
                "role": "assistant"
              }
            }
          ],
          "created": 1687922578,
          "id": "chatcmpl-7WG3eLnLxNxpPPfqXv9v9g6GLJYVO",
          "model": "gpt-3.5-turbo-0613",
          "object": "chat.completion",
          "usage": {
            "completion_tokens": 13,
            "prompt_tokens": 27,
            "total_tokens": 40
          }
        }
    """
    openai.api_key = "openai api key"  # 已付费账号 plus 1账号
    print(content)
    # max_tokens = 10,  token 代表返回字符长度
    if stream:
        response = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=content, n=1, temperature=0.5,
                                                timeout=300, max_tokens=2048, stream=True)
        return response
    else:
        response = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=content,  n=1, temperature=0.5,
                                                timeout=300, max_tokens=2048)
        generated_text = response.choices[0].message.content
        # 打印生成的文本
        print("*" * 100)
        print("%s " % generated_text)
        print("*" * 100)
        return response.choices[0].message

  在接口处调用

@chatgpt.route('/streamGPT3', methods=['POST'])
def stream3():
    """
       chatgpt 系统流式调试聊天的接口
         与上面类似,是流传输
       """
    json_data = request.json
    data = json_data.get("data")
    # 检查json
    if not all([data]):
        # 参数不全
        return JsonResponse.msg_fail("参数错误")
    # 拼接系统json
    try:
        back = send_msg(data, stream=True)

        def generate():
            """
            标准的 SSE 流响应结构:
            event: xxxx
            data: xxxx

            包含 event和data两部分,其中 event 可以为空,由于 openai gpt 接口的 event 为空,有些程序并没兼容含 event 的情况,会出错。
            标准的 SSE 响应解析流程:
            1. 先使用 \n\n 进行分割
            2. 分割的每个元素使用 \n 进行分割,从而得到 event 和 data
            3. 对 data 进行序列化解析
            :return:
            """

            for chunk in back:
                """
                {
                  "choices": [
                    {
                      "delta": {
                        "content": "2"
                      },
                      "finish_reason": null,
                      "index": 0
                    }
                  ],
                  "created": 1677825464,
                  "id": "chatcmpl-6ptKyqKOGXZT6iQnqiXAH8adNLUzD",
                  "model": "gpt-3.5-turbo-0301",
                  "object": "chat.completion.chunk"
                }
                由于不想暴露太多信息,所以只取了chunk['choices'][0]
                """
                str_trunk = 'data: ' + json.dumps(chunk['choices'][0]) + '\n\n'
                yield "event: answer\n"
                yield str_trunk


        headers = {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no',
        }
        return Response(generate(), mimetype='text/event-stream;charset:UTF-8', headers=headers)
    except Exception as e:
        print(data)
        print(e)
        return JsonResponse.msg_fail("openai错误")

   在postman调试结果:

  2. 前端接收

  目前对SSE接收在前端只找到两个方法

  使用 EventSource 对象:

const endpoint = 'http://example.com/sse';

const eventSource = new EventSource(endpoint);

eventSource.addEventListener('open', event => {
  console.log('Connected to SSE server');
});

eventSource.addEventListener('message', event => {
  console.log(event.data);
});

eventSource.addEventListener('error', error => {
  console.error(error);
});

setTimeout(() => {
  eventSource.close();
  console.log('Connection closed');
}, 60000);

  EventSource 在这里不支持post请求,所以在这里不符合业务需求

  这里用的是第二中,fetch

const fetch_test = async () =>{
            const response = await fetch('/api/chatgpt/streamGPT3', {
                    method: "POST",
                    cache: "no-cache",
                    keepalive: true,
                    headers: {
                        "Content-Type": "application/json",
                        "Accept": "text/event-stream",
                    },
                    body: JSON.stringify(data),
                });
                
            const reader = response.body.getReader();
            let buffer = ''while (true) {
                const {value, done} = await reader.read();
                if (done) break;
                buffer = new TextDecoder().decode(value)
                const lines = buffer.split('\n');
                let new_ob = {}
                lines.forEach(line => {
                    // 解析 SSE 数据
                    const fields = line.split(':');
                    if (fields.length > 0){
                        let key = fields[0]
                        fields.shift()
                        new_ob[key] = fields.join(':')
                        // console.log(1, JSON.stringify(new_ob))
                    }
                });
                
                
                console.log(2, new_ob)
                new_ob['data'] = JSON.parse(new_ob['data'])
                 // 处理业务逻辑
                
            }
        }
        fetch_test()

   在new_ob就是每次解析出来的数据,在后面可以接上自己业务代码,这里有点麻烦的就是需要自己解析数据。

   如果不想解析数据,可以安装封装好的

   先安装好

npm i @microsoft/fetch-event-source

   调用代码

import { fetchEventSource } from '@microsoft/fetch-event-source'
let url = "你的地址"
let eventSource = fetchEventSource(url, {
            method: 'POST',
            headers: {
              "Content-Type": 'application/json',
            },
            body: JSON.stringify(data),
            onmessage(event) {
                // event
                // {
                //     data: ""
                //     event: ""
                //     id: ""
                //     retry: undefined
                // }
                // console.log(event)
                let data = JSON.parse(event.data)if (data['delta']['content']){
                 // 填写业务代码
                }
                if (data['finish_reason'] == 'stop'){
                    // sse 完成,
               
                }
            },
            onerror() {
                // 发生错误
            }
        })

 

   

 三、问题

  1.在使用python 使用 json.dumps(chunk)时,会出现前端无法解析的情况

    使用flask.json,就可以了