RabbitMQ实战经验分享

时间:2022-06-14 06:40:44

前言

最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。

下面分享下RabbitMQ实战经验,希望对大家有所帮助:


一、生产消息

关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。

先上发消息的代码

private bool MarkErrorSend(string[] lstMsg)
{
try
{
var factory = new ConnectionFactory()
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
try
{
//定义一个Direct类型交换机
channel.ExchangeDeclare(
exchange: "TestTopicChange", //exchange名称
type: ExchangeType.Topic, //Topic模式,采用路由匹配
durable: true,//exchange持久化
autoDelete: false,//是否自动删除,一般设成false
arguments: null//一些结构化参数,比如:alternate-exchange
); //定义测试队列
channel.QueueDeclare(
queue: "Test_Queue", //队列名称
durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
autoDelete: false,//是否自动删除,一般设成false
arguments: null
); //将队列绑定到交换机
string routeKey = "TestRouteKey.*";//*匹配一个单词
channel.QueueBind(
queue: "Test_Queue",
exchange: "TestTopicChange",
routingKey: routeKey,
arguments: null
); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = ; channel.ConfirmSelect();//发送确认机制
foreach (var itemMsg in lstMsg)
{
byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
//发布消息
channel.BasicPublish(
exchange: "TestTopicChange",
routingKey: "TestRouteKey.one",
basicProperties: properties,
body: sendBytes
);
}
bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true
return isAllPublished;
}
catch (Exception ex)
{
//写错误日志
return false;
}
finally
{
channel.Close();
connection.Close();
}
}
catch
{
//RabbitMQ.Client.Exceptions.BrokerUnreachableException:
//When the configured hostname was not reachable.
return false;
}
}

发消息没啥特别的。关于消息持久化的介绍这里也不再介绍,不懂的可以看上篇文章。发消息需要注意的地方是,可以选择多条消息一起发送,最后才确定消息发送成功,这样效率比较高;此外,需要尽量精简每条消息的长度(楼主在这里吃过亏),不然会因消息过长从而增加发送时间。在实际项目中一次发了4万多条数据没有出现问题。


二、接收消息

接下来说下消费消息的过程,我使用的是单个连接多个channel,每个channel每次只取一条消息方法。有人会问单个TCP连接,多个channel会不会影响通信效率。这个理论上肯定会有影响的,看影响大不大而已。我开的channel数一般去到30左右,并没有觉得影响效率,有可能是因为我每个channel是拿一条消息的原因。通过单个连接多个channel的方法,可以少开了很多连接。至于我为什么选每个channel每次只取一条消息,这是外界因素限制了,具体看自己需求。

接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn连接变量,此外还有创建连接、关闭连接和验证连接是否打开等方法。程序运行一个定时器,当

检测到连接未打开的情况下,主动创建连接处理消息。

 public class RabbitMQHelper
{
public IConnection conn = null; /// <summary>
/// 创建RabbitMQ消息中间件连接
/// </summary>
/// <returns>返回连接对象</returns>
public IConnection RabbitConnection(string sHostName, ushort nChannelMax)
{
try
{
if (conn == null)
{
var factory = new ConnectionFactory()
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],
AutomaticRecoveryEnabled = false,//取消自动重连,改用定时器定时检测连接是否存在
RequestedConnectionTimeout = ,//请求超时时间设成10秒,默认的为30秒
RequestedChannelMax = nChannelMax//与开的线程数保持一致
};
//创建连接
conn = factory.CreateConnection();
Console.WriteLine("RabbitMQ连接已创建!");
} return conn;
}
catch
{
Console.WriteLine("创建连接失败,请检查RabbitMQ是否正常运行!");
return null;
}
} /// <summary>
/// 关闭RabbitMQ连接
/// </summary>
public void Close()
{
try
{
if (conn != null)
{
if (conn.IsOpen)
conn.Close();
conn = null;
Console.WriteLine("RabbitMQ连接已关闭!");
}
}
catch { }
} /// <summary>
/// 判断RabbitMQ连接是否打开
/// </summary>
/// <returns></returns>
public bool IsOpen()
{
try
{
if (conn != null)
{
if (conn.IsOpen)
return true;
}
return false;
}
catch
{
return false;
}
}
}

接下来我们看具体如何接收消息。

private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = ;//一个连接的最大通道数和所开的线程数一致

首先初始化一个rabbit实例,然后通过RabbitConnection方法创建RabbitMQ连接。

当连接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,不然会有问题(具体问题是,设了RabbitMQ连接超时时间为10秒,有时候不管用,原因未查明。RabbitMQ创建连接默认超时时间为30秒,假如在这个时间内再去调用创建的话,就有可能得到两倍的channel;)

