一.简介
基于redis队列的生产者消费者实现主要是利用redis的blpop或者brpop命令,以下是官方文档对这两个命令的描述:
BLPOP 是列表的阻塞式(blocking)弹出原语。
它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
当给定多个 key
参数时,按参数 key
的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
BRPOP的描述差不多,这里就不重复了。
那么有了这两个命令,实现生产者消费者模式就有思路了,我们从外界数据源不停的传入数据到redis指定的list里面,此时不管有没有消费者,我们的数据是会存储在list里的。
然后消费者的程序只需要调用blpop命令,如果指定的list里面有数据,就能从里面取得list最左边的数据;如果指定的list里面没有数据,那么就会阻塞在那,直到list里面来了新数据或者已经达到阻塞时间为止。
二.普通生产者消费者代码:
生产者我们就用自己生成的数据模仿。
public class RedisProducer { public static void main(String[] args) throws InterruptedException { Jedis jedis=JavaRedisUtils.getJedis(); jedis.select(4); int count=0; while(count<100){ Thread.sleep(300); jedis.lpush("mylist",String.valueOf(count)); count++; } jedis.close(); } }然后消费者得集成Thread类,重写run方法,我们可以在run方法里面写一些对取出来的数据需要进行的业务操作,我这里就是简单的打印出来判断是否取出数据。
public class Consumer extends Thread{ String name; public Consumer(String name) { this.name = name; } @Override public void run(){ Jedis jedis = JavaRedisUtils.getJedis(); while(true) { jedis.select(4); //阻塞式brpop,List中无数据时阻塞 //参数0表示一直阻塞下去,直到List出现数据 List<String> list = jedis.blpop(0, "mylist"); for(String s : list) { System.out.println(name+" "+s); } jedis.close(); } } }
下面是程序的consumer执行类:
public class RedisConsumer { public static void main(String[] args) { Consumer mq1=new Consumer("mq1"); Consumer mq2=new Consumer("mq2"); mq1.start(); mq2.start(); }
下面是程序运行部分结果:
我们可以从结果中看到,我们的消费者是真的取到了数据并且在原始没有数据的时候,我们的消费者是阻塞了的,直到新数据来临才继续取数据。
为了更加方便的观看到生产者和消费者的程序执行情况,我们将从"mylist"中的消费数据利用redis的brpoplpush命令将数据从mylist消费到各个消费者自己名字的列表中。
下面是brpoplpush的解释:
BRPOPLPUSH 是 RPOPLPUSH 的阻塞版本,当给定列表 source
不为空时, BRPOPLPUSH 的表现和 RPOPLPUSH 一样。
当列表 source
为空时, BRPOPLPUSH 命令将阻塞连接,直到等待超时,或有另一个客户端对 source
执行 LPUSH 或 RPUSH 命令为止。
超时参数 timeout
接受一个以秒为单位的数字作为值。超时参数设为 0
表示阻塞时间可以无限期延长(block indefinitely) 。
- 返回值:
-
假如在指定时间内没有任何元素被弹出,则返回一个
nil
和等待时长。反之,返回一个含有两个元素的列表,第一个元素是被弹出元素的值,第二个元素是等待时长。
public class Consumer extends Thread{ String name; public Consumer(String name) { this.name = name; } @Override public void run(){ Jedis jedis = JavaRedisUtils.getJedis(); jedis.select(4); while(true) { //调用brpoplpush方法 从mylist取出来然后放到对应name的list去 String a=jedis.brpoplpush("mylist",name,0); } } }
运行程序之后,redis库中出现了mq1以及mq2的list,并且他们分别消费了mylist中的所有数据:
以及它们分别消费的数目:
三.在消费过程中新增加消费者
上面我们已经做过实验了,它能够做到生产者和消费者能做到的事情:当list没有数据的时候,消费者会阻塞,当list新来数据的时候,它会接着进行消费。那么当新来一个新的消费者的时候,它会有什么变化呢?
新加入消费者的代码如下:
public class addconsumer { public static void main(String[] args) { Consumer mq3=new Consumer("mq3"); mq3.start(); } }
下面我们先运行生产者,紧接着运行消费者mq1和mq2,等它们消费一段时间,并且生产者数据还在传输的时候,我们开启消费者mq3。让我们来看看结果会是怎么样。
redis数据库中产生了三个列表。
它们分别的数据量为:
说明,当新加入消费者的时候,它会和其它两个消费者内部竞争,然后一起消费没有消费过的数据。
以上是redis队列实现的消费者和生产者demo,希望可以给大家提供到帮助。