三.RabbitMQ之异步消息队列(Work Queue)

时间:2022-05-11 23:41:58

  上一篇文章简要介绍了RabbitMQ的基本知识点,并且写了一个简单的发送和接收消息的demo.这一篇文章继续介绍关于Work Queue(工作队列)方面的知识点,用于实现多个工作进程的分发式任务。

  一.Work Queues:我们可以把它翻译成工作队列,他有什么用呢?它的主要作用就是规避了实时的执行资源密集型任务( resource-intensive task),因为这会造成响应时间过长,影响用户体验。Work Queues通过把一些实时性不强的任务存储到消息队列中,然后后台的工作者(worker)会在特定的情况下完成这些任务。

  举个例子来说,用户注册是一个资源密集型的任务,因为它需要经过存储用户基本信息(用户名,邮箱,密码),发送邮箱验证码、或者更有甚者,存入注册日志(操作日志)等步骤。传统的串行做法如下所示。

  三.RabbitMQ之异步消息队列(Work Queue)

  可以看到,在用户填写完注册信息并点击提交以后,需要经历3个步骤,其中第一个步骤,判断注册信息是否合法,合法则存入数据库,这是注册的核心步骤,而后面两个步骤并不是十分迫切,无需在这个请求中马上完成。而传统的串行模式一般都是在一个请求中塞满逻辑处理,无论是否迫切的逻辑请求。这样会大大加重一个请求的负担,无论是用户等待时间,程序的压力上,都不是一种好的做法。

  尤其是对于web应用,我们知道一个web请求是一个短连接,在一个短连接中做过于复杂的逻辑运算操作,显然是不合适的。所以消息分布队列在web应用中尤为有用。

三.RabbitMQ之异步消息队列(Work Queue)

  我们将上述串行的的方式改为用消息队列的形式来实现,可以看到此时我把一个请求里面做的事情分解到三个请求来实现,这样每个请求的时间都降低了,特别对于用户而言,他的等待时间大大减少,而这样也可以充分利用了cpu的性能。

以上便是工作队列主要的原理及优点。

二.一个work queues的demo

  延续上一个demo的轨迹,并结合我们举的注册的例子,模拟用户注册业务。

  1.首先,我们编写一个生产者,它除了执行将注册数据存储进数据库的方法外,还向RabbitMQ队列里发送了两条消息,分别用于存储有关邮箱验证和日志存储的内容。代码如下。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class NewTask {
private final static String QUEUE_NAME="register"; public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.195");//服务器ip
factory.setPort(5672);//端口
factory.setUsername("xdx");//登录名
factory.setPassword("xxxxx");//密码
Connection connection=factory.newConnection();//建立连接
Channel channel=connection.createChannel();//建立频道
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一个队列
System.out.println("首先,保存用户注册数据到数据库");
JSONObject jsonObjet1=new JSONObject();
jsonObjet1.put("msgType", "email");//该消息是针对发送验证邮件的。
jsonObjet1.put("content", "执行发送验证邮件到邮箱操作");
String message1=jsonObjet1.toString();
channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//发布第一个异步消息
System.out.println(channel+" Sent '"+message1+"'");
JSONObject jsonObject2=new JSONObject();
jsonObject2.put("msgType", "log");//该消息针对存储操作日志
jsonObject2.put("content", "执行存储操作日志的操作");
String message2=jsonObject2.toString();
channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());//发布第二个异步消息
System.out.println(channel+" Sent '"+message2+"'");
channel.close();
connection.close();
}
}

  由上面的代码我们知道我们可以传输较为复杂的消息,我们用一个json类型对象来封装消息,并将该消息存储到消息队列中。执行上述代码,得到结果如下。

  首先,保存用户注册数据到数据库
  AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作"}'
  AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作"}'

  然后我们再到RabbitMQ的后台看看现在的queue的情况,发现多了一个名叫register的queue,并且在该queue中有两个消息,如下图所示。

