WebSocket连接实现实时数据推送

发布时间 2023-12-28 10:52:14作者: 戒爱学Java

WebSocket连接实现实时数据推送

1、前端

1-1、webSocket.js

//暴露自定义websocket对象
export const socket = {
    //后台请求路径
    url: "",
    //websocket对象
    websocket: null,
    //websocket状态
    websocketState: false,
    //重新连接次数
    reconnectNum: 0,
    //重连锁状态,保证重连按顺序执行
    lockReconnect: false,
    //定时器信息
    timeout: null,
    clientTimeout: null,
    serverTimeout: null,
    //初始化方法,根据url创建websocket对象封装基本连接方法,并重置心跳检测
    initWebSocket(newUrl) {
      socket.url = newUrl;
      socket.websocket = new WebSocket(socket.url);
      socket.websocket.onopen = socket.websocketOnOpen;
      socket.websocket.onerror = socket.websocketOnError;
      socket.websocket.onclose = socket.websocketOnClose;
      this.resetHeartbeat()
    },
    reconnect() {
      //判断连接状态
      if (socket.lockReconnect) return;
      socket.reconnectNum += 1;
      //重新连接三次还未成功调用连接关闭方法
      if (socket.reconnectNum === 3) {
        socket.reconnectNum = 0;
        socket.websocket.onclose()
        return;
      }
      //等待本次重连完成后再进行下一次
      socket.lockReconnect = true;
      //5s后进行重新连接
      socket.timeout = setTimeout(() => {
        socket.initWebSocket(socket.url);
        socket.lockReconnect = false;
      }, 5000);
    },
    //重置心跳检测
    resetHeartbeat() {
      socket.heartbeat();
    },
    //心跳检测
    heartbeat() {
      socket.clientTimeout = setTimeout(() => {
        if (socket.websocket) {
          //向后台发送消息进行心跳检测
          socket.websocket.send(JSON.stringify({ type: "heartbeat" }));
          socket.websocketState = false;
          //一分钟内服务器不响应则关闭连接
          socket.serverTimeout = setTimeout(() => {
            if (!socket.websocketState) {
              socket.websocket.onclose()
            } else {
              this.resetHeartbeat()
            }
          }, 60 * 1000);
        }
      }, 3 * 1000);
    },
    //发送消息
    sendMsg(message) {
      socket.websocket.send(message);
    },
    websocketOnOpen(event) {
      //连接开启后向后台发送消息进行一次心跳检测
      console.log(event);
      socket.sendMsg(JSON.stringify({ type: "heartbeat" }));
      socket.sendDataRequest();
    },
    websocketOnError(error) {
      console.log(error);
      socket.reconnect();
    },
    websocketOnClose() {
      socket.websocket.close();
    },
  };

1-2、组件

<script>
import { socket } from "@/request/websocket";

export default {
  data() {
        return {
          tableData: [],
          loading: true,
          websocketCount: -1,
          //查询条件
          queryCondition: {
            type: "message",
          },
        }
  },
  created(){
    socket.initWebSocket(
      `ws://localhost:9999/notice/` +"guest"  //要连接的socket服务器地址,以及连接用户名
    );
    //绑定接收消息方法
    socket.websocket.onmessage = this.websocketOnMessage;
  },
  methods:{
    timestampToDate(time){
      console.log(time);
      return new Date(time).toLocaleString();
    },


    init() {
      this.queryCondition.type = "message";
      socket.sendMsg(JSON.stringify(this.queryCondition));
    },


    websocketOnMessage(event) {
      //初始化界面时,主动向后台发送一次消息,获取数据
      this.websocketCount += 1;
      if (this.websocketCount === 0) {
        this.init();
      }
      console.log(event.data);
      let info = JSON.parse(event.data);
      console.log(info);
      switch (info.type) {
        case "heartbeat":
          socket.websocketState = true;
          break;
        case "message":
          this.loading = true;
          this.$nextTick(() => {
            this.consumeMessage(info);
          })
          break;
        case "error":
          this.loading = false;
          break;
      }
    },
    consumeMessage(info) {
      //拿到最新数据重新渲染界面
      var resp = JSON.parse(info.content);
      if(resp == this.tableData){
        return;
      }else{
        this.tableData=[];  
        this.tableData=resp;  
      }
    },
  }
}
</script>

2、后端

2-1、编写配置类

跨域配置(统一设定)

@Configuration
public class WebConfig implements WebMvcConfigurer {
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*")
                .allowedMethods("*")
                .allowedHeaders("*");
    }
}

webSocket配置

/**
 * @author LiuJunDong
 * @ ClassName WebSocketConfig
 * @ description: TODO
 * @ date 2023-10-17
 * @ version: 1.0
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

2-2、编写Socket服务

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {
    /**
     * 解决无法注入bean:定义静态service对象,通过@Autowired在系统启动时为静态变量赋值
     * @ Autowired 注解作用在方法上,如果方法没有参数,spring容器会在类加载完后执行一次这个方法,
     * 如果方法中有参数的话,还会从容器中自动注入这个方法的参数,然后执行一次这个方法。
     */
    public static DataService dataService;

    @Autowired
    public void setXxService(DataService dataService) {
        WebSocketServer.dataService = dataService;
    }

    /**
     * 存储客户端session信息
     */
    public static Map<String, Session> clients = new ConcurrentHashMap<>();

    /**
     * 存储把不同用户的客户端session信息集合
     */
    public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();

    /**
     * 会话id
     */
    private String sid = null;

    /**
     * 建立连接的用户id
     */
    private String userId;

    /**
     * @ description: 当与前端的websocket连接成功时,执行该方法
     * @ PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.sid = UUID.randomUUID().toString();
        this.userId = userId;
        clients.put(this.sid, session);
        //判断该用户是否存在会话信息,不存在则添加
        Set<String> clientSet = connection.get(userId);
        if (clientSet == null) {
            clientSet = new HashSet<>();
            connection.put(userId, clientSet);
        }
        clientSet.add(this.sid);
        log.info(this.userId + "用户建立连接," + this.sid + "连接开启!");
    }

    /**
     * @ description: 当连接失败时,执行该方法
     **/
    @OnClose
    public void onClose() {
        clients.remove(this.sid);
        log.info(this.sid + "连接断开");
    }

    /**
     * @ description: 当收到前台发送的消息时,执行该方法
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来自用户:" + this.userId + "的信息   " + message);
        //自定义消息实体
        ViewQueryInfoDTO viewQueryInfoDTO = JSON.parseObject(message, ViewQueryInfoDTO.class);
        viewQueryInfoDTO.setUserId(this.userId);
        //判断该次请求的消息类型是心跳检测还是获取信息
        if ("heartbeat".equals(viewQueryInfoDTO.getType())) {
            log.info("我在正常运行");
            //立刻向前台发送消息,代表后台正常运行
            sendMessageByUserId(this.userId, new MessageInfo("heartbeat", "ok"));
        }
        if ("message".equals(viewQueryInfoDTO.getType())) {
            log.info("我来取数据了");
            //执行业务逻辑
            MessageInfo messageInfo = dataService.list(viewQueryInfoDTO);
            sendMessageByUserId(this.userId, messageInfo);
        }
    }

    /**
     * @ description: 当连接发生错误时,执行该方法
     **/
    @OnError
    public void onError(Throwable error) {
        log.info("系统错误");
        error.printStackTrace();
    }

    /**
     * @ description: 通过userId向用户发送信息
     * 该类定义成静态可以配合定时任务实现定时推送
     **/
    public static void sendMessageByUserId(String userId, MessageInfo message) {
        if (!StringUtils.isEmpty(userId)) {
            Set<String> clientSet = connection.get(userId);
            //用户是否存在客户端连接
            if (Objects.nonNull(clientSet)) {
                Iterator<String> iterator = clientSet.iterator();
                while (iterator.hasNext()) {
                    String sid = iterator.next();
                    Session session = clients.get(sid);
                    //向每个会话发送消息
                    if (Objects.nonNull(session)) {
                        try {
                            String jsonString = JSON.toJSONString(message);
                            //同步发送数据,需要等上一个sendText发送完成才执行下一个发送
                            session.getBasicRemote().sendText(jsonString);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

3、方式一:前端定时器

//在websocket.js里添加定时器
     sendDataRequest(){
       socket.sendMsg(JSON.stringify({ type: "message" }));
       // 每隔2秒发送一次数据请求
       setTimeout(() => {
         this.sendDataRequest();
       }, 2000);
     }

//在websocketOnOpen里添加
socket.sendDataRequest();

4、方式二:后端处理后实时推送

这里以定时器模拟将数据处理结束之后,向客户端推送数据。

@Slf4j
@Component
@EnableScheduling
public class SendDataTask {

    @Autowired
    private DataService dataService ;

    @Scheduled(fixedRate = 5000)
    @Async
    public void sendMessageToClient(){
        log.info("定时任务:发送数据");
        try {
            TimeUnit.SECONDS.sleep(5);
            MessageInfo messageInfo = dataService.list(new ViewQueryInfoDTO());
            sendMessageByUserId("guest",messageInfo);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

模拟一个数据集(实际是从数据库取的)

/**
 * @author LiuJunDong
 * @ ClassName DataService
 * @ description: TODO
 * @ date 2023-10-17
 * @ version: 1.0
 */
@Service
public class DataService {
    public static CopyOnWriteArrayList<Device> list = new CopyOnWriteArrayList<>();
    public static AtomicInteger atomicInteger = new AtomicInteger(5);

    static {
        Device device = new Device("1", "博联", System.currentTimeMillis());
        Device device2 = new Device("2", "博联2", System.currentTimeMillis());
        Device device3= new Device("3", "博联3", System.currentTimeMillis());
        Device device4 = new Device("4", "博联4", System.currentTimeMillis());
        list.add(device);
        list.add(device2);
        list.add(device3);
        list.add(device4);
    }
    public MessageInfo list(ViewQueryInfoDTO viewQueryInfoDTO){
        String s = Integer.valueOf(atomicInteger.incrementAndGet()).toString();
        list.add(new Device(s,"博联"+s,System.currentTimeMillis()));
        String jsonString = JSONObject.toJSONString(list);
        return new MessageInfo("message","ok",jsonString);
    }
}

集合最好采用线程安全的,防止多线程环境下出现问题。