大数据入门第三天——基础补充与ActiveMQ

时间:2021-10-08 05:07:41

一、多线程基础回顾

  先导知识在基础随笔篇:http://www.cnblogs.com/jiangbei/p/6664555.html

  以下此部分以补充为主

  1.概念

    进程:进行中的程序,内存中有独立的内存空间

    线程:进程中的多个顺序控制流

  2.Java中实现线程的两种方式

    参考上文(继承thread类与实现runnable接口)

  3.同步synchronized的用法

    参考上文(同一时间只能有一个线程执行)

  4.lock

    外部的锁类,参考:https://www.cnblogs.com/dolphin0520/p/3923167.html

    更多的介绍与细节,将在JUC的基础随笔中进行补充...

  5.更多JUC特性

    线程池,队列blockqueue等其他特性,将在基础JUC中进行补充...

二、JMS

  1.什么是JMS

  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

查看更多JMS介绍

  // 介绍来自百度百科

    所以,实质上,JMS是类似JDBC的一套规范(一组接口)

  2.体系架构

JMS由以下元素组成。
JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户:生产或消费基于消息的Java的应用程序或对象。
JMS生产者:创建并发送消息的JMS客户。
JMS消费者:接收消息的JMS客户。
JMS消息:包括可以在JMS客户之间传递的数据的对象
JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题:一种支持发送消息给多个订阅者的机制。

  3.JMS两种模型 

    1、 点对点或队列模型

      大数据入门第三天——基础补充与ActiveMQ

    2、发布者/订阅者模型

      大数据入门第三天——基础补充与ActiveMQ

  更多JMS基本概念介绍与编程接口讲解,参考http://blog.csdn.net/jiuqiyuliang/article/details/46701559

三、ActiveMQ入门

  1.概述

    消息中间件的概念

消息中间件

我们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件(MOM:Message Orient middleware)。

消息中间件有很多的用途和优点:
. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
. 负责建立网络通信的通道,进行数据的可靠传送。
. 保证数据不重发,不丢失
. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

    ActiveMQ就是JMS的一种具体实现,是一种受欢迎的消息中间件

Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.

  2.下载

    这里示例使用windows版本,实际生产应该是Linux版了;这里就不下载最新版了,使用5.12.1演示(注意查看版本位置)

    下载地址:http://activemq.apache.org/download.html

  3.配置与安装

    和tomcat一样,解压

    大数据入门第三天——基础补充与ActiveMQ

    打开conf/activemq.xml配置文件,修改相关的IP0.0.0.0为locaohost

    大数据入门第三天——基础补充与ActiveMQ

    4.启动

bin\activemq start

  对于报错无法加载主类一闪而过的,是因为安装路径有空格,解决方法参考:https://www.cnblogs.com/anan1688/p/4681965.html

  对于双击bat文件一闪而过,应该是由于版本问题(老版本可以使用此方式启动),查看官方get started即可完美解决:点击这里

   5.测试

Open the administrative interface
URL: http://127.0.0.1:8161/admin/
Login: admin
Passwort: admin
Navigate to "Queues"
Add a queue name and click create
Send test message by klicking on "Send to"

四、Java连接Helloworld

  1.准备

    将解压出来的包中的activemq-all-5.11.1.jar加入lib(IDEA如何引入外部jar请参考IDEA相关随笔),maven依赖方式此处暂略

  2.开始

package cn.itcast_03_mq.topic;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener,ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url =ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "mytopic";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
public static Boolean isconnection=false;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(subject);
consumer = session.createConsumer(destination);
} // 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
consumer.setMessageListener(this);
connection.setExceptionListener(this);
isconnection=true;
System.out.println("Consumer:->Begin listening...");
// 开始监听
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} public void onException(JMSException arg0) {
isconnection=false;
}
}

  更多helloworld示例,参考博文或者官网示例

  J2EE中使用activeMQ没问题,大数据方向后续将会有kafka的介绍

五、反射与动态代理

  反射参考基础随笔篇:http://www.cnblogs.com/jiangbei/p/6829755.html

  动态代理参考基础增强篇:http://www.cnblogs.com/jiangbei/p/6828086.html

  其他基础(例如socket等)请在Java基础篇补充查看