Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,如果你的消费者端是这样的代码的话,就会导致消费者挂掉。
usingSystem;;;;classReceiveLogs
{public static voidMain()
{var factory = new ConnectionFactory() { HostName = "localhost"};using (var connection =())
{using (var channel =())
{
("logs", "fanout");var queueName =().QueueName;
(queueName,"logs", "");var consumer = newQueueingBasicConsumer(channel);
(queueName,true, consumer);
("[*] Waiting for logs." +
"To exit press CTRL+C");while (true)
{var ea =(BasicDeliverEventArgs)();var body =;var message =Encoding.(body);
("[x] {0}", message);
}
}
}
}
}
那么如何会异常恢复呢?
之前我的操作方式是,建立一个ConnectionPool,在出现异常后,重建channel,也就是说,整个的异常恢复过程是自己处理的。最近研究因为研究Orleans,担心RabbitMQ的NET client使用Task时,会遇到Orleans的坑,所以顺手研究了下RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下
usingSystem;;;;classReceiveLogs
{public static voidMain()
{var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true};using (var connection =())
{using (var channel =())
{
("logs", "fanout");var queueName =().QueueName;
(queueName,"logs", "");var consumer = newQueueingBasicConsumer(channel);
(queueName,true, consumer);
("[*] Waiting for logs." +
"To exit press CTRL+C");while (true)
{var ea =(BasicDeliverEventArgs)();var body =;var message =Encoding.(body);
("[x] {0}", message);
}
}
}
}
}
具体的恢复机制如下
1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理
public voidinit()
{
m_delegate= new Connection(m_factory, false, m_factory.CreateFrameHandler());
AutorecoveringConnection self= this;
EventHandler recoveryListener = (_, args) =>{lock(recoveryLockTarget)
{if(ShouldTriggerConnectionRecovery(args))
{try{
();
}catch(Exception e)
{//TODO: logging
("BeginAutomaticRecovery() failed: {0}", e);
}
}
}
};lock(m_eventLock)
{
ConnectionShutdown+=recoveryListener;if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
{
m_recordedShutdownEventHandlers.Add(recoveryListener);
}
}
}
观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate
protected voidPerformAutomaticRecovery()
{lock(recoveryLockTarget)
{
RecoverConnectionDelegate();
RecoverConnectionShutdownHandlers();
RecoverConnectionBlockedHandlers();
RecoverConnectionUnblockedHandlers();
RecoverModels();if(m_factory.TopologyRecoveryEnabled)
{
RecoverEntities();
RecoverConsumers();
}
RunRecoveryEventHandlers();
}
}
这个方法中调用的是
protected voidRecoverConnectionDelegate()
{bool recovering = true;while(recovering)
{try{
m_delegate= new Connection(m_factory, false, m_factory.CreateFrameHandler());
recovering= false;
}catch(Exception)
{//TODO: exponential back-off
(m_factory.NetworkRecoveryInterval);//TODO: provide a way to handle these exceptions
}
}
}
可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用来等待一下,然后再次执行连接恢复。