基于redis实现队列
/**
* Redis队列 实现类
*
*/
public class RedisQueueTemplate {
private Log logger = LogFactory.getLog(RedisQueueTemplate.class);
private StringRedisTemplate redisTemplate;
private static final String QUESUE_LOCK_KEY_SUFFIX = "_gsoft_lock";
public RedisQueueTemplate(StringRedisTemplate redisTemplate) {
if (redisTemplate == null) {
throw new RuntimeException("redisTemplate can not be null ");
}
this.redisTemplate = redisTemplate;
}
public long rPush(final String queueName, final String value) {
return redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)),
redisTemplate.getStringSerializer().serialize(value));
}
});
}
public String lPop(final String queueName) {
return redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
byte[] value = connection.lPop(redisTemplate.getStringSerializer().serialize((queueName)));
return redisTemplate.getStringSerializer().deserialize(value);
}
});
}
public long rPush(final String queueName, final List<String> values) {
if (values != null && values.size() > 0) {
return redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
List<byte[]> valuesByte = new ArrayList<>();
for (String value : values) {
if (value != null) {
valuesByte.add(redisTemplate.getStringSerializer().serialize(value));
}
}
return connection.rPush(redisTemplate.getStringSerializer().serialize((queueName)),
valuesByte.toArray(new byte[valuesByte.size()][]));
}
});
}
return 0L;
}
public List<String> lPop(final String queueName, final int length) {
return redisTemplate.execute(new RedisCallback<List<String>>() {
// long s = ();
@Override
public List<String> doInRedis(RedisConnection connection) throws DataAccessException {
List<String> values = new ArrayList<>();
byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
byte[] lockKey = redisTemplate.getStringSerializer().serialize((queueName) + QUESUE_LOCK_KEY_SUFFIX);
try {
while (!connection.setNX(lockKey, key)) {// 队列加锁
try {
Thread.sleep(10);
} catch (Exception e) {
logger.warn(" RedisQueueTemplate sleep Exception .....");
}
}
connection.expire(lockKey, 5);// 设计锁的超时时间
List<byte[]> valuesByte = connection.lRange(key, 0, length - 1);
connection.lTrim(key, valuesByte.size(), -1);
for (byte[] valueByte : valuesByte) {
values.add(redisTemplate.getStringSerializer().deserialize(valueByte));
}
} finally {
connection.del(lockKey);
}
// long s2 = ();
// if (s2 - 1000 > s) {
// ("lPop WARN Time consuming: " + (s2 - 1000 > s));
// }
return values;
}
});
}
public List<String> lRange(final String queueName, final int length) {
return redisTemplate.execute(new RedisCallback<List<String>>() {
@Override
public List<String> doInRedis(RedisConnection connection) throws DataAccessException {
List<String> values = new ArrayList<>();
byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
List<byte[]> valuesByte = connection.lRange(key, 0, length - 1);
for (byte[] valueByte : valuesByte) {
values.add(redisTemplate.getStringSerializer().deserialize(valueByte));
}
return values;
}
});
}
public void lRem(final String queueName, final int length) {
redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
connection.lTrim(key, length, -1);
return 1L;
}
});
}
public long lLen(final String queueName) {
return redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
byte[] key = redisTemplate.getStringSerializer().serialize((queueName));
return connection.lLen(key);
}
});
}
}