Spring引导嵌入的HornetQ集群不转发消息。

时间:2021-05-09 20:51:19

I'm trying to create a static cluster of two Spring Boot applications with embedded HornetQ servers. One application/server will be handling external events and generating messages to be sent to a message queue. The other application/server will be listening on the message queue and process incoming messages. Because the link between the two applications is unreliable, each will use only local/inVM clients to produce/consume messages on their respective server, and relying on the clustering functionality to forward the messages to the queue on the other server in the cluster.

我正在尝试用嵌入的HornetQ服务器创建两个Spring引导应用程序的静态集群。一个应用程序/服务器将处理外部事件并生成发送到消息队列的消息。另一个应用程序/服务器将监听消息队列并处理传入消息。由于这两个应用程序之间的链接不可靠,每个应用程序只使用本地/inVM客户端在各自的服务器上生成/使用消息,并依赖集群功能将消息转发到集群中的其他服务器上的队列。

I'm using the HornetQConfigurationCustomizer to customize the embedded HornetQ server, because by default it only comes with an InVMConnectorFactory.

我使用hornetqconfigurationcustomzer来定制嵌入的HornetQ服务器,因为默认情况下,它只附带一个InVMConnectorFactory。

I have created a couple of sample applications that illustrate this setup, throughout this example "ServerSend", refers to the server that will be producing messages, and "ServerReceive" refers to the server that will be consuming messages.

我已经创建了几个示例应用程序来演示这个设置,在这个示例中,“ServerSend”指的是将要生成消息的服务器,而“ServerReceive”指的是将要使用消息的服务器。

pom.xml for both applications contains:

砰的一声。两个应用程序的xml包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>

DemoHornetqServerSendApplication:

DemoHornetqServerSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties (ServerSend):

应用程序。属性(ServerSend):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

DemoHornetqServerReceiveApplication:

DemoHornetqServerReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties (ServerReceive):

应用程序。属性(ServerReceive):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

After starting both applications, log output shows this:

启动两个应用程序后,日志输出显示:

ServerSend:

ServerSend:

2015-04-09 11:11:58.471 INFO 7536 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:11:58.501 INFO 7536 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:11:58.595 INFO 7536 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:11:58.720 INFO 7536 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:11:59.568 INFO 7536 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

