activemq 实战 四 传输连接器-Transport connectors 4.2

时间:2024-12-03 11:35:49

In order to exchange messages, producers and consumers (clients) need to connect to

the broker. This client-to-broker communication is performed through transport connectors.

ActiveMQ provides an impressive list of protocols clients can use to exchange

messages. The requirements of ActiveMQ users in terms of connectivity are diverse.

Some users focus on performance, others on security, and so on. ActiveMQ tries to

cover all these aspects and provide a connector for every use case.

In this section you’ll learn how transport connectors are configured in the

ActiveMQ configuration files and adapt the stock portfolio example to demonstrate

various connectors. In the following sections, we’ll go through protocols available for

connecting to the broker over the network, as well as introduce the concept of the

embedded broker and Virtual Machine Protocol used for communicating with brokers

inside your application (a topic that will be continued in chapter 7).

为了交换消息,消息生产者和消费者(客户端)需要连接到代理(broker).

这种客户端-代理之间的通信就是通过传输连接(transport connector)来完成.

ActiveMQ提供一系列的连接协议,客户端使用这些协议可以交换消息.

ActiveMQ用户对连接的需求是多种多样的,比如有些用户关注性能,有些关注安全等等,

ActiveMQ提供连接器以满足所有用户的需求.

本节中,你将学习如何在ActiveMQ配置文件中配置传输连接器,同时会修改 stock portfolio例子,

以便阐述各种各样的链接器.在接下来的章节中,我们将讨论连接协议,使用这些连接协议,可以通过网络连接到代理.

另外,本节中还将介绍嵌入式代理(embedded broker)和虚拟机协议,使用虚拟机协议,允许在应用程序内部与代理直接通信

(这个专题将在第七章继续讨论).

4.2.1 Configuring transport connectors

From the broker’s perspective, the transport connector is a mechanism used to accept

and listen to connections from clients. If you take a look at the ActiveMQ demo configuration

file (conf/activemq-demo.xml), you’ll see the configuration snippet for transport

connectors similar to the following example:

<transportConnectors>

<transportConnector name=”openwire” uri=”tcp://localhost:61616″ discoveryUri=”multicast://default”/>

<transportConnector name=”ssl” uri=”ssl://localhost:61617″/>

<transportConnector name=”stomp” uri=”stomp://localhost:61613″/>

<transportConnector name=”xmpp” uri=”xmpp://localhost:61222″/>

</transportConnectors>

从代理的(broker)的角度来说,传输连接(transport connector)是一种机制,用来处理和监听客户端的连接.

如果你查看ActiveMQ demo的配置文件(conf/activemq-demo.xml),你会看到,传输连接配置类似于下面的代码片段:

<transportConnectors>

<transportConnector name=”openwire” uri=”tcp://localhost:61616″ discoveryUri=”multicast://default”/>

<transportConnector name=”ssl” uri=”ssl://localhost:61617″/>

<transportConnector name=”stomp” uri=”stomp://localhost:61613″/>

<transportConnector name=”xmpp” uri=”xmpp://localhost:61222″/>

</transportConnectors>

As you can see, transport connectors are defined within the <transportConnectors>

element. You define particular connectors with the appropriate nested <transport-Connector> element.

ActiveMQ simultaneously supports many protocols listening on

different ports. The configuration for a connector must uniquely define the name

and the URI attributes. In this case, the URI defines the network protocol and optional

parameters through which ActiveMQ will be exposed for connectivity. The

discoveryUri attribute as shown on the OpenWire connector is optional and will be

discussed further in section 4.3.1.

正如你看到的,传输连接使用<transportConnectors>元素来定义.通过嵌套的<transport-Connector>元素

来定义特定的连接.ActiveMQ支持同时支持不同的连接协议监听不同的端口.连接器配置的name和URI属性必须唯一.

这样,通过URI定义网络协议以及可选的参数,ActiveMQ便产生了一个连接.在OpenWire connector配置中的

discoveryUri属相是可选的,这部分将在4.3.1节中讨论.

The preceding snippet defines four transport connectors. Upon starting up

ActiveMQ using such a configuration file, you’ll see the following log in the console as

