springboot整合websocket

发布时间 2023-05-27 19:18:49作者: 全他吗被取了

一、引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.5.14</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.15</version>
        </dependency>

二、创建WsSessionManager管理器


/**
 * websocket的session管理器
 */
@Slf4j
public class WsSessionManager {
    private WsSessionManager(){}

    /**
     * 记录当前在线连接数
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * 保存连接 session 的地方
     */
    private static final ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>(1000);

    /**
     * 添加 session
     * @param key 键
     * @param session 值
     */
    public static void add(String key, WebSocketSession session) {
        // 添加 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 删除 session,会返回删除的 session
     * @param key 键
     * @return 值
     */
    public static WebSocketSession remove(String key) {
        // 删除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 删除并同步关闭连接
     * @param key 键
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 关闭连接
                session.close();
            } catch (IOException e) {
                log.error("关闭会话时出现异常{}{}",e.getMessage(),e);
            }finally {
                if(session.isOpen()){
                    try {
                        session.close();
                    } catch (IOException e) {
                        log.error("再次尝试关闭会话时出现异常{}{}",e.getMessage(),e);
                    }
                }
            }
        }
    }

    /**
     * 获得 session
     *
     * @param key 键
     * @return 值
     */
    public static WebSocketSession get(String key) {
        // 获得 session
        return SESSION_POOL.get(key);
    }

    /**
     * 获得 Map
     * @return 值
     */
    public static ConcurrentMap<String, WebSocketSession> getMap() {
        return SESSION_POOL;
    }
}

三、创建WebSocketInterceptor拦截器


/**
 * websocket拦截器
 */
@Component
@Slf4j
public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * websocket连接前的握手拦截器
     * 握手前
     * @param request    请求对象
     * @param response   响应对象
     * @param wsHandler  请求处理器
     * @param attributes 属性域
     * @return true放行,false拒绝
     * @throws Exception 可能抛出的异常
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 获得请求参数
        Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.defaultCharset());
        String key = paramMap.get("key");
        if (CharSequenceUtil.isNotBlank(key)) {
            // 放入属性域
            attributes.put("key", key);
            log.info("与用户:{}握手成功!", key);
            return true;
        }
        return false;
    }

    /**
     * 握手后
     *
     * @param request   请求独享
     * @param response  响应对象
     * @param wsHandler 处理器
     * @param exception 抛出的异常
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) {

        log.info("握手结束!");
    }
}

四、创建WebSocketHandler处理器


/**
 * websocket消息处理器
 * afterConnectionEstablished连接成功
 * afterConnectionClosed断开连接
 * handleTextMessage接收来自客户端的消息
 * sendMessage、sendMessageAll自定义发送消息
 */
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {
    private static final String KEY = "key";
    @Override
    public void afterConnectionEstablished(WebSocketSession session){
        Object key = session.getAttributes().get(KEY);
        if (key != null) {
            // 用户连接成功,放入在线用户缓存
            WsSessionManager.add(key.toString(), session);
            //这里可以执行服务端想要做的事情,比如发送信息给客户端
            sendMessage(key.toString(),"你好,我是服务端,这是第一次连接成功返回的消息!");
        }
    }

    /**
     * 接收消息事件
     *
     * @param session session对象
     * @param message 接收到的消息
     * @throws Exception 可能抛出的异常
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 获得客户端传来的消息
        String payload = message.getPayload();
        log.info("接收到客户端发来的信息:{}",payload);
    }

    /**
     * socket 断开连接时
     * @param session session对象
     * @param status  断开状态
     * @throws Exception 可能抛出的异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Object key = session.getAttributes().get(KEY);
        if (key != null) {
            // 用户断开连接时,删除缓存
            WsSessionManager.removeAndClose(key.toString());
        }
    }

    /**
     * 发送消息给指定设备(用户)
     * @param key 序列号(用户id)
     * @param message      消息内容
     */
    public void sendMessage(String key, String message) {
        WebSocketSession webSocketSession = WsSessionManager.get(key);
        try {
            if (webSocketSession != null && webSocketSession.isOpen()) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        } catch (Exception e) {
            log.error("消息发送失败,设备{},失败原因{}{}", webSocketSession.getAttributes().get(KEY), e.getMessage(), e);
        }
    }

    /**
     * 广播消息
     * @param message 消息
     */
    public void sendMessageAll(String message) {
        WsSessionManager.getMap().keySet().forEach(e -> sendMessage(e, message));
    }
}

五、创建WebSocketConfig配置类


/**
 * websocket配置类
 */
@Slf4j
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private WebSocketHandler webSocketHandler;
    @Autowired
    private WebSocketInterceptor webSocketInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //"/test"为websocket服务器地址(客户端可以通过访问ws://192.168.0.121:8080/test?key=999来连接服务器)
        //"?key=999"这个为客户端向服务器传递的参数,相当于一个标识,可以让服务器知道你是谁,服务器可以根据此标识给你返回消息
        // 自己定义即可,比如userId=123
        // 如果直接通过websocket在线测试工具等连接服务器,出现连接不上的问题,一般看是不是被安全框架拦截了,需要放行
        registry.addHandler(webSocketHandler,"/test")
                .addInterceptors(webSocketInterceptor)
                .setAllowedOrigins("*");
    }
}

六、测试
···
http://www.websocket-test.com/
用在线测试工具进行连通测试,在测试地址栏输入服务器websocket地址,比如我的地址是:
ws://192.168.0.104:8080/test?key=999点击连接,看是否连通
···