python websocket

发布时间 2023-09-22 00:30:00作者: 爱编程_喵

websocket

简介

1. websocket特点
	全双工实时通信
	连接建立一次 可携带有状态信息 
    数据传输相比http更高效
    减少重复请求和响应的开销
	
2. 相关依赖
2.1 flask-socketio 
2.1.1 可选异步服务启动方案: eventlet(推荐)|gevent|Flask自带uWSGI 
2.1.2 上面三种方案 会根据安装对应的依赖自动选择
2.1.3 github源码:https://github.com/miguelgrinberg/Flask-SocketIO/blob/main/example
2.1.4 参考文档:https://flask-socketio.readthedocs.io/en/latest/

2.2 tornado-websocket
2.2.1 参考文档: https://www.tornadoweb.org/en/stable/

2.3 websockets
2.3.1 参考文档:https://pypi.org/project/websockets/
2.3.2 github源码: https://github.com/python-websockets/websockets

2.4 python-socketio
2.4.1 参考文档:https://python-socketio.readthedocs.io/en/latest/index.html


实例

python-socketio

依赖

pip install python-socketio
pip install websocket-client

客户端

import socketio

sio = socketio.Client(logger=True, engineio_logger=True)
namespace = "/task"


def my_background_task(args=0):
    # 后台任务
    sio.emit("my_task", args, namespace=namespace)


# 方式1
# @sio.on("connect", namespace=namespace)
# def on_connect():
#     print("connected...")
#
#
# @sio.event(namespace=namespace)
# def connect_error(e):
#     print(f"connectError:{e}")
#
#
# @sio.event(namespace=namespace)
# def disconnect():
#     print("disconnected...")
#
#
# @sio.on("task_data", namespace=namespace)
# def my_return(data):
#     print("recv", data)


# 方式2
class MyCustomNamespace(socketio.ClientNamespace):
    def on_connect(self):
        print("taskClient-connected...")

    def on_disconnect(self):
        print("taskClient-disconnected...")

    def on_task_data(self, data):
        print(f"taskClient-receive:{data}")


sio.register_namespace(MyCustomNamespace('/task'))



if __name__ == '__main__':
    sio.connect("http://localhost:5000", transports="websocket")
    sio.start_background_task(my_background_task)
    sio.wait()
    # sio.sleep(10)
    # sio.disconnect()

服务端

import socketio
from flask import Flask

sio = socketio.Server(logger=True,
                      engineio_logger=True,
                      cors_allowed_origins="*",
                      async_mode="threading",
                      )

app = Flask(__name__)
app.wsgi_app = socketio.WSGIApp(sio, app.wsgi_app)

namespace = "/task"


@sio.event(namespace=namespace)
def connect(sid, environ):
    print(f"{sid} connected...")


@sio.event(namespace=namespace)
def disconnect(sid):
    print(f"{sid} disconnected...")


@sio.event(namespace=namespace)
def my_task(sid, data):
    print(f"{sid} recv:{data}")
    sio.emit("task_data", {"sid": sid, "data": data}, namespace=namespace)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=5000, debug=True)

image


flask-socketio

依赖

pip install flask-socketio eventlet

客户端

