WebSocketUtil.java
8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package com.subsidy.util.websocket;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.subsidy.common.ResponseData;
import com.subsidy.common.exception.HttpException;
import com.subsidy.mapper.ClassDictMapper;
import com.subsidy.mapper.ClassVodFaceCheckMapper;
import com.subsidy.mapper.MemberMapper;
import com.subsidy.mapper.OprMemDictMapper;
import com.subsidy.model.OprMemDictDO;
import com.subsidy.vo.classdict.ClassSettingsVO;
import com.subsidy.vo.classdict.SystemSettings;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.*;
/**
* WebSocket
* https://zhuanlan.zhihu.com/p/531474864
*
* @author DengMin
* @since 2022/7/13
*/
@Slf4j
@Component
public class WebSocketUtil implements WebSocketHandler {
@Autowired
private ClassVodFaceCheckMapper classVodFaceCheckMapper;
@Autowired
private ClassDictMapper classDictMapper;
@Autowired
private MemberMapper memberMapper;
private int heartbeatMin = 1; // 断连最小心跳次数
private int heartbeatMax = 3; // 断连最大心跳次数
private int reconnectionSeconds = 30; //每次断连间隔重新连接秒
/**
* selfExport
* 存放建立连接webSocket对象 Map<memberId,session>
*/
public static ConcurrentHashMap<Long, WebSocketSession> webSocketMap = new ConcurrentHashMap();
ConcurrentHashMap<String, ScheduledFuture> taskMap = new ConcurrentHashMap<>(1);
/**
* 处理成功连接WebSocket 建立直接后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
if (null != session) {
String params = session.getUri().getQuery();
//ws://www.youkehulian.cn/nginx/socket?userId=3340&token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpZCI6MzM0MCwidHlwZSI6Im1vYmlsZSIsImV4cCI6MTY3MzA1NjAzOSwiaWF0IjoxNjcyOTY5NjM5fQ.qx05-0AoWcaktgux30dJLmRE7AdtRuSWvk6cX7CNnao
// Long id = Long.valueOf(params.split("=")[1]);
String[] split = params.split("=");
Long id = Long.valueOf(split[1].split("&")[0]);
String token = split[2];
// OprMemDictDO oprMemDictDO = oprMemDictMapper.getLatestLoginInfo(id); //最近一次登录
// if (null != oprMemDictDO) {
// //如果最近一次不是登录成功,那么就是页面上刷新
// if (!("登录".equals(oprMemDictDO.getOprType()) && 1 == oprMemDictDO.getResult())) {
// webSocketMap.put(id, session);
// webSocketMap.get(id).sendMessage(new TextMessage(JSONObject.toJSONString(ResponseData.generateCreatedResponse(1012))));
// }
// }
//webSocketMap的操作
if (null != webSocketMap && webSocketMap.get(id) != null) {
if (webSocketMap.get(id).isOpen()) {
/* 相同账户进行挤号,发送消息给WebSocket通知账户已在其他地方登录 */
webSocketMap.get(id).sendMessage(new TextMessage(JSONObject.toJSONString(ResponseData.generateCreatedResponse(1011))));
webSocketMap.get(id).close();
}
webSocketMap.remove(id);
}
webSocketMap.put(id, session);
// 数据操作
SystemSettings systemSettings = memberMapper.companySettings(id); //公司配置
List<ClassSettingsVO> classSettings = classDictMapper.getClassSettings(id);
for (ClassSettingsVO csv : classSettings) {
//配置的视频id
List<Long> longs = classVodFaceCheckMapper.faceVodIds(csv.getId());
csv.setVodIds(longs);
}
systemSettings.setClassSettingsVOS(classSettings);
if (CollectionUtils.isNotEmpty(classSettings)) {
String data = JSONObject.toJSONString(ResponseData.generateCreatedResponse(0, systemSettings), SerializerFeature.WriteMapNullValue);
if (null != webSocketMap.get(id)) {
webSocketMap.get(id).sendMessage(new TextMessage(data));
}
}
}
}
/**
* 接收WebSocket客户端Message
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
this.handleMessage(session, message);
}
/**
* 链接发生错误:处理WebSocket transport error
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
if (session.isOpen()) {
session.close();
}
String params = session.getUri().getQuery();
String[] split = params.split("=");
Long id = Long.valueOf(split[1].split("&")[0]);
// String token = split[2];
// Long id = Long.valueOf(params.split("=")[1]);
webSocketMap.remove(id);
}
/**
* 关闭链接 :在两端WebSocket connection都关闭或transport error发生后执行
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (null != session) {
if (null != webSocketMap) {
// String params = session.getUri().getQuery();
// String[] split = params.split("=");
// Long id = Long.valueOf(split[1].split("&")[0]);
// String token = split[2];
// Long id = Long.valueOf(params.split("=")[1]);
// heartbeat(webSocketMap.get(id));
}
}
}
/**
* 支持分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 断开连接后进行三次心跳验证判断是否重新连接了,如果没有连接成功则判断为下线
*/
public void heartbeat(WebSocketSession session) {
// ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
// String params = session.getUri().getQuery();
// String[] split = params.split("=");
// Long id = Long.valueOf(split[1].split("&")[0]);
// String token = split[2];
//
// ScheduledFuture scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
// int beatsNum = heartbeatMin; //至少一次
//
// @SneakyThrows
// @Override
// public void run() {
// if (null != session && !session.isOpen()) {
// //判断这个session在不在map里
// if (taskMap.containsKey(token)) {
// //在map里并且这个session还是断开的情况下,进行重连,超过次数就掉线
// if (beatsNum > heartbeatMax) { //
// OprMemDictDO oprMemDictDO = new OprMemDictDO();
// oprMemDictDO.setUserId(id);
// oprMemDictDO.setResult(1);
// oprMemDictDO.setOprType("登出");
// oprMemDictMapper.insert(oprMemDictDO);
// //结束任务
// taskMap.get(token).cancel(true);
// taskMap.remove(token);
// }
// beatsNum++;
// }
// } else if (null != session && session.isOpen()) {
// if (taskMap.containsKey(token)){
// /* 时间段内重新连接了结束验证 */
// beatsNum = heartbeatMin; //重连次数清零
// taskMap.get(token).cancel(true);
// taskMap.remove(token);
// }
//
// }
// }
// }, 0, reconnectionSeconds, TimeUnit.SECONDS);
// taskMap.put(token, scheduledFuture);
}
public static void main(String[] args) {
String url = "ws://www.youkehulian.cn/nginx/socket?userId=3340&token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpZCI6MzM0MCwidHlwZSI6Im1vYmlsZSIsImV4cCI6MTY3MzA1NjAzOSwiaWF0IjoxNjcyOTY5NjM5fQ.qx05-0AoWcaktgux30dJLmRE7AdtRuSWvk6cX7CNnao";
String[] split = url.split("=");
Long id = Long.valueOf(split[1].split("&")[0]);
String token = split[2];
System.out.println(id);
System.out.println(token);
}
}