Pipeline:“管道”,和很多设计模式中的“管道”具有同样的概念,pipleline的操作,将明确client与server端的交互,都是“单向的”:你可以将多个command,依次发给server,但在此期间,你将无法获得单个command的响应数据,此后你可以关闭“请求”,然后依次获取每个command的响应结果。
从简单来说,在IO操作层面,对于client而言,就是一次批量的连续的“write”请求,然后是批量的连续的“read”操作。其实对于底层Socket-IO而言,对于client而言,只不过是多次write,然后一次read的操作;对于server端,input通道上read到数据之后,就会立即被实施,也会和非pipeline一样在output通道上输出执行的结果,只不过此时数据会被阻塞在网络缓冲区上,直到client端开始read或者关闭链接。
redisService:
public interface RedisService {
/**
* 根据 redis 的key的集合批量查询对应存在数据
* @param keys
* @param useParallel 是否使用parallel 在没有顺序要求的时候,提高效率,true为表示使用,false 表示不用,默认为true
* @return
*/
Map<String,Object> batchQueryByKeys(List<String> keys,Boolean useParallel);
/**
* 使用管道的方式,批量添加数据
* @param keyValueMap
* @param expire 过期时间
* @return
*/
List<Object> batchPutInPipelined(Map<String,Object> keyValueMap,long expire);
/**
* batch increment an integer value in pipelined
*
* @param entityList
*
* @return
*
*/
List<MapEntity> batIncByPipelined(List<MapEntity> entityList);
}
AbstractRedisService 抽象类
public abstract class AbstractRedisService implements RedisService{
private static final Logger logger = ();
protected abstract RedisTemplate getRedisTemplate();
@Override
public Boolean expire(String key, long timeout, TimeUnit timeUnit) {
return getRedisTemplate().expire(key,timeout,timeUnit);
}
@Override
public Map<String,Object> batchQueryByKeys(List<String> keys,Boolean useParallel){
if(null == keys || () == 0 ){
return null;
}
if(null == useParallel){
useParallel = true;
}
List<Object> results = getRedisTemplate().executePipelined(
new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for(String key:keys) {
(key);
}
return null;
}
});
if(null == results || () == 0 ){return null;}
Map<String,Object> resultMap = null;
if(useParallel){
Map<String,Object> resultMapOne = (new HashMap<String,Object>());
().forEach(t -> {
(t,((t)));
});
resultMap = resultMapOne;
}else{
Map<String,Object> resultMapTwo = new HashMap<>();
for(String t:keys){
(t,((t)));
}
resultMap = resultMapTwo;
}
return resultMap;
}
@Override
public List<Object> batchPutInPipelined(Map<String,Object> keyValueMap,long expire){
List<Object> results = getRedisTemplate().executePipelined(
new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for(String key:()) {
if(null != (key)){
(key, (key).toString());
(key,expire);
}
}
return null;
}
});
return results;
}
@Override
public List<MapEntity> batIncByPipelined(List<MapEntity> params){
List<Object> results = getRedisTemplate().executePipelined(
new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for(MapEntity entity:params) {
if(null != entity){
((), ());
}
}
return null;
}
});
List<MapEntity> resultList = ();
for(MapEntity entity:params){
(new MapEntity((),(Long)((entity))));
}
return resultList;
}
}
RedisServiceImpl 实现类
public class RedisServiceImpl extends AbstractRedisService {
private RedisTemplate redisTemplate;
public RedisServiceImpl(RedisTemplate redisTemplate) {
= redisTemplate;
}
@Override
protected RedisTemplate getRedisTemplate() {
return redisTemplate;
}
}
补充: 具体RedisTemplate 替代 jedis 方法 点这里 /yinfengjiujian/p/