// jquery.min.js
https://cdnjs.cloudflare.com/ajax/libs/jquery/3.5.1/jquery.min.js
// socket.io.js
https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.4/socket.io.js
<!DOCTYPE HTML>
<html>
<head>
    <title>Flask-SocketIO</title>
    <script src="js/jquery.min.js" ></script>
    <script src="js/socket.io.js" ></script>

    <script type="text/javascript" charset="utf-8">
        $(document).ready(function() {
            // Connect to the Socket.IO server.
            // The connection URL has the following format, relative to the current page:
            //     http[s]://<domain>:<port>[/<namespace>]
            var socket = io();

            // Event handler for new connections.
            // The callback function is invoked when a connection with the
            // server is established.
            socket.on('connect', function() {
                socket.emit('my_event', {data: 'I\'m connected!'});
            });

            // Event handler for server sent data.
            // The callback function is invoked whenever the server emits data
            // to the client. The data is then displayed in the "Received"
            // section of the page.
            socket.on('my_response', function(msg, cb) {
                $('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
                if (cb)
                    cb();
            });

            // Interval function that tests message latency by sending a "ping"
            // message. The server then responds with a "pong" message and the
            // round trip time is measured.
            var ping_pong_times = [];
            var start_time;
            window.setInterval(function() {
                start_time = (new Date).getTime();
                $('#transport').text(socket.io.engine.transport.name);
                socket.emit('my_ping');
            }, 1000);

            // Handler for the "pong" message. When the pong is received, the
            // time from the ping is stored, and the average of the last 30
            // samples is average and displayed.
            socket.on('my_pong', function() {
                var latency = (new Date).getTime() - start_time;
                ping_pong_times.push(latency);
                ping_pong_times = ping_pong_times.slice(-30); // keep last 30 samples
                var sum = 0;
                for (var i = 0; i < ping_pong_times.length; i++)
                    sum += ping_pong_times[i];
                $('#ping-pong').text(Math.round(10 * sum / ping_pong_times.length) / 10);
            });

            // Handlers for the different forms in the page.
            // These accept data from the user and send it to the server in a
            // variety of ways
            $('form#emit').submit(function(event) {
                socket.emit('my_event', {data: $('#emit_data').val()});
                return false;
            });
            $('form#broadcast').submit(function(event) {
                socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()});
                return false;
            });
            $('form#join').submit(function(event) {
                socket.emit('join', {room: $('#join_room').val()});
                return false;
            });
            $('form#leave').submit(function(event) {
                socket.emit('leave', {room: $('#leave_room').val()});
                return false;
            });
            $('form#send_room').submit(function(event) {
                socket.emit('my_room_event', {room: $('#room_name').val(), data: $('#room_data').val()});
                return false;
            });
            $('form#close').submit(function(event) {
                socket.emit('close_room', {room: $('#close_room').val()});
                return false;
            });
            $('form#disconnect').submit(function(event) {
                socket.emit('disconnect_request');
                return false;
            });
        });
    </script>
</head>
<body>
    <h1>Flask-SocketIO</h1>
    <p>
      Async mode is: <b>{{ async_mode }}</b><br>
      Current transport is: <b><span id="transport"></span></b><br>
      Average ping/pong latency: <b><span id="ping-pong"></span>ms</b>
    </p>
    <h2>Send:</h2>
    <form id="emit" method="POST" action='#'>
        <input type="text" name="emit_data" id="emit_data" placeholder="Message">
        <input type="submit" value="Echo">
    </form>
    <form id="broadcast" method="POST" action='#'>
        <input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
        <input type="submit" value="Broadcast">
    </form>
    <form id="join" method="POST" action='#'>
        <input type="text" name="join_room" id="join_room" placeholder="Room Name">
        <input type="submit" value="Join Room">
    </form>
    <form id="leave" method="POST" action='#'>
        <input type="text" name="leave_room" id="leave_room" placeholder="Room Name">
        <input type="submit" value="Leave Room">
    </form>
    <form id="send_room" method="POST" action='#'>
        <input type="text" name="room_name" id="room_name" placeholder="Room Name">
        <input type="text" name="room_data" id="room_data" placeholder="Message">
        <input type="submit" value="Send to Room">
    </form>
    <form id="close" method="POST" action="#">
        <input type="text" name="close_room" id="close_room" placeholder="Room Name">
        <input type="submit" value="Close Room">
    </form>
    <form id="disconnect" method="POST" action="#">
        <input type="submit" value="Disconnect">
    </form>
    <h2>Receive:</h2>
    <div id="log"></div>
</body>
</html>

服务端

"""
|----font
|----font/js
|----font/html
|----run.py
"""

from threading import Lock
from flask import Flask, render_template, session, request, jsonify, g
from flask_socketio import SocketIO, Namespace, emit, join_room, leave_room, \
    close_room, rooms, disconnect, send

# 异步模式(eventlet|gevent_uwsgi|gevent|threading)
# 若为None则按照上面的顺序尝试 前提是已安装相关依赖
async_mode = None

app = Flask(__name__, static_folder="font/js", template_folder="font/html")
app.config['SECRET_KEY'] = 'secret!'
mSocketIo = SocketIO(app=app,
                     async_mode=async_mode,
                     ping_interval=25,
                     ping_timeout=5,
                     max_http_buffer_size=1000000,
                     allow_upgrades=True,
                     http_compression=True,
                     compression_threshold=1024,
                     cors_allowed_origins="*",
                     logger=True,
                     engineio_logger=True
                    )
