HTTP事件流 text/event-stream

发布时间 2024-01-06 14:46:13作者: 梅丹隆

GitHub All-in-one OpenAI Demo

一、依赖

<dependency>
  <groupId>org.asynchttpclient</groupId>
  <artifactId>async-http-client</artifactId>
  <version>2.12.3</version>
</dependency>

二、事件流处理器

@Slf4j
public class ChatGptStreamedHandler implements StreamedAsyncHandler {
    @Override
    public State onStream(Publisher publisher) {
        publisher.subscribe(new ChatGptSubscriber());
        log.info("》》》》》 onStream");
        return State.CONTINUE;
    }

    @Override
    public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
        log.info("》》》》》onStatusReceived");
        return responseStatus.getStatusCode() == 200 ? State.CONTINUE : State.ABORT;
    }

    @Override
    public State onHeadersReceived(HttpHeaders headers) throws Exception {
        log.info("》》》》》 头信息处理");
        return State.CONTINUE;
    }

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
        log.info("》》》》》 onBodyPartReceived");
        return State.CONTINUE;
    }

    @Override
    public void onThrowable(Throwable t) {
        log.error("发生异常", t);
    }

    @Override
    public Object onCompleted() throws Exception {
        log.info("》》》》》 完成");
        return State.ABORT;
    }
}

三、事件订阅者

{
  "bodyByteBuffer": {
    "array": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
    "limit": 50,
    "position": 0
  },
  "bodyPartBytes": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
  "last": false
}
{
  "id": "chatcmpl-7WHp5USDDqtzkqXXXXXXXXXXXXX",
  "object": "chat.completion.chunk",
  "created": 1687929363,
  "model": "gpt-3.5-turbo-0613",
  "choices": [
    {
      "index": 0,
      "delta": {
        "content": "你"
      },
      "finish_reason": null
    }
  ]
 }
@Slf4j
public class ChatGptSubscriber implements Subscriber {

    private final static String DATA_SEPARATOR = "data: ";
    private final static String FINISH_MARK = "[DONE]";

    @Override
    public void onSubscribe(Subscription s) {
        log.info(">>>>> onSubscribe  {}", JSON.toJSONString(s));
        s.request(Long.MAX_VALUE);
    }

    // 暂存字段
    private String tempStorage = Strings.EMPTY;

    @Override
    public void onNext(Object o) {
        String bodyDatas = new String(JSON.parseObject(JSON.toJSONString(o)).getBytes("bodyPartBytes"));
        log.info(">>>>> onNext  {}", bodyDatas);
        boolean firstLine = false;
        StringBuilder words = new StringBuilder();
        if(bodyDatas.startsWith(DATA_SEPARATOR)){
            firstLine = true;
            // 结尾刚好为完整JSON
            if(StringUtil.isNotEmpty(tempStorage)){
                // 通用处理
                words.append(getWord(tempStorage));
            }
        }
        String[] singleWords = bodyDatas.split(DATA_SEPARATOR);

        for(int i=0; i<singleWords.length; i++){
            String singleWordJsonStr = singleWords[i];
            // 校验
            if(StringUtil.isEmpty(singleWordJsonStr)){
                continue;
            }
            if(FINISH_MARK.equals(singleWordJsonStr)){
                break;
            }
            // 结尾处理
            if(i == singleWords.length -1){
                tempStorage = singleWordJsonStr;
                break;
            }
            // 开头处理
            if(!firstLine && i==0 && StringUtil.isNotEmpty(tempStorage)){
                singleWordJsonStr = tempStorage + singleWordJsonStr;
            }
            // 通用处理
            String word = getWord(singleWordJsonStr);
            if(StringUtil.isNotEmpty(word)){
                words.append(word);
            }
        }

        log.info(">>>>> ai says: {}", words);
    }

    private String getWord(String singleWordJsonStr) {
        ChatCompletionResponseDTO chatCompletionResponseDTO = OpenAiUtil.checkAndGetResponse(singleWordJsonStr, ChatCompletionResponseDTO.class);
        return chatCompletionResponseDTO.getChoices().get(0).getDelta().getContent();
    }

    @Override
    public void onError(Throwable t) {
        //这里就是监听到的响应
        log.info(">>>>> onError  {}", JSON.toJSONString(t));
    }

    @Override
    public void onComplete() {
        log.info(">>>>> onComplete ");
    }
}

四、请求调用

public static void main(String[] args) throws Exception {
    //原始地址
    String apiUrl = "https://api.openai.com/v1/chat/completions";
    String apiKey = "";
    List<JSONObject> msgData = new ArrayList<>();
    msgData.add(
            new JSONObject()
                    .fluentPut("role", "user")
                    .fluentPut("content", "200字夸奖我一下")
    );
    JSONObject req = new JSONObject()
            // 完成时要生成的最大token数量。
            // 提示的token计数加上max_tokens不能超过模型的上下文长度。大多数模型的上下文长度为2048个token(最新模型除外,支持4096个)。
            .fluentPut("max_tokens", 512)
//                .fluentPut("model", "gpt-3.5-turbo")
            .fluentPut("model", "gpt-4")
            // 可选 默认值1
            // 使用什么样的采样温度,介于0和2之间。较高的值(如0.8)将使输出更加随机,而较低的值(例如0.2)将使其更加集中和确定。
            // 通常建议更改它或top_p,但不能同时更改两者。
            .fluentPut("temperature", 0.6)
            //告诉ChatGpt 使用流式返回,传FALSE 则返回完整的JSON(有点慢)
            .fluentPut("stream", Boolean.TRUE)
            .fluentPut("messages", msgData);

    log.info("》》 {}", JSON.toJSONString(req));
    AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
    asyncHttpClient.preparePost(apiUrl)
            .addHeader("Content-Type", "application/json")
            .addHeader("Authorization", "Bearer " + apiKey)
            .addHeader("Accept", "text/event-stream")
            .setBody(JSON.toJSONString(req))
            .execute(new ChatGptStreamedHandler())
            .get();
    asyncHttpClient.close();
}

五、输出

image.png
通过输出我们可以发现,实际上也是先获取全部结果,再按照流式进行返回。并没有缩短等待时长


参考:
JAVA 监听 text/event-stream ,事件流 SSE_java text/event-stream_高冷滴互联网农民工的博客-CSDN博客