即时通讯 离线消息处理初版

时间:2024-10-23 07:40:58

离线消息处理

NotOnlineExecute

package com.example.im.infra.executor.send;

import com.example.im.endpoint.WebSocketEndpoint;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Set;

/**
 * @author PC
 * 不在线处理方式
 */
@Component
public class NotOnlineExecute {

    private final static Logger logger = LoggerFactory.getLogger(NotOnlineExecute.class);

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 用户不在线时保存离线消息
     *
     * @param notOnlineReceiverSet 不在线人员列表
     * @param message              消息
     */
    public void notOnlineMessageSave(Set<String> notOnlineReceiverSet, String message) {
        //离线消息
        notOnlineReceiverSet.forEach(receiver -> redisTemplate.opsForSet().add("offline_messages:" + receiver, message));
    }

    /**
     * 用户上线时进行发送
     *
     * @param receiverName 接收人
     */
    public void sendOnline(String receiverName) {
        int receiverNameHashCode = receiverName.hashCode();
        Set<String> messageSet = redisTemplate.opsForSet().members("offline_messages:" + receiverName);
        if (CollectionUtils.isEmpty(messageSet)) {
            logger.info(receiverName + "no offline messages");
            return;
        }
        messageSet.forEach(message -> {
            try {
                WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverNameHashCode).getSession().getBasicRemote()
                        .sendText(message);
            } catch (IOException e) {
                logger.error("An error occurred when the user: {} received an offline message: {}", receiverName, message);
            }
        });
        redisTemplate.delete("offline_messages:" + receiverName);
    }
}

com.example.im.infra.executor.send.DefaultSendExecutor#sendToUser

向redis添加未送达消息

notOnlineReceiverSet = notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {
    //处理未送达消息
    notOnlineExecute.notOnlineMessageSave(notOnlineReceiverSet, generatorMessage(message));
    logger.info("not online number is " + notOnlineReceiverSet.size());
    logger.info("The user : {} is not online", String.join(",", notOnlineReceiverSet));
}

com.example.im.endpoint.WebSocketEndpoint#onOpen

用户登录时触发离线消息发送

WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
//触发离线消息发送
notOnlineExecute.sendOnline(userName);

测试

test2向test1发送消息,test1不进行连接

test1连接,接收到离线消息

参考资料

[1].处理离线消息代码