rabbitmq 自带恢复连接机制,只要设置
factory.setAutomaticRecoveryEnabled(true);,它自动使用
AutorecoveringConnection
去连接,如果连接超时,会自动去恢复。来自amqp-client-4.2.0.jar的恢复连接原码:
private synchronized void beginAutomaticRecovery() throws InterruptedException { Thread.sleep(this.params.getNetworkRecoveryInterval()); this.notifyRecoveryListenersStarted(); RecoveryAwareAMQConnection newConn = this.recoverConnection(); if(newConn != null) { this.addAutomaticRecoveryListener(newConn); this.recoverShutdownListeners(newConn); this.recoverBlockedListeners(newConn); this.recoverChannels(newConn); this.delegate = newConn; if(this.params.isTopologyRecoveryEnabled()) { this.recoverEntities(); this.recoverConsumers(); } this.notifyRecoveryListenersComplete(); } }
可惜,在我这边恢复不成功,恢复报错。
背景: 一个队列对应5个消费者; 每个消费者对应一个channel; exchanges 共用一个,type是direct;
消息持久化。
问题: 连接超时的时候,rabbitmq 的自带的工具页面,conn, channel, consumer , 及关系都已清掉。
在尝试恢复连接的时候,能成功恢复channel, 但在恢复其它的时候,报错。
private void recoverEntities() { this.recoverExchanges(); this.recoverQueues(); this.recoverBindings(); }
是报channel.closed错误。
思路:
exchanges, queues, 在我这边是不用恢复的,但要恢复consumer, 与queue, channel的关系。
故rabbitmq 把一些方法封装的深,重写这方法抛弃。故先配置
factory.setTopologyRecoveryEnabled(false);
不走它自带恢复其它的,只需要它恢复channel.
再配置
AutorecoveringConnection
它的监听事件,进行恢复channel与consumer的关系。
方法:
把所有消费者的引用主在一个集合里。
在监听事件里,遍历给每个消费者分配channel, 并恢复与队列的关系。
private Connection createConn() throws IOException, TimeoutException { conn = createFactory().newConnection(); conn.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { logger.error("rabbitmq connection already close. reason: {}", cause); } }); AutorecoveringConnection myAutorecoveringConnection = (AutorecoveringConnection)conn; myAutorecoveringConnection.addRecoveryListener(new RecoveryListener() { @Override public void handleRecovery(Recoverable recoverable) { logger.debug("revocer by mqconsummers"); if(CollectionUtils.isNotEmpty(mqConsumers)){ logger.debug("revocer by mqconsummers size " + mqConsumers.size()); try { AutorecoveringConnection autorecoveringConnection = (AutorecoveringConnection)recoverable; Field privateChannelsField = AutorecoveringConnection.class .getDeclaredField("channels"); privateChannelsField.setAccessible(true); Map<Integer, AutorecoveringChannel> map = (Map)privateChannelsField.get(autorecoveringConnection); Integer[] keys = map.keySet().toArray(new Integer[0]); if(keys.length != mqConsumers.size()){ logger.error("rabbitmq channel length not equals mqconsumer length,so autorecoveringConnection error"); return; } logger.debug(" mqconsummer deptach by channel! "); int index = 0; for (MqConsumer mqConsumer: mqConsumers){ int key = keys[index]; //Channel channel2 = conn.createChannel(); Channel channel2 = map.get(key); channel2.basicQos(prefetch); channel2.queueDeclare(mqConsumer.getQueue(), true, false, false, null); channel2.queueBind(mqConsumer.getQueue(), mqConsumer.getExchange(), mqConsumer.getQueue()); mqConsumer.setChannel(channel2); mqConsumer.consumer(); index++; } } catch (Exception e) { logger.error("rabbitmq autorecoveringConnection error ", e); } } } @Override public void handleRecoveryStarted(Recoverable recoverable) { } }); return conn; }
囧囧,以上的方案是有问题的。
说明一下:
所以rabbitmq自带恢复连接,是要求consumer tags 不同才可以恢复连接。
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
去掉它的监听事件,就可以啦
AutorecoveringConnection