中间件:中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。是连接两个独立应用程序或独立系统的软件。相连接的系统,即使它们具有不同的接口,但通过中间件相互之间仍能交换信息。执行中间件的一个关键途径是信息传递。通过中间件,应用程序可以工作于多平台或 OS 环境。
java中有好多中间件,比如IBM MQ,JMS,以及马上要讲的activeMQ等。
为什么要使用中间件以及使用中间件的好处就不说了。
到apache官网下载最新版本的activeMQ是apache-activemq-5.9.0。
解压到一个目录下,如果是本机用,基本上不用任何配置。如果配置的话,都在conf下配置即可。具体的请参考apache官方说明。
启动bin下的。如果想看例子,把webapps-demo下的demo复制到webapps下即可。
activemq内置jetty容器,而且在内又这样一段节点:
<bean class="" init-method="start">
<!-- the default port number for the web console -->
<property name="port" value="8161"/>
</bean>
熟悉tomcat的都应该明白这是对外访问的端口哦,所以可以通过localhost:8161访问jetty容器了。浏览器输入,即可看到页面,如果不相信是jetty的话可以再8161后随便输入一个路径,是不是返回如下:
Problem accessing /ddd. Reason:
Not Found
Powered by Jetty://
\webapps目录下有admin,顾名思义是管理jetty的啊,和tomcat的manager一样啊。别的不说了。
启动bin下的。
通过查看,这个文件内又有这样一段节点:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&=104857600"/>
</transportConnectors>
所以如果想访问mq,有好多种方式啊。
最常用的当然是tcp协议啦。
activemq的管理页面有Home | Queues | Topics | Subscribers | Connections | Network | Scheduled | Send
通过看文档知道消息传输有两种模型,queue和topic。
queue:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。但是消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
topic:
消息生产者将消息发布到topic中,同时有多个消息消费者消费该消息。和queue不同,发布到topic的消息会被所有订阅者消费。
以上各特点可以通过实际代码验证。
下面就写个代码区测试测试了。
发送:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class Sender {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = ();
();
session = (,
Session.AUTO_ACKNOWLEDGE);
destination = ("testQueue");
producer = (destination);
(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage message = ("测试发送" + i);
(message);
}
();
} catch (Exception e) {
();
} finally {
try {
if (null != connection)
();
} catch (Throwable ignore) {
}
}
}
}
接受:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = ();
();
session = (,
Session.AUTO_ACKNOWLEDGE);
destination = ("testQueue");
consumer = (destination);
while (true) {
TextMessage message = (TextMessage) (100000);
if (null != message) {
("收到" + ());
} else {
break;
}
}
} catch (Exception e) {
();
} finally {
try {
if (null != connection)
();
} catch (Throwable ignore) {
}
}
}
}
用户名和密码都是admin,如果不知道,就用ActiveMQConnection.DEFAULT_USER和ActiveMQConnection.DEFAULT_PASSWORD去替代吧。
自己运行看看吧。
下面展示topic:
发送:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class Sender {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Topic destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = ();
();
session = (,
Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("testTopic");
producer = (destination);
(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage message = ("测试发送" + i);
(message);
}
();
} catch (Exception e) {
();
} finally {
try {
if (null != connection)
();
} catch (Throwable ignore) {
}
}
}
}
接收者:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Topic destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,// admin,admin
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = ();
();
session = (,
Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("testTopic");
consumer = (destination);
while (true) {
TextMessage message = (TextMessage) ();
if (null != message) {
("收到" + ());
} else {
break;
}
}
} catch (Exception e) {
();
} finally {
try {
if (null != connection)
();
} catch (Throwable ignore) {
}
}
}
}
区别都用红线标明了,先运行接收者(多run几次),再运行发送者。切换控制台看吧。
如果你切换控制台,你会看到。queue方式下发送的消息被某个接收者接收后,别的接收者就接收不到啦。而topic方式下,发送者的消息会被所有接收者接收。
好了,大概就介绍这么多吧。
activemq和IBM MQ的机制差不多,但是IBM MQ要比activeMq复杂的多。
有些项目,特别是银行项目用中间件比较多。