在Spring整合websocket整合应用示例(上)文章中,我们已经实现了websocket,但还有一个核心的业务实现类没有实现,这里我们就实现这个业务核心类,因为老夫参与的这个系统使用websocket发送消息,所以其实现就是如何发送消息了。
7. NewsListenerImpl的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
package cn.bridgeli.websocket;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.lagou.common.base.util.date.DateUtil;
import com.lagou.platform.news.api.enumeration.PlatNewsCategoryType;
import com.lagou.platform.news.web.dao.ext.model.PlatNewsVo;
import com.lagou.platform.news.web.dao.ext.model.SearchCondition;
import com.lagou.platform.news.web.quartz.impl.TimingJob;
import com.lagou.platform.news.web.service.PlatNewsService;
import org.apache.commons.lang.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description : 站内消息监听器实现
* @Date : 16-3-7
*/
@Component
public class NewsListenerImpl implements NewsListener{
private static final Logger logger = LoggerFactory.getLogger(NewsListenerImpl. class );
Gson gson = new GsonBuilder().setDateFormat( "yyyy-MM-dd HH:mm:ss" ).create();
//线程池
private ExecutorService executorService = Executors.newCachedThreadPool();
//任务调度
private SchedulerFactory sf = new StdSchedulerFactory();
@Autowired
private PlatNewsService platNewsService;
@Override
public void afterPersist(PlatNewsVo platNewsVo) {
logger.info( "监听到有新消息添加。。。" );
logger.info( "新消息为:" +gson.toJson(platNewsVo));
//启动线程
if ( null != platNewsVo && !StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){
//如果是定时消息
if (platNewsVo.getNewsType() == PlatNewsCategoryType.TIMING_TIME.getCategoryId()){
startTimingTask(platNewsVo); //定时推送
} else {
//立即推送
executorService.execute( new AfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail()));
}
}
}
@Override
public void afterConnectionEstablished(String email) {
logger.info( "建立websocket连接后推送新消息。。。" );
if (!StringUtils.isBlank(email)){
executorService.execute( new AfterConnectionEstablishedTask(email));
}
}
/**
* @Description : 如果新添加了定时消息,启动定时消息任务
* @param platNewsVo
*/
private void startTimingTask(PlatNewsVo platNewsVo){
logger.info( "开始定时推送消息任务。。。" );
Date timingTime = platNewsVo.getTimingTime();
if ( null == timingTime){
logger.info( "定时消息时间为null。" );
return ;
}
logger.info( "定时推送任务时间为:" +DateUtil.date2String(timingTime));
JobDetail jobDetail= JobBuilder.newJob(TimingJob. class )
.withIdentity(platNewsVo.getCurrentoperatoremail()+ "定时消息" +platNewsVo.getId(), "站内消息" )
.build();
//传递参数
jobDetail.getJobDataMap().put( "platNewsService" ,platNewsService);
jobDetail.getJobDataMap().put( "userEmail" ,platNewsVo.getCurrentoperatoremail());
Trigger trigger= TriggerBuilder
.newTrigger()
.withIdentity( "定时消息触发" +platNewsVo.getId(), "站内消息" )
.startAt(timingTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds( 0 ) //时间间隔
.withRepeatCount( 0 ) //重复次数
)
.build();
//启动定时任务
try {
Scheduler sched = sf.getScheduler();
sched.scheduleJob(jobDetail,trigger);
if (!sched.isShutdown()){
sched.start();
}
} catch (SchedulerException e) {
logger.info(e.toString());
}
logger.info( "完成开启定时推送消息任务。。。" );
}
/**
* @Description : 建立websocket链接后的推送线程
*/
class AfterConnectionEstablishedTask implements Runnable{
String email ;
public AfterConnectionEstablishedTask(String email){
this .email = email;
}
@Override
public void run() {
logger.info( "开始推送消息给用户:" +email+ "。。。" );
if (!StringUtils.isBlank(email)){
SearchCondition searchCondition = new SearchCondition();
searchCondition.setOperatorEmail(email);
JSONArray jsonArray = new JSONArray();
for (PlatNewsCategoryType type : PlatNewsCategoryType.values()){
searchCondition.setTypeId(type.getCategoryId());
int count = platNewsService.countPlatNewsByExample(searchCondition);
JSONObject object = new JSONObject();
object.put( "name" ,type.name());
object.put( "description" ,type.getDescription());
object.put( "count" ,count);
jsonArray.add(object);
}
if ( null != jsonArray && jsonArray.size()> 0 ){
UserSocketVo userSocketVo = WSSessionLocalCache.get(email);
TextMessage reMessage = new TextMessage(gson.toJson(jsonArray));
try {
if ( null != userSocketVo){
//推送消息
userSocketVo.getWebSocketSession().sendMessage(reMessage);
//更新推送时间
userSocketVo.setLastSendTime(DateUtil.getNowDate());
logger.info( "完成推送新消息给用户:" +userSocketVo.getUserEmail()+ "。。。" );
}
} catch (IOException e) {
logger.error(e.toString());
logger.info( "站内消息推送失败。。。" +e.toString());
}
}
}
logger.info( "结束推送消息给" +email+ "。。。" );
}
}
}
|
这个类就是websocket的核心业务的实现,其具体肯定和业务相关,由于业务的不同,实现肯定不同,因为老夫参与的系统是发送消息,所以里面最核心的一句就是:
1
|
userSocketVo.getWebSocketSession().sendMessage(reMessage);
|
通过WebSocketSession的sendMessage方法把我们的消息发送出去。另外,这主要是后端的实现,至于前端的实现,因为老夫是后端程序猿比较关注后端,所以前端就不多做介绍了,大家可以自己去网上查资料。最后需要说明的是,老夫之前搜一些学习资料的时候,发现老夫该同事的写法和有一篇文章几乎一样,我想该同事应该是参考了这篇文章,所以列在下面,算作参考资料。