Node.js连接RabbitMQ,断线重连,动态绑定routing key

时间:2022-12-24 03:15:01

RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。

下面将给出基于Promise式的写法。并且实现动态的队列绑定

初始化配置


const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js') // 交换机名称
const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME // 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null

向队列发送消息的函数


// 向队列发送消息的函数
function publishMessage (msg) {
if (!CH) {
return ''
} msg = JSON.stringify(msg)
// 指定交换机ex, routing key, 以及消息的内容
CH.publish(ex, eventBusTopic, Buffer.from(msg))
}

当链接rabbitMQ断开时,要主动去重连


function reconnectRabbitMq () {
log.info('reconnect_rabbit_mq')
connectRabbitMq()
}

连接rabbitMQ的主要函数


function connectRabbitMq () {
amqp.connect(amqpAddr, {
// 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到连接是来自哪个实例
clientProperties: {
connection_name: hostName
}
})
.then((conn) => {
log.info('rabbitmq_connect_successd')
// 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
// error是个特别的事件,务必要处理的
// 报错就直接去重连
conn.on('error', (err) => {
log.error('connect_error ' + err.message, err)
reconnectRabbitMq()
})
// 创建channel
return conn.createChannel()
})
.then((ch) => {
CH = ch
// 初始化交换机
ch.assertExchange(ex, 'topic', {durable: true})
// 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
return ch.assertQueue(hostName, queueAttr)
})
.then((q) => {
// 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
// CH.bindQueue(q.queue, ex, 'some.topic.aaa')
// 消费者,获取消息
CH.consume(q.queue, (msg) => {
var _msg = msg.content.toString()
var MSG = JSON.parse(_msg)
log.info(_msg, MSG)
}, {noAck: true})
})
.catch((err) => {
console.log(err)
})
}

动态给队列绑定或者解绑routing key


function toggleBindQueue (routingKey, bind) {
return new Promise((resolve, reject) => {
if (!CH) {
log.error('channel not established')
reject(new Error('channel not established'))
return ''
}
// 初始化队列,如果队列已经存在,就会直接使用
CH.assertQueue(`${hostName}`, queueAttr)
.then((q) => {
// 如果bind是true,就绑定。否则就解绑
if (bind) {
log.info(`bindQueue ${hostName} ${topic}`)
return CH.bindQueue(q.queue, ex, topic)
} else {
return CH.unbindQueue(q.queue, ex, topic)
}
})
.then((res) => {
resolve()
})
.catch((err) => {
reject(err)
log.error(err)
})
})
} module.exports = {
connectRabbitMq,
toggleBindQueue,
publishMessage
}

使用方法

加入你的服务端用的是Express, 那么在app.js中可以


...
const {connectRabbitMq} = require('./connect-mq.js')
connectRabbitMq()
...

完整代码


// onnect-mq.js
const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js') // 交换机名称
const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME // 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null // 向队列发送消息的函数
function publishMessage (msg) {
if (!CH) {
return ''
} msg = JSON.stringify(msg)
// 指定交换机ex, routing key, 以及消息的内容
CH.publish(ex, eventBusTopic, Buffer.from(msg))
} // 当链接rabbitMQ断开时,要主动去重连
function reconnectRabbitMq () {
log.info('reconnect_rabbit_mq')
connectRabbitMq()
} // 链接rabbitMQ的主要函数
function connectRabbitMq () {
amqp.connect(amqpAddr, {
// 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到链接是来自哪个实例
clientProperties: {
connection_name: hostName
}
})
.then((conn) => {
log.info('rabbitmq_connect_successd')
// 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
// error是个特别的事件,务必要处理的
// 报错就直接去重连
conn.on('error', (err) => {
log.error('connect_error ' + err.message, err)
reconnectRabbitMq()
})
// 创建channel
return conn.createChannel()
})
.then((ch) => {
CH = ch
// 初始化交换机
ch.assertExchange(ex, 'topic', {durable: true})
// 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
return ch.assertQueue(hostName, queueAttr)
})
.then((q) => {
// 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
// CH.bindQueue(q.queue, ex, 'some.topic.aaa')
// 消费者,获取消息
CH.consume(q.queue, (msg) => {
var _msg = msg.content.toString()
var MSG = JSON.parse(_msg)
log.info(_msg, MSG)
}, {noAck: true})
})
.catch((err) => {
console.log(err)
})
} // 动态给队列绑定或者解绑routing key
function toggleBindQueue (routingKey, bind) {
return new Promise((resolve, reject) => {
if (!CH) {
log.error('channel not established')
reject(new Error('channel not established'))
return ''
}
// 初始化队列,如果队列已经存在,就会直接使用
CH.assertQueue(`${hostName}`, queueAttr)
.then((q) => {
// 如果bind是true,就绑定。否则就解绑
if (bind) {
log.info(`bindQueue ${hostName} ${topic}`)
return CH.bindQueue(q.queue, ex, topic)
} else {
return CH.unbindQueue(q.queue, ex, topic)
}
})
.then((res) => {
resolve()
})
.catch((err) => {
reject(err)
log.error(err)
})
})
} module.exports = {
connectRabbitMq,
toggleBindQueue,
publishMessage
}

来源:https://segmentfault.com/a/1190000016807727