Blame view

WebSocketUtil.java 8.2 KB
邓敏 committed
1 2
package com.subsidy.util.websocket;

邓敏 committed
3 4 5
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.subsidy.common.ResponseData;
邓敏 committed
6
import com.subsidy.mapper.ClassDictMapper;
邓敏 committed
7 8
import com.subsidy.mapper.OprMemDictMapper;
import com.subsidy.model.OprMemDictDO;
邓敏 committed
9
import com.subsidy.util.DateFormatUtil;
邓敏 committed
10
import com.subsidy.vo.classdict.ClassSettingsVO;
邓敏 committed
11
import lombok.SneakyThrows;
邓敏 committed
12
import lombok.extern.slf4j.Slf4j;
邓敏 committed
13
import org.apache.commons.collections.CollectionUtils;
邓敏 committed
14
import org.springframework.beans.factory.annotation.Autowired;
邓敏 committed
15 16 17 18 19 20
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
邓敏 committed
21 22
import java.io.IOException;
import java.util.Calendar;
邓敏 committed
23
import java.util.Date;
邓敏 committed
24 25 26 27 28 29
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
邓敏 committed
30 31 32 33 34 35 36 37 38 39 40 41 42

/**
 * <p>
 *  WebSocket
 * </p>
 *
 * @author DengMin
 * @since 2022/7/13
 */
@Slf4j
@Component
public class WebSocketUtil implements WebSocketHandler {

邓敏 committed
43 44 45
    @Autowired
    private OprMemDictMapper oprMemDictMapper;

邓敏 committed
46 47 48
    @Autowired
    private ClassDictMapper classDictMapper;

邓敏 committed
49 50 51 52
    private int heartbeatMin = 1; // 断连最小心跳次数
    private int heartbeatMax = 3; // 断连最大心跳次数
    private int reconnectionSeconds = 60; //每次断连间隔重新连接秒

邓敏 committed
53 54 55
    /**
     * 存放建立连接webSocket对象
     */
邓敏 committed
56 57 58
    private ConcurrentHashMap<Long, WebSocketSession> webSocketMap = new ConcurrentHashMap();

    ConcurrentHashMap<String, ScheduledFuture> taskMap = new ConcurrentHashMap<>(1);
邓敏 committed
59 60 61 62 63 64

    /**
     * 处理成功连接WebSocket
     * @param session
     */
    @Override
邓敏 committed
65
    public void afterConnectionEstablished(WebSocketSession session) throws IOException {
邓敏 committed
66
        if(null != session) {
邓敏 committed
67 68 69 70
            String params = session.getUri().getQuery();
            Long id = Long.valueOf(params.split("=")[1]);
            if(null != webSocketMap && webSocketMap.get(id) != null) {
                if(webSocketMap.get(id).isOpen()) {
邓敏 committed
71
                    /* 相同账户进行挤号,发送消息给WebSocket通知账户已在其他地方登录 */
邓敏 committed
72 73 74 75
                    webSocketMap.get(id).sendMessage(new TextMessage(JSONObject.toJSONString(ResponseData.generateCreatedResponse(1011))));
                    webSocketMap.get(id).close();
                } else {
                    /*
邓敏 committed
76 77
                     * 如果上次连接的WebSocket状态是关闭,
                     * 并且上一次记录时间大于 { heartbeatMax * reconnectionSeconds } 秒(心跳检测机制),则判断为这次登陆是免密码登陆的重新记录上线时间
邓敏 committed
78 79 80 81 82 83 84 85
                     */
                    List<OprMemDictDO> list = oprMemDictMapper.selectList(new QueryWrapper<OprMemDictDO>()
                            .lambda()
                            .eq(OprMemDictDO::getUserId, id)
                            .orderByDesc(OprMemDictDO::getCreateDate));

                    Calendar calendar = Calendar.getInstance();
                    calendar.setTime(DateFormatUtil.localDateTimeToDate(list.get(0).getCreateDate()));
邓敏 committed
86
                    calendar.add(Calendar.SECOND,heartbeatMax * reconnectionSeconds);
邓敏 committed
87 88 89 90 91 92 93 94 95 96 97 98
                    if(calendar.getTime().after(DateFormatUtil.localDateTimeToDate(list.get(0).getCreateDate()))) {
                        if(list.get(0).getOprType().equals("登出")) {
                            OprMemDictDO oprMemDictDO = new OprMemDictDO();
                            oprMemDictDO.setUserId(id);
                            oprMemDictDO.setResult(1);
                            oprMemDictDO.setOprType("登录");
                            oprMemDictDO.setIpAddress(session.getRemoteAddress().getHostName());
                            oprMemDictMapper.insert(oprMemDictDO);
                        }
                    }
                }
                webSocketMap.remove(id);
邓敏 committed
99
            }
邓敏 committed
100
            webSocketMap.put(id, session);
邓敏 committed
101

邓敏 committed
102 103 104 105 106
            List<ClassSettingsVO> list = classDictMapper.getClassSettings(id);
            if(CollectionUtils.isNotEmpty(list)) {
                webSocketMap.get(id).sendMessage(new TextMessage(JSONObject.toJSONString(ResponseData.generateCreatedResponse(0, list))));
            }

邓敏 committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
            String httpSessionId = session.getId();
            String host = session.getUri().getHost();
            String query = session.getUri().getQuery();
            log.info("----> webSocket connection success");
            log.info("parameter:[ httpSessionId: {}, host: {}, {} ]", httpSessionId, host, query);
            log.info("connection time: {}", DateFormatUtil.format(new Date(), DateFormatUtil.FMT_sdf14_L));
        }
    }

