Commit ea11f978 by 邓敏

WebSocket

1 parent 858a7caf
package com.subsidy.common.configure;
import com.subsidy.common.interceptor.WebSocketInterceptor;
import com.subsidy.util.websocket.WebSocketUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* <p>
* WebSocket 配置
* </p>
*
* @author DengMin
* @since 2022/7/13
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private WebSocketUtil webSocketUtil;
@Autowired
private WebSocketInterceptor webSocketInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(webSocketUtil, "socket")
.addInterceptors(webSocketInterceptor) // 自定义验证规则
.setAllowedOrigins("*");
}
}
\ No newline at end of file
......@@ -8,7 +8,9 @@ import com.subsidy.common.ResponseVO;
import com.subsidy.common.constant.Code;
import com.subsidy.common.exception.HttpException;
import com.subsidy.mapper.AdministerMapper;
import com.subsidy.mapper.MemberMapper;
import com.subsidy.model.AdministerDO;
import com.subsidy.model.MemberDO;
import com.subsidy.util.ConstantUtils;
import com.subsidy.util.JwtUtil;
import com.subsidy.util.Localstorage;
......@@ -45,6 +47,9 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
private AdministerMapper administerMapper;
@Autowired
private MemberMapper memberMapper;
@Autowired
private RedisUtil redisUtil;
@Override
......@@ -81,10 +86,15 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
return true;
}
} else if(ConstantUtils.MOBILE_TERMINATE.equals(type)) {
MemberDO memberDO = memberMapper.selectById(claimMap.get("id").asLong());
if(memberDO != null) {
Localstorage.setUser(memberDO);
return true;
}
/**
* 学生端设置单设备登录
*/
String tk = (String) redisUtil.get(ConstantUtils.MOBILE_TERMINATE+"_"+claimMap.get("id").asLong());
/* String tk = (String) redisUtil.get(ConstantUtils.MOBILE_TERMINATE+"_"+claimMap.get("id").asLong());
if(StringUtils.isNotBlank(tk)) {
if(tk.equals(token)) {
return true;
......@@ -93,7 +103,7 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
throw new HttpException(1011);
} else {
throw new HttpException(1010);
}
}*/
} else {
throw new HttpException(1010);
}
......
package com.subsidy.common.interceptor;
import com.subsidy.mapper.MemberMapper;
import com.subsidy.model.MemberDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
/**
* <p>
* WebSocket握手拦截器
* </p>
*
* @author DengMin
* @since 2022/7/14
*/
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
@Autowired
private MemberMapper memberMapper;
/**
* 自定义验证规则, 如果该用户不存在数据表中,拒绝此次连接
* @param request
* @param serverHttpResponse
* @param webSocketHandler
* @param map
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
if(request instanceof ServletServerHttpRequest) {
HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest();
Long userId = Long.parseLong(httpServletRequest.getParameter("userId"));
MemberDO memberDO = memberMapper.selectById(userId);
if(null != memberDO) {
return true;
}
}
return false;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
......@@ -387,7 +387,8 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, MemberDO> imple
userRoleVO.setRotationImgDictDOS(rotationImgDictDOS);
String token = JwtUtil.generateToken(memberDO.getId(), ConstantUtils.MOBILE_TERMINATE);
redisUtil.set(ConstantUtils.MOBILE_TERMINATE + "_" + memberDO.getId(), token);
//redisUtil.set(ConstantUtils.MOBILE_TERMINATE + "_" + memberDO.getId(), token);
Localstorage.setUser(memberDO);
userRoleVO.setToken(token);
return userRoleVO;
}
......@@ -441,7 +442,8 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, MemberDO> imple
//redisUtil.set(RedisPrefixConstant.SUBSIDY_MEMBER_LOGIN_PREFIX + memberDO.getId() + ":" + System.currentTimeMillis(), 1);
String token = JwtUtil.generateToken(memberDO.getId(), ConstantUtils.MOBILE_TERMINATE);
redisUtil.set(ConstantUtils.MOBILE_TERMINATE + "_" + memberDO.getId(), token);
//redisUtil.set(ConstantUtils.MOBILE_TERMINATE + "_" + memberDO.getId(), token);
Localstorage.setUser(memberDO);
memberVO.setToken(token);
return memberVO;
} else {
......
package com.subsidy.util.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint(value = "/chat",configurator = GetHttpSessionConfigurator.class) //与前端的那个对应
@Component
public class ChatEndPoint {
/**
* 用来存储每一个客户端对象对应的ChatEndPoint
*/
private static Map<String, ChatEndPoint> onlineUsers = new ConcurrentHashMap<>();
/**
* 声明session对象 通过该对象可以发送消息给指定的用户/客户端
*/
private Session session;
/**
* 声明一个HttpSession对象,我们之前在HttpSession对象中存储了用户名
*/
private HttpSession httpSession;
/**
* 连接建立时候被调用
*/
@OnOpen
public void open(Session session, EndpointConfig endpointConfig) {
//将局部的Session对象赋值给成员Session
this.session = session;
//获取httpSession对象
HttpSession httpSession = (HttpSession) session.getUserProperties().get(HttpSession.class.getName());
this.httpSession = httpSession;
//从httpSession对象中获取用户名
String userName = (String) httpSession.getAttribute("user");
//将当前对象存储到容器里面 当前chatEndpoint对象
onlineUsers.put(userName, this);
//将当前在线用户的用户名推送给所有的客户端
//1.获取消息
//所有在线的用户名
String msg = MessageUtils.getMessage(true, null, getName());
// 2.调用方法进行系统消息的推送
broadcastAllUsers(msg);
}
private void broadcastAllUsers(String message){
try {
//要将该消息推送给所有的客户端
Set<String> set = onlineUsers.keySet();
for (String user : set){
ChatEndPoint chatEndPoint = onlineUsers.get(user);
chatEndPoint.session.getBasicRemote().sendText(message);
}
}catch (Exception e){
e.printStackTrace();
}
}
private Set<String> getName() {
return onlineUsers.keySet();
}
/**
* 接收到客户端发送的数据时候被调用
*/
@OnMessage(maxMessageSize = 102400)
public void onMessage(String msg, Session session) {
try {
//将msg转换成Message对象
ObjectMapper objectMapper = new ObjectMapper();
Message message = objectMapper.readValue(msg,Message.class);
//获取要接受数据的人
String toName = message.getToName();
//获取消息数据
String data = message.getMessage();
//获取当前登录的用户
String userName = (String) httpSession.getAttribute("user");
//获取推送给指定用户的消息格式的数据
String result = MessageUtils.getMessage(false,userName,data);
//发送数据
onlineUsers.get(toName).session.getBasicRemote().sendText(result);
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 客户端关闭时候被调用
*/
@OnClose
public void onClose(Session session) {
//获取当前登录的用户
String userName = (String) httpSession.getAttribute("user");
// 从容器中删除指定的用户
onlineUsers.remove(userName);
//获取推送消息
String msg = MessageUtils.getMessage(true,null,getName());
broadcastAllUsers(msg);
}
}
package com.subsidy.util.websocket;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
public class GetHttpSessionConfigurator extends ServerEndpointConfig.Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
//将HttpSession对象存储到配置对象中
sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
// super.modifyHandshake(sec, request, response);
}
}
package com.subsidy.util.websocket;
import lombok.Data;
/**
* 浏览器给服务端发的websocket数据
*/
@Data
public class Message {
private String toName;
private String message;
}
package com.subsidy.util.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* 封装消息的工具类
*/
public class MessageUtils {
public static String getMessage(Boolean isSystemMessage,String fromName,Object message){
try {
ResultMessage resultMessage = new ResultMessage();
resultMessage.setIsSystem(isSystemMessage);
resultMessage.setMessage(message);
if (fromName != null){
resultMessage.setFromName(fromName);
}
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(resultMessage);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
package com.subsidy.util.websocket;
import lombok.Data;
/**
* 登陆响应给浏览器的数据
*/
@Data
public class Result {
private Boolean flag;
private String message;
}
package com.subsidy.util.websocket;
import lombok.Data;
/**
* 服务端发送给浏览器的websocket数据
*/
@Data
public class ResultMessage {
private Boolean isSystem;
private String fromName;
private Object message; //系统消息的话就是数组
}
package com.subsidy.util.websocket;
import com.subsidy.util.DateFormatUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.util.Date;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* <p>
* WebSocket
* </p>
*
* @author DengMin
* @since 2022/7/13
*/
@Slf4j
@Component
public class WebSocketUtil implements WebSocketHandler {
/**
* 存放建立连接webSocket对象
*/
private static CopyOnWriteArraySet<WebSocketSession> webSocketMap = new CopyOnWriteArraySet<>();
/**
* 处理成功连接WebSocket
* @param session
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
if(null != session) {
if(webSocketMap.contains(session)) {
webSocketMap.remove(session);
}
webSocketMap.add(session);
String httpSessionId = session.getId();
String host = session.getUri().getHost();
String query = session.getUri().getQuery();
log.info("----> webSocket connection success");
log.info("parameter:[ httpSessionId: {}, host: {}, {} ]", httpSessionId, host, query);
log.info("connection time: {}", DateFormatUtil.format(new Date(), DateFormatUtil.FMT_sdf14_L));
}
}
/**
* 处理WebSocket transport error
* @param session
* @param throwable
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
if(session.isOpen()) {
session.close();
}
webSocketMap.remove(session);
log.error("<---- webSocket transport error");
log.error("error message: {}", throwable.getMessage());
}
/**
* 在两端WebSocket connection都关闭或transport error发生后执行
* @param session
* @param closeStatus
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
webSocketMap.remove(session);
System.out.println("<---- webSocket is close");
log.info("<---- webSocket is close");
log.info("session {} close, status: {}", session.getId(), closeStatus);
}
/**
* 接收WebSocket客户端Message
* @param session
* @param message
* @throws Exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if(message instanceof TextMessage) {
System.out.println(message.getPayload());
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}
\ No newline at end of file
package com.subsidy.util.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebsocketConfig {
/**
* 注入 ServerEndpointExporter 对象,可以自动注册使用了@ServerEndpoint注解的bean
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
......@@ -47,6 +47,7 @@
</appender>
<springProfile name="dev">
<logger name="com.subsidy" level="debug" />
<logger name="com.subsidy" level="error" />
<root level="INFO">
<appender-ref ref="CONSOLE" />
......@@ -56,6 +57,7 @@
</springProfile>
<springProfile name="pre">
<logger name="com.subsidy" level="debug" />
<logger name="com.subsidy" level="error" />
<root level="INFO">
<appender-ref ref="ERROR_FILE" />
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!