websocket+定时任务实现实时推送
有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。
TaskScheduler定时任务实现
- TaskScheduler接口提供了多种调度方法来实现运行任务的执行。
public interface TaskScheduler {
//通过触发器来决定task是否执行
ScheduledFuture schedule(Runnable task, Trigger trigger);
//在starttime的时候执行一次
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture schedule(Runnable task, Instant startTime);
//从starttime开始每个period时间段执行一次task
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
//每隔period执行一次
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
//从startTime开始每隔delay长时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
//每隔delay时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
- 简单测试一下
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
/**
* The type Task scheduler test.
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:45:17
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {
private final TaskScheduler taskScheduler;
@Bean
public void test() {
//每隔3秒执行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//每隔1秒执行一次
//Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
taskScheduler.schedule(new MyThread(), trigger);
}
private class MyThread implements Runnable {
@Override
public void run() {
log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
}
}
}
效果就是每个3秒执行一次
websocket+定时任务实时推送
实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现
- TestWebsocket.java
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* 测试websocket
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 14:55:29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {
protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();
/**
* 定时任务集合
*/
Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();
/**
* taskScheduler
*/
private final TaskScheduler taskScheduler;
/**
* 建立连接后操作
*
* @param session 连接session信息
* @throws Exception exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
WEB_SOCKET_SESSIONS.add(session);
//设置定时任务,每隔3s执行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//开启一个定时任务
ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
//根据session连接id定时任务线程存到map中
stringScheduledFutureMap.put(session.getId(), schedule);
}
private class CustomizeTask implements Runnable {
private final String sessionId;
CustomizeTask(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void run() {
try {
String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
sendMessage(JSONUtil.toJsonStr(message), sessionId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 接收到消息后的处理
*
* @param session 连接session信息
* @param message 信息
* @throws Exception exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
}
/**
* ws连接出错时调用
*
* @param session session连接信息
* @param exception exception
* @throws Exception exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
}
/**
* 连接关闭后调用
*
* @param session session连接信息
* @param closeStatus 关闭状态
* @throws Exception exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (session.isOpen()) {
sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
String sessionId = session.getId();
ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
if (scheduledFuture != null) {
//暂停对应session的开启的定时任务
scheduledFuture.cancel(true);
//集合移除
stringScheduledFutureMap.remove(sessionId);
}
}
/**
* 是否支持分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 群发发送消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
/**
* 发给指定连接消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message, String sessionId) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
if (sessionId.equals(webSocketSession.getId())) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
}
}
- websocket绑定URL
import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* websocket配置
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:10:11
*/
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private TestWebsocket testWebsocket;
/**
* Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
*
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
}
}
- websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 当定时任务和websocket同时存在时报错解决
*
* @author yjj
* @version 1.0
* @since 2022 -04-28 17:35:54
*/
@Configuration
public class ScheduledConfig {
/**
* Schedule本身是单线程执行的
*
* @return the task scheduler
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
scheduling.setPoolSize(20);
return scheduling;
}
}
- 效果如下
连接上以后服务每隔3秒会向客户端实时推送消息