websocket使用

发布时间 2023-12-13 17:18:34作者: 杨吃羊

WEBSocket(客户端和服务器能够双向同时传输数据): 应用层协议,客户端和服务器建立连接时采用http握手方式,建立连接后利用http协议的Upgrade属性将协议变更为WebSocket协议(通过TCP协议来传输数据)

http和websocket
相同点:1 都是建立在TCP之上,通过TCP协议来传输数据; 2 都是可靠性传输协议; 3 都是应用层协议
不同点:1 WebSocket支持持久连接,HTTP不支持持久连接 2 WebSocket是双向通信协议,HTTP是单向协议,只能由客户端发起,做不到服务器主动向客户端推送信息。

后端java使用websocket:

依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

 创建WebSocket服务端点

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/imserver/{conferenceId}/{userToken}", configurator = WebSocketConfigurator.class) @Component public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收 userToken
*/
private String userToken = "";
//可用@Autowired正常注入
@OnOpen
public void onOpen(Session session, @PathParam("userToken") String userToken, @PathParam("conferenceId") String conferenceId) throws IOException {
final Map<String, Object> properties = session.getUserProperties();
logger.info("open输出当前对象: " + this);
this.session = session;
this.userToken = userToken;
//根据userToken判断webSocketMap中是否已经有相同的key,如果有,先把旧的连接断开再加入新连接
for (Map.Entry<String, WebSocketServer> entry : webSocketMap.entrySet()) {
String preUserToken = entry.getKey();//先登陆的
if (userToken.equals(preUserToken)) {
WebSocketServer webSocket = entry.getValue();
webSocketMap.get(preUserToken).onClose();
}
}
webSocketMap.put(userToken, this);
}
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userToken) && webSocketMap.get(userToken) == this) {
logger.info("删除时 webSocketMap 成员数量为 {}, token 为 {} ,onClose 要删的对象是 {}", webSocketMap.size(), userToken, webSocketMap.get(userToken));
webSocketMap.remove(userToken);
}
logger.info("用户 memberId = {}, name = {}, token = {} 退出,当前在线人数为 {}", memberId, name, userToken, getOnlineCount());
}
 
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
logger.debug("接收到用户 {} 消息,报文: {}", userToken, message);
JSONObject jsonMsg;
try {
jsonMsg = JSONObject.parseObject(message);
} catch (com.alibaba.fastjson.JSONException e) {
logger.warn("WebSocket服务收到了非 JSON 消息,报文:{}", message);
return;
}
if (jsonMsg != null) {
//业务逻辑
String type = jsonMsg.getString("type");
String uuid = jsonMsg.getString("uuid");
WebSocketEnum anEnum = WebSocketEnum.getEnum(type);
//前端根据用户操作,给后端发送不同类型的消息,后端根据消息类型分别处理
switch (anEnum) {
// 心跳
case heartbeat:
// 进行心跳包反馈
sendInfo(jsonMsguserToken);
break;
//倒计时的开启、关闭
case countPlayStop:
sendCountdownInfoForAll(jsonMsg);
break;
//切换议题
case conferenceIssue:
//修改会议状态
case conferenceStatus:
//开启、关闭同屏
case sameScreenStatus:
//切换资料
case conferenceAttachment:
//如果是接口调用去查当前用户
Member userInfo = getUserinfo.getUserInfo(this.userToken);
speakerId = userInfo.getId();
case conferenceSync:
if (anEnum == conferenceSync) {
Conference conference = conferenceService.getConferenceByAttachmentId(attachmentId);
if (conference == null) {
// attachmentId已被删除
JSONObject obj = new JSONObject();
obj.put("type", "deletedType");
obj.put("conferenceId", conferenceId);
obj.put("attachmentId", attachmentId);
obj.put("message", "当前会议资料已删除,请点击确认按钮更新会议资料");
try {
sendInfo(obj.toJSONString(), userToken);
} catch (IOException e) {
throw new RuntimeException(e);
}
return;
}
}
// 主控轮询修改版本号
ConferenceStatusDTO conferenceStatus = conferenceStatusComponent.getConferenceStatusById(conferenceId);

if (conferenceStatus != null) {
logger.debug("主控轮询 conferenceSync 操作,用户 {}, 当前会议状态信息:{},token 为 {},", memberId, JacksonHelper.toJsonString(conferenceStatus), userToken);
// 切换议题,主讲轮询的数据是切换议题前的数据,不修改
// 切换议题会有 >= 俩个值不同
// 议题是否匹配
boolean notMatchIssue = issueId.longValue() != conferenceStatus.getIssueId();
// 主讲人是否匹配
boolean notMatchSpeaker = speakerId.longValue() != conferenceStatus.getSpeakerId();
// 附件是否匹配
boolean notMatchAttachment = attachmentId.longValue() != conferenceStatus.getAttachmentId();
if (notMatchIssue && (notMatchSpeaker || notMatchAttachment)) {
logger.warn("主控轮询上报信息不匹配,议题是否匹配:{},上报主讲人是否匹配:{},附件是否匹配:{}", false, !notMatchSpeaker, !notMatchAttachment);
return;
}
// 其他情况每次只有一个不相同
if (notMatchSpeaker || notMatchIssue || notMatchAttachment) {
logger.warn("主控轮询上报信息不匹配,当前用户为 {},token 为 {},议题是否匹配:{},主讲人是否匹配:{},附件是否匹配:{}", memberId, userToken, !notMatchIssue, !notMatchSpeaker, !notMatchAttachment);
boolean success = conferenceStatusComponent.changeVersion(conferenceId, issueId, attachmentId, speakerId, type, false);
if (success) {
// 同步主控端消息推送
this.masterSyncSend(jsonMsg);
}
}
}
default:
if (StringUtils.isNotBlank(uuid)) {
//messageMap.remove(uuid);
Iterator<Map<String, String>> iterator = messageVector.iterator();
while (iterator.hasNext()) {
Map<String, String> next = iterator.next();
if (uuid.equals(next.get("uuid"))) {
iterator.remove();
}
}
logger.info("收到客户端消息用户:" + userToken + ",已清除消息:" + uuid);
//logger.warn("用户:" + userId + ",已清除消息:" + uuid);
}
break;
}
}
}
public static void sendInfo(String message, @PathParam("userToken") String userToken) throws IOException {
//log.info("发送消息到:"+userId+",报文:"+message);
if (StringUtils.isNotBlank(userToken) && webSocketMap.containsKey(userToken)) {
webSocketMap.get(userToken).sendMessage(message);
} else {
logger.warn("用户【{}】不在线,发送消息失败,消息内容为 {} ", userToken, message);
}
}
public static void sendCountdownInfoForAll(Map<String, Object> map) throws IOException {
//优先清理垃圾连接
cleanGcConnect();
for (Map.Entry<String, WebSocketServer> entry : webSocketMap.entrySet()) {
//只给同一会议中的人发
String conferenceId = map.get("conferenceId").toString();
if (StringUtils.equals(entry.getValue().conferenceId, conferenceId)) {
String userToken = entry.getKey();
sendInfo(JSON.toJSONString(map), userToken);
}
}
}
}