通过Redis实现消息队列主要用的是Redis数据类型List,Lpush,Brpop两个命令。Redis实现的消息队列肯定不如MQ的功能强大,灵活。但是实现一些不复杂的生产者-消费者模型还是可以参考的。毕竟不用单独搭建一套MQ。
Lpush:命令将一个或多个值插入到列表头部。 如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。 当 key 存在但不是列表类型时,返回一个错误。
Brpop :命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
下面就以异步记录访问日志为例通过Redis实现简单的消息队列异步记录系统访问日志。
原理:记录日志时通过lpush放入list中,开启一个线程监听这个list,如果list没有记录则线程阻塞。
我的项目采用ssm+jedis
spring配置文件:
Redis实现类:
package com.example.demo.redis.jedisUtils;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class JedisClientPool implements JedisClient {
//创建JedisPool来操作单机版的数据库
@Autowired
private JedisPool jedisPool;
@Override
public String set(String key, String value) {
Jedis jedis = jedisPool.getResource();
String result = jedis.set(key, value);
jedis.close();
return result;
}
@Override
public String get(String key) {
Jedis jedis = jedisPool.getResource();
String result = jedis.get(key);
jedis.close();
return result;
}
@Override
public Boolean exists(String key) {
Jedis jedis = jedisPool.getResource();
Boolean result = jedis.exists(key);
jedis.close();
return result;
}
@Override
public Long expire(String key, int seconds) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.expire(key, seconds);
jedis.close();
return result;
}
@Override
public Long ttl(String key) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.ttl(key);
jedis.close();
return result;
}
@Override
public Long incr(String key) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.incr(key);
jedis.close();
return result;
}
@Override
public Long hset(String key, String field, String value) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.hset(key, field, value);
jedis.close();
return result;
}
@Override
public String hget(String key, String field) {
Jedis jedis = jedisPool.getResource();
String result = jedis.hget(key, field);
jedis.close();
return result;
}
@Override
public Long hdel(String key, String... field) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.hdel(key, field);
jedis.close();
return result;
}
@Override
public long lpush(String key, String value) {
Jedis jedis = jedisPool.getResource();
Long result = jedis.lpush(key, value);
jedis.close();
return result;
}
@Override
public List<String> brpop(String key) {
Jedis jedis = jedisPool.getResource();
List<String> result = jedis.brpop(0, key);
jedis.close();
return result;
}
}
线程监听处理任务类:
package com.example.demo.redis.task;
import java.util.List;
import net.sf.json.JSONObject;
import com.example.demo.redis.bean.RequestLogBean;
import com.example.demo.redis.dao.RequestLogDao;
import com.example.demo.redis.jedisUtils.JedisClient;
public class WriteRequestLogTaskTask {
private JedisClient jedis;
private RequestLogDao requestLogDao;
WriteRequestLogTaskTask(JedisClient jedisClient,RequestLogDao requestLogDao){
this.jedis=jedisClient;
this.requestLogDao=requestLogDao;
if(jedis!=null){
init();
}
}
private void init(){
//这块开启了一个线程记录日志,也可以用线程池开启多个线程记录 redis 多个线程操作一个队列 也不会出现 现场安全问题的
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("日志记录线程已经启动");
while(true){
List<String>list=jedis.brpop(JedisClient.REQUEST_LOG_KEY);//brpop 如果队列为空 会一直阻塞
if(list!=null&&list.size()>0){
for(String str:list){
if(!JedisClient.REQUEST_LOG_KEY.equalsIgnoreCase(str)){
JSONObject json=JSONObject.fromObject(str);
RequestLogBean bean=(RequestLogBean)JSONObject.toBean(json,RequestLogBean.class);
requestLogDao.saveLog(bean);
}
//字符串转成实体类
//写入数据库
}
}
}
}
});
thread.start();
}
}
记录日志:
package com.example.demo.redis.controller;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.example.demo.redis.bean.RequestLogBean;
import com.example.demo.redis.jedisUtils.JedisClient;
@Controller
@RequestMapping("/test")
public class TestRedisController {
@Autowired
public JedisClient jedis;
@RequestMapping("/redis")
public void redis(HttpServletRequest request,HttpServletResponse response) throws IOException{
String requestId=request.getParameter("requestid");
String params=request.getParameter("params");
String ip =request.getRemoteAddr();
String result="SUCCESS";
//业务请求逻辑
//记录日志
RequestLogBean requestLogBean=new RequestLogBean(requestId,params,result,System.currentTimeMillis()+"",ip);
jedis.lpush(JedisClient.REQUEST_LOG_KEY, JSONObject.fromObject(requestLogBean).toString());//这块存进redis的是json字符串 有可以是其他类型,后面转成实体(如果直接存 实体类型要序列化)
response.getWriter().write("请求成功!");
}
}