StringRedisTemplate 批量插入数据并且设置过期时间,批量获取redis数据信息

时间:2025-03-22 09:30:50

springboot中使用StringRedisTemplate   对redis进行批量插入, 使用管道executePipelined操作,提高效率

 

//批量插入redis
    public void insertRedisBatch(final List<OutboundRequest> outs){
        (new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                for (int i=0;i<();i++) {
                    OutboundRequest out=(i);
                    String key="RCS:REVOKE:"+()+":"+();
                    String value=()+","+();
                    ((), ());//批量list
                   // (("aaa"+i).getBytes(),"nnn".getBytes());//批量String

 String key2="RCS:REPORT:"+()+":"+();
                        String filed2="task";
                        String value2= ();
                        ((),(),());//批量Map
                    ((),()*60);//过期时间单位秒
                }
                return null;
            }
        });
    }

    //批量获取redis
    public  List<Object> queryRedisBatch(final List<OutboundRequest> outs){
        List<Object> results = (
                new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        for (int i=0;i<();i++) {
                            OutboundRequest out=(i);
                            String key="RCS:REVOKE:"+()+":"+();
                            (key);
                        }
                        return null;
                    }
                });
        return results;
    }

 

 

 

application,java

@Bean(name="cache")
public Cache cache() {
    String cv= ("revokeQueueTimeout");
    Integer revokeQueueTimeout=cv==null?():(cv);
    return new GuavaCache("cache", ().maximumSize(30000).expireAfterWrite(revokeQueueTimeout, ).build());
}
@Slf4j
@Component
public class RevokeQueueRedis {

    @Autowired
    @Qualifier("cache")
    private Cache cache;
    @Autowired
    private Config config;
    @Autowired
    public StringRedisTemplate stringRedisTemplate;

    // 撤回的电话号码插入redis的队列
    public static BlockingQueue<OutboundRequest> revokeQueueRedis = new LinkedBlockingQueue<>(5000);

    @PostConstruct
    public void  insert(){ //开启一个插入redis数据的线程
        runThread();
    }
    public  void runThread(){
        new Thread(){
            @Override
            public void run() {
                ("---------开启批量插入撤回队列的线程");
                while(true){
                    try{
                    if(()>0){
                        int tempcount = 100;
                        if (tempcount > ()) {
                            tempcount = ();
                        }
                        List<OutboundRequest>  outs=new ArrayList<>();
                        for (int i =0; i < tempcount; i++) {
                            OutboundRequest out = ();
                            if (null != out) {
                                (out);
                            }
                        }
                        try {
                            insertRedisBatch(outs);
                        } catch (Exception e) {
                            ("insert revokeQueueRedis error,msg={}",e);
                        }
                        (10);
                    }
                    (50);
                } catch (Exception e) {
                    ("insert revokeQueueRedis error={}",e);
                    (100);
                }
                }
            }
        }.start();
    }

    //批量插入redis
    public void insertRedisBatch(final List<OutboundRequest> outs){
        (new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                for (int i=0;i<();i++) {
                    OutboundRequest out=(i);
                    String key="RCS:REVOKE:"+()+":"+();
                    String value=()+","+();
                    ((), ());//批量list
                   // (("aaa"+i).getBytes(),"nnn".getBytes());//批量String
                    if(null==(key,)) {//不存在说明没有设置过期时间
                        String cv=("revokeQueueTimeout");
                        Integer revokeQueueTimeout=cv==null?():(cv);
                        ((), revokeQueueTimeout * 60);//过期时间单位秒
                        (key,out);
                    }
                }
                return null;
            }
        });
    }

    //批量获取redis
    public  List<Object> queryRedisBatch(final List<OutboundRequest> outs){
        List<Object> results = (
                new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        for (int i=0;i<();i++) {
                            OutboundRequest out=(i);
                            String key="RCS:REVOKE:"+()+":"+();
                            (key);
                        }
                        return null;
                    }
                });
        return results;
    }

}