
在RabbitMQ中,可以设置消息的优先级,也就相当于在队列中置顶某条消息,让某个消息优先得到处理的功能。
既然是设置消息的优先级,那么就是针对生产者,也就是消息发布端。
设置消息的优先级一共有2个步骤:
1、设置队列的x-max-priority参数;
2、设置消息的Priority参数。
话不多说,上代码!
发送端:
var factory = new ConnectionFactory() { HostName = "localhost",UserName="ty2017",Password="",VirtualHost="log" };
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
//声明一个队列,设置x-max-priority参数
channel.QueueDeclare("q.test", true, false, false, new Dictionary<string, object> { { "x-max-priority", } }); for (int i = ; i < ; i++) {
var body = Encoding.UTF8.GetBytes(string.Format("第{0}个消息",i+));
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
//设置消息的优先级
properties.Priority = (byte)((i == )?:i);
//发布消息
channel.BasicPublish(exchange: "",
routingKey: "q.test",
basicProperties: properties,
body: body);
} }
}
将第四个消息的优先级设置为最大,我们打开Consumer端,看看第四个消息是否被优先处理!
Consumer端代码如下:
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ty2017", Password = "",VirtualHost="log" };
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
channel.QueueDeclare("q.test", true, false, false, new Dictionary<string, object> { { "x-max-priority", } }); channel.BasicQos(, , false); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {
var body = ea.Body; var message = Encoding.UTF8.GetString(body);
Console.WriteLine("优先级:{0} 消息内容:{1}",ea.BasicProperties.Priority, message);
channel.BasicAck(ea.DeliveryTag, false); };
channel.BasicConsume(queue: "q.test",
noAck: false,
consumer: consumer);
Console.ReadLine();
}
}
运行Consumer端程序,得到如下结果:
根据结果可以看出,优先级越大的消息越是被优先处理!