01_Storm体系概要

时间:2023-12-11 10:58:08

1. Storm发展历史

Storm历史

1. 2010年12月,backtype公司Nathan,提出Storm的核心概念
2. backtype, 提供数据分析,数据处理服务的一个公司
3. 2011年4月,backtype完成storm开发,正式问世
4. 2011年5月,backtype被twitter收购,Storm开始开源
5. 2013年,Nathan加入Apache, Storm成为Apache开源项目
6. 2014年,Storm成为顶尖开源项目

Storm版本

01_Storm体系概要

2. Storm术语

集群架构

01_Storm体系概要

zookeeper的作用

1) nimbus和supervisor间的通信
2)监控storm集群各个物理节点的状态(online,offline)
Nimbus通过zk监控各个supervisor节点的状态(子节点列表事件)
supervisor失效,Nimbus收到Event通知,重新分发该Supervisor上的任务给其他Superviosr

注意:只有1个nimbus, 失效后不影响已经运行的topology,但新的topology无法提交(Nimbus不支持高可用)

1. nimbus
Storm集群的主节点,负责资源分配和任务调度,类似Hadoop中的JobTracker
只有1个nimbus(只有1台机器运行nimbus进程)

2. supervisor
Storm集群的从节点,多个supervisor(多台机器,每台上运行supervisor进程)
接收nimbus分配的任务,管理(起/停/监控)本节点上的多个worker进程

3. worker
Supervisor上的工作进程,1个supervisor上可以有多个worker进程
1个worker --- 1个JVM(worker内部会启动1个或多个executor线程默认1个executor只执行1个task, 通过Conf可以针对每个spout,bolt在代码中设置executor个数和task个数,可以通过命令行调整worker进程数,spout/bolt各自的executor线程数

4. task
worker下的工作线程,0.8版本后是逻辑线程,不再是物理线程
多个task共用1个物理线程(executor)

01_Storm体系概要

编程模型

5. Tolopogy

01_Storm体系概要

计算拓扑,由Spout,Bolt组成的图状结构
1个topology = 1个mapreduce Job

Topology实例分析

1)通过专用的数据接入API,将用户行为log输入给Spout
2) Bolt A, 进行数据去噪处理(丢弃格式不规范的数据,清理无效数据)并完成数据格式统一
3)Bolt B, 将规范后的数据写入磁盘进行存储(HDFS)
4)Bolt C, 可以筛选出VIP用户的数据
5)Bolt D, 将VIP行为日志存储到数据库(mysql-VIP行为日志)
6)Bolt F, 将VIP行为日志写入消息队列或缓存(redis)等,提供给其他系统使用

Topology的提交和运行
# storm jar code.jar MyTopology arg1 arg2
1) storm jar将连接nimbus, 上传code.jar到storm集群
2) MyTopology是要运行的主类(包含main方法,main中定义了topology)
3) Nimbus本质上是1个thrift服务,可以提交任何语言创建的topology

6. Spout
Storm编程模型中的消息源,提供消息的可靠传输(ack/fail机制)及 不可靠传输
从外部的消息队列(kafka), 数据库(HBase), API调用,分布式文件系统(HDFS)接入消息
主要开发接口是数据接入和nextTuple方法

7. Bolt
Storm编程模型中的消息处理组件,execute方法定义了实际的数据处理逻辑
execute中可以进行:输入tuple的过滤,合并等处理,对处理过的tuple进行ack或者set fail

Storm开发总结:
1)Spout的数据输入
2)Blot的处理逻辑(数据去噪,写入HDFS,写入数据库MySQL,写入消息队列或缓存)
3)Spout-Bolt-Bolt间的数据流向
4)Strom本身不存储数据,要自己设计数据落地

数据流

8.Stream
Topology中的消息流,节点间的边,由1个个tuple构成
每一条steam都会有1个id
通过id来检查消息是否被完整处理

9. tuple
消息的基本单位,消息封装为多个tuple, 多个tuple组成stream, 每一个tuple拥有tuple id

10. Stream Groupings 数据流分组策略

01_Storm体系概要

决定spout, bolt, bolt间的数据流动: 是否建立“边”,建立哪些边

1)shuffle Grouping: 随机选择1个bolt,向其发送数据;但storm提供了负载均衡,保证了bolt接收到的tuple数目基本相同
2)Fields Grouping: 按字段分组,相同数据内容构成的tuple都发往同一个bolt(单词统计)

3)All Grouping: 广播发送,每个tuple发送到所有bolt

4) Global Grouping:全局分组,所有tuple发送给1个bolt(task_id最小的)
5)Non Grouping:不分组,和Shuffle Grouping类似,发布者和订阅者是同一个bolt,spout
6) Direct Grouping:直接分组,使用特殊的发送方法
7)Custom Grouping:自定义分组,需要自己实现

3. 扩展、可靠、容错

3.1 高扩展性

1)Supervisor节点的水平扩展,千级节点
2)工作进程worker的扩展,1个从节点上可以运行多个worker进程
3)每个worker进程,可以创建多个executor线程
4)每个executor线程,可以执行多个task(轮询方式),task(spout,bolt)是真正的数据处理实体
task是逻辑线程,共享executor线程

3.2 高可靠性

消息,以消息树 + ack/fail机制保证每个消息都得到处理或者失败提示
1)每个spout和bolt,对处理的tuple发送ack或fail (发送给ack线程,ack将最终判定结果发送给spout)
2)多次都失败的消息,Spout将该消息丢弃
3)可靠消息处理机制,是storm区别于其他实时处理框架的特色

3.3 高容错

1)节点级别容错
nimbus通过zk集群监控supervisor状态,将失效的supervisor上的工作转移给其他supervisor

2)worker级别容错
supervisor会尝试重启出错的worker进程,并尝试一定次数
Storm在其他Supervisor上,启动新Worker进程,运行之前worker进程中的task
缺点:Nimbus没有高容错

3)数据容错(可靠消息处理如何实现?)

理想情况
每1个topology都包含1个acker组件(特殊的bolt, 默认1个,数量可设置)
针对每一个tuple,利用tuple id来维护1个tuple树(该tuple被发往了哪些bolt)
每个bolt处理完该tuple,反馈ack信息给acker, acker将该bolt从tuple树中删除
当tuple数被清空(所有bolt都成功处理),acker发送信息给spout, 说明该tuple被正常处理完成

现实情况:tuple树改良
1)每个tuple都要维护1个tuple数,内存占用巨大
2)实际实现:
* 使用固定大小的内存(20M),跟踪各个组件返回给acker的该tuple的ack value
* 各个组件(spout,bolt)返回的ack value = (输入tupleid) 异或(输出tupleid)
* spout反馈的ack value = 各个方向输出的tupleid的异或值
* 只接收tuple的bolt反馈的ack avalue = 各个方向输入的tupleid的异或值
* acker,将该tuple的所有ack value进行异或,最终结果为0则表示该tuple被正常处理,acker发送信息给spout,spout调用相应的ack方法,反馈tuple处理完成

开启tuple跟踪的前提:
1. 在spout emit tuple的时候,要加上第3个参数messageid
2. 在配置中acker数目至少为1
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路。