RabbitMQ工作模式 - 简单模式和work工作模式多个竞争的消费者

时间:2024-01-26 14:53:37

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';

async function produce() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });

  for (let i = 0; i < 10; i++) {
    const message = `Message ${i}`;
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    console.log(` [x] Sent '${message}'`);
  }

  setTimeout(() => {
    connection.close();
    process.exit(0);
  }, 500);
}

produce();

consumer.ts:

import * as amqp from 'amqplib';

async function consume() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);

  console.log(' [*] Waiting for messages. To exit press CTRL+C');

  channel.consume(queue, async (msg) => {
    if (msg !== null) {
      const message = msg.content.toString();
      console.log(` [x] Received ${message}`);

      // Simulate some work
      await new Promise(resolve => setTimeout(resolve, 1000));

      console.log(' [x] Done');
      channel.ack(msg);
    }
  });
}

consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {
  if (msg) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    console.log('Processing mail task:', msg.content.toString());
    try {
      //模拟邮件发送
      await new Promise(resolve => setTimeout(resolve, 1000));
      console.log(' [x] Done');
      channel.ack(msg);
    } catch (error) {
      console.log('error:', data);
      // 处理消息失败,判断是否需要进行重试
      if (canRetry(msg)) {
        // 重新入队,进行下一次尝试
        channel.reject(msg, true);
      } else {
        // 不进行重试,将消息从队列中移除
        channel.ack(msg);
      }
    }
  }
});

  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在进行消息重试时,你可以考虑添加一些延迟和控制重试次数的逻辑。以下是修改后的代码,考虑了重试间隔和次数的情况:

function canRetry(msg: Message) {
    const maxRetryAttempts = 3; // 最大重试次数

    // 从消息的属性中获取重试次数
    const retryCount = msg.properties.headers['x-retry-count'] || 0;

    // 判断是否达到最大重试次数
    return retryCount < maxRetryAttempts;
}

async function processMailTask(msg: Message) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    console.log('Processing mail task:', msg.content.toString());
    
    try {
        // 模拟邮件发送
        await new Promise(resolve => setTimeout(resolve, 1000));
        console.log(' [x] Done');
        channel.ack(msg);
    } catch (error) {
        console.log('error:', data);
        // 处理消息失败,判断是否需要进行重试
        if (canRetry(msg)) {
            // 获取当前重试次数
            const retryCount = msg.properties.headers['x-retry-count'] || 0;

            // 计算下一次重试的延迟时间,可以根据重试次数进行指数退避
            const delay = Math.pow(2, retryCount) * 1000;

            // 在一定的延迟后重新入队,进行下一次尝试
            setTimeout(() => {
                channel.nack(msg, false, false);
            }, delay);
        } else {
            // 不进行重试,将消息从队列中移除
            channel.ack(msg);
        }
    }
}

channel.consume(queueName, async (msg: Message | null) => {
    if (msg) {
        await processMailTask(msg);
    }
});

在这个示例中,我添加了一个 processMailTask 函数来处理邮件任务。在处理失败的情况下,根据重试次数计算下一次重试的延迟时间,然后使用 setTimeout 在一定的延迟后重新入队。这里使用了指数退避的策略,即每次重试的延迟时间是上一次的两倍。

channel.nack(msg, allUpTo, requeue);
  • msg: 要否定的消息对象。
  • allUpTo(可选参数): 如果设置为 true,则所有比当前消息更早的消息也将被否定。默认为 false
  • requeue(可选参数): 如果设置为 true,则被否定的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true

在你的代码中,channel.nack(msg, false, false); 表示对当前消息 msg 进行否定,并且不重新将消息放回队列,而是将其删除。这在处理重试逻辑时很重要,因为我们通过 setTimeout 自定义了重试的时间,并手动重新入队。如果 requeue 设置为 true,消息会立即被重新入队,这可能与我们的定制重试逻辑冲突。