Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
涂亚平
/
subsidy
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit e4fc42d1
authored
Jul 26, 2022
by
邓敏
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
webSocket新增心跳机制
1 parent
a3bbd3a0
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
25 additions
and
18 deletions
src/main/java/com/subsidy/util/websocket/WebSocketUtil.java
src/main/java/com/subsidy/util/websocket/WebSocketUtil.java
View file @
e4fc42d
...
@@ -40,6 +40,10 @@ public class WebSocketUtil implements WebSocketHandler {
...
@@ -40,6 +40,10 @@ public class WebSocketUtil implements WebSocketHandler {
@Autowired
@Autowired
private
OprMemDictMapper
oprMemDictMapper
;
private
OprMemDictMapper
oprMemDictMapper
;
private
int
heartbeatMin
=
1
;
// 断连最小心跳次数
private
int
heartbeatMax
=
3
;
// 断连最大心跳次数
private
int
reconnectionSeconds
=
60
;
//每次断连间隔重新连接秒
/**
/**
* 存放建立连接webSocket对象
* 存放建立连接webSocket对象
*/
*/
...
@@ -58,14 +62,13 @@ public class WebSocketUtil implements WebSocketHandler {
...
@@ -58,14 +62,13 @@ public class WebSocketUtil implements WebSocketHandler {
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
if
(
null
!=
webSocketMap
&&
webSocketMap
.
get
(
id
)
!=
null
)
{
if
(
null
!=
webSocketMap
&&
webSocketMap
.
get
(
id
)
!=
null
)
{
if
(
webSocketMap
.
get
(
id
).
isOpen
())
{
if
(
webSocketMap
.
get
(
id
).
isOpen
())
{
/*
/* 相同账户进行挤号,发送消息给WebSocket通知账户已在其他地方登录 */
相同账户进行挤号,发送消息给前者WebSocket通知账户已在其他地方登录
*/
webSocketMap
.
get
(
id
).
sendMessage
(
new
TextMessage
(
JSONObject
.
toJSONString
(
ResponseData
.
generateCreatedResponse
(
1011
))));
webSocketMap
.
get
(
id
).
sendMessage
(
new
TextMessage
(
JSONObject
.
toJSONString
(
ResponseData
.
generateCreatedResponse
(
1011
))));
webSocketMap
.
get
(
id
).
close
();
webSocketMap
.
get
(
id
).
close
();
}
else
{
}
else
{
/*
/*
* 如果上次连接的WebSocket状态是关闭,并且上一次记录时间大于60秒的,则判断为这次登陆是免密码登陆的重新记录上线时间
* 如果上次连接的WebSocket状态是关闭,
* 并且上一次记录时间大于 { heartbeatMax * reconnectionSeconds } 秒(心跳检测机制),则判断为这次登陆是免密码登陆的重新记录上线时间
*/
*/
List
<
OprMemDictDO
>
list
=
oprMemDictMapper
.
selectList
(
new
QueryWrapper
<
OprMemDictDO
>()
List
<
OprMemDictDO
>
list
=
oprMemDictMapper
.
selectList
(
new
QueryWrapper
<
OprMemDictDO
>()
.
lambda
()
.
lambda
()
...
@@ -74,7 +77,7 @@ public class WebSocketUtil implements WebSocketHandler {
...
@@ -74,7 +77,7 @@ public class WebSocketUtil implements WebSocketHandler {
Calendar
calendar
=
Calendar
.
getInstance
();
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
DateFormatUtil
.
localDateTimeToDate
(
list
.
get
(
0
).
getCreateDate
()));
calendar
.
setTime
(
DateFormatUtil
.
localDateTimeToDate
(
list
.
get
(
0
).
getCreateDate
()));
calendar
.
add
(
Calendar
.
SECOND
,
60
);
calendar
.
add
(
Calendar
.
SECOND
,
heartbeatMax
*
reconnectionSeconds
);
if
(
calendar
.
getTime
().
after
(
DateFormatUtil
.
localDateTimeToDate
(
list
.
get
(
0
).
getCreateDate
())))
{
if
(
calendar
.
getTime
().
after
(
DateFormatUtil
.
localDateTimeToDate
(
list
.
get
(
0
).
getCreateDate
())))
{
if
(
list
.
get
(
0
).
getOprType
().
equals
(
"登出"
))
{
if
(
list
.
get
(
0
).
getOprType
().
equals
(
"登出"
))
{
OprMemDictDO
oprMemDictDO
=
new
OprMemDictDO
();
OprMemDictDO
oprMemDictDO
=
new
OprMemDictDO
();
...
@@ -127,9 +130,6 @@ public class WebSocketUtil implements WebSocketHandler {
...
@@ -127,9 +130,6 @@ public class WebSocketUtil implements WebSocketHandler {
@Override
@Override
public
void
afterConnectionClosed
(
WebSocketSession
session
,
CloseStatus
closeStatus
)
throws
Exception
{
public
void
afterConnectionClosed
(
WebSocketSession
session
,
CloseStatus
closeStatus
)
throws
Exception
{
if
(
null
!=
session
)
{
if
(
null
!=
session
)
{
/*
断开连接后不会马上判断为下线状态,而是进入60秒的心跳检测机制,如果60秒内没有进行重连,则判断为下线,记录下线时间和状态
*/
if
(
null
!=
webSocketMap
)
{
if
(
null
!=
webSocketMap
)
{
String
params
=
session
.
getUri
().
getQuery
();
String
params
=
session
.
getUri
().
getQuery
();
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
...
@@ -141,29 +141,36 @@ public class WebSocketUtil implements WebSocketHandler {
...
@@ -141,29 +141,36 @@ public class WebSocketUtil implements WebSocketHandler {
}
}
/**
/**
* 断开连接后
60秒后进行
判断是否重新连接了,如果没有连接成功则判断为下线
* 断开连接后
进行三次心跳验证
判断是否重新连接了,如果没有连接成功则判断为下线
*
*
* @param session
* @param session
*/
*/
public
void
heartbeat
(
WebSocketSession
session
)
{
public
void
heartbeat
(
WebSocketSession
session
)
{
ScheduledExecutorService
service
=
Executors
.
newScheduledThreadPool
(
1
);
ScheduledExecutorService
service
=
Executors
.
newScheduledThreadPool
(
1
);
ScheduledFuture
scheduledFuture
=
service
.
scheduleAtFixedRate
(
new
Runnable
()
{
ScheduledFuture
scheduledFuture
=
service
.
scheduleAtFixedRate
(
new
Runnable
()
{
int
beatsNum
=
heartbeatMin
;
@SneakyThrows
@SneakyThrows
@Override
@Override
public
void
run
()
{
public
void
run
()
{
if
(
null
!=
session
&&
!
session
.
isOpen
())
{
if
(
null
!=
session
&&
!
session
.
isOpen
())
{
String
params
=
session
.
getUri
().
getQuery
();
while
(
beatsNum
>
heartbeatMax
)
{
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
String
params
=
session
.
getUri
().
getQuery
();
OprMemDictDO
oprMemDictDO
=
new
OprMemDictDO
();
Long
id
=
Long
.
valueOf
(
params
.
split
(
"="
)[
1
]);
oprMemDictDO
.
setUserId
(
id
);
OprMemDictDO
oprMemDictDO
=
new
OprMemDictDO
();
oprMemDictDO
.
setResult
(
1
);
oprMemDictDO
.
setUserId
(
id
);
oprMemDictDO
.
setOprType
(
"登出"
);
oprMemDictDO
.
setResult
(
1
);
oprMemDictDO
.
setIpAddress
(
session
.
getRemoteAddress
().
getHostName
());
oprMemDictDO
.
setOprType
(
"登出"
);
oprMemDictMapper
.
insert
(
oprMemDictDO
);
oprMemDictDO
.
setIpAddress
(
session
.
getRemoteAddress
().
getHostName
());
oprMemDictMapper
.
insert
(
oprMemDictDO
);
taskMap
.
get
(
session
.
getId
()).
cancel
(
true
);
}
beatsNum
++;
}
else
if
(
null
!=
session
&&
session
.
isOpen
())
{
/* 时间段内重新连接了结束验证 */
taskMap
.
get
(
session
.
getId
()).
cancel
(
true
);
taskMap
.
get
(
session
.
getId
()).
cancel
(
true
);
}
}
}
}
},
1
,
1
,
TimeUnit
.
MINUTE
S
);
},
0
,
reconnectionSeconds
,
TimeUnit
.
SECOND
S
);
taskMap
.
put
(
session
.
getId
(),
scheduledFuture
);
taskMap
.
put
(
session
.
getId
(),
scheduledFuture
);
}
}
...
...
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment