python websocket server and client 用户认证

发布时间 2023-10-19 14:35:53作者: lshan

 

WebSocketServer.py
pip install websockets
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @mail    : lshan523@163.com
# @Time    : 2023/10/18 9:58
# @Author  : Sea
# @File    : WebSocketServer.py
# @history:  pip install websockets
# server 常见的参数:
# host: 指定服务器绑定的主机名,默认为localhost。
# port: 指定服务器绑定的端口号,默认为8765。
# path: 指定WebSocket的URL路径,默认为根路径。
# max_size: 指定接收消息的最大长度,默认为None,表示没有限制。
# max_queue: 指定消息队列的最大长度,默认为None,表示没有限制。
# ping_interval: 指定发送ping消息的时间间隔,默认为None,表示不发送ping消息。
# ping_timeout: 指定等待pong消息的超时时间,默认为None,表示不等待pong消息。
# close_timeout: 指定关闭连接的超时时间,默认为None,表示不等待关闭连接。
# subprotocols: 指定支持的子协议列表,默认为None,表示不限制子协议。
# select_subprotocol: 指定选择子协议的回调函数,默认为None。
# compression: 指定是否启用压缩,默认为False。
# server: 指定自定义的服务器类,默认为None。
# origins: 指定允许的来源列表,默认为None,表示不限制来源。
# extensions: 指定启用的扩展列表,默认为None,表示不启用扩展。
# 这些参数可以在创建WebSocket服务器对象时作为关键字参数传递。例如:
# ****************************
import asyncio
from concurrent.futures import ThreadPoolExecutor

import websockets
from expiringdict import ExpiringDict
import traceback
import threading
class WebSocketServer:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        # client is dict(id,ws)
        self.clients = ExpiringDict(max_len=1000, max_age_seconds=60 * 60 * 1)
        self.pools = ThreadPoolExecutor(1)

    async def on_authenticate(self, websocket, path):
        # 读取连接路径 path= /uid/name/token =>['', 'id', 'name', 'token']
        print(f"Connected to {path}")
        if str(path).__contains__(f"/"):
            ags = str(path).split(f"/")
            if len(ags) == 4:
                self.clients[str(ags[1])] = websocket
                print(str(ags[2]) + f" connect success!")
                return True, ags[1], ags[2], ags[3]
        # 如果认证通过,返回 True;否则返回 False
        return True, str(path), "", ""

    async def on_message(self, websocket):

        # await websocket.send(f"{message}")
        # await self.send_msg(websocket, message)
        async for message in websocket:
            print(f"Received message: {message}")


    async def on_close(self, websocket, username=""):
        await websocket.close()
        print(username + f" websocket  closed ")

    async def send_msg(self, websocket, message):
        print(f"send message: {message}")
        await websocket.send(f"{message}")

    async def handle_websocket(self, websocket, path):
        try:
            # 认证
            (authenticated, uid, username, token) = await self.on_authenticate(websocket, path)
            if not authenticated:
                await self.on_close(websocket, username)
                return
            # 处理 WebSocket 消息
            await self.on_message(websocket)
            # async for message in websocket:
            #     await self.on_message(websocket, message)
        except Exception as e:
            print(str(uid) + f"  connect exception : " + str(e))
            # traceback.print_exc()
            await self.on_close(websocket, str(username))
            return

    def start(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        my_server = websockets.serve(self.handle_websocket, self.host, self.port, ping_interval=30, ping_timeout=60)
        loop.run_until_complete(my_server)
        loop.run_forever()

    def start_daemon(self):
        thread = threading.Thread(target=self.start)
        thread.start()




if __name__ == '__main__':
    server = WebSocketServer("localhost", 8080)
    server.start_daemon()
    # import threading
    # thread = threading.Thread(target=server.start_daemon)
    # thread.start()
    import time
    while True:
        pass
        print("ok")
        time.sleep(10)

 

WSClient.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 11/23/21 10:06 PM
# @Author  : Sea
# @File    : WSClient.py
# @Software: pip install websocket-client
# ******************************
import json
import time
import websocket
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
class WSClient():
    wsClient = None
    cookie_get_time = time.time()  # 时间戳
    pools = ThreadPoolExecutor(1)

    def on_message(self, ws, message):
        # re_msg = json.loads(message)
        print("message  "+str(message))
        # self.wsClient.send(message)

    def send_msg(self, datastr, receiver, sender="2"):
        response = {"message": datastr, "receiver": [receiver], "sender": sender}
        self.wsClient.send(json.dumps(response))

    def on_error(self, ws, error):
        print(error)

    def on_close(self, ws, b, x):
        print("Connection closed ……")

    def on_open(self, ws):
        print("连接成功!")
        # self.sprider()
        # self.pools.submit(self.sprider)
        print("ok")
        self.send_msg("x","xx","xx")

    def connect_websocket(self):
        # websocket.enableTrace(True)
        # logging.basicConfig(level=logging.INFO)
        self.wsClient = websocket.WebSocketApp("ws://127.0.0.1:8080/id/name/token",
                                               on_message=self.on_message,
                                               on_error=self.on_error,
                                               on_close=self.on_close)
        self.wsClient.on_open = self.on_open
        self.wsClient.keep_running = True
        self.wsClient.run_forever(ping_timeout=60, ping_interval=200)

    def start_daemon(self):
        thread = threading.Thread(target=self.connect_websocket)
        thread.start()



if __name__ == '__main__':
    wsClient = WSClient()
    wsClient.start_daemon()