Java Redis实现消息队列,异步记录日志

时间:2024-03-22 22:21:52

通过Redis实现消息队列主要用的是Redis数据类型List,Lpush,Brpop两个命令。Redis实现的消息队列肯定不如MQ的功能强大,灵活。但是实现一些不复杂的生产者-消费者模型还是可以参考的。毕竟不用单独搭建一套MQ。

Lpush:命令将一个或多个值插入到列表头部。 如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。 当 key 存在但不是列表类型时,返回一个错误。

Brpop :命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

下面就以异步记录访问日志为例通过Redis实现简单的消息队列异步记录系统访问日志。

原理:记录日志时通过lpush放入list中,开启一个线程监听这个list,如果list没有记录则线程阻塞。

我的项目采用ssm+jedis

spring配置文件:

Java Redis实现消息队列,异步记录日志

Redis实现类:

package com.example.demo.redis.jedisUtils;
 
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
 
public class JedisClientPool implements JedisClient {
    //创建JedisPool来操作单机版的数据库
    @Autowired
    private JedisPool jedisPool;
 
    @Override
    public String set(String key, String value) {
        Jedis jedis = jedisPool.getResource();
        String result = jedis.set(key, value);
        jedis.close();
        return result;
    }
 
    @Override
    public String get(String key) {
        Jedis jedis = jedisPool.getResource();
        String result = jedis.get(key);
        jedis.close();
        return result;
    }
 
    @Override
    public Boolean exists(String key) {
        Jedis jedis = jedisPool.getResource();
        Boolean result = jedis.exists(key);
        jedis.close();
        return result;
    }
 
    @Override
    public Long expire(String key, int seconds) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.expire(key, seconds);
        jedis.close();
        return result;
    }
 
    @Override
    public Long ttl(String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.ttl(key);
        jedis.close();
        return result;
    }
 
    @Override
    public Long incr(String key) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.incr(key);
        jedis.close();
        return result;
    }
 
    @Override
    public Long hset(String key, String field, String value) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.hset(key, field, value);
        jedis.close();
        return result;
    }
 
    @Override
    public String hget(String key, String field) {
        Jedis jedis = jedisPool.getResource();
        String result = jedis.hget(key, field);
        jedis.close();
        return result;
    }
 
    @Override
    public Long hdel(String key, String... field) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.hdel(key, field);
        jedis.close();
        return result;
    }

    @Override
    public long lpush(String key, String value) {
        Jedis jedis = jedisPool.getResource();
        Long result = jedis.lpush(key, value);
        jedis.close();
        return result;
    }

    @Override
    public List<String> brpop(String key) {
        Jedis jedis = jedisPool.getResource();
        List<String> result = jedis.brpop(0, key);
        jedis.close();
        return result;
    }
 
}

 

 

 

线程监听处理任务类:

package com.example.demo.redis.task;

import java.util.List;

import net.sf.json.JSONObject;

import com.example.demo.redis.bean.RequestLogBean;
import com.example.demo.redis.dao.RequestLogDao;
import com.example.demo.redis.jedisUtils.JedisClient;

public class WriteRequestLogTaskTask {
    private JedisClient jedis;
    private RequestLogDao requestLogDao;
    WriteRequestLogTaskTask(JedisClient jedisClient,RequestLogDao requestLogDao){
        this.jedis=jedisClient;
        this.requestLogDao=requestLogDao;
        if(jedis!=null){
             init();
        }
    }
    private void init(){
        //这块开启了一个线程记录日志,也可以用线程池开启多个线程记录 redis 多个线程操作一个队列 也不会出现 现场安全问题的
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("日志记录线程已经启动");
                while(true){
                    List<String>list=jedis.brpop(JedisClient.REQUEST_LOG_KEY);//brpop 如果队列为空 会一直阻塞
                    if(list!=null&&list.size()>0){
                        for(String str:list){
                            if(!JedisClient.REQUEST_LOG_KEY.equalsIgnoreCase(str)){
                                JSONObject json=JSONObject.fromObject(str);
                                RequestLogBean bean=(RequestLogBean)JSONObject.toBean(json,RequestLogBean.class);
                                requestLogDao.saveLog(bean);
                            }
                            //字符串转成实体类
                            //写入数据库
                        }
                        
                    }
                }
                
            }
        });
        
        thread.start();
        
    }

}

记录日志:

package com.example.demo.redis.controller;

import java.io.IOException;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import net.sf.json.JSONObject;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import com.example.demo.redis.bean.RequestLogBean;
import com.example.demo.redis.jedisUtils.JedisClient;

@Controller
@RequestMapping("/test")
public class TestRedisController {
    @Autowired
    public JedisClient jedis;
    @RequestMapping("/redis")
    public void redis(HttpServletRequest request,HttpServletResponse response) throws IOException{
        String requestId=request.getParameter("requestid");
        String params=request.getParameter("params");
        String ip =request.getRemoteAddr();
        String result="SUCCESS";
        //业务请求逻辑
        
        //记录日志
        RequestLogBean requestLogBean=new RequestLogBean(requestId,params,result,System.currentTimeMillis()+"",ip);
        jedis.lpush(JedisClient.REQUEST_LOG_KEY, JSONObject.fromObject(requestLogBean).toString());//这块存进redis的是json字符串 有可以是其他类型,后面转成实体(如果直接存 实体类型要序列化)
        response.getWriter().write("请求成功!");
}
}