springboot中使用StringRedisTemplate 对redis进行批量插入, 使用管道executePipelined操作,提高效率
//批量插入redis
public void insertRedisBatch(final List<OutboundRequest> outs){
(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for (int i=0;i<();i++) {
OutboundRequest out=(i);
String key="RCS:REVOKE:"+()+":"+();
String value=()+","+();
((), ());//批量list
// (("aaa"+i).getBytes(),"nnn".getBytes());//批量String
String key2="RCS:REPORT:"+()+":"+();
String filed2="task";
String value2= ();
((),(),());//批量Map
((),()*60);//过期时间单位秒
}
return null;
}
});
}
//批量获取redis
public List<Object> queryRedisBatch(final List<OutboundRequest> outs){
List<Object> results = (
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for (int i=0;i<();i++) {
OutboundRequest out=(i);
String key="RCS:REVOKE:"+()+":"+();
(key);
}
return null;
}
});
return results;
}
application,java
@Bean(name="cache") public Cache cache() { String cv= ("revokeQueueTimeout"); Integer revokeQueueTimeout=cv==null?():(cv); return new GuavaCache("cache", ().maximumSize(30000).expireAfterWrite(revokeQueueTimeout, ).build()); }
@Slf4j
@Component
public class RevokeQueueRedis {
@Autowired
@Qualifier("cache")
private Cache cache;
@Autowired
private Config config;
@Autowired
public StringRedisTemplate stringRedisTemplate;
// 撤回的电话号码插入redis的队列
public static BlockingQueue<OutboundRequest> revokeQueueRedis = new LinkedBlockingQueue<>(5000);
@PostConstruct
public void insert(){ //开启一个插入redis数据的线程
runThread();
}
public void runThread(){
new Thread(){
@Override
public void run() {
("---------开启批量插入撤回队列的线程");
while(true){
try{
if(()>0){
int tempcount = 100;
if (tempcount > ()) {
tempcount = ();
}
List<OutboundRequest> outs=new ArrayList<>();
for (int i =0; i < tempcount; i++) {
OutboundRequest out = ();
if (null != out) {
(out);
}
}
try {
insertRedisBatch(outs);
} catch (Exception e) {
("insert revokeQueueRedis error,msg={}",e);
}
(10);
}
(50);
} catch (Exception e) {
("insert revokeQueueRedis error={}",e);
(100);
}
}
}
}.start();
}
//批量插入redis
public void insertRedisBatch(final List<OutboundRequest> outs){
(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for (int i=0;i<();i++) {
OutboundRequest out=(i);
String key="RCS:REVOKE:"+()+":"+();
String value=()+","+();
((), ());//批量list
// (("aaa"+i).getBytes(),"nnn".getBytes());//批量String
if(null==(key,)) {//不存在说明没有设置过期时间
String cv=("revokeQueueTimeout");
Integer revokeQueueTimeout=cv==null?():(cv);
((), revokeQueueTimeout * 60);//过期时间单位秒
(key,out);
}
}
return null;
}
});
}
//批量获取redis
public List<Object> queryRedisBatch(final List<OutboundRequest> outs){
List<Object> results = (
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for (int i=0;i<();i++) {
OutboundRequest out=(i);
String key="RCS:REVOKE:"+()+":"+();
(key);
}
return null;
}
});
return results;
}
}