Redis集群下过期key监听的实现代码

时间:2022-04-10 08:19:21

1. 前言

在使用redis集群时,发现过期key始终监听不到。网上也没有现成的解决方案。于是想,既然不能监听集群,那我可以建立多个redis连接,分别对每个redis的key过期进行监听。以上做法可能不尽人意,目前也没找到好的解决方案,如果有好的想法,请留言告知哦!不多说,直接贴我自己的代码!

2. 代码实现

关于Redis集群配置代码此处不贴,直接贴配置监听类代码!

?
1
2
3
4
5
6
7
8
9
10
11
12
redis.host1: 10.113.56.68
redis.port1: 7030
redis.host2: 10.113.56.68
redis.port2: 7031
redis.host3: 10.113.56.68
redis.port3: 7032
redis.host4: 10.113.56.68
redis.port4: 7033
redis.host5: 10.113.56.68
redis.port5: 7034
redis.host6: 10.113.56.68
redis.port6: 7035
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
 
import java.util.Arrays;
 
/**
 * @Author xiabing5
 * @Create 2019/8/6 14:46
 * @Desc  监听redis中Key过期事件
 **/
@Configuration
public class RedisListenerConfig {
 
  @Value("${redis.host1}")
  private String host1;
 
  @Value("${redis.host2}")
  private String host2;
 
  @Value("${redis.host3}")
  private String host3;
 
  @Value("${redis.host4}")
  private String host4;
 
  @Value("${redis.host5}")
  private String host5;
 
  @Value("${redis.host6}")
  private String host6;
 
  @Value("${redis.port1}")
  private int port1;
 
  @Value("${redis.port2}")
  private int port2;
 
  @Value("${redis.port3}")
  private int port3;
 
  @Value("${redis.port4}")
  private int port4;
 
  @Value("${redis.port5}")
  private int port5;
 
  @Value("${redis.port6}")
  private int port6;
 
  @Bean
  JedisPoolConfig jedisPoolConfig(){
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setMaxIdle(100);
    jedisPoolConfig.setMaxWaitMillis(1000);
    return jedisPoolConfig;
  }
 
  // redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
  @Bean
  RedisMessageListenerContainer redisContainer1() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host1);
    jedisConnectionFactory.setPort(port1);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisMessageListenerContainer redisContainer2() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host2);
    jedisConnectionFactory.setPort(port2);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisMessageListenerContainer redisContainer3() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host3);
    jedisConnectionFactory.setPort(port3);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisMessageListenerContainer redisContainer4() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host4);
    jedisConnectionFactory.setPort(port4);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisMessageListenerContainer redisContainer5() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host5);
    jedisConnectionFactory.setPort(port5);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisMessageListenerContainer redisContainer6() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
    jedisConnectionFactory.setHostName(host6);
    jedisConnectionFactory.setPort(port6);
    jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
    jedisConnectionFactory.afterPropertiesSet();
    container.setConnectionFactory(jedisConnectionFactory);
    return container;
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener1() {
    return new RedisKeyExpirationListener(redisContainer1());
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener2() {
    return new RedisKeyExpirationListener(redisContainer2());
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener3() {
    return new RedisKeyExpirationListener(redisContainer3());
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener4() {
    return new RedisKeyExpirationListener(redisContainer4());
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener5() {
    return new RedisKeyExpirationListener(redisContainer5());
  }
 
  @Bean
  RedisKeyExpirationListener redisKeyExpirationListener6() {
    return new RedisKeyExpirationListener(redisContainer6());
  }
 
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.util.Date;
 
/**
 * @Author xiabing5
 * @Create 2019/9/4 9:47
 * @Desc  redis过期监听
 **/
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
  @Autowired
  RedisUtil redisUtil;
  @Autowired
  LoginUserStatisticsMapper loginUserStatisticsMapper;
  public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
  }
  @Override
  public void onMessage(Message message, byte[] pattern) {
    // 用户做自己的业务处理即可,message.toString()可以获取失效的key
    String mesg = message.toString();  
  }
}

3. Redis防止过期key重复监听

对于项目集群情况下,部署多个服务后,容易出现redis过期被多个服务同时监听到,从而执行相同的业务逻辑,这不是我们期望的。单机部署下方法的同步可以采用synchronize关键字。但集群下,就得采用分布式锁。在需要加锁的地方,只要加锁和解锁即可。此处正好写到Redis,那就贴一个自己用的redis分布式锁。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.UUID;
/**
 * @Author xiabing5
 * @Create 2019/9/6 15:54
 * @Desc  redis分布式锁
 **/
@Component
public class RedisLock {
  @Autowired
  Jedis jedis;
  private static final String SET_IF_NOT_EXIST = "NX"; // NX表示如果不存在key就设置value
  private static final String SET_WITH_EXPIRE_TIME = "PX"; // PX表示毫秒
  // 加锁
  public String tryLock(String key,Long acquireTimeout) {
    // 生成随机value
    String identifierValue = UUID.randomUUID().toString();
    // 设置超时时间
    Long endTime = System.currentTimeMillis() + acquireTimeout;
    // 循环获取锁
    while (System.currentTimeMillis() < endTime) {
      String result = jedis.set(key,identifierValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, acquireTimeout);
      if("OK".equals(result)) {
        return identifierValue;
      }
    }
    return null;
  }
  // 解锁
//  public void delLock(String key,String identifierValue) {
//    // 判断是否是同一把锁
//    try{
//      if(jedis.get(key).equals(identifierValue)){
//        // 此处操作非原子性,容易造成释放非自己的锁
//        jedis.del(key);
//      }
//    }catch(Exception e) {
//      e.printStackTrace();
//    }
//  }
  // 使用Lua代码解锁
  public void delLock(String key,String identifierValue) {
    try{
      String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
      Long result = (Long) jedis.eval(script, Collections.singletonList(key), Collections.singletonList(identifierValue));
      if (1 == result) {
        System.out.println(result+"释放锁成功");
      } if (0 == result) {
        System.out.println(result+"释放锁失败");
      }
    }catch (Exception e) {
      e.printStackTrace();
    }
  }
}

 

4. 总结

自己实现的一个小demo,废话比较少。小白自己写的配置类,理解有问题请留言!自己实现的方案感觉不妥,只是基本完成需求,还得继续研究。

以上所述是小编给大家介绍的Redis集群下过期key监听的实现代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

原文链接:https://www.cnblogs.com/xiaobingblog/archive/2019/09/09/11490726.html