记录springboot的一次使用socketio的经历

发布时间 2023-12-08 10:00:46作者: XSWClevo

pom中加入依赖

        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>2.0.6</version>
        </dependency>

        <dependency>
            <groupId>io.socket</groupId>
            <artifactId>socket.io-client</artifactId>
            <version>2.1.0</version>
        </dependency>

netty socketio 配置信息

# netty-socketio 配置
socketio:
  host: 127.0.0.1
  port: 8889
  contextPath: /mwapi/ws/spl
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576
  # 设置http交互最大内容长度
  maxHttpContentLength: 1048576
  # socket连接数大小(如只监听一个端口boss线程组为1即可)
  bossCount: 1
  workCount: 100
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  upgradeTimeout: 1000000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 6000000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000

java socketio config配置

@Configuration
public class SocketIOConfig {

    @Value("${socketio.host}")
    private String host;

    @Value("${socketio.port}")
    private Integer port;

    @Value("${socketio.contextPath}")
    private String contextPath;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Bean
    public SocketIOServer socketIOServer() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setSoLinger(0);
        socketConfig.setReuseAddress(true);

        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(host);
        config.setPort(port);
        // config.setContext(contextPath);

        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        config.setOrigin("*");
        SocketIOServer socketIOServer = new SocketIOServer(config);
        socketIOServer.addNamespace("/mynamespace");

        return socketIOServer;
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

java server 通用代码编写

@Component
public class MessageEventHandler {

    private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);

    /**
     * 服务器socket对象
     */
    public SocketIOServer socketIoServer;

    /**
     * 客户端集合
     */
    public List<String> listClient = new CopyOnWriteArrayList<>();

    public SocketInstance socketInstance = SocketInstance.getSocketInstance();

    /**
     * 超时时间
     */
    static final int limitSeconds = 60;

    /**
     * 初始化消息事件处理器
     *
     * @param server 服务器socket对象
     */
    @Autowired
    public MessageEventHandler(SocketIOServer server) {
        logger.info("初始化SOCKET消息事件处理器");
        this.socketIoServer = server;
    }

    /**
     * 客户端发起连接时触发
     *
     * @param client 客户端Socket对象信息
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        logger.info("客户端{}已连接", client.getSessionId());
        String sessionId = getSessionId(client);
        listClient.add(sessionId);
        socketInstance.insertSocketClient(sessionId, client);
        //向前端发送接收数据成功标识
        client.sendEvent("connect_success", "已经成功连接");

    }

    /**
     * 客户端断开连接时触发
     *
     * @param client 客户端Socket对象信息
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        logger.info("客户端{}断开连接", client.getSessionId());
        String sessionId = getSessionId(client);

        listClient.remove(sessionId);
        socketInstance.remoteClient(sessionId);

    }


    /**
     * 客户端发送消息时触发
     *
     * @param client  客户端Socket对象信息
     * @param request AckRequest 回调对象
     * @param data    消息信息实体
     */
    @OnEvent(value = SocketConstants.SocketEvent.MESSAGE)
    public void onEvent(SocketIOClient client, AckRequest request, String data) {
        System.out.println("发来消息:" + data);
        request.sendAckData("服务端已收到");
        client.sendEvent("messageevent", "back data");
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
    }

    @OnEvent(value = SocketConstants.SocketEvent.BROADCAST)
    public void onEventByBroadcast(SocketIOClient client, AckRequest request, String data) {
        System.out.println("发来消息:" + data);
        request.sendAckData("服务端-广播事件已收到");
        client.sendEvent(SocketConstants.SocketEvent.BROADCAST, "广播事件 " + DateUtil.now());
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
    }


    /**
     * 广播消息 函数可在其他类中调用
     */
    public void sendBroadcast(byte[] data) {
        //向已连接的所有客户端发送数据,map实现客户端的存储
        for (SocketIOClient client : socketInstance.getClientSocketAll().values()) {
            if (client.isChannelOpen()) {
                client.sendEvent("message_event", data);
            }
        }
    }

    /**
     * 获取客户端的session Id
     *
     * @param client: 客户端
     */
    private String getSessionId(SocketIOClient client) {
        return client.getSessionId().toString();

    }

    /**
     * 获取连接的客户端ip地址
     *
     * @param client: 客户端
     * @return 获取连接的客户端ip地址
     */
    private String getIpByClient(SocketIOClient client) {
        String sa = client.getRemoteAddress().toString();
        return sa.substring(1, sa.indexOf(":"));
    }

}

自定义命名空间,事件处理

@Slf4j
@Component
public class MyNamespaceHandler {
    //测试使用
    @OnEvent("message")
    public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

        log.info("SplSearch:{}", data);

        if (ackRequest.isAckRequested()) {
            //返回给客户端,说我接收到了
            ackRequest.sendAckData("SplSearch", data);
        }

    }
}

项目启动加载并且项目关闭时关闭socket io

可以使用springboot方式也可以使用spring的方式,这里方式很多根据自己喜好来定
如果使用spring方式来处理可以实现:ApplicationListener<...>
实现类同时希望拥有启动加载和关闭销毁两个功能,可以这样做

  1. spring 实现应用监听
