“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”
Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。
Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。
本片文章简单介绍Pulsar的Producer,包含以下内容:
- Producer的设计
- 消息发送的实现
1. Producer设计
1.1 创建Producer
以上是Pulsar官网上创建一个Producer的示例代码。
创建的过程如下:
- 指定serviceUrl创建PulsarClient
- 指定Producer发送消息的Topic,通过PulsarClient创建Producer
通过上述的创建代码可以推测:
- serviceUrl应该是用于做服务发现的,通过serviceUrl查找Broker的信息
- Producer指定了Topic,那么一个Producer只能往特定的Topic发送消息
1.2 Producer API
Pulsar中,发送相关的接口为Producer,如上图所示:
- Producer定义了发送接口
- ProducerBase作为抽象类,提供了基础实现
- ProducerImpl则是真正的实现类
- PartitionedProducerImpl看着和分区相关,这个之后再看
Producer 接口具体如下:
public interface Producer<T> extends Closeable {
/**
* 返回Producer发送消息的Topic
*/
String getTopic();
/**
* Producer的名称
*/
String getProducerName();
/**
* 同步发送消息
*/
MessageId send(T message) throws PulsarClientException;
/**
* 有发送消息
*/
CompletableFuture<MessageId> sendAsync(T message);
/**
* Flush客户端完成中的消息并等待所有消息成功持久化
* @since 2.1.0
* @see #flushAsync()
*/
void flush() throws PulsarClientException;
/**
* 异步Flush客户端完成中的消息并等待所有消息成功持久化
* @since 2.1.0
* @see #flush()
*/
CompletableFuture<Void> flushAsync();
/**
* 创建TypedMessageBuilder,用于构建消息
*/
TypedMessageBuilder<T> newMessage();
/**
* 同步发送消息,已经被弃用
*/
@Deprecated
MessageId send(Message<T> message) throws PulsarClientException;
/**
* 异步发送消息,已经被弃用
*/
@Deprecated
CompletableFuture<MessageId> sendAsync(Message<T> message);
/**
* 获取Producer发送的最后一个序列号
*/
long getLastSequenceId();
/**
* 获取Producer的统计信息
*/
ProducerStats getStats();
/**
* 异步关闭Producer并且释放资源
*/
CompletableFuture<Void> closeAsync();
/**
* 返回Producer是否连接到Broker上
*/
boolean isConnected();
}
通过Producer接口可以看出Pulsar Producer提供的能力:
- 同步发送消息
- 异步发送消息
- 一个Producer只能向一个特定的Topic发送消息(Producer#topic()返回了一个Topic,说明Producer会绑定到一个Topic上)
- 批量发送(flush方法说明了应该是支持批量的,消息会在客户端内存中保存)
- 包含了sequenceId是否可以做幂等之类的事情?
- 统计能力
1.3 ProducerBase
ProducerBase作为抽象类,实现了Producer接口。
ProducerBase包含四个属性:
- producerCreatedFuture:异步创建Producer的Future
- conf:Producer的配置
- schema:消息相关的Schema信息
- interceptors:Producer的拦截器,在发送前后插入一些操作
producerCreatedFuture
重命名上看这个属性是用于异步创建Producer。
但是在一个基类中提供异步创建实体的Future显得比较难理解。一般的编程思路会在基类中定义一些基础的公共的属性,用于保存状态或者配置,比如conf。这里的producerCreatedFuture实际用于PartitionedProducerImpl异步创建多个Producer,这个后续再看。
conf
ProducerConfigurationData提供了Producer相关的配置信息,包含是否批量发送、内存缓存消息的大小、发送的Timeout等。
schema
Schema指明了消息的格式,通过Schema完成对消息的encode和decode。
interceptors
ProducerInterceptor是Producer提供的拦截器,包含两个方法:beforeSend、onSendAcknowledgement,分别用于在发送前和发送后插入行为。
1.4 ProducerImpl
ProducerImpl继承了ProducerBase,是Producer接口的核心实现。
ProducerImpl在ProducerBase的基础上增加了大量的属性,包含:
- producerId:通过AtomicLong生成的进程内唯一的Producer ID
- msgIdGenerator:消息ID
- pendingMessages:内存中缓存的消息
- pendingCallbacks:内存中缓存的消息对应的Callback
- timeout:发送的超时配置
- batchMessageContainer:批量消息的容器
- producerName:全局唯一的Producer名称
- 等等...(在后续发送实现中介绍相关的属性)
ProducerImpl实现了具体的发送行为,比如同步发送、异步发送(后续在消息发送的实现部分介绍)。
1.5 PartitionedProducerImpl
Producer提供的发送相关的API定义,ProducerBase提供了基础实现,ProducerImpl提供了具体的实现,那么PartitionedProducerImpl做什么?
通过PartitionedProducerImpl的属性可以看到内部包含了一个ProducerImpl列表,那么可以PartitionedProducerImpl和ProducerImpl是一个组合的关系。
通过start方法可以看出,PartitionedProducerImpl根据对应的topicMetadata的分区数创建了对应数量的ProducerImpl实例(这里也说明了ProducerBase中producerCreatedFuture的用途)。
为什么在PartitionedProducerImpl中需要创建一组ProducerImpl实例?
PartitionedProducerImpl另外增加了一个routerPolicy属性,其接口为:
public interface MessageRouter extends Serializable {
@Deprecated
default int choosePartition(Message<?> msg) {
throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");
}
default int choosePartition(Message<?> msg, TopicMetadata metadata) {
return choosePartition(msg);
}
}
通过接口的定义不难理解MessageRouter接口实现Message和Partition的映射。
通过internalSendAsync方法的实现可以看出,发送消息时通过routerPolicy将消息映射到Partition,通过Partition选择对应的Producer执行发送,那么久解释了为什么在PartitionedProducerImpl会创建和对应Topic的分区数相同的ProducerImpl。
通过以上内容,能总结出Producer模块的各个类的职责:
- Producer:定义发送接口,用户使用的核心API
- ProducerBase:Producer接口的基础实现
- ProducerImpl:实现具体的发送行为,一个ProducerImpl只能向一个Topic写入消息
- PartitionedProducerImpl:整合多个ProducerImpl,用于向多分区发送消息的场景
2. 消息发送的实现
在对Producer模块有个整体的认识后,后续内容具体阐述一条消息的发送流程。
在消息系统中,从Producer的视角看,一条消息写入过程一般包含:
- 消息校验
- 消息属性增强(添加一些必要的系统属性)
- 消息路由(选择目标分区)
- 消息序列化
- 消息数据写入网络
- 等待写入结果响应
- 返回写入结果
下面将通过ProducerImpl的实现来了解Pulsar的Producer发送消息的过程。
2.1 寻址
要发送一条消息,除了校验消息是否合法,首先要这条消息的写入目标(通过路由找到消息目标的Partition)。
在ProducerImpl的构造方法最后一行调用了grabCnx()方法创建了链接(构建了链接的上下文)。
grabCnx方法通过PulsarClient创建Connection,而PubsarClient内部则通过LookupService接口来完成Topic到Broker的映射并建立链接。
LookupService接口提供了BinaryProtoLookupService和HttpLookupService实现,通过LookupService用户也可以实现自己的服务发现模块。
2.2 消息发送
发送消息的调用链如上图所示,最终通过ProducerImpl的internalSendAsync将消息发送出去。无论同步发送还是异步发送,最终都会通过异步的方式执行发送(同时只是在异步的基础上等待发送结果),这里可以看到Pulsar Producer在API实现上比较注重代码的复用性即API的最小功能原则。
以单挑消息发送为例,sendAsync的具体实现如下:
- 在必要的校验后,将消息包装成OpSendMsg对象(包含异步发送完成后的Callback)
- 将消息添加到pendingMessages
- 通过Connection的EventLoop执行发送操作
ProducerImpl将在ackReceived方法中处理服务端对写入消息的响应,通过消息的sequenceId来识别对应的OpSendMsg,并调用对应Callback来执行回调逻辑。实际在Callback完成了响应用户的操作及发送行为的一些统计。
ProducerImpl只会建立一个链接,且发送和ACK都是通过synchronized执行的,所以中间通过pendingMessages来完成消息发送和响应的对应,以及超时的处理。这块具体可以看一下代码实现。
总结
本文介绍了Pulsar Producer模块的设计,包含各个类的职责,并简单介绍了消息的发送过程。Puslar Producer在设计上和RocketMQ的思想差异还是比较大的,比如Puslar Producer会将Producer对应到分区上,每个分区有自己的Producer,这样可以比较容易完成一些幂等之类的操作。
Pulsar-Producer实现简介的更多相关文章
-
RocketMQ学习笔记(8)----RocketMQ的Producer API简介
在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...
-
【Apache Pulsar】Apache Pulsar单机环境及Go语言开发环境搭建
0x01 简介 Apache Pulsar是一个开源的分布式发布-订阅消息系统,与Kafka类似,但比后者更加强大.Pulsar最初由Yahoo开发并维护,目前已经成为Apache软件组织的一个孵化子 ...
-
Pulsar部署和实践(一)
前言 本地Docker部署Pulsar消息代理实现消息发布和消息订阅 介绍 相关概念,后面有时间再花时间整理下. 实践步骤 1.使用dokcer本地部署pulsar docker run -it \ ...
-
Apache Pulsar简介
Apache Pulsar What is Pulsar "Pulsar is a distributed pub-sub messaging platform with a very fl ...
-
kafka producer 发送消息简介
kafka 的 topic 由 partition 组成,producer 会根据 key,选择一个 partition 发送消息,而 partition 有多个副本,副本有 leader 和 fol ...
-
分布式消息队列Apache Pulsar
Pulsar简介 Apache Pulsar是一个企业级的分布式消息系统,最初由Yahoo开发并在2016年开源,目前正在Apache基金会下孵化.Plusar已经在Yahoo的生产环境使用了三年多, ...
-
最佳实践:Pulsar 为批流处理提供融合存储
非常荣幸有机会和大家分享一下 Apache Pulsar 怎样为批流处理提供融合的存储.希望今天的分享对做大数据处理的同学能有帮助和启发. 这次分享,主要分为四个部分: 介绍与其他消息系统相比, Ap ...
-
Apache Pulsar 在能源互联网领域的落地实践
关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会*项目,是下一代云原生分布式消息流平台,集消息.存储.轻量化函数式计算为一体,采用计算与存储分离架构设计,支 ...
-
Kafka简介
Kafka简介 转载请注明出处:http://www.cnblogs.com/BYRans/ Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成 ...
随机推荐
-
Linux crontab 命令格式与详细例子
基本格式 :* * * * * command分 时 日 月 周 命令 第1列表示分钟1-59 每分钟用*或者 */1表示第2列表示小时1-23(0表示0点)第3列表示日期1-31第4列表示月份1-1 ...
-
汇编语言中有一种移位指令叫做循环左移(ROL),现在有个简单的任务,就是用字符串模拟这个指令的运算结果。对于一个给定的字符序列S,请你把其循环左移K位后的序列输出。例如,字符序列S=”abcXYZdef”,要求输出循环左移3位后的结果,即“XYZdefabc”。是不是很简单?OK,搞定它!
// test20.cpp : 定义控制台应用程序的入口点. // #include "stdafx.h" #include<iostream> #include< ...
-
基于Http替补新闻WebService数据交换
该系统的工作之间的相互作用.随着信息化建设的发展,而业界SOA了解并带来低TOC(总拥有成本)其他优势.越来越多的高层次的信息使用者关注. 这里暂且不提SOA这种架构规划.在系统间集成协议简单的讨论. ...
-
Sublime Text 2
常用功能: 安装Package Control:https://sublime.wbond.net/ 多行选择.多行编辑鼠标选中多行,按下 Ctrl+Shift+L (Command+Shift+L) ...
-
在ZendStudio中增加新的php模板
步骤: 找到目录:D:\Program Files\Zend\Zend Studio 12.5.1\plugins , 这要根据自己的安装情况来找,再找到以下文件org.eclipse.php.ui_ ...
-
我为什么要花大力气从头研发智表ZCELL(一个仿EXCEL的前端插件)
为什么呢,一个前端用的,类似EXCEL的操作的JS 插件,从头研发真的有必要吗?可能你会觉得没有必要吧,其实我自己也问过自己好多遍.因为业界有更加强大的spreadjs,也有比较轻型的JEXCEL,自 ...
-
Bootstrap -- 插件: 模态框、滚动监听、标签页
Bootstrap -- 插件: 模态框.滚动监听.标签页 1. 模态框(Modal): 覆盖在父窗体上的子窗体. 使用模态框: <!DOCTYPE html> <html> ...
-
C++回顾day03---<;多态>;
一:错误理解下的多态 #include <iostream> using namespace std; class Parent { public: Parent() { cout < ...
-
姿势摆好,一招学会android的布局优化!
作为android应用来讲,无论应用本身多么美观,功能多么强大,内容多么丰富.但如果App本身打开界面缓慢超过手机16ms刷新一次页面的时间,就会产生卡顿.用户体验都会变得极差,导致用户量减少.所以我 ...
-
开源通用爬虫框架YayCrawler-页面的抽取规则定义
本节我将向大家介绍一下YayCrawler的核心-页面的抽取规则定义,这也是YayCrawler能够做到通用的主要原因之一.如果我要爬去不同的网站的数据,尽管他们的网站采用的开发技术不同.页面的结构不 ...