python通过SSE与html主动通讯

发布时间 2023-07-26 14:17:24作者: Dapenson

博客:使用 Python 通过 SSE 与 HTML 实现主动通讯

在现代 Web 应用中,实时性和交互性成为了越来越重要的需求。服务器向客户端主动推送数据,而不是等待客户端发送请求,就是一种实现实时通讯的方式。Server-Sent Events(SSE)正是一种用于实现这种服务器主动推送的技术。本文将介绍如何使用 Python 和 Flask 框架,通过 SSE 与 HTML 页面实现主动通讯,让前端实时接收服务器端的数据并进行展示。

1. 什么是 Server-Sent Events (SSE)

Server-Sent Events(SSE)是 HTML5 规范的一部分,它允许服务器端通过单向的 HTTP 连接,向客户端(通常是浏览器)实时地发送数据。相比传统的轮询或长轮询方式,SSE 更加高效,因为它不需要频繁地建立和关闭连接,而是保持长连接,服务器可以在有新数据时立即发送给客户端。

2. 使用 Python 和 Flask 实现 SSE 服务器

首先,我们需要安装 Flask 框架,它是一个轻量级的 Python Web 框架,方便我们构建 Web 应用。

pip install flask

接下来,我们创建一个 Python 文件 sse_flask_demo.py,并实现 SSE 服务器端代码:

# 导入所需的模块
import json
import time
import datetime
from flask import Flask, request, Response, render_template

app = Flask(__name__)

# 解决跨域问题
@app.after_request
def after_request(response):
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
    response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
    response.headers.add('Access-Control-Allow-Credentials', 'true')
    return response

# 获取当前时间,并转换为 JSON 格式
def get_time_json():
    dt_ms = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    return json.dumps({'time': dt_ms}, ensure_ascii=False)

# 设置路由,返回 SSE 流
@app.route('/')
def hello_world():
    return render_template('sse.html')

@app.route('/sse')
def stream():
    user_id = request.args.get('user_id')  # 可选,用于区分不同用户的连接
    print(user_id)

    def eventStream():
        id = 0
        while True:
            id += 1
            time.sleep(1/50)  # 50Hz,每秒发送约 50 条数据
            event_name = 'time_reading'
            str_out = f'id: {id}\nevent: {event_name}\ndata: {get_time_json()}\n\n'
            print(str_out)  # 在服务器端打印发送的数据
            yield str_out

    return Response(eventStream(), mimetype="text/event-stream")

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5678, debug=True)

在上述代码中,我们创建了一个 Flask 应用并定义了两个路由,/ 路由返回了一个 HTML 页面(稍后会讲解),/sse 路由则返回 SSE 流。在 stream() 函数中,我们使用一个无限循环来模拟不断向客户端发送数据,每次发送都包含一个唯一的 ID 和当前时间的 JSON 字符串。

3. 前端 HTML 页面

为了接收服务器端的 SSE 数据,我们需要在前端创建一个 HTML 页面。在 templates 文件夹下,创建一个名为 sse.html 的文件,并将以下代码复制进去:

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Server-Sent Events</title>
</head>
<body>
    <div id="content">
        <h1>Server-Sent Events</h1>
        <p>time: <span id="time_show"></span></p>
    </div>
</body>
<script>
    function connectSSE() {
        if (window.EventSource) {
            let sse_url = 'http://localhost:5678/sse';
            // 创建 EventSource 对象连接服务器
            const source = new EventSource(sse_url);

            // 连接成功后会触发 open 事件
            source.addEventListener('open', () => {
                console.log('Connected');
            }, false);

            // 当接收到服务器端发送的数据时会触发 time_reading 事件
            source.addEventListener('time_reading', function (e) {
                console.log("time_reading", e.data);
                // 把时间显示到页面上,这里需要解析一下 JSON
                document.getElementById("time_show").innerHTML = JSON.parse(e.data).time;
            }, false);

            // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
            source.addEventListener('message', e => {
                console.log(`data: ${e.data}`);
            }, false);

            // 连接异常时会触发 error 事件并自动重连
            source.addEventListener('error', e => {
                if (e.target.readyState === EventSource.CLOSED) {
                    console.log('Disconnected');
                } else if (e.target.readyState === EventSource.CONNECTING) {
                    console.log('Connecting...');
                }
            }, false);
        } else {
            console.error('Your browser doesn\'t support SSE');
        }
    }

    connectSSE();