these connectors start up:

INFO TransportServerThreadSupport – Listening for connections at:tcp://localhost:61616

INFO TransportConnector – Connector openwire Started

INFO TransportServerThreadSupport – Listening for connections at:ssl://localhost:61617

INFO TransportConnector – Connector ssl Started

INFO TransportServerThreadSupport – Listening for connections at:stomp://localhost:61613

INFO TransportConnector – Connector stomp Started

INFO TransportServerThreadSupport – Listening for connections at:xmpp://localhost:61222

INFO TransportConnector – Connector xmpp Started

前面的代码片断中,一共定义了4个传输连接.使用包含上面代码片段的配置文件启动ActiveMQ时,控制台

会显示下面的日志信息:

INFO TransportServerThreadSupport – Listening for connections at:tcp://localhost:61616

INFO TransportConnector – Connector openwire Started

INFO TransportServerThreadSupport – Listening for connections at:ssl://localhost:61617

INFO TransportConnector – Connector ssl Started

INFO TransportServerThreadSupport – Listening for connections at:stomp://localhost:61613

INFO TransportConnector – Connector stomp Started

INFO TransportServerThreadSupport – Listening for connections at:xmpp://localhost:61222

INFO TransportConnector – Connector xmpp Started

From the client’s perspective, the transport connector URI is used to create a connection

to the broker in order to send and receive messages. Sending and receiving messages

will be discussed in detail in chapter 7, but the following code snippet should be

enough to demonstrate the usage of the transport connector URIs in Java applications:

从客户端的角度来说,使用传输连接的URI可以创建一个到代理的连接,以便接收和发送消息.

接收和发送消息将在第7章详细讨论,下面的代码片段足以说明Java应用程序中传输连接URI

(transport connector URI)的用处.

ActiveMQConnectionFactory factory =

