问题现象
由于项目需要连接到多个RabbitMQ实例或者一个实例的多个vhost上,需要自定义配置多个ConnectionFactory来区分连接,以达到可以消费或者向多个RabbitMQ实例/多个vhost发送消息。手动配置ConnectionFactory后,发现原来配置的发送确认回调无效了,ConnectionFactory的配置如下,
/**
* 自定义RabbitMQ不同实例/不同vhost的ConnectionFactory
* @param rabbitProperties 配置文件中rabbitMQ的配置属性
* @return
*/
@Bean("defaultRabbitConnectionFactory")
@Primary
public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
/**
* 配置自定义的RabbitTemplate模板
* @param connectionFactory 连接工厂
* @param confirmCallback 消息发送后回调的实例对象
* @return
*/
@Bean("rabbitTemplate")
@Primary
public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory, MQConfirmCallback confirmCallback) {
CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
cachingConnectionFactory.setVirtualHost("/");
RabbitTemplate rabbitTemplate = new CorrelateRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback);
return rabbitTemplate;
}
原因分析
从上面的自定义的ConnectionFactory配置来看,配置了连接的RabbitMQ实例地址和vhost,以及RabbitTemplate中配置定义的ConfirmCallback对象实例。此时即使在Properties文件中配置了-confirm-type=correlated
,在发送消息后是不会对实现了接口的Bean对象发起回调的。跟踪发送消息的接口,在RabbitTemplate的源码中,发现在
doSend
方法中,调用了setupConfirm
方法,用于设置回调,源码如下:
/**
* Send the given message to the specified exchange.
*
* @param channel The RabbitMQ Channel to operate within.
* @param exchangeArg The name of the RabbitMQ exchange to send to.
* @param routingKeyArg The routing key.
* @param message The Message to send.
* @param mandatory The mandatory flag.
* @param correlationData The correlation data.
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
String exch = nullSafeExchange(exchangeArg);
String rKey = nullSafeRoutingKey(routingKeyArg);
if (logger.isTraceEnabled()) {
logger.trace("Original message to publish: " + message);
}
Message messageToUse = message;
MessageProperties messageProperties = messageToUse.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
}
if (this.beforePublishPostProcessors != null) {
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
}
}
setupConfirm(channel, messageToUse, correlationData);
if (this.userIdExpression != null && messageProperties.getUserId() == null) {
String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
if (userId != null) {
messageProperties.setUserId(userId);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}
private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
CorrelationData correlationData = this.correlationDataPostProcessor != null
? this.correlationDataPostProcessor.postProcess(message, correlationDataArg)
: correlationDataArg;
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
new PendingConfirm(correlationData, System.currentTimeMillis()));
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
correlationData.getId());
}
}
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
}
}
通过源码发现在发送消息前,在setupConfirm方法中设置具体的回调对象,要设置回调对象,则首先要判断是否开启了发送确认或者回调对象不为空,此时,这个条件肯定满足,还有个关键点就是channel instanceof PublisherCallbackChannel
,也就是说当前发送消息的Channel
必须是PublisherCallbackChannel
,既然是从Channel中判断的,那么这个设置的地方应该就Connect有关系了,因为在没有手动配置ConnectFactory之前,使用默认的配置,回调时没有问题,手动配置ConnectFacotry后,发送确认回调就无效了,此时,可以基本判断问题出现在自定义的ConnectFactory上。此时问题就要回到确认这个Channel是什么时候创建的以及创建时使用了哪些配置条件。
回到ConnectionFactory定义的地方,检查new的动作,注意在new CachingConnectionFactory
时,内部的属性private PublisherCallbackChannelFactory publisherChannelFactory = ();
,器(),此时创建的正是PublisherCallbackChannelImpl
public static PublisherCallbackChannelFactory factory() {
return (channel, exec) -> new PublisherCallbackChannelImpl(channel, exec);
}
在服务启动时,最终会执行到CachingConnectionFactory#doCreateBareChannel
方法,在705行的if条件中,需要判断是否是
类型的确认,我们再来看
是从什么时候设置值的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-42lButdl-1622359786369)(/Users/yuxiao/Documents/)]
从源码得知,CachingConnectionFactory
中的confirmType
的默认值为,那么至此可以明白此值应该可以由外界配置传入,既然properties文件中已经配置了``-confirm-type=correlated
,那么应该就是在自定义
CachingConnectionFactory`的时候没有设置该值,至此问题得以了然。
解决方案
需要再声明自定的ConnectionFactory时,配置confirm-type,完成的ConnectionFactory的定义如下:
/**
* 自定义RabbitMQ不同实例/不同vhost的ConnectionFactory
* @param rabbitProperties 配置文件中rabbitMQ的配置属性
* @return
*/
@Bean("defaultRabbitConnectionFactory")
@Primary
public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost("/");
cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode());
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
return cachingConnectionFactory;
}
参考资料
rabbitmq:publisher confirms - 知乎 ()