@Component
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            // 处理应用程序启动事件
            System.out.println("Application Started");
        } else if (event instanceof ContextClosedEvent) {
            // 处理应用程序关闭事件
            System.out.println("Application Closed");
        }
    }
}
  1. springboot方式
@Slf4j
@Component
@Order(value = 1)
public class MyCommandLineRunner implements CommandLineRunner, DisposableBean {

    private final SocketIOServer server;

    private final MyNamespaceHandler myNamespaceHandler;

    @Autowired
    public MyCommandLineRunner(SocketIOServer server, MyNamespaceHandler myNamespaceHandler) {
        this.myNamespaceHandler = myNamespaceHandler;
        this.server = server;
        System.out.println("初始化MyCommandLineRunner");
    }

    @Override
    public void run(String... args) {
        try {
            server.getNamespace("/mynamespace").addListeners(myNamespaceHandler);
            server.start();
            System.out.println("socket.io启动成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void destroy() {
        //如果用kill -9  这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
        server.stop();
        log.info("SocketIOServer==============================关闭成功");
    }
}

唯一socket实例

public class SocketInstance {

    /**
     * 客户端Socket连接对象容器
     */
    private static Map<String, SocketIOClient> socketClients = null;

    /**
     * 私有构造
     */
    private SocketInstance() {
        //从缓存中获取socketClients
        socketClients = new HashMap<>();
    }

    /**
     * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为SocketInstanceHolder的类,
     * 只有在SocketInstance.getSocketInstance()中调用,
     * 由于私有的属性,他人无法使用SocketInstanceHolder,不调用SocketInstance.getSocketInstance()就不会创建实例。
     * 优点:达到了lazy loading的效果,即按需创建实例。
     * 无法适用于分布式集群部署
     */
    private static class SocketInstanceHolder {
        /**
         * 创建全局唯一实例
         */
        private final static SocketInstance instance = new SocketInstance();
    }

    /**
     * 获取全局唯一实例
     *
     * @return SocketInstance对象
     */
    public static SocketInstance getSocketInstance() {
        return SocketInstanceHolder.instance;
    }

    /**
     * 新增客户端连接到容器
     *
     * @param encode         设备En号
     * @param socketIOClient 客户端socket对象
     */
    public void insertSocketClient(String encode, SocketIOClient socketIOClient) {
        socketClients.put(encode, socketIOClient);
    }

    /**
     * 获取客户端Socket对象
     *
     * @param encode 设备encode
     * @return 客户端Socket对象
     */
    public SocketIOClient getClientSocket(String encode) {
        return socketClients.get(encode);
    }

    /**
     * 获取所有客户端Socket对象
     *
     * @return 客户端Socket对象
     */
    public Map<String, SocketIOClient> getClientSocketAll() {
        return socketClients;
    }


    /**
     * 删除客户端
     * @param sessionId 客户端的id
     */
    public void remoteClient(String sessionId) {
        SocketIOClient oldSocketIOClient = socketClients.get(sessionId);
        if (oldSocketIOClient != null) {
            try {
                //关闭客户端连接
                oldSocketIOClient.disconnect();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        socketClients.remove(sessionId);
    }
}

public class SocketConstants {

    /**
     * Socket事件类
     */
    public class SocketEvent {

        /**
         * 效验连接状况
         */
        public static final String HEALTH_CHECK = "HEALTH_CHECK";

        /**
         * 消息接收事件名称
         */
        public static final String MESSAGE = "message";

        public static final String BROADCAST = "broadcast";


    }
}

客户端代码

默认没有命名空间的,需要命名空间在url后面加/xxx

public class SocketIOClientLaunch {
    public static void main(String[] args) {
        // 服务端socket.io连接通信地址
        String url = "http://127.0.0.1:8889";
        try {
            IO.Options options = new IO.Options();

            options.transports = new String[]{"websocket"};
            options.reconnectionAttempts = 2;
            // 失败重连的时间间隔
            options.reconnectionDelay = 1000;
            // 连接超时时间(ms)
            options.timeout = 500;
            // userId: 唯一标识 传给服务端存储
            final Socket socket = IO.socket(url + "?userId=1", options);

            socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

            // 自定义事件`connected` -> 接收服务端成功连接消息
            socket.on(SocketConstant.CONNECTION, objects -> {
                int length = objects.length;
                log.info("服务端自定义事件`connected`:" + objects[0].toString());
            });

            // 自定义事件`push_data_event` -> 接收服务端消息
            socket.on(SocketConstant.PUSH_DATA_EVENT, objects -> log.info("服务端自定义`push_data_event`:" + objects[0].toString()));

            // 自定义事件`myBroadcast` -> 接收服务端广播消息
            socket.on(SocketConstant.BROADCAST, objects -> log.info("服务端广播消息:" + objects[0].toString()));

            socket.connect();

            while (true) {
                Thread.sleep(3000);
                // 自定义事件`push_data_event` -> 向服务端发送消息
                socket.emit(SocketConstant.PUSH_DATA_EVENT, "发送数据 " + DateUtil.now());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

postman 调用方式

回调事件

image

发送事件

image

配置

image

下一篇:k8s中配置socket io,gateway 配置socket io