三.RabbitMQ之异步消息队列(Work Queue)

  2.接下来我们编写一个消费者worker1,在worker1中,根据接收到的消息类型,调用不同的处理方法来处理消息中的任务。如下所示。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope; public class Worker1 {
private final static String QUEUE_NAME="register"; public static void main(String[] args) throws IOException, TimeoutException {
//下面的配置与生产者相对应
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.195");//服务器ip
factory.setPort(5672);//端口
factory.setUsername("xdx");//登录名
factory.setPassword("xxxxx");//密码
Connection connection=factory.newConnection();
final Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" worker1 Waiting for messages. To exit press CTRL+C");
//每次从队列获取的数量
channel.basicQos(1);
//defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取。
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
JSONObject jsonObject=JSONObject.fromObject(message);
String msgType=jsonObject.get("msgType").toString();
System.out.println(" wokrer1 Received message,msgType is " + msgType);
if(msgType.equals("email")){
//调用邮箱验证代码
System.out.println("worker1 do "+jsonObject.get("content"));
}else{
//调用日志保存代码
System.out.println("worker1 do "+jsonObject.get("content"));
}
} catch (Exception e) {
channel.abort();
}finally{
System.out.println("Worker1 Done");
//注意这句为必须,否则会造成RabbitMQ因为重复的重新发送已处理的消息而内存溢出
channel.basicAck(envelope.getDeliveryTag(),false);
} }
};
//接收到消息以后,推送给RabbitMQ,确认收到了消息。第二个参数为false,表示手动确认消息处理完毕
channel.basicConsume(QUEUE_NAME, false, consumer);
} }

  执行上述的代码,可以得到如下结果

   worker1 Waiting for messages. To exit press CTRL+C
   wokrer1 Received message,msgType is email
  worker1 do 执行发送验证邮件到邮箱操作
  Worker1 Done
   wokrer1 Received message,msgType is log
  worker1 do 执行存储操作日志的操作
  Worker1 Done

  可以看到我们能够解析到消息里面的内容,并且根据不同的消息类别调用不同的处理逻辑,上述代码需要注意的知识点均有注释。执行完毕后,再到RabbitMQ后台查看,发现待处理消息已经为0.

三.RabbitMQ之异步消息队列(Work Queue)

  3.并发处理,我们稍微改动一下NewTask方法,让它一次性发送多条消息到队列中。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class NewTask {
private final static String QUEUE_NAME="register"; public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.195");//服务器ip
factory.setPort(5672);//端口
factory.setUsername("xdx");//登录名
factory.setPassword("xxxxx");//密码
Connection connection=factory.newConnection();//建立连接
Channel channel=connection.createChannel();//建立频道
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一个队列
System.out.println("向消息队列中插入10条邮箱验证消息和10条日志存储消息");
for(int i=0;i<10;i++){
JSONObject jsonObjet1=new JSONObject();
jsonObjet1.put("msgType", "email");//该消息是针对发送验证邮件的。
jsonObjet1.put("content", "执行发送验证邮件到邮箱操作"+i);
String message1=jsonObjet1.toString();
channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//发布第一个异步消息
System.out.println(channel+" Sent '"+message1+"'");
JSONObject jsonObject2=new JSONObject();
jsonObject2.put("msgType", "log");//该消息针对存储操作日志
jsonObject2.put("content", "执行存储操作日志的操作"+i);
String message2=jsonObject2.toString();
channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());
System.out.println(channel+" Sent '"+message2+"'");
} channel.close();
connection.close();
}
}

  执行结果如下:

向消息队列中插入10条邮箱验证消息和10条日志存储消息
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作0"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作0"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作1"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作1"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作2"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作2"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作3"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作3"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作4"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作4"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作5"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作5"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作6"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作6"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作7"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作7"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作8"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作8"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作9"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作9"}'

 这样以后,现在队列中已经有了20条的数据,如下所示。

  三.RabbitMQ之异步消息队列(Work Queue)

  可以看到生产者已经生产成功,接下来我再编写一个消费者Worker2,用于分担Worker1的负担,它的代码与worker2基本类似,我们修改了worker1和worker2的代码,加入睡眠机制,每一个worker执行完消息的任务以后,如下。

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope; public class Worker2 {
private final static String QUEUE_NAME="register"; public static void main(String[] args) throws IOException, TimeoutException {
//下面的配置与生产者相对应
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.195");//服务器ip
factory.setPort(5672);//端口
factory.setUsername("xdx");//登录名
factory.setPassword("xxxx");//密码
Connection connection=factory.newConnection();
final Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//每次从队列获取的数量
channel.basicQos(1);
//defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取。
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
JSONObject jsonObject=JSONObject.fromObject(message);
String msgType=jsonObject.get("msgType").toString();
if(msgType.equals("email")){
//调用邮箱验证代码
System.out.println("worker2 do "+jsonObject.get("content"));
}else{
//调用日志保存代码
System.out.println("worker2 do "+jsonObject.get("content"));
}
} catch (Exception e) {
channel.abort();
}finally{
channel.basicAck(envelope.getDeliveryTag(),false);
//执行以后睡一会,好让其他的worker有机会执行任务
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }
};
//接收到消息以后,推送给RabbitMQ,告诉他确认收到了消息。第二个参数为false,表示手动确认消息处理完毕
channel.basicConsume(QUEUE_NAME, false, consumer);
} }

  现在我同时执行worker1和worker2的代码。

三.RabbitMQ之异步消息队列(Work Queue)

三.RabbitMQ之异步消息队列(Work Queue)

这样以后,在RabbitMQ的控制台,已经没有未处理的消息了。

三.RabbitMQ之异步消息队列(Work Queue)

  可以看到worker1和worker2确实分工合作,共同处理了这些消息队列中的任务。

  三.扩展

  1.message acknowledgment(消息确认):如果消费者在没有处理完一个消息就挂掉了,则这个消息就会遗失,所以必须在消费者代码中通知给RabbitMQ。默认是手动通知的,这样可以确保消息不会遗失。如果没有接收到确认,RabbitMQ会指派另外一个消费者处理任务。 channel.basicAck(envelope.getDeliveryTag(),false);和channel.basicConsume(QUEUE_NAME, false, consumer);都是必须的,否则会造成RabbitMQ无法释放已经处理过的消息和导致内存溢出。

  2.Message durability(消息持久化):可修改channel.queueDeclare("task_queue", durable, false, false, null);及channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());来使消息能持久化。需要注意的是,如果一个queue已经定义为非持久化,则不能再改为持久化,会出错,此时必须定义一个新的queue(换个名字)

  3.Fair dispatch(分配策略):默认情况下,RabbitMQ会平均的分配消息给消费者,它不会管这个消费者目前手上有多少未完成的任务,这可能会造成有的消费者很忙,有的消费者很闲。通过channel.basicQos(1);可以指定消费者每次只接收一条消息,只有当这条消息已经处理完毕,并且确认以后,才接收下一条的消息。

