可以利用redis存储数据类型的list类型实现消息发送与消费的一对一模式,使用lpush向list的左端推送数据(发送消息),使用rpop从右端接收数据(消费消息)。由于rpop需要周期性的从list中获取数据,可以考虑使用brpop代替rpop,brpop是一个阻塞方法,直到获取到数据。代码如下
生产者的pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tansun</groupId> <artifactId>ProducerTest</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies> </project>生产者的主方法
package com.tansun; import redis.clients.jedis.Jedis; public class ProducerTest { @SuppressWarnings("resource") public static void main(String[] args) { Jedis jedis = new Jedis("192.168.229.128", 6379); // 向键为“test queue”的值的左端推入数据 jedis.lpush("test queue", "message: hello redis queue"); } }
消费者的pom文件与生产者相同
消费者的主方法
package com.tansun; import java.util.List; import redis.clients.jedis.Jedis; public class ConsumerTest { @SuppressWarnings("resource") public static void main(String[] args) { Jedis jedis = new Jedis("192.168.229.128", 6379); while(true){ // 设置超时时间为0,表示无限期阻塞 List<String> message = jedis.brpop(0, "test queue"); System.out.println(message); } } }
至此,已经实现了消息队列的queue模式发送和消费消息。
看一下brpop方法源码,发现还有一个重载的方法,源码如下:
public List<String> brpop(final int timeout, final String... keys) { return brpop(getArgsAddTimeout(timeout, keys)); }
public List<String> brpop(String... args) { checkIsInMultiOrPipeline(); client.brpop(args); client.setTimeoutInfinite(); try { return client.getMultiBulkReply(); } finally { client.rollbackTimeout(); } }
public void brpop(final String[] args) { final byte[][] bargs = new byte[args.length][]; for (int i = 0; i < bargs.length; i++) { bargs[i] = SafeEncoder.encode(args[i]); } brpop(bargs); }
protected Connection sendCommand(final Command cmd, final byte[]... args) { try { connect(); Protocol.sendCommand(outputStream, cmd, args); pipelinedCommands++; return this; } catch (JedisConnectionException ex) { /* * When client send request which formed by invalid protocol, Redis send back error message * before close connection. We try to read it to provide reason of failure. */ try { String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } } catch (Exception e) { /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */ } // Any other exceptions related to connection? broken = true; throw ex; } }
从源码中可知这个方法可以从多个key获取元素,并且先从第一个key中获取元素,再依次向后获取,根据这个方法的特点,我们可以实现具有优先级的任务队列,代码不再赘述。
如果想了解redis实现消息队列的发布订阅模式,可以参考我的另一篇文章 http://blog.csdn.net/jia_costa/article/details/79033899