Chatgpt基本调用(revgpt+openai+流式输出)

发布时间 2023-06-28 02:10:47作者: dd随风

创建chatbot类调用

  • 定义ChatBot类

这个主要是通过open ai 来自定义类,自己有账号即可
1.登录openai官网
2.右上角View Api keys
3.然后点击**Create new secret key **即可生成新的API Key

async def async_task_wrapper(name, func, args):  # 创建异步任务
    return name, await func(*args)

def get_or_create_eventloop():  # 创建or拿事件池
    '''making flask concurrent mechanism happy'''
    try:
        return asyncio.get_running_loop()
    except RuntimeError as ex:
        if "no running event loop" in str(ex):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return asyncio.get_event_loop()
        else:
            raise ex

class ChatBot():
    def __init__(self, temperature, role="user", system_msg=None, engine="gpt-3.5-turbo") -> None:
        self.temperature = temperature
        self.role = role
        self.other_role = "user" if role == "assistant" else "assistant"
        self.engine = engine
        if system_msg:
            self.messages = [
                {"role": "system", "content": system_msg},
            ]
        else:
            self.messages = [
                {"role": "system",
                 "content": "You are ChatGPT, a large language model trained by OpenAI. Answer as concisely as possible."},
            ]

    def get_api_key(self):
        # api_key为字符串
        return api_key

    def ask(self, prompt, return_full=False):
        self.messages += [{"role": self.role, "content": prompt}]
        response = openai.ChatCompletion.create(
            model=self.engine,
            temperature=self.temperature,
            messages=self.messages,
        )
        result_txt = response["choices"][0]["message"]["content"]
        self.messages += [{"role": self.other_role, "content": result_txt}]
        # usage = response["usage"]["total_tokens"] / 1000 * 0.002
        return result_txt


    async def ask_async(self, prompt, return_full=False):
        self.messages += [{"role": self.role, "content": prompt}]
        response = await openai.ChatCompletion.acreate(
            model="gpt-3.5-turbo",
            temperature=self.temperature,
            messages=self.messages
        )
        result_txt = response["choices"][0]["message"]["content"]
        self.messages += [{"role": self.other_role, "content": result_txt}]
        usage = response["usage"]["total_tokens"] / 1000 * 0.002
        if return_full:
            return response
        else:
            return result_txt, usage

def stream(prompt, temperature=0.0, engine="gpt-3.5-turbo"):
    # 流式输出函数
    completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=[
        {"role": "user", "content": prompt}], stream=True, temperature=0)
    for line in completion:
        if 'content' in line['choices'][0]['delta']:
            yield line['choices'][0]['delta']['content']



ask 为正常的回复函数,async为异步函数,需要配合事件池来做异步任务.

产生一个chatbot对象,调用类中的get_api_key方法获取key ,然后就可以调用ask方法传prompt获取回答了

# django框架:

# 流式输出代码 
# 需要在stream函数中指定stream=True,并且需要使用StreamingHttpResponse来进行流式处理
chat_bot_obj = ChatBot(temperature=0)
openai.api_key = chat_bot_obj.get_api_key()
response = StreamingHttpResponse(stream(prompt=request.data.get('content'))
return response

配置openai.proxy='ip:prot' ,正常在国内需要配这个代理来访问chatgpt

RevserseGPT

GitHub - acheong08/ChatGPT: Reverse engineered ChatGPT API

  • 流式输出
from revChatGPT.V1 import Chatbot
def get_response(prompt,access_token):
    chatbot = Chatbot(config={
        "access_token": access_token,
        "collect_analytics": True,
    })
    result = chatbot.ask(prompt)
    return result
  • 获取access_token

这个就更方便一些

from revChatGPT.V1 improt Chatbot
 chatbot= Chatbot(config={
        "email": "xxx",
        "password": "xxx",
    })
chatbot.config.print()
## 直接就能获取账号的access_token

django chnnels websocket流式输出chatgpt

from revChatGPT.V1 import Chatbot
async def get_response(prompt):
    res = requests.get('http://192.168.12.52:8982/api/v1/get_key')
    access_token = res.json()['access_token']
    chatbot = Chatbot(config={
        "access_token": access_token,
        "collect_analytics": True,
        "proxy": "socks5h://xxxxx"
    })
    try:
        result = chatbot.ask(prompt)
    except Exception as e:
        raise APIException('xxx')
    yield result
class Chatting(AsyncWebsocketConsumer):

    async def connect(self):
        print('连接成功')

        await self.accept()

    async def disconnect(self, close_code):
        print('断开连接')

    async def receive(self, text_data):
        print('连接成功')
        response = get_response(prompt=prompt)
        async for result in response:
            for word in result:
                await self.send(text_data=word['message'])
                await asyncio.sleep(0)
        await self.send(text_data=json.dumps({"msg": '发完了'}))

必须异步的websocket 不然会阻塞其他接口(我不知道为什么..)

Fastapi sse链接 流式输出chatgpt(openai)

from sse_starlette.sse import EventSourceResponse, ServerSentEvent
async def stream(data: PostData = Body(...)):
    async def event_generator():
        result = ask_chatgpt(prompt='你好')
        for content in result:
            event = ServerSentEvent(data=content)
            yield event
    return EventSourceResponse(event_generator())

RevGPT 踩坑

使用revgpt需要注意版本5.0.0:
我一开始一直用的就是pip install revGPT 没有太注意版本带来的问题
直到有一天我有获取access_token的需求,结果一看报错了!
会报
image
这两种
需要更改revgpt site-package的V1源码
ctrl+f找 login

把之前的改成

if not self.config.get("email") and not self.config.get("password"):

即可!

chatgpt输出需要prompt的控制,不然不稳定~

websocket流式 (适用django channels/ flask/ fastapi)
sse 流式 (fastapi)
django的 StreamingHttpResponse 流式 >> 这个也简单
他们各自的效果其实大差不差,只不过fastapi相比这两个更方便更简单。