大白话strom——问题收集(持续更新ing)

时间:2023-03-08 17:38:26
本文导读:

1、基于storm的应用
2、storm的单点故障解决
3、strom与算法的结合学习
4、杂记——常见问题的解答
5、http://www.blogchong.com/catalog.asp?tags=问题整理(storm)

Storm存在的一些问题:(V 0.7.4之前)

1、编程门槛对普通用户较高
2、框架无持久化存储
3、框架不提供消息接入模块 —— kafka
4、storm ui 功能简单
5、跨topology的boit复用
6、nimbus单点故障
7、topology不支持动态部署

storm业务需求:

条件过滤
这是storm最基本的处理方式,对符合条件的数据进行实时过滤,将符合条件的数据保存下来,这种实时查询的业务需求在实际应用中很常见。
中间件计算
我们需要改变数据中某一个字段(例如是数值),我们需要利用一个中间值经过计算(值比较、求和、求平均等)后改变该值,然后将数据重新输出。
求TopN
相信大家对TopN类的业务需求也是比较熟悉的,在规定的时间窗口内,统计数据出现的TopN,该类处理在购物及电商业务需求中,比较常见。
分布式RPC
storm有对RPC进行专门的设计,分布式RPC用于对storm上大量的函数调用进行并行计算,最后将结果返回给客户端。
推荐系统
在实时处理时从mysql及hadoop中获取数据库中的信息,例如在电影推荐系统中,传入数据为用户当前点播电影信息,从后数据库中获取的是该用户之前的一些点播电影信息统计。
批处理
所谓批处理就是数据积攒到一定触发条件 ,就批量输出,所谓的触发条件类似事件窗口到了,统计数量够了及检测到某种数据传入等等。
热度统计
热度统计实现依赖于TimeCacheMap数据结构,该结构能够在内存中保持近期活跃的对象。我们可以使用它来实现例如论坛中的热帖排行统计等。

1、基于storm的应用系统:

(1)基于storm的网络爬虫系统的设计与实现:

大体框架:

大白话strom——问题收集(持续更新ing)

  众所周知,爬虫系统里几个必不可少的模块,像下载、解析、回写待爬资源、存储等,本质上他们像是一个责任链,但后一个module又基于前一个module,所以可以理解为一种流处理模型,从我们拿到待爬URL一直到处理完毕存储数据,这是一个完整的过程。如您看到的这张图,如果我们实现了storm化,那么基于storm强大的功能,我们的爬虫可以完美运行在storm集群上,并且每类处理器我们都可以非常灵活的分配其线程数,耗时的处理我们多开几个线程,可以实现资源合理利用,当然既然是集群,你的某个任务具体运行在哪里,storm已经帮您分配好了,并且帮我们实现了节点失效等处理。

  最后如果bolt间传输的消息量比较大,有可能网络是个瓶颈。

其他应用:——基于storm的实时交通状况的设计与分析

  1. 基于Storm实时路况分析和实时路径推荐系统
  2. 用storm来监测车辆速度是否超过80 km/h
  3. 公共交通出行服务大数据平台设计方案
  4. 基于Storm云平台的地图道路匹配算法研究

............


2、storm单点故障问题:

(1)采用master组解决单点故障

思路:

zookeeper解决方案还是相当复杂的,最近想到使用master组来解决这一问题,一个系统只有一个master组,它由若干个节点组成,好处:
1、结构简单,容易理解和实现
2、master组中的一个节点不管理集群内所有元数据,而只分担其中一部分,这样系统的扩展能力更强,不会受限于元数据
3、master组中的节点互备,可以解决单点故障问题
4、为性能而优化,专注于解决小文件性能问题,目标是达到实时检索

讨论:P2P解决方案

问:master组的实现,hadoop的热备份等都比较困难?

答:master数据备份可以直接采用多写方式,只有一个异常的master恢复时,才需要同步。

问:zookeeper是仅仅适用于hbase这种master slave模式还是也适应于multi master?

答:ZooKeeper使用没有限制的,比如它的分布式锁能力和配置能力,有广泛的应用。关于Master组腾讯的XFS就采用了,不过仍然有个单点的顶层Master,采用了ZooKeeper做主备切换。顶层的单点Master,通常有两个备,在数据未同步时,叫Newbie(学习者),只有数据一致后才叫Standby,数据的一致是通过两阶段来保证的。

----讨论-----后续见:storm源码之一个class解决nimbus单点问题【转】

--单点问题可以参考hdfs的实现~

--hdfs的namenode也是单节点,secondnode也只是为合并操作日志以缩短namenode的启动时间而设。目前hadoop的namenode单点问题也是下一代hadoop要解决的重要问题。

--那是2.0之前的版本,现在两个namenode之间采用nfs共享元数据,通过zookeeper选举一个作为当前active,另一个作为standby,当active挂了之后,能够在短时间内切换到standby节点,实现高可用~

--nimbus节点利用nfs共享存储也是一种解决方案。可以通过将Froastman实现的storage.clj作如下修改:

(^boolean isSupportDistributed [this]
true))))

  Nathanmarz因此在0.8.2版本的基础上,新开了storm-0.8.2-ha分支,专门用来解决nimbus单点问题,并将Frostman已完成的nimbus-storage代码合并到该分支。

  Rostman在nimbus-storage基础上继续增加了nimbus多节点选举机制,(目前尚未被Nathanmarz合并入storm-ha分支)。

  nimbus单点问题的解决思路

  1、Frostman的工作已为彻底解决nimbus单点问题奠定了重要基础:

    • nimbus ip地址动态获取
    • topology代码存储方案可定制
    • nimbus多节点选举,宕机自动切换
    • nimbus leader状态ui展示

  在Frostman工作的基础上继续深入,将极大减少工作量。
  2、Frostman并未解决topology代码如何在多个nimbus节点或集群所有节点间共享的问题。Nathamarz的理想规划是:实现storm集群中所有nimbus、supervisor机器之间通过P2P协议共享topology代码,但目前限于BitTorrent未完成的工作,目前暂停了nimbus-ha分支的开发。
  3、最终选定的解决方案:实现定制的nimbus-storage插件NimbusCloudStorage,使得所有nimbus节点在启动后均从leader 轮询下载本地不存在的topology代码。依次满足supervisor在nimbus节点切换后下载代码的需求。


3、Storm与算法的结合: 

  ...........

  基于storm的聚类算法的分析与实现,基于storm的关联规则的分析与实现,storm平台一致性哈希算法的分析与研究等等...


4、杂记:

    ——storm常见问题解答

转载于http://blog.****.net/hguisu/article/details/8454368#t3 (20121231)

一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算?

  你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout可以参考官方的kestrel spout实现:
https://github.com/nathanmarz/storm-kestrel

  如果你的数据源不支持事务性消费,那么就无法得到storm提供的可靠处理的保证,也没必要实现ISpout接口中的ack和fail方法。

二、Storm为了保证tuple的可靠处理,需要保存tuple信息,这会不会导致内存OOM

  Storm为了保证tuple的可靠处理,acker会保存该节点创建的tuple id的xor值,这称为ack value,那么每ack一次,就将tuple id和ack value做异或(xor)。当所有产生的tuple都被ack的时候, ack value一定为0。这是个很简单的策略,对于每一个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

三、Storm计算后的结果保存在哪里?可以保存在外部存储吗?

  Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,你可以简单地保存在内存里,也可以每次都更新数据库,也可以采用NoSQL存储。storm并没有像s4那样提供一个Persist API,根据时间或者容量来做存储输出。这部分事情完全交给用户。

  数据存储之后的展现,也是你需要自己处理的,storm UI只提供对topology的监控和统计

四、Storm怎么处理重复的tuple

  因为Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并重新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
  (1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后续的批处理计算会更正实时计算的误差。
  (2)使用第三方集中存储来过滤,比如利用mysql,memcached或者redis根据逻辑主键来去重。
  (3)使用bloom filter做过滤,简单高效。

五、Storm的动态增删节点

  我在storm和s4里比较里谈到的动态增删节点,是指storm可以动态地添加和减少supervisor节点。对于减少节点来说,被移除的supervisor上的worker会被nimbus重新负载均衡到其他supervisor节点上。在storm 0.6.1以前的版本,增加supervisor节点不会影响现有的topology,也就是现有的topology不会重新负载均衡到新的节点上,在扩展集群的时候很不方便,需要重新提交topology。因此我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz创建了一个issue 54并在0.6.1提供了rebalance命令来让正在运行的topology重新负载均衡,具体见:
https://github.com/nathanmarz/storm/issues/54
  和0.6.1的变更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

  storm并不提供机制来动态调整worker和task数目。

六、Storm UI里spout统计的complete latency的具体含义是什么?为什么emit的数目会是acked的两倍

  这个事实上是storm邮件列表里的一个问题。Storm作者marz的解答:

  The complete latency is the time from the spout emitting a tuple to that
tuple being acked on the spout
. So it tracks the time for the whole tuple
tree to be processed.

  If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream. This is the spout
communicating with the ackers which take care of tracking the tuple trees. 

  简单地说,complete latency表示了tuple从emit到被acked经过的时间,可以认为是tuple以及该tuple的后续子孙(形成一棵树)整个处理时间。其次spout的emit和transfered还统计了spout和acker之间内部的通信信息,比如对于可靠处理的spout来说,会在emit的时候同时发送一个_ack_init给acker,记录tuple id到task id的映射,以便ack的时候能找到正确的acker task。

七、strom不能实现不同topology之间stream的共享

  Storm中Stream的概念是Topology内唯一的,只能在Topology内按照“发布-订阅”方式在不同的计算组件(Spout和Bolt)之间进行数据的流动,而Stream在Topology之间是无法流动的。

  对于不同topology前半部分共用Spouts和Bolts不能直接复用,解放方案有以下三种:

  (1)kill原topology,共用前半部分的Spouts和Bolts,在分支Bolt处分别处理,重新打包形成新的topology并提交;

  (2)从相同的外部数据源单独读取数据,设计与前一个topology相同的新topology处理后面的新任务;

  (3)通过Kafka消息中间件实现不同topology的Spout共享数据源,重新部署运行新的topology即可。

参考链接Storm数据流模型的分析及讨论


参考链接:

大白话storm——几点问题