三.RabbitMQ之异步消息队列(Work Queue)的更多相关文章

  1. 八&period;利用springAMQP实现异步消息队列的日志管理

    经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解.俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块.大概的需求是这样的:系统中有很多地方需要记录操作日志,比如登 ...

  2. RabbitMQ AMQP &lpar;高级消息队列协议&rpar;

    目录 RabbitMQ AMQP (高级消息队列协议) Message Queue 简介 概念 基本组成 场景及作用 AMQP简介 模型架构 基础组件 AMQP-RabbitMQ 简介 模型 特性 参 ...

  3. C&num;实现异步消息队列

    原文:C#实现异步消息队列 拿到新书<.net框架设计>,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使用我们的知识,从这本书 ...

  4. 异步消息队列Celery

    Celery是异步消息队列, 可以在很多场景下进行灵活的应用.消息中包含了执行任务所需的的参数,用于启动任务执行, suoy所以消息队列也可以称作 在web应用开发中, 用户触发的某些事件需要较长事件 ...

  5. 第二百九十二节,RabbitMQ多设备消息队列-Python开发

    RabbitMQ多设备消息队列-Python开发 首先安装Python开发连接RabbitMQ的API,pika模块 pika模块为第三方模块  对于RabbitMQ来说,生产和消费不再针对内存里的一 ...

  6. 第二百九十一节,RabbitMQ多设备消息队列-安装与简介

    RabbitMQ多设备消息队列-安装与简介 RabbitMQ简介 解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议. AMQP协议是 ...

  7. C&num;后台异步消息队列实现

    简介 基于生产者消费者模式,我们可以开发出线程安全的异步消息队列. 知识储备 什么是生产者消费者模式? 为了方便理解,我们暂时将它理解为垃圾的产生到结束的过程. 简单来说,多住户产生垃圾(生产者)将垃 ...

  8. 【Redis】redis异步消息队列&plus;Spring自定义注解&plus;AOP方式实现系统日志持久化

    说明: SSM项目中的每一个请求都需要进行日志记录操作.一般操作做的思路是:使用springAOP思想,对指定的方法进行拦截.拼装日志信息实体,然后持久化到数据库中.可是仔细想一下会发现:每次的客户端 ...

  9. rabbitMQ学习1&colon;消息队列介绍与rabbitmq安装使用

    1. 什么是消息队列 生活里的消息队列,如同邮局的邮箱, 如果没邮箱的话, 邮件必须找到邮件那个人,递给他,才玩完成,那这个任务会处理的很麻烦,很慢,效率很低 但是如果有了邮箱, 邮件直接丢给邮箱,用 ...

随机推荐

  1. windows 物理内存获取

    由于我一般使用的虚拟内存, 有时我们需要获取到物理内存中的数据(也就是内存条中的真实数据), 按理说是很简单,打开物理内存,读取就可以了.但似乎没这么简单: #include "window ...

  2. NoClassDefFoundError

    //Java对象转化json格式 public static void toJsonByJettisonMappedXmlDriver(){ try { User user = getUser(); ...

  3. 【最短路】BAPC2014 B Button Bashing &lpar;Codeforces GYM 100526&rpar;

    题目链接: http://codeforces.com/gym/100526 http://acm.hunnu.edu.cn/online/?action=problem&type=show& ...

  4. IOS中使用像素位图&lpar;CGImageRef&rpar;对图片进行处理

    IOS中对图片进行重绘处理的方法总结 一.CGImageRef是什么 CGImageRef是定义在QuartzCore框架中的一个结构体指针,用C语言编写.在CGImage.h文件中,我们可以看到下面 ...

  5. Java基础笔记2

    1.   变量的定义 int money; int 变量类型   money 变量名 money=1000;变量的值 2.  自动类型转换 ①类型要兼容  容器 (水杯---竹篮---碗) ②目标类型 ...

  6. visual studio 版本管理从tfs迁移到svn

    1.首先要解除解决方案的tfs绑定 清除(删除)项目下的所有版本控制文件,这些文件有:*.vssscc,*.vspscc 删除这些版本控制文件比较简单,搜索这些后缀的文件,删除即可. 修改项目的解决方 ...

  7. 安装CentOS桌面环境

    CentOS 作为服务器的操作系统是很常见的,但是因为需要稳定而没有很时髦的更新,所以很少做为桌面环境.在服务器上通常不需要安装桌面环境,最小化地安装 CentOS(也就是 minimal CentO ...

  8. eclipse安装python

    在Eclipse中安装pydev插件 启动Eclipse, 点击Help->Install New Software...   在弹出的对话框中,点Add 按钮.  Name中填:Pydev,  ...

  9. maven install 跳过测试

    mvn命令跳过测试:mvn install -Dmaven.test.skip=true 测试类不会生成.class 文件mvn install -DskipTests 测试类会生成.class文件 ...

  10. &period;net维护的一些心得

    做了三个月的.net的开发,其实一直在做一个维护加二次开发的工作.现在这个项目告一段落,就此总结下我的所学所感吧.总的来说,.net和JAVA还是有许多地方是不同的,比如.net中的数据绑定问题,已经 ...