使用redis管道(pipeline)实现批量查询,批量修改

时间:2025-03-22 09:31:39

 Pipeline:“管道”,和很多设计模式中的“管道”具有同样的概念,pipleline的操作,将明确client与server端的交互,都是“单向的”:你可以将多个command,依次发给server,但在此期间,你将无法获得单个command的响应数据,此后你可以关闭“请求”,然后依次获取每个command的响应结果。

    从简单来说,在IO操作层面,对于client而言,就是一次批量的连续的“write”请求,然后是批量的连续的“read”操作。其实对于底层Socket-IO而言,对于client而言,只不过是多次write,然后一次read的操作;对于server端,input通道上read到数据之后,就会立即被实施,也会和非pipeline一样在output通道上输出执行的结果,只不过此时数据会被阻塞在网络缓冲区上,直到client端开始read或者关闭链接。

redisService:

public interface RedisService {
    /**
     * 根据 redis  的key的集合批量查询对应存在数据
     * @param keys
     * @param useParallel  是否使用parallel 在没有顺序要求的时候,提高效率,true为表示使用,false 表示不用,默认为true
     * @return
     */
    Map<String,Object> batchQueryByKeys(List<String> keys,Boolean useParallel);


    /**
     * 使用管道的方式,批量添加数据
     * @param keyValueMap
     * @param expire 过期时间
     * @return
     */
    List<Object> batchPutInPipelined(Map<String,Object> keyValueMap,long expire);

    /**
     * batch increment an integer value in pipelined
     *
     * @param  entityList
     *
     * @return
     *
     */
    List<MapEntity> batIncByPipelined(List<MapEntity> entityList);


}

AbstractRedisService 抽象类

public abstract class AbstractRedisService implements RedisService{
    private static final Logger logger = ();
    protected abstract RedisTemplate getRedisTemplate();

    @Override
    public Boolean expire(String key, long timeout, TimeUnit timeUnit) {
        return getRedisTemplate().expire(key,timeout,timeUnit);
    }


    @Override
   public Map<String,Object> batchQueryByKeys(List<String> keys,Boolean useParallel){
        if(null == keys || () == 0 ){
            return null;
        }

        if(null == useParallel){
            useParallel = true;
        }

        List<Object> results = getRedisTemplate().executePipelined(
                new RedisCallback<Object>() {
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        for(String key:keys) {
                            (key);
                        }
                        return null;
                    }
                });
        if(null == results || () == 0 ){return null;}

        Map<String,Object> resultMap  =  null;

        if(useParallel){

            Map<String,Object> resultMapOne  = (new HashMap<String,Object>());

            ().forEach(t -> {
                (t,((t)));
            });

            resultMap = resultMapOne;

        }else{

            Map<String,Object> resultMapTwo  = new HashMap<>();

            for(String t:keys){
                (t,((t)));
            }

            resultMap = resultMapTwo;
        }

        return  resultMap;

    }

    @Override
   public List<Object> batchPutInPipelined(Map<String,Object> keyValueMap,long expire){

        List<Object> results = getRedisTemplate().executePipelined(
                new RedisCallback<Object>() {
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        for(String key:()) {
                            if(null != (key)){
                                (key, (key).toString());
                                (key,expire);
                            }
                        }
                        return null;
                    }
                });

        return  results;

    }


    @Override
    public List<MapEntity> batIncByPipelined(List<MapEntity> params){

        List<Object> results = getRedisTemplate().executePipelined(
                new RedisCallback<Object>() {
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        for(MapEntity entity:params) {
                            if(null != entity){
                                ((), ());
                            }
                        }
                        return null;
                    }
                });

        List<MapEntity> resultList  = ();

        for(MapEntity entity:params){
            (new MapEntity((),(Long)((entity))));
        }


        return  resultList;

    }
}

 RedisServiceImpl 实现类

public class RedisServiceImpl extends AbstractRedisService  {
    private RedisTemplate redisTemplate;

    public RedisServiceImpl(RedisTemplate redisTemplate) {
         = redisTemplate;
    }

    @Override
    protected RedisTemplate getRedisTemplate() {
        return redisTemplate;
    }
}

补充: 具体RedisTemplate 替代 jedis 方法 点这里 /yinfengjiujian/p/