thread = None
thread_lock = Lock()


def background_thread():
    """Example of how to send server generated events to clients."""
    count = 0
    while True:
        mSocketIo.sleep(10)
        count += 1
        mSocketIo.emit('my_response',
                      {'data': 'Server generated event', 'count': count},
                      namespace='/test')


@app.route('/')
def index():
    """
    官方例子
    :return:
    """
    return render_template('flaskSocketio.html', async_mode=mSocketIo.async_mode)


# 方式1-常规
namespace = "/chat"
CLIENT = None
@app.route("/client/<msg>")
def client(msg):
    """
    模拟客户端测试
    :param msg:
    :return:
    """
    c = mSocketIo.test_client(app=app)
    c.connect(namespace=namespace, auth={"key": 1})
    if c.is_connected(namespace=namespace):
        c.emit("my_event1", msg, namespace=namespace)  # 发送消息(推荐方式 可携带事件名)
        c.emit("my_event2", msg, namespace=namespace)
        c.emit("my_event3", msg, namespace=namespace)
        # global CLIENT
        # CLIENT = c
        g.c = c
        c.emit("join", {"room": f"room-{msg}", "username": c.eio_sid, }, namespace=namespace)
        c.emit("leave", {"room": f"room-{msg}", "username": c.eio_sid,}, namespace=namespace)
        c.disconnect(namespace=namespace)
    return jsonify({"client": c.eio_sid, "sendData": msg, "connected": c.connected, "rooms": str(c.clients)})


# 捕获异常
@mSocketIo.on_error(namespace=namespace)
def error(e):
    print("error=" + str(e))


# 绑定事件方式1
@mSocketIo.event(namespace=namespace)
def my_event1(message):
    print("rec1>>>", message)


# 绑定事件方式2
@mSocketIo.on("my_event2", namespace=namespace)
def my_event2_(message):
    print("rec2>>>", message)


# 绑定事件方式3
def my_event3_(message):
    print("rec3>>>", message)


mSocketIo.on_event("my_event3", my_event3_, namespace=namespace)


@mSocketIo.on("room", namespace=namespace)
def on_room(data):
    print(f"roomRec:{data}")


@mSocketIo.on("join", namespace=namespace)
def on_join(data):
    username = data["username"]
    room = data["room"]
    join_room(room, namespace=namespace)
    g.c.emit("room", username + " join the room", to=room, namespace=namespace)   # 发送消息到这个房间 所有人均可看到消息


@mSocketIo.on("leave", namespace=namespace)
def on_leave(data):
    username = data["username"]
    room = data["room"]
    leave_room(room, namespace=namespace)
    g.c.emit("room", username + " leave the room", to=room, namespace=namespace)  # 发送消息到这个房间 所有人均可看到消息


@mSocketIo.on("connect", namespace=namespace)
def on_connect():
    print("connected...")


@mSocketIo.on("disconnect", namespace=namespace)
def on_disconnect():
    print("disconnect...")


# 方式二:命名空间方式(官方例子)
class MyNamespace(Namespace):
    def on_my_event(self, message):
        """
        命名空间绑定事件方式
        def on_{事件名}
        :param message:
        :return:
        """
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': message['data'], 'count': session['receive_count']})

    def on_my_broadcast_event(self, message):
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': message['data'], 'count': session['receive_count']},
             broadcast=True)

    def on_join(self, message):
        join_room(message['room'])
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': 'In rooms: ' + ', '.join(rooms()),
              'count': session['receive_count']})

    def on_leave(self, message):
        leave_room(message['room'])
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': 'In rooms: ' + ', '.join(rooms()),
              'count': session['receive_count']})

    def on_close_room(self, message):
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response', {'data': 'Room ' + message['room'] + ' is closing.',
                             'count': session['receive_count']},
             room=message['room'])
        close_room(message['room'])

    def on_my_room_event(self, message):
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': message['data'], 'count': session['receive_count']},
             room=message['room'])

    def on_disconnect_request(self):
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response',
             {'data': 'Disconnected!', 'count': session['receive_count']})
        disconnect()

    def on_my_ping(self):
        emit('my_pong')

    def on_connect(self):
        global thread
        with thread_lock:
            if thread is None:
                # 开启后台异步任务
                thread = mSocketIo.start_background_task(target=background_thread)
        emit('my_response', {'data': 'Connected', 'count': 0})

    def on_disconnect(self):
        print('Client disconnected', request.sid)


