持久化WebSocket协议消息推送

发布时间 2023-11-07 19:04:25作者: cherrymint

1、什么是 WebSocket

1.1 WebSocket 诞生背景

早期,很多网站为了实现推送技术,所用的技术都是轮询(也叫短轮询)。轮询是指由浏览器每隔一段时间向服务器发出 HTTP 请求,然后服务器返回最新的数据给客户端。

常见的轮询方式分为轮询与长轮询,它们的区别如下图所示:

img

1.2短轮询

短轮询,就是web端不停地间隔一段时间向服务端发一个 HTTP 请求,如果有新消息,就会在某次请求返回。

适用场景:

  1. 扫码登录:短时间内频繁查询二维码状态
  2. 小OA系统:客户端使用量不大的情况下可以使用

缺点:

  1. 大量无效请求:大量的无效请求,浪费服务器资源
  2. 服务端请求压力大:万人群聊频繁访问,上万并发服务扛不住。

1.3长轮询

长轮询和短轮询相比,一个最大的改进之处在于:

  1. 短轮询模式下,服务端不管本轮有没有新消息产生,都会马上响应并返回。而长轮询模式当本次请求没有获取到新消息时,并不会马上结束返回,而是会在服务端“悬挂(hang)”,等待一段时间;
  2. 如果在等待的这段时间内有新消息产生,就能马上响应返回。

这也意味着web端的请求超时时长得设置长一些。

优点:相比短轮询模式

  1. 大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销
  2. 降低了服务端处理请求的 QPS

缺点:

  1. 无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。
  2. 服务端压力大:服务端悬挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端)

3.Websocket长连接

长轮询短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。

实现原理:客户端和服务器之间维持一个 TCP/IP 长连接,全双工通道。

2、什么是 WebSocketwebsocket代码实现方案

2.1 tomcat实现websocket

参考 https://blog.csdn.net/devcloud/article/details/124681914

2.2 netty实现websocket

2.2.1 实现过程

心跳、编解码器、业务处理

public void run() throws InterruptedException {
    // 服务器启动引导对象
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 128)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                //30秒客户端没有向服务器发送心跳则关闭连接
                pipeline.addLast(new IdleStateHandler(30, 0, 0));
                // 因为使用http协议,所以需要使用http的编码器,解码器
                pipeline.addLast(new HttpServerCodec());
                // 以块方式写,添加 chunkedWriter 处理器
                pipeline.addLast(new ChunkedWriteHandler());
                /**

   * 说明:
     中是分段的,HttpObjectAggregator可以把多个段聚合起来;
        *  2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
                  */
                pipeline.addLast(new HttpObjectAggregator(8192));
                //保存用户ip
                pipeline.addLast(new HttpHeadersHandler());
                /**
               * 说明:
               * 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
               * 2. 可以看到 WebSocketFrame 下面有6个子类
               * 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
               * 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
               * 是通过一个状态码 101 来切换的
                  */
                pipeline.addLast(new WebSocketServerProtocolHandler("/"));
                // 自定义handler ,处理业务逻辑
                pipeline.addLast(new NettyWebSocketServerHandler());
            }
        });
    // 启动服务器,监听端口,阻塞直到启动成功
    serverBootstrap.bind(WEB_SOCKET_PORT).sync();
    System.out.println("启动成功");
}

2.2.2 获取用户ip和连接

在http协议升级成WebSocket之前获取赋值存储用户ip

public class HttpHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            UrlBuilder urlBuilder = UrlBuilder.ofHttp(request.uri());

            // 获取token参数
            String token = Optional.ofNullable(urlBuilder.getQuery()).map(k->k.get("token")).map(CharSequence::toString).orElse("");
            NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token);

            // 获取请求路径
            request.setUri(urlBuilder.getPath().toString());
            HttpHeaders headers = request.headers();
            String ip = headers.get("X-Real-IP");
            if (StringUtils.isEmpty(ip)) {//如果没经过nginx,就直接获取远端地址
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                ip = address.getAddress().getHostAddress();
            }
            NettyUtil.setAttr(ctx.channel(), NettyUtil.IP, ip);
            ctx.pipeline().remove(this);
            ctx.fireChannelRead(request);
        }else
        {
            ctx.fireChannelRead(msg);
        }
    }
}

2.2.3 请求处理

实现NettyWebSocketServerHandler接收来自WebSocket的信息。

@Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class);
        WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType());
        switch (wsReqTypeEnum) {
            case LOGIN:
                this.webSocketService.handleLoginReq(ctx.channel());
                log.info("请求二维码 = " + msg.text());
                break;
            case HEARTBEAT:
                break;
            default:
                log.info("未知类型");
        }
    }