/// <summary>
/// 单个RabbitMQ连接开多个线程,每个线程开一个channel接受消息
/// </summary>
private void CreateConnecttion()
{
try
{
rabbit.RabbitConnection("localhost", nChannel);
if (rabbit.conn != null)
{
ThreadPool.SetMinThreads(, );
ThreadPool.SetMaxThreads(, );
for (int i = ; i <= nChannel; i++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");
}
myEvent.WaitOne();//等待所有线程工作完成后,才能关闭连接
rabbit.Close();
}
}
catch (Exception ex)
{
rabbit.Close();
Console.WriteLine(ex.Message);
}
}

接着就是接收消息的方法,处理消息的过程省略了。

  /// <summary>
/// 接收并处理消息,在一个连接中创建多个通道(channel),避免创建多个连接
/// </summary>
/// <param name="con">RabbitMQ连接</param>
private void ReceiveMsg(object obj)
{
IModel channel = null;
try
{
#region 创建通道,定义中转站和队列
channel = rabbit.conn.CreateModel();
channel.ExchangeDeclare(
exchange: "TestTopicChange", //exchange名称
type: ExchangeType.Topic, //Topic模式,采用路由匹配
durable: true,//exchange持久化
autoDelete: false,//是否自动删除,一般设成false
arguments: null//一些结构化参数,比如:alternate-exchange
); //定义阅卷队列
channel.QueueDeclare(
queue: "Test_Queue", //队列名称
durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
autoDelete: false,
arguments: null
);
#endregion
channel.BasicQos(, , false);//每次只接收一条消息 channel.QueueBind(queue: "Test_Queue",
exchange: "TestTopicChange",
routingKey: "TestRouteKey.*");
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
//处理消息方法
try
{
bool isMark = AutoMark(message);
if (isMark)
{
//Function.writeMarkLog(message);
//确认该消息已被消费,发消息给RabbitMQ队列
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
if (MarkErrorSend(message))//把错误消息推到错误消息队列
channel.BasicReject(ea.DeliveryTag, false);
else
//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者
channel.BasicReject(ea.DeliveryTag, true);
}
}
catch (Exception ex)
{
try
{
Console.WriteLine(ex.Message);
if (channel != null && channel.IsOpen)//处理RabbitMQ停止重启而自动评阅崩溃的问题
{
//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者
channel.BasicReject(ea.DeliveryTag, true);
}
}
catch { }
}
};
//手动确认消息
channel.BasicConsume(queue: "Test_Queue",
autoAck: false,
consumer: consumer);
}
catch (Exception ex)
{
try
{
Console.WriteLine("接收消息方法出错:" + ex.Message);
if (channel != null && channel.IsOpen)//关闭通道
channel.Close();
if (rabbit.conn != null)//处理RabbitMQ突然停止的问题
rabbit.Close();
}
catch { }
}
}

三、处理错误消息

把处理失败的消息放到“错误队列”,然后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,如果把这些消息都放回原队列,它们会继续分发到其他线程的channel,但结果还是处理不了,就会造成一个死循环,导致后面的消息无法处理)。把第一次处理不了的消息放到“错误队列”后,重新再开一个新的连接去处理“错误队列”的消息。

/// <summary>
/// 把处理错误的消息发送到“错误消息队列”
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
private bool MarkErrorSend(string msg)
{
RabbitMQHelper MQ = new RabbitMQHelper();
MQ.RabbitConnection("localhost",);
//创建通道
var channel = MQ.conn.CreateModel();
try
{
//定义一个Direct类型交换机
channel.ExchangeDeclare(
exchange: "ErrorTopicChange", //exchange名称
type: ExchangeType.Topic, //Topic模式,采用路由匹配
durable: true,//exchange持久化
autoDelete: false,//是否自动删除,一般设成false
arguments: null//一些结构化参数,比如:alternate-exchange
); //定义阅卷队列
channel.QueueDeclare(
queue: "Error_Queue", //队列名称
durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效)
exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除
autoDelete: false,//是否自动删除,一般设成false
arguments: null
); //将队列绑定到交换机
string routeKey = "ErrorRouteKey.*";//*匹配一个单词
channel.QueueBind(
queue: "Error_Queue",
exchange: "ErrorTopicChange",
routingKey: routeKey,
arguments: null
); //消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效)
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = ; channel.ConfirmSelect();//发送确认机制
byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
//发布消息
channel.BasicPublish(
exchange: "ErrorTopicChange",
routingKey: "ErrorRouteKey.one",
basicProperties: properties,
body: sendBytes
); bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true
return isAllPublished;
}
catch (Exception ex)
{
//写错误日志
return false;
}
finally
{
channel.Close();
MQ.conn.Close();
}
}

总结:RabbitMQ本身已经很稳定了,而且性能也很好,所有不稳定的因素都在我们处理消息的过程,所以可以放心使用。

Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper