RabbitMQ -- unacked

时间:2024-09-23 00:04:45

RabbitMQ解决大量unacked问题

为了快速响应用户请求,我们需要消息异步处理机制,比较简单的做法是用redis的List结构,我们项目使用更专业的RabbitMQ。关于redis和RabbitMQ队列处理的性能比较可以查看这篇文章http://blog.****.net/educast/article/details/34521603

这里不扯RabbitMQ的一些定义了,我们遇到的问题是,入队并发和速度速度很快,但是消费端的处理速度慢得惊人。为了数据安全我们做了持久化而且消费端需要ack或者unack响应。从其提供的web控制台可以看到量大的时候产生大量的unacked消息,也就是说MQ把数据放到channel里,很长时间过去了channel没有给任何响应。我们创建了600个channel问题也依旧。查看了硬件没有瓶颈,网上有文章说是channel连接断了mq无法及时识别,还会往这些失效的channel里传递数据,误认子弟啊,拉出去枪毙了。Netstat一下,发现一共MQ服务器一共有4个外部连接连入了5672端口,两个productor两个consumer跟预想的一样。Jstack一下,发现pool-xxx线程远远没有预想的多,才50个,我们创建了600个channel啊。恍然大悟,尼玛的channel中文意思虽然是通道,但是MQ使用了线程池技术,它们是共享线程的,而不是一个通道一个线程,这个有点类似http请求和http服务器的关系。

把结构图给画出来先:

RabbitMQ -- unacked

Java代码

  1. Properties prop = new Properties();
  2. InputStream inStream = this.getClass().getResourceAsStream("/config.properties");
  3. prop.load(inStream);
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(prop.getProperty("RabbitMQHost"));
  6. factory.setUsername(prop.getProperty("RabbitMQUserName"));
  7. factory.setPassword(prop.getProperty("RabbitMQPassword"));
  8. // 关键所在,指定线程池
  9. ExecutorService service = Executors.newFixedThreadPool(500);
  10. factory.setSharedExecutor(service);
  11. factory.setAutomaticRecoveryEnabled(true);
  12. factory.setConnectionTimeout(15000);// 15秒
  13. factory.setRequestedHeartbeat(60);
  14. Connection connection = factory.newConnection();

线程方面已经没有问题了,不存在什么失效连接无法及时识别这一说法,如果还慢,则jstack去跟踪这些线程池的线程,看看都在干啥,关于jstack抽空再写了,很强大的jdk工具。

---------------------

本文来自 weinianjie1 的**** 博客 ,全文地址请点击:https://blog.****.net/weinianjie1/article/details/50611379?utm_source=copy