mSocketIo.on_namespace(MyNamespace('/'))


if __name__ == '__main__':
    mSocketIo.run(app, host="0.0.0.0", port=5000, debug=True, use_reloader=False)

image


tornado-websocket

依赖

pip install tornado

客户端

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Tornado-Websocket</title>
</head>
<body>
    <h2 style="text-align: center">WebSocket</h2>
    <div id="contents" style="height:500px;overflow:auto;border: 1px grey dashed;"></div>
    <div>
        <input id="msg" value="">
        <button onclick="sendMsg()">发送</button>
        <button onclick="connected()">重新建立连接</button>
        <button onclick="closed()">关闭当前连接</button>
    </div>
    <script src="{{ static_url('jquery.min.js') }}"></script>
    <script type="text/javascript">
        var ws = new WebSocket('ws://127.0.0.1:5000/ws');
        // websocket连接时触发
        ws.onopen = function(e){console.log('connected:' + ws.readyState)};
        // websocket关闭时触发
        ws.onclose = function(e){console.log('closed:'+ ws.readyState)};
        // 客户端接收到服务端数据时触发
        ws.onmessage = function(e) {
            $("#contents").append("<p>" + e.data + "</p>");
        };

        function sendMsg() {
            var msg = $("#msg").val();
            ws.send(msg);
            $("#msg").val("");
        }
        function closed() {
            ws.close();
        }
        function connected() {
            ws = new WebSocket('ws://127.0.0.1:5000/ws');
        }
    </script>
</body>
</html>

服务端

# -*- coding:utf-8 -*-
"""
|----font
|----font/js
|----font/html
|----run.py
"""
import asyncio
import datetime
import json
import tornado.ioloop
import tornado.web
import tornado.websocket


class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("tornadoSocketio.html")


class MyWebSocketHandler(tornado.websocket.WebSocketHandler):
    # 连接用户
    users = set()
    # 消息队列
    # messages_queue = []

    def open(self):
        """
        ws建立连接后调用
        :return:
        """
        self.users.add(self)
        for u in self.users:
            u.write_message(f"[{datetime.datetime.now()}]-[{self.request.remote_ip}]-user[{u}]-connected...")

    def on_message(self, message):
        """
        客户端发送消息后调用
        :param message:
        :return:
        """
        for u in self.users:
            u.write_message(f"[{datetime.datetime.now()}]-[{self.request.remote_ip}]-user[{u}]-send:{message}")

    def on_close(self):
        """
        ws连接断开后调用
        :return:
        """
        self.users.remove(self)
        for u in self.users:
            u.write_message(f"[{datetime.datetime.now()}]-[{self.request.remote_ip}]-user[{u}]-disconnect...")

    def check_origin(self, origin: str) -> bool:
        return True


async def run():
    settings = {
        'template_path': 'font/html',
        'static_path': 'font/js',
        'debug': True,
    }
    application = tornado.web.Application([
        (r"/", IndexHandler),
        (r"/ws", MyWebSocketHandler),
    ], **settings)
    application.listen(5000)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(run())

image


websockets

依赖

pip install websockets

客户端

import asyncio
import websockets


async def client():
    uri = "ws://localhost:5000"
    async with websockets.connect(uri) as ws:
        name = "websockets"
        await ws.send(name)
        print(f"Client-send:{name}")
        recv_data = await ws.recv()
        print(f"Client-recv:{recv_data}")


if __name__ == '__main__':
    asyncio.run(client())

服务端

import asyncio

from websockets.server import serve


async def handler(websocket):
    name = await websocket.recv()  # 接收消息
    print(f"Server-recv:{name}")
    send_data = f"hello {name}"
    await websocket.send(send_data)  # 发送消息
    print(f"Server-send:{send_data}")


async def main():
    async with serve(handler, "", 5000):
        await asyncio.Future()


if __name__ == '__main__':
    asyncio.run(main())

image