</script>
</html>

上述 HTML 页面中,我们使用 JavaScript 创建了一个 EventSource 对象,并连接到服务器的 /sse 路由。当接收到服务器端发送的 time_reading 事件时,我们解析 JSON 数据,并将时间显示在页面上。

4. 运行并测试

在完成上述代码编写后,我们可以运行 Python 服务器。在命令行中执行以下命令:

python sse_flask_demo.py

服务器将会运行在 http://localhost:5678 上。

现在,打开浏览器并访问 http://localhost:5678/,你应该会

看到一个简单的页面显示 "Server-Sent Events" 以及时间的实时更新。

5. 结语

通过 Python 和 Flask 框架,我们成功地实现了使用 SSE 技术与 HTML 页面进行实时通讯的功能。这种方式可以在很多场景下派上用场,例如实时聊天、实时数据展示等。注意,在真实的应用中,我们可能会使用数据库或其他数据源提供实时数据,而不是简单地使用时间戳作为示例数据。

总结起来,SSE 技术为现代 Web 应用提供了一种高效、实时的服务器主动推送数据的方式,而 Python 和 Flask 框架的结合,让我们可以轻松实现这种功能,让 Web 应用变得更加动态、交互性更强。

6. 扩展esp32

以下代码提供esp32作为服务器的参考

esp32基于Arduino的代码

/*
 * @Author: Dapenson
 * @Date: 2023-07-26 13:58:08
 * @LastEditors: Dapenson
 * @LastEditTime: 2023-07-26 13:58:19
 * @FilePath: \SSE\esp32_demo.cpp
 */
#include <Arduino.h>
#include <WiFi.h>
#include <AsyncTCP.h>
#include <ESPAsyncWebServer.h>
#include <Arduino_JSON.h>
#include "SPIFFS.h"

const char *ssid = "REPLACE_WITH_YOUR_SSID";
const char *password = "REPLACE_WITH_YOUR_PASSWORD";

// 端口号
AsyncWebServer server(80);
// 事件接口
AsyncEventSource events("/sse");

JSONVar readings;

void initSPIFFS()
{
    if (!SPIFFS.begin())
    {
        Serial.println("An error has occurred while mounting SPIFFS");
    }
    Serial.println("SPIFFS mounted successfully");
}

void initWiFi()
{
    WiFi.mode(WIFI_STA);
    WiFi.begin(ssid, password);
    Serial.println("");
    Serial.print("Connecting to WiFi...");
    while (WiFi.status() != WL_CONNECTED)
    {
        Serial.print(".");
        delay(1000);
    }
    Serial.println("");
    Serial.println(WiFi.localIP());
}

void setup()
{
    Serial.begin(115200);
    initWiFi();
    initSPIFFS();

    server.on("/", HTTP_GET, [](AsyncWebServerRequest *request)
              { request->send(SPIFFS, "/index.html", "text/html"); });

    server.serveStatic("/", SPIFFS, "/");

    events.onConnect([](AsyncEventSourceClient *client)
                     {
    if(client->lastId()){
      Serial.printf("Client reconnected! Last message ID that it got is: %u\n", client->lastId());
    }
    client->send("hello!", NULL, millis(), 10000); });
    server.addHandler(&events);

    server.on("/reset", HTTP_GET, [](AsyncWebServerRequest *request)
              {
    readings["time"] = String(millis());
    events.send(JSON.stringify(readings).c_str(), "time_reading", millis());
    request->send(200, "text/plain", "OK"); });

    server.begin();
}

void loop()
{
    // 50Hz
    delay(1000 / 50);
    readings["time"] = String(millis());
    events.send(JSON.stringify(readings).c_str(), "time_reading", millis());
}