new ActiveMQConnectionFactory(“tcp://localhost:61616″);

Connection connection = factory.createConnection();

connection.start();

Session session =

connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Note in the preceding example that the transport connector URIs defined in

ActiveMQ configuration are used by the client application to create a connection to

the broker. In this case, the URI for the TCP transport is used and is shown in bold

text.

注意,前面的例子中,客户端程序使用ActiveMQ配置文件中定义的的传输连接器的URI来创建到代理的连接.

代码中,展示了如何使用TCP transport的URI并以斜体显示.

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(“tcp://localhost:61616″);

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

NOTE The important thing to know is that we can use the query part of the

URI to configure connection parameters both on the server and client sides.

Usually most of the parameters apply both for client and server sides of the

connection, but some of them are specific to one or the other, so be sure you

check the protocol reference before using the particular query parameter.

With this basic understanding of configuring transport connectors, it’s important to

become aware of and understand the available transport connectors in ActiveMQ. But

before we start explaining particular connectors, we must first adapt our stock portfolio

example so it can be used with different transport connectors.

注意:需要重点了解的是:我们可以使用URI中查询字符串部分的来配置服务端和客户端的连接参数.

通常这些参数同时适用于服务端和客户端,但是也有一些仅适用于其中之一,所以在使用查询字符串之前

需要查看连接协议的使用手册.有了这些配置传输连接的基本知识后,需要重点了解ActiveMQ提供了哪些可用传输连接了.

但在解释这些连接器之前,我们首先修改下stock portfolio的例子,这样可以先熟悉不同的传输连接.

4.2.2 Adapting the stock portfolio example

Chapter 3 introduced a stock portfolio example that uses ActiveMQ to publish and

consume stock exchange data. There, we used the fixed standard connector URI since

we wanted to make those introductory examples as simple as possible. In this chapter,

we’ll explain all protocols and demonstrate them by running the stock portfolio

example using each of them. For that reason, we need to modify the stock portfolio

example so it will work using any of the protocols.

Listing 4.1 is a modified version of the main() method from the stock portfolio

publisher.

第三章介绍了一个stock portfolio实例,在这个实例中使用ActiveMQ发送和接收处理消息,以便进行股票价格信息交换.

为了使stock portfolio这个入门性的实例看起来尽量简单,我们使用了固定的标准的URI配置连接器.

本章将解释所有连接协议,并在stock portfolio这个例子中使用各种连接协议.为此,我们需要修改

stock portfolio例子,使得这个例子可以在使用任何连接实例时能够正常运行.

清单4.1是stock portfolio例子中修改后的Publisher类的main()方法.

Listing 4.1 Modifying stock portfolio publisher to support various connector URIs

public static void main(String[] args) throws JMSException

{

if (args.length == 0)

{

System.err.println(“Please define a connection URI!”);

return;

}

Publisher publisher = new Publisher(args[0]);

String[] topics = new String[args.length – 1];

System.arraycopy(args, 1, topics, 0, args.length – 1);

while (total < 1000)

{

for (int i = 0; i < count; i++)

{

publisher.sendMessage(topics);

}

total += count;

System.out.println(“Published ‘” + count + “‘ of ‘” + total + “‘ price messages”);

try

{

Thread.sleep(1000);

}

catch (InterruptedException x)

{

}

}

publisher.close();

}

The preceding code ensures that the connector URI is passed as the first argument

and extracts topic names from the rest of the arguments passed to the application.

Now the stock portfolio publisher can be run with the following command:

前面的代码保证了程序运行时第一个命令行参数是连接器的URI,剩下的参数会被提取出来作为主题的名称.

至此可有下面的命令执行那个stock portfolio实例中的publisher.

$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch4.Publisher -Dexec.args=”tcp://localhost:61616 CSCO ORCL”

Sending: {price=65.713356601409, stock=JAVA, offer=65.779069958011,up=true}on destination: topic://STOCKS.JAVA

Sending: {price=66.071605671946, stock=JAVA, offer=66.137677277617,up=true}on destination: topic://STOCKS.JAVA

Sending: {price=65.929035001620, stock=JAVA, offer=65.994964036622,up=false}on destination: topic://STOCKS.JAVA

Note that one more argument has been added to the publisher: the URL to be used to

connect to the appropriate broker.

注意在执行publisher类时,新增了一个URI参数连接器使用这个参数连接到合适的代理.

The same principle can be used to modify the stock portfolio consumer. In the following

listing, you’ll find the stock portfolio consumer’s main() method modified to

accept the connection URI as a first parameter.

使用同样的方法,可以修改consumer类.在下面的代码清单中,你会看到stock portfolio实例的

consumer类的main()方法已经被修改过了以便使用第一个命令行参数作为连接器的URI.

Listing 4.2 Modifying stock portfolio consumer to support various connector URIs

public static void main(String[] args) throws JMSException

{

if (args.length == 0)

{

System.err.println(“Please define connection URI!”);

return;

}

Consumer consumer = new Consumer(args[0]);

String[] topics = new String[args.length – 1];

System.arraycopy(args, 1, topics, 0, args.length – 1);

for (String stock : topics)

{

Destination destination = consumer.getSession().createTopic(“STOCKS.” + stock);

MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);

messageConsumer.setMessageListener(new Listener());

}

}

In order to achieve the same functionality as in the chapter 3 example, you should

run the consumer with an extra URI argument.

The following example shows how to do this:

下面的命令展示了执行consumer的命令行命令,改命令使用了一个额外的URI参数,以便使得这里的consumer

类与第三章中的consumer类功能相同.

$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch4.Consumer -Dexec.args=”tcp://localhost:61616 CSCO ORCL”

ORCL 65.71 65.78 up

ORCL 66.07 66.14 up

ORCL 65.93 65.99 down

CSCO 23.30 23.33 up

Note that the message flow between the producer and the consumer is the same as in

the original example. With these changes, the examples are now ready to be run using

a variety of supported protocols. Let’s now dig into the particular transport connectors.

In the following section we’ll see what options you have if you want to connect to

the broker over the network.

注意,修改后例子里面producer和consumer这个之间的消息流与未修改之前是相同的.做完这些修改后,

这个例子可以支持多种连接协议了.现在让我们深入探讨各种传输连接.在接下来的章节中,可以看到使用哪些

方法可以连接到代理.