离线消息处理
NotOnlineExecute
package com.example.im.infra.executor.send;
import com.example.im.endpoint.WebSocketEndpoint;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Set;
/**
* @author PC
* 不在线处理方式
*/
@Component
public class NotOnlineExecute {
private final static Logger logger = LoggerFactory.getLogger(NotOnlineExecute.class);
private RedisTemplate<String, String> redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 用户不在线时保存离线消息
*
* @param notOnlineReceiverSet 不在线人员列表
* @param message 消息
*/
public void notOnlineMessageSave(Set<String> notOnlineReceiverSet, String message) {
//离线消息
notOnlineReceiverSet.forEach(receiver -> redisTemplate.opsForSet().add("offline_messages:" + receiver, message));
}
/**
* 用户上线时进行发送
*
* @param receiverName 接收人
*/
public void sendOnline(String receiverName) {
int receiverNameHashCode = receiverName.hashCode();
Set<String> messageSet = redisTemplate.opsForSet().members("offline_messages:" + receiverName);
if (CollectionUtils.isEmpty(messageSet)) {
logger.info(receiverName + "no offline messages");
return;
}
messageSet.forEach(message -> {
try {
WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverNameHashCode).getSession().getBasicRemote()
.sendText(message);
} catch (IOException e) {
logger.error("An error occurred when the user: {} received an offline message: {}", receiverName, message);
}
});
redisTemplate.delete("offline_messages:" + receiverName);
}
}
com.example.im.infra.executor.send.DefaultSendExecutor#sendToUser
向redis添加未送达消息
notOnlineReceiverSet = notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {
//处理未送达消息
notOnlineExecute.notOnlineMessageSave(notOnlineReceiverSet, generatorMessage(message));
logger.info("not online number is " + notOnlineReceiverSet.size());
logger.info("The user : {} is not online", String.join(",", notOnlineReceiverSet));
}
com.example.im.endpoint.WebSocketEndpoint#onOpen
用户登录时触发离线消息发送
WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
//触发离线消息发送
notOnlineExecute.sendOnline(userName);
测试
test2向test1发送消息,test1不进行连接
test1连接,接收到离线消息
参考资料
[1].处理离线消息代码