基于redis实现队列

时间:2025-04-13 11:02:46
/** * Redis队列 实现类 * */ public class RedisQueueTemplate { private Log logger = LogFactory.getLog(RedisQueueTemplate.class); private StringRedisTemplate redisTemplate; private static final String QUESUE_LOCK_KEY_SUFFIX = "_gsoft_lock"; public RedisQueueTemplate(StringRedisTemplate redisTemplate) { if (redisTemplate == null) { throw new RuntimeException("redisTemplate can not be null "); } this.redisTemplate = redisTemplate; } public long rPush(final String queueName, final String value) { return redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)), redisTemplate.getStringSerializer().serialize(value)); } }); } public String lPop(final String queueName) { return redisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { byte[] value = connection.lPop(redisTemplate.getStringSerializer().serialize((queueName))); return redisTemplate.getStringSerializer().deserialize(value); } }); } public long rPush(final String queueName, final List<String> values) { if (values != null && values.size() > 0) { return redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { List<byte[]> valuesByte = new ArrayList<>(); for (String value : values) { if (value != null) { valuesByte.add(redisTemplate.getStringSerializer().serialize(value)); } } return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)), valuesByte.toArray(new byte[valuesByte.size()][])); } }); } return 0L; } public List<String> lPop(final String queueName, final int length) { return redisTemplate.execute(new RedisCallback<List<String>>() { // long s = (); @Override public List<String> doInRedis(RedisConnection connection) throws DataAccessException { List<String> values = new ArrayList<>(); byte[] key = redisTemplate.getStringSerializer().serialize((queueName)); byte[] lockKey = redisTemplate.getStringSerializer().serialize((queueName) + QUESUE_LOCK_KEY_SUFFIX); try { while (!connection.setNX(lockKey, key)) {// 队列加锁 try { Thread.sleep(10); } catch (Exception e) { logger.warn(" RedisQueueTemplate sleep Exception ....."); } } connection.expire(lockKey, 5);// 设计锁的超时时间 List<byte[]> valuesByte = connection.lRange(key, 0, length - 1); connection.lTrim(key, valuesByte.size(), -1); for (byte[] valueByte : valuesByte) { values.add(redisTemplate.getStringSerializer().deserialize(valueByte)); } } finally { connection.del(lockKey); } // long s2 = (); // if (s2 - 1000 > s) { // ("lPop WARN Time consuming: " + (s2 - 1000 > s)); // } return values; } }); } public List<String> lRange(final String queueName, final int length) { return redisTemplate.execute(new RedisCallback<List<String>>() { @Override public List<String> doInRedis(RedisConnection connection) throws DataAccessException { List<String> values = new ArrayList<>(); byte[] key = redisTemplate.getStringSerializer().serialize((queueName)); List<byte[]> valuesByte = connection.lRange(key, 0, length - 1); for (byte[] valueByte : valuesByte) { values.add(redisTemplate.getStringSerializer().deserialize(valueByte)); } return values; } }); } public void lRem(final String queueName, final int length) { redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { byte[] key = redisTemplate.getStringSerializer().serialize((queueName)); connection.lTrim(key, length, -1); return 1L; } }); } public long lLen(final String queueName) { return redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { byte[] key = redisTemplate.getStringSerializer().serialize((queueName)); return connection.lLen(key); } }); } }