2015-04-09 11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:00服务器:HQ221000: live服务器从配置HornetQ配置开始(集群=true,backup=false,sharedStore=true,journalDirectory=C:\用户***\本地\Temp\ HornetQ -data/journal,bindingsDirectory=数据/绑定,largeMessagesDirectory=data/largemessages,pagingDirectory=数据/分页)2015-04-09 11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:00 --- --- --- --- --- --- --- --- --- --- --- --- --- --- --- --- --- --- --- -服务器:HQ221045: libaio不可用,将配置转换为NIO 2015-04-09 11:11 . 11:11 . 58.595 INFO 7536 --- [main] org.hornetq.core。服务器:HQ221043:添加协议支持核心2015-04-09 11:11:11 . 11:11:11:11 . 11:11:11:11:11:11 . 11:11:11:11:11:11 . 11:11:11:11:11:12:00 --- [main] org.hornetq.core。服务器:HQ221003:尝试部署队列jms.queue.jms。testqueue 2015-04-09 11:11 .568 --- --- - [main]。服务器:HQ221020:启动Netty Acceptor 4.0.13。最后localhost:5445 2015-04-09 11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:11:服务器:HQ221007:服务器现在运行2015-04-09 11:11:11 .593 INFO 7536 --- [main] org.hornetq.core。服务器:HQ221001: HornetQ服务器版本2.4.5。最后(Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

ServerReceive:

ServerReceive:

2015-04-09 11:12:04.401 INFO 4528 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:12:04.410 INFO 4528 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:12:04.520 INFO 4528 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:12:04.629 INFO 4528 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:12:05.545 INFO 4528 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

2015-04-09 11:12 04.401信息4528 --- [main] org.hornetq.core。服务器:HQ221000: live服务器从配置HornetQ配置开始(集群=true,backup=false,sharedStore=true,journalDirectory=C:\Users*** \AppData\Local\Temp\ HornetQ -data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/分页)2015-04-09 11:12:04.410 INFO 4528 --- [main] org.hornetq.core。服务器:HQ221045: libaio不可用,将配置转换为NIO 2015-04-09 11:12:04.520 INFO 4528 --- [main] .hornetq.core。服务器:HQ221043:添加协议支持核心2015-04-09 11:12:04.629 INFO 4528 --- [main] org.hornetq.core。服务器:HQ221003:尝试部署队列jms.queue.jms。testqueue 2015-04-09 11:12 05.545 INFO 4528 --- [main] org.hornetq.core。服务器:HQ221020:启动Netty Acceptor 4.0.13。最后localhost:5446 2015-04-09 11:12:05.578 INFO 4528 --- [main] org.hornetq.core。服务器:HQ221007:服务器现在运行2015-04-09 11:12:05.578 INFO 4528 --- [main] org.hornetq.core。服务器:HQ221001: HornetQ服务器版本2.4.5。最后(Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

I see clustered=true in both outputs, and this would show false if I removed the cluster configuration from the HornetQConfigurationCustomizer, so it must have some effect.

我在两个输出中都看到了集群=true,如果我从HornetQConfigurationCustomizer中删除了集群配置,这将显示错误,因此它必须具有一定的效果。

Now, ServerSend shows this in the console output:

现在,ServerSend在控制台输出中显示了这一点:

Sending message: Timestamp from Server: 1428574324910
Sending message: Timestamp from Server: 1428574329899
Sending message: Timestamp from Server: 1428574334904

发送消息:从服务器发送的时间戳:1428574324910发送消息:来自服务器的时间戳:1428574329899发送消息:来自服务器的时间戳:1428574334904。

However, ServerReceive shows nothing.

然而,ServerReceive显示。

It appears that the messages are not forwarded from ServerSend to ServerReceive.

似乎消息没有从服务器端转发到ServerReceive。

I did some more testing, by creating two further Spring Boot applications (ClientSend and ClientReceive), which do not have a HornetQ server embedded and instead connect to a "native" server.

我做了更多的测试,创建了两个新的Spring引导应用程序(ClientSend和ClientReceive),它们没有嵌入的HornetQ服务器,而是连接到一个“本地”服务器。

pom.xml for both client applications contains:

砰的一声。客户机应用程序的xml包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>

DemoHornetqClientSendApplication:

DemoHornetqClientSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}

application.properties (ClientSend):

应用程序。属性(ClientSend):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446

queue=jms.testqueue

DemoHornetqClientReceiveApplication:

DemoHornetqClientReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
    }

    @JmsListener(destination="${queue}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

application.properties (ClientReceive):

应用程序。属性(ClientReceive):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445

queue=jms.testqueue

Now the console shows this:

现在控制台显示了这个:

ServerReveive:

ServerReveive:

Received message: Timestamp from Client: 1428574966630
Received message: Timestamp from Client: 1428574971600
Received message: Timestamp from Client: 1428574976595

收到的信息:时间戳:客户端:1428574966630收到的信息:时间戳来自客户:1428574971600收到的信息:时间戳来自客户:1428574976595。

ClientReceive:

ClientReceive:

Received message: Timestamp from Server: 1428574969436
Received message: Timestamp from Server: 1428574974438
Received message: Timestamp from Server: 1428574979446

收到消息:来自服务器的时间戳:1428574969436收到消息:来自服务器的时间戳:1428574974438收到的消息:来自服务器的时间戳:1428574979446。

If I have ServerSend running for a while, and then start ClientReceive, it also receives all the messages queued up to that point, so this shows that the messages don't just disappear somewhere, or get consumed from somewhere else.

如果我有一个ServerSend运行一段时间,然后启动ClientReceive,它也会接收到所有排队到那个点的消息,所以这表明消息不会消失在某个地方,或者从其他地方被消费掉。

For completeness sake I've also pointed ClientSend to ServerSend and ClientReceive to ServerReceive, to see if there is some issue with clustering and the InVM clients, but again there was no outout indicating that any message was received in either ClientReceive or ServerReceive.

为了完整性起见,我还将ClientSend发送到ServerSend和ClientReceive到ServerReceive,以查看集群和InVM客户端是否存在问题,但是仍然没有outout指示在ClientReceive或ServerReceive中接收到任何消息。

So it appears that message delivery to/from each of the embedded brokers to directly connected external clients works fine, but no messages are forwarded between brokers in the cluster.

因此,从每个嵌入式代理到直接连接外部客户端的消息传递都可以正常工作,但是在集群中的代理之间没有转发消息。

So, after all this, the big question, what's wrong with the setup that messages aren't forwarded within the cluster?

所以,在这之后,最大的问题是,在集群中没有转发消息的设置有什么问题?

1 个解决方案

#1


0  

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html d0e595

"HornetQ core is designed as a set of simple POJOs so if you have an application that requires messaging functionality internally but you don't want to expose that as a HornetQ server you can directly instantiate and embed HornetQ servers in your own application."

“HornetQ核心被设计成一组简单的pojo,所以如果您有一个需要内部消息功能的应用程序,但您不想公开它作为一个HornetQ服务器,您可以直接实例化并将HornetQ服务器嵌入到您自己的应用程序中。”

If you are embedding it, you aren't exposing it as a server. Each of your containers has a seperate instance. It is the equivalent of starting up 2 copies of hornet and giving them the same queue name. One writes to that queue on the first instance and the other listens to the queue on the second instance.

如果将其嵌入,就不会将其作为服务器公开。每个容器都有一个独立的实例。它相当于启动了两个hornet的副本,并给它们相同的队列名称。一个在第一个实例中写入队列,另一个在第二个实例上侦听队列。

If you want to decouple your apps in this way, you need to have a single place that is acting as a server. Probably, you want to cluster. This isn't specific to Hornet, BTW. You'll find this pattern often.

如果你想以这种方式解耦你的应用程序,你需要一个单独的地方作为一个服务器。可能,您想要集群。这不是特别针对黄蜂,顺便说一下。你会经常发现这种模式。

#1


0  

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html d0e595

"HornetQ core is designed as a set of simple POJOs so if you have an application that requires messaging functionality internally but you don't want to expose that as a HornetQ server you can directly instantiate and embed HornetQ servers in your own application."

“HornetQ核心被设计成一组简单的pojo,所以如果您有一个需要内部消息功能的应用程序,但您不想公开它作为一个HornetQ服务器,您可以直接实例化并将HornetQ服务器嵌入到您自己的应用程序中。”

If you are embedding it, you aren't exposing it as a server. Each of your containers has a seperate instance. It is the equivalent of starting up 2 copies of hornet and giving them the same queue name. One writes to that queue on the first instance and the other listens to the queue on the second instance.

如果将其嵌入,就不会将其作为服务器公开。每个容器都有一个独立的实例。它相当于启动了两个hornet的副本,并给它们相同的队列名称。一个在第一个实例中写入队列,另一个在第二个实例上侦听队列。

If you want to decouple your apps in this way, you need to have a single place that is acting as a server. Probably, you want to cluster. This isn't specific to Hornet, BTW. You'll find this pattern often.

如果你想以这种方式解耦你的应用程序,你需要一个单独的地方作为一个服务器。可能,您想要集群。这不是特别针对黄蜂,顺便说一下。你会经常发现这种模式。