rabbitmq取消自动重连_RabbitMQ 连接断开处理-自动恢复

时间:2025-04-13 18:38:55

Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,如果你的消费者端是这样的代码的话,就会导致消费者挂掉。

usingSystem;;;;classReceiveLogs

{public static voidMain()

{var factory = new ConnectionFactory() { HostName = "localhost"};using (var connection =())

{using (var channel =())

{

("logs", "fanout");var queueName =().QueueName;

(queueName,"logs", "");var consumer = newQueueingBasicConsumer(channel);

(queueName,true, consumer);

("[*] Waiting for logs." +

"To exit press CTRL+C");while (true)

{var ea =(BasicDeliverEventArgs)();var body =;var message =Encoding.(body);

("[x] {0}", message);

}

}

}

}

}

那么如何会异常恢复呢?

之前我的操作方式是,建立一个ConnectionPool,在出现异常后,重建channel,也就是说,整个的异常恢复过程是自己处理的。最近研究因为研究Orleans,担心RabbitMQ的NET client使用Task时,会遇到Orleans的坑,所以顺手研究了下RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下

usingSystem;;;;classReceiveLogs

{public static voidMain()

{var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true};using (var connection =())

{using (var channel =())

{

("logs", "fanout");var queueName =().QueueName;

(queueName,"logs", "");var consumer = newQueueingBasicConsumer(channel);

(queueName,true, consumer);

("[*] Waiting for logs." +

"To exit press CTRL+C");while (true)

{var ea =(BasicDeliverEventArgs)();var body =;var message =Encoding.(body);

("[x] {0}", message);

}

}

}

}

}

具体的恢复机制如下

1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理

public voidinit()

{

m_delegate= new Connection(m_factory, false, m_factory.CreateFrameHandler());

AutorecoveringConnection self= this;

EventHandler recoveryListener = (_, args) =>{lock(recoveryLockTarget)

{if(ShouldTriggerConnectionRecovery(args))

{try{

();

}catch(Exception e)

{//TODO: logging

("BeginAutomaticRecovery() failed: {0}", e);

}

}

}

};lock(m_eventLock)

{

ConnectionShutdown+=recoveryListener;if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))

{

m_recordedShutdownEventHandlers.Add(recoveryListener);

}

}

}

观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate

protected voidPerformAutomaticRecovery()

{lock(recoveryLockTarget)

{

RecoverConnectionDelegate();

RecoverConnectionShutdownHandlers();

RecoverConnectionBlockedHandlers();

RecoverConnectionUnblockedHandlers();

RecoverModels();if(m_factory.TopologyRecoveryEnabled)

{

RecoverEntities();

RecoverConsumers();

}

RunRecoveryEventHandlers();

}

}

这个方法中调用的是

protected voidRecoverConnectionDelegate()

{bool recovering = true;while(recovering)

{try{

m_delegate= new Connection(m_factory, false, m_factory.CreateFrameHandler());

recovering= false;

}catch(Exception)

{//TODO: exponential back-off

(m_factory.NetworkRecoveryInterval);//TODO: provide a way to handle these exceptions

}

}

}

可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用来等待一下,然后再次执行连接恢复。