目录
1. 中间件技术
2. MetaQ中间件
3. MetaQ编程实践
1. 中间件技术
0x1: 中间件简介
中间件(Middleware)是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务、面向服务的体系结构等中应用比较广泛,如:
1. 数据库
2. Apache的Tomcat
3. IBM公司的WebSphere
4. BEA公司的WebLogic[[应用服务器]
5. 东方通公司的Tong系列中间件
6. Kingdee公司的等
都属于中间件,中间件技术本质上就是在计算机系统不同层次的模块之间的异构、跨协议的通信问题
严格来讲,中间件技术已经不局限于应用服务器、数据库服务器。中间件技术创建在对应用软件部分常用功能的抽象上,将常用且重要的
1. 过程调用
2. 分布式组件
3. 消息队列
4. 事务
5. 安全
6. 连结器
7. 商业流程
8. 网络并发
9. HTTP服务器
10. Web Service
等功能集于一身或者分别在不同产品中分别完成 ,我国学术界一般认可的定义是中间件是指网络环境下处于操作系统、数据库等系统软件和应用软件之间的一种起连接作用的分布式软件,主要解决异构网络环境下分布式应用软件的互连与互操作问题,提供标准接口、协议,屏蔽实现细节,提高应用系统易移植性
0x2: 中间件的特征(内涵)
总的来说,中间件有几个非常重要的特征
1. 平台化
所谓平台就是能够独立运行并自主存在,为其所支撑的上层系统和应用提供运行所依赖的环境。中间件是一个平台,因此中间件是必须独立存在,是运行时刻的"系统软件",它为上层的网络应用系统提供一个运行环境,并通过标准的接口和API来隔离其支撑的系统,实现其独立性,也就是平台性。J2EE应用服务器提供JAVA应用的运行环境,就是经典的中间件
2. 应用支撑
中间件的最终目的是解决上层应用系统的问题,而且也是软件技术发展到今天对应用软件提供最完善彻底的解决方案。
1) 高级程序设计语言的发明,使得软件开发变成一个独立的科学和技术体系,而操作系统平台的出现,使得应用软件通过标准的API接口,实现了软件与硬件的分离。
2) 现代面向服务(SOA)的中间件在软件的模型、结构、互操作以及开发方法等四个方面提供了更强的应用支撑能力:
2.1) 模型:构件模型弹性粒度化
通过抽象层度更高的构件模型,实现具备更高结构独立性、内容自包含性和业务完整性的可复用构件,即服务(RESTFUL API就是一个最好的例子)。并且在细粒度服务基础上,提供了更粗粒度的服务封装方式,即业务层面的封装,形成业务组件,就可以实现从组件模型到业务模型的全生命周期企业建模的能力。
2.2) 结构:结构松散化
将"服务描述"和"服务功能实现"分离,将"服务的使用者"和"提供者"分离,从而避免分布式应用系统构建和集成时常见的技术、组织、时间等不良约束。
2.3) 互操作:交互过程标准化
将与互操作相关的内容进行标准化定义,如服务封装、描述、发布、发现、调用等契约,通信协议以及数据交换格式等等。最终实现访问互操作、连接互操作和语义互操作(RESTFUL、SOAP、WPF、WebService、SCA/SDO)
2.4) 开发集成方法:
应用系统的构建方式由代码编写转为主要通过服务间的快捷组合及编排,完成更为复杂的业务逻辑的按需提供和改善,从而大大简化和加速应用系统的搭建及重构过程(一种典型的轻耦合思想)而要最终解决软件的质量问题、效率问题、互操作问题、灵活应变问题这四大问题,需要在软件技术的内在结构(Structure)、架构(Architecture)层面进行思考。解决这些问题,技术的本质是复用、松耦合、互操作(标准)等软件技术的内在机制。这也是中间件技术和产品的本质特征
3. 软件复用
软件复用,即软件的重用,也叫再用,是指同一事物不作修改或稍加改动就多次重复使用。从软件复用技术的发展来看,就是不断提升抽象级别,扩大复用范围。最早的复用技术是子程序,人们发明子程序,就可以在不同系统之间进行复用了。但是,子程序是最原始的复用,因为这种复用范围是一个可执行程序内复用,静态开发期复用,如果子程序修改,意味着所有调用这个子程序的程序必须重新编译、测试和发布
复用对象复用范围
1) 子程序: 一个可执行程序内复用: 静态开发期复用
2) 组件(DLL、Com..): 系统内复用,动态运行期复用
3) 企业对象组件(Com+、.NET、EJB..): 企业网络内复用,不同系统之间复用
4) 服务(RESTFUL、SOAP、WPF、WebService、SCA/SDO): 不同企业之间、跨系统、跨异构环境复用,动态可配置
4. 耦合关系
1) 分布式对象技术将"连接逻辑"进行分离
2) 消息中间件将"连接逻辑"进行异步处理,增加了更大的灵活性
3) 消息代理和一些分布式对象中间件将数据转换也进行了分离
4) 而SOA架构,通过服务的封装,实现了业务逻辑与网络连接、数据转换等进行完全的解耦
5. 互操作性
在软件的互操作方面,传统中间件只是实现了访问互操作,即通过标准化的API实现了同类系统之间的调用互操作,而连接互操作还是依赖于特定的访问协议,如JAVA使用RMI,CORBA使用IIOP等。而SOA通过标准的、支持Internet、与操作系统无关的SOAP协议实现了连接互操作。而且,服务的封装是采用XML协议,具有自解析和自定义的特性,这样,基于SOA的中间件还可以实现语义互操作
总之,服务化体现的是中间件在完整业务复用、灵活业务组织方面的发展趋势,其核心目标是提升IT基础设施的业务敏捷性。因此,中间件将成为SOA的主要实现平台
Relevant Link:
http://zh.wikipedia.org/wiki/%E4%B8%AD%E9%97%B4%E4%BB%B6
http://kb.cnblogs.com/page/196448/
http://jm.taobao.org/
2. MetaQ中间件
0x1: MetaQ的应用场景
假设我们有这么一个应用场景,为了完成一个用户注册操作,可能需要将用户信息写入到用户库中,然后通知给红包中心给用户发新手红包,然后还需要通知博客系统给用户准备对应的博客账号,进行合法性验证,告知SNS系统给用户导入新的用户等10步操作。
那么针对这个场景,一个最简单的设计方法就是串行的执行整个流程
//全称串行操作
用户注册->通知给红包中心给用户发新手红包->博客系统给用户准备对应的博客账号->进行合法性验证->告知SNS系统给用户导入新的用户
通过业务分析我们能够得知,用户的实际的核心流程其实只有一个,就是用户注册。而后续的准备博客帐号,通知SNS等操作虽然必须要完成,但却是不需要让用户等待的。
这种模式有个专业的名词,就叫"最终一致",即实际上这并不是一个严格强制的串行操作,从业务的角度上来说,有很多步骤完全是可以异步完成的,只要最终的结果是"最终一致"的就可以
0x2: MetaQ技术原理
METAQ是一款完全的队列模型消息中间件,服务器使用Java语言编写,可在多种软硬件平台上部署。客户端支持Java、C++编程语言
MetaQ对外提供的是一个队列服务,内部实现也是完全的队列模型,这里的队列是持久化的磁盘队列,具有非常高的可靠性,并且充分利用了操作系统cache来提高性能
1. MetaQ是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
2. Producer、Consumer、队列都可以分布式。
3. 能够保证严格的消息顺序
4. 提供丰富的消息拉取模式
5. 高效的订阅者水平扩展能力
6. 实时的消息订阅机制
7. 亿级消息堆积能力
MetaQ的存储结构是根据大规模互联网应用需求,完全重新设计的一套存储结构,使用这套存储结构可以支持上万的队列模型,并且可以支持消息查询、分布式事务、定时队列等功能
MetaQ内部大部分功能都靠队列来驱动,那么必须支持足够多的队列,才能更好的满足业务需求,MetaQ可以在单机支持上万队列,这里的队列全部为持久化磁盘方式,从而对IO性能提出了挑战。MetaQ是这样解决的
1. Message全部写入到一个独立的队列,完全的顺序写
2. Message在文件的位置信息写入到另外的文件,串行方式写
通过以上方式,既做到数据可靠,又可以支持更多的队列
Relevant Link:
http://blog.csdn.net/blogdevteam/article/details/8449916
https://github.com/killme2008/Metamorphosis/wiki
http://m.bianceng.cn/web/Skills/201407/42090.htm
http://www.bkjia.com/ASPjc/871354.html
3. MetaQ编程实践
消息中间件中有两个角色: "消息生产者(Producer)"和"消息消费者(Consumer)"。Meta里同样有这两个概念,消息生产者负责创建消息并发送到Meta服务器(Broker),Meta服务器会将消息持久化到磁盘,消息消费者从Meta服务器拉取消息并提交给应用消费
回顾我们之前说的MetaQ的架构图
要使用MetaQ进行分布式消息通信编程学习,就必须要实现最基本的架构搭建
0x1: 配置Zookeeper集群
MetaQ使用zookeeper发布和订阅服务,并默认使用zookeeper存储消费者offset,因此,你需要首先安装一个zookeeper到某台机器上,或者使用某个现有的zk集群
/mate-queue/taobao/metamorphosis-server-wrapper/conf/server.ini
使用内置的zookeeper服务器进行搭建
[zookeeper]
zk.zkConnect=localhost:2181
zk.zkSessionTimeoutMs=30000
zk.zkConnectionTimeoutMs=30000
zk.zkSyncTimeMs=5000
0x2: 启动Zookeeper
//停止local模式启动的broker1,并重新以集群模式启动
bin/metaServer.sh start
0x3: 引入MetaQ需要依赖的JAR包
http://fnil.net/downloads/index.html
包括client、server的
0x4: Producer.java
package com.taobao.metamorphosis.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class Producer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "192.168.207.128:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
/*
* create producer,强烈建议使用单例
* 消息生产者的接口是MessageProducer,你可以通过它来发送消息
*/
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "test";
/*
* 这一步在发送消息前是必须的,你必须发布你将要发送消息的topic
* 这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接
* 这个步骤针对每个topic只需要做一次,多次调用无影响
*/
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = reader.readLine()) != null)
{
/*
* send message
* 在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:
* 1) id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。
* 2) topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。
* 3) data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。
* 4) attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。
*/
SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
// check result
if (!sendResult.isSuccess())
{
System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to " + sendResult.getPartition());
}
}
}
}
0x5: AsyncConsumer.java
package com.taobao.metamorphosis.example;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "192.168.207.128:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
// subscribed topic
final String topic = "test";
// consumer group
final String group = "meta-example";
/*
* create consumer,强烈建议使用单例
* 通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,
* 这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,
* 我们这里只设置了group属性,这是消费者的分组名称。
* Meta的Producer、Consumer和Broker都可以为集群。
* 消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。
* 同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example
*/
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
/*
* subscribe topic
* 订阅消息通过subscribe方法,这个方法接受三个参数
* 1) topic,订阅的主题
* 2) maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。
* 3) MessageListener,消息监听器,负责消息消息。
*/
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}
public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe
consumer.completeSubscribe();
}
}
Relevant Link:
https://github.com/killme2008/Metamorphosis/wiki/%E5%A6%82%E4%BD%95%E5%BC%80%E5%A7%8B
http://www.it165.net/admin/html/201402/2409.html
Copyright (c) 2014 LittleHann All rights reserved