    /**
     * 处理WebSocket transport error
     * @param session
     * @param throwable
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
        if(session.isOpen()) {
            session.close();
        }

邓敏 committed
128 129 130
        String params = session.getUri().getQuery();
        Long id = Long.valueOf(params.split("=")[1]);
        webSocketMap.remove(id);
邓敏 committed
131 132 133 134 135 136 137 138 139 140 141
        log.error("<---- webSocket transport error");
        log.error("error message: {}", throwable.getMessage());
    }

    /**
     * 在两端WebSocket connection都关闭或transport error发生后执行
     * @param session
     * @param closeStatus
     * @throws Exception
     */
    @Override
邓敏 committed
142
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception{
邓敏 committed
143
        if(null != session) {
邓敏 committed
144 145 146 147 148
            if(null != webSocketMap) {
                String params = session.getUri().getQuery();
                Long id = Long.valueOf(params.split("=")[1]);
                heartbeat(webSocketMap.get(id));
            }
邓敏 committed
149 150 151
            log.info("<---- webSocket is close");
            log.info("session {} close, status: {}", session.getId(), closeStatus);
        }
邓敏 committed
152 153 154
    }

    /**
邓敏 committed
155
     * 断开连接后进行三次心跳验证判断是否重新连接了,如果没有连接成功则判断为下线
邓敏 committed
156 157 158 159 160 161
     *
     * @param session
     */
    public void heartbeat(WebSocketSession session) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        ScheduledFuture scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
邓敏 committed
162
            int beatsNum = heartbeatMin;
邓敏 committed
163 164 165 166
            @SneakyThrows
            @Override
            public void run() {
                if(null != session && !session.isOpen()) {
邓敏 committed
167 168 169 170 171 172 173 174 175 176 177 178 179 180
                    while (beatsNum > heartbeatMax) {
                        String params = session.getUri().getQuery();
                        Long id = Long.valueOf(params.split("=")[1]);
                        OprMemDictDO oprMemDictDO = new OprMemDictDO();
                        oprMemDictDO.setUserId(id);
                        oprMemDictDO.setResult(1);
                        oprMemDictDO.setOprType("登出");
                        oprMemDictDO.setIpAddress(session.getRemoteAddress().getHostName());
                        oprMemDictMapper.insert(oprMemDictDO);
                        taskMap.get(session.getId()).cancel(true);
                    }
                    beatsNum++;
                } else if (null != session && session.isOpen()) {
                    /* 时间段内重新连接了结束验证 */
邓敏 committed
181 182 183
                    taskMap.get(session.getId()).cancel(true);
                }
            }
邓敏 committed
184
        }, 0, reconnectionSeconds, TimeUnit.SECONDS);
邓敏 committed
185 186 187 188
        taskMap.put(session.getId(), scheduledFuture);
    }

    /**
邓敏 committed
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
     * 接收WebSocket客户端Message
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        if(message instanceof TextMessage) {
            System.out.println(message.getPayload());
        }
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}