使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)

时间:2022-02-27 01:28:53

前言:

本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换

1、jedis的消息队列方法简述

1.1、发布消息方法

(其中,channel是对应消息通道,message是对应消息体)

jedis.publish(channel, message);

1.2、监听消息方法

(其中,jedisPubSub用于处理监听到的消息,channels是对应的通道)

jedis.subscribe(jedisPubSub, channels);

2、发布消息

/**
* 从jedis连接池获取jedis操作实例
* @return
*/
public static Jedis getJedis() {
return RedisPoolManager.getJedis();
} /**
* 推入消息到redis消息通道
*
* @param String
* channel
* @param String
* message
*/
public static void publish(String channel, String message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
}
} /**
* 推入消息到redis消息通道
*
* @param byte[]
* channel
* @param byte[]
* message
*/
public void publish(byte[] channel, byte[] message) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.publish(channel, message);
} finally {
jedis.close();
} }

3、监听消息

3.1、监听消息主体方法

/**
* 监听消息通道
* @param jedisPubSub - 监听任务
* @param channels - 要监听的消息通道
*/
public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
} /**
* 监听消息通道
* @param jedisPubSub - 监听任务
* @param channels - 要监听的消息通道
*/
public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.subscribe(jedisPubSub, channels);
} finally {
jedis.close();
}
}

3.2、处理监听到的消息任务

class Tasker implements Runnable {
private String[] channel = null;//监听的消息通道
private JedisPubSub jedisPubSub = null;//消息处理任务 public Tasker(JedisPubSub jedisPubSub, String ...channel) {
this.jedisPubSub = jedisPubSub;
this.channel = channel;
} @Override
public void run() {
// 监听channel通道的消息
RedisMQ.subscribe(jedisPubSub, channel);
} }

3.3、处理监听到的消息主体类实现

package cn.eguid.livePushServer.redisManager;

import java.util.Map;

import org.json.JSONObject;

import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub; public class RedisMQHandler extends JedisPubSub{
PushManager pushManager = null; public RedisMQHandler(PushManager pushManager) {
super();
this.pushManager = pushManager;
} @Override
// 接收到消息后进行分发执行
public void onMessage(String channel, String message) {
JSONObject jsonObj = new JSONObject(message);
System.out.println(channel+","+message);
if ("push".equals(channel)) {
Map<String,Object> map=jsonObj.toMap();
System.out.println("接收到一条推流消息,准备推流:"+map);
// String appName=pushManager.push(map);
//推流完成后还需要发布一个成功消息到返回队列 } else if ("close".equals(channel)) {
String appName=jsonObj.getString("appName");
System.out.println("接收到一条关闭消息,准备关闭应用:"+appName);
// pushManager.closePush(appName);
}
}
}

4、测试消息队列发布和监听

public static void main(String[] args) throws InterruptedException {

		PushManager pushManager= new PushManagerImpl();
Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
t1.start();
t2.start(); LivePushEntity livePushInfo=new LivePushEntity();
livePushInfo.setAppName("test1");
JSONObject json=new JSONObject(livePushInfo);
publish("push",json.toString());
publish("close", json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString());
Thread.sleep(2000);
publish("push", json.toString());
publish("close",json.toString()); }

使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)的更多相关文章

  1. 为什么会需要消息队列&lpar;MQ&rpar;?

    为什么会需要消息队列(MQ)? #################################################################################### ...

  2. 消息队列一:为什么需要消息队列&lpar;MQ&rpar;?

    为什么会需要消息队列(MQ)? #################################################################################### ...

  3. 详解RPC远程调用和消息队列MQ的区别

    PC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制. RPC框架 知名度较高的有Thrift(FB的).dubbo(阿里的). RP ...

  4. 消息队列MQ简介

    项目中要用到RabbitMQ,领导让我先了解一下.在之前的公司中,用到过消息队列MQ,阿里的那款RocketMQ,当时公司也做了简单的技术分享,自己也看了一些博客.自己在有道云笔记上,做了一些整理,但 ...

  5. 消息队列MQ&lpar;一&rpar;

    消息队列 为什么要用消息队列,都有什么优缺点? 要问的是消息队列都有哪些场景,然后项目里具体实现的什么场景,你在这个场景里用的什么消息队列? 期望的回答是,你们公司有个什么业务,这个业务场景有什么技术 ...

  6. java面试记录三:hashmap、hashtable、concurrentHashmap、ArrayList、linkedList、linkedHashmap、Object类的12个成员方法、消息队列MQ的种类

    口述题 1.HashMap的原理?(数组+单向链表.put.get.size方法) 非线程安全:(1)hash冲突:多线程某一时刻同时操作hashmap并执行put操作时,可能会产两个key的hash ...

  7. 消息通讯之关于消息队列MQ必须了解的相关概念

    目录 系统通讯方式有哪些? 消息队列的应用场景 消息队列通讯模型 常见的消息协议 AMQP MQTT ATOMP JMS 小结 系统通讯方式有哪些? RPC调用 RPC 全称 Remote Proce ...

  8. 消息队列 MQ 入门理解

    功能特性: 应用场景: 消息队列 MQ 可应用于如下几个场景: 分布式事务 在传统的事务处理中,多个系统之间的交互耦合到一个事务中,响应时间长,影响系统可用性.引入分布式事务消息,交易系统和消息队列之 ...

  9. 消息队列MQ集合

    消息队列MQ集合 消息队列简介 kafka简介 Centos7部署zookeeper和Kafka集群 .

随机推荐

  1. IRandomAccessStream&comma; IBuffer&comma; Stream&comma; byte&lbrack;&rsqb; 之间相互转换

    /* * 用于实现 IRandomAccessStream, IBuffer, Stream, byte[] 之间相互转换的帮助类 */ using System;using System.IO;us ...

  2. c&plus;&plus;表达式的一些小小的注意事项

    3+12>>1 = 7; 12>>1+3 =0; 3+(12>>1)=9;

  3. JS对日期时间的操作

    代码: //判断是否超期(有效期开始超过一年后算已超期) function IsEffect(effectDate) { var val = ""; var currentDate ...

  4. Java基础知识强化之IO流笔记19:FileOutputStream的三个write方法

    1. FileOutputStream的三个write方法:  void write(byte[] buffer)           Writes the entire contents of th ...

  5. Linux系统相关

    1. 图形界面启动的是哪个运行级别? 而我们平时用的命令行模式又是哪个运行级别? 除了图形和命令行模式两个常用级别外,其他运行级别代表什么涵义?如何更改系统的运行级别?图形界面启动的是5级别,命令行模 ...

  6. 可空类型 Nullable&lt&semi;T&gt&semi;

    Nullable<T> 内部实现了显示和隐式转换 显示转换: public static explicit operator T(T? value) { return value.Valu ...

  7. 数据结构之堆Heap

    1. 概述 堆(也叫优先队列),是一棵完全二叉树,它的特点是父节点的值大于(小于)两个子节点的值(分别称为大顶堆和小顶堆).它常用于管理算法执行过程中的信息,应用场景包括堆排序,优先队列等. 2. 堆 ...

  8. IE下获取XPATH小工具,支持32&sol;64位

    背景是曾经友情支持了测试组一小段时间,发现他们使用selenium做页面的自动化测试,需要用到XPath,但IE下没有获取XPath的工具,只能在Firefox和chrome下获取,步骤还比较麻烦.而 ...

  9. python基础07&lowbar;tuple&lowbar;dict

    tuple 元组     dict 字典 更详细参考:https://www.cnblogs.com/jin-xin/articles/7562422.html #!/usr/bin/env pyth ...

  10. P3868 &lbrack;TJOI2009&rsqb;猜数字

    [TJOI2009]猜数字 中国剩余定理 求解i=1 to n : x≡a[i] (mod b[i])的同余方程组 设 t= ∏i=1 to n b[i] 我们先求出 i=1 to n : x≡1 ( ...