storm的并发

时间:2024-06-29 22:34:33

1 storm并行的基本概念

storm的并发
  • storm集群中的一个机器可以运行一个或者多个worker,对应于一个或者多个topologies.
  • 1个worker进程运行1个或多个excutor线程。每个worker从属于一个topology.
  • executor是单线程。每1个executor运行着相同的组件(spout或bolt)的1个或多个task。
  • 1个task执行着实际的数据处理。
一个实际的例子:
What Description Configuration option How to set in your code (examples)
#worker processes How many worker processes to createfor the topologyacross machines in the cluster. Config#TOPOLOGY_WORKERS Config#setNumWorkers
#executors (threads) How many executors to spawnper component. ? TopologyBuilder#setSpout() and TopologyBuilder#setBolt()





Note that as of Storm 0.8 theparallelism_hint parameter now specifies the initial number of
executors (not tasks!) for that bolt.
#tasks How many tasks to create per component. Config#TOPOLOGY_TASKS ComponentConfigurationDeclarer#setNumTasks()

Here is an example code snippet to show these settings in practice:

Configuring the parallelism of a Storm bolt
1
2
3
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");

In the above code we configured Storm to run the bolt GreenBolt with
an initial number of two executors and four associated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number of tasks, Storm will run by default one task per executor.

详细的参考:http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

2  是否需要提高workers数目

(1)  最好 一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输

How many Workers should I use?

The total number of workers is set by the supervisors – there’s some number of JVM slots each supervisor will superintend. The thing you set on the topology is how many worker slots it will try to claim.

There’s no great reason to use more than one worker per topology per machine.

With one topology running on three 8-core nodes, and parallelism hint 24, each bolt gets 8 executors per machine, i.e. one for each core. There are three big benefits to running three workers (with 8 assigned executors each) compare to running say 24 workers
(one assigned executor each).

First, data that is repartitioned (shuffles or group-bys) to executors in the same worker will not have to hit the transfer buffer. Instead, tuples are deposited directly from send to receive buffer. That’s a big win. By contrast, if the destination executor
were on the same machine in a different worker, it would have to go send -> worker transfer -> local socket -> worker recv -> exec recv buffer. It doesn’t hit the network card, but it’s not as big a win as when executors are in the same worker.

Second, you’re typically better off with three aggregators having very large backing cache than having twenty-four aggregators having small backing caches. This reduces the effect of skew, and improves LRU efficiency.

Lastly, fewer workers reduces control flow chatter.

(2)

Increasing the number of workers (responsible for running one or more executors for one or more components) might also gives
you a performance benefit, but this also relative as I found from this discussion
wherenathanmarz says

Having more workers might have better performance, depending on where your bottleneck is. Each worker has a single thread
that passes tuples on to the 0mq connections for transfer to other workers, so if you're bottlenecked on CPU and each worker is dealing with lots of tuples, more workers will probably net you better throughput.

So basically there is no definite answer to this, you should try different configuration based on your environment and design.

3 executor的数目

executor是真正的并行度(事实上的并行度)。(task数目是想要设置的并行度)

executor初始数目=spout数目+bolt数目+acker数目 (这些加起来也就是task数目。)

spout数目,bolt数目,acker数目运行时是不会变化的,但是executor数目可以变化。

4   是否需要提高TASK数目

TASK的存在只是为了topology扩展的灵活性,与并行度无关。

Disclaimer: I wrote the article you referenced in your question above.

However I'm a bit confused by the concept of "task". Is a task an running instance of the component(spout or bolt) ? A executor having multiple tasks actually is saying the same component is executed for multiple times by the executor, am I correct ?

Yes, and yes.

Moreover in a general parallelism sense, Storm will spawn a dedicated thread(executor) for a spout or bolt, but what is contributed to the parallelism by an executor(thread) having multiple tasks ?

Running more than one task per executor does not increase the level of parallelism -- an executor always has one thread that it uses for all of its tasks, which means that tasks run serially on an executor.

As I wrote in the article please note that:

  • The number of executor threads can be changed after the topology has been started (seestorm rebalance command).
  • The number of tasks of a topology is static.

And by definition there is the invariant of #executors <= #tasks.

So one reason for having 2+ tasks per executor thread is to give you the flexibility to expand/scale up the topology through thestorm rebalance command in the future without taking the topology offline. For instance, imagine you start out with
a Storm cluster of 15 machines but already know that next week another 10 boxes will be added. Here you could opt for running the topology at the anticipated parallelism level of 25 machines already on the 15 initial boxes (which is of course slower than 25
boxes). Once the additional 10 boxes are integrated you can thenstorm rebalance the topology to make full use of all 25 boxes without any downtime.

Another reason to run 2+ tasks per executor is for (primarily functional) testing. For instance, if your dev machine or CI server is only powerful enough to run, say, 2 executors alongside all the other stuff running on the machine, you can still run 30
tasks (here: 15 per executor) to see whether code such as your custom Storm grouping is working as expected.

In practice we normally we run 1 task per executor.

PS: Note that Storm will actually spawn a few more threads behind the scenes. For instance, each executor has its own "send thread" that is responsible for handling outgoing tuples. There are also "system-level" background threads for e.g. acking tuples that run alongside "your" threads. IIRC
the Storm UI counts those acking threads in addition to "your" threads.

转自:http://*.com/questions/17257448/what-is-the-task-in-twitter-storm-parallelism


一个运行中的拓扑是由什么组成的:worker进程,executors和tasks。

Storm是按照下面3种主要的部分来区分Storm集群中一个实际运行的拓扑的:

  1. Worker进程
  2. Executors (线程)
  3. Tasks

下图简单的展示了它们的关系:

storm的并发

上图中的3段话依次如下:

  • Storm集群中的其中1台机器可能运行着属于多个拓扑(可能为1个)的多个worker进程(可能为1个)。每个worker进程运行着特定的某个拓扑的executors。
  • 1个或多个excutor可能运行于1个单独的worker进程,每1个executor从属于1个被worker process生成的线程中。每1个executor运行着相同的组件(spout或bolt)的1个或多个task。
  • 1个task执行着实际的数据处理。

1个worker进程执行一个拓扑的子集。1个worker进程从属于1个特定的拓扑,并运行着这个拓扑的1个或多个组件(spout或bolt)的1个或多个executor。一个运行中的拓扑包括集群中的许多台机器上的许多个这样的进程。

1个executor是1个worker进程生成的1个线程。它可能运行着1个相同的组件(spout或bolt)的1个或多个task。

1 个task执行着实际的数据处理,你用代码实现的每一个spout或bolt就相当于分布于整个集群中的许多个task。在1个拓扑的生命周期中,1个组 件的task的数量总是一样的,但是1个组件的executor(线程)的数量可以随着时间而改变。这意味着下面的条件总是成立:thread的数量 <= task的数量。默认情况下,task的数量与executor的数量一样,例如,Storm会在每1个线程运行1个task。


配置拓扑的并发度

注意Storm的术语"并发度(parallelism)"是特别用来描述所谓的parallelism hint的,这代表1个组件的初始的executor(线程)的数量。在此文档中我们使用术语"并发度"的一般意义来描述你不但可以配置executor的数量,还可以配置worker进程的数量,还可以是1个拓扑的task的数量。在用到并发度的狭义的定义时我们会特别提出。

下面的小节给出了一些不同的配置选项,还有你如何在代码中设置它们。有多种方法可以进行设置,表格列举了其中几种。Storm目前有下列的配置优先级:defaults.yaml < storm.yaml < 特定拓扑的配置 < 内部特定组件的配置 < 外部特定组件的配置。

worker进程的数量

  • 描述: 1个拓扑有多少个分布于集群中的机器的worker进程
  • 配置选项: TOPOLOGY_WORKERS
  • 如何在代码中设置 (例子):
    • Config#setNumWorkers

executor的数量 (线程)

  • 描述: 每个组件会有多少个executor
  • 配置选项: ?
  • 如何在代码中设置 (例子):
    • TopologyBuilder#setSpout()
    • TopologyBuilder#setBolt()
    • 注意在Storm 0.8以后parallelism_hint参数指定的是bolt的初始的executor的数量。

task的数量

  • 描述: 每个组件有多少task
  • 配置选项: TOPOLOGY_TASKS
  • 如何在代码中设置 (例子):
    • ComponentConfigurationDeclarer#setNumTasks()

这是一个简单的代码例子,展示了在实践中如何设置:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)

.setNumTasks(4).shuffleGrouping("blue-spout);

在上面的代码中我们配置了Storm运行GreenBolt指定了初始有2个executor和4个关连的task。Storm会在每个 executor(线程)2个task。如果你不想显式的配置task的数量,Storm会默认的为每个executor运行1个task。


一个运行中的拓扑的例子

下 面的图表展示了1个简单拓扑在实际操作中看起来是怎样的。这个拓扑包含了3个组件:1个spout叫做BlueSpout,2个bolt分别叫 GreenBolt和YellowBolt。BlueSpout发送它的输出到GreenBolt,GreenBolt又把它的输出发到 YellowBolt。

storm的并发

下面是对上图的简要分析:

3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每个worker产生10 / 2 = 5条线程。

绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。

下面的代码配置了这3个组件,相关代码如下:

Config conf = new Config();

conf.setNumWorkers(2); // 使用2个worker进程

topologyBuilder.setSpout(“blue-spout”, new BlueSpout(), 2); // parallelism hint为2

topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout”);

topologyBuilder.setBolt(“yellow-bolt”, new YellowBolt(), 6) .shuffleGrouping(“green-bolt”);

StormSubmitter.submitTopology( “mytopology”, conf, topologyBuilder.createTopology() );

And of course Storm comes with additional configuration settings to control the parallelism of a topology, including:

此外还有其他的配置来控制拓扑的并发度,包括了:

  • TOPOLOGY_MAX_TASK_PARALLELISM: 这个设置指定了1个单独的组件的executor的数量的上限。当在测试阶段使用本地模式运行1个拓扑时,用来限制生成的线程的数量。你可以像下面这样来使用:
  • Config#setMaxTaskParallelism().

如何改变1个正在运行的拓扑的并发度

Storm有一个不错的特性,你可以在不需要重启集群或拓扑,来增加或减少worker进程和executor的数量。这样行为成为rebalancing。

你有两个选项可以rebalance1个拓扑:

  1. 使用Storm的web UI来rebalance。
  2. 像下面描述的那样,使用命令行工具来做:

# 重新配置拓扑 “mytopology” 使用5个worker进程。

# spout “blue-spout” 使用3个executor

# bolt “yellow-bolt” 使用10个executor

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

转自:http://www.imooo.com/zonghe/open-source/1270481.htm

参考文献:

(1)此人博客有大量storm使用心得。http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

本篇文章综合网上 by crazyhacking