如何在Storm中停止元组处理并执行其他代码

时间:2022-06-08 07:37:20

I'm a newbie of Storm. I'm using it for an University project.

我是Storm的新手。我正在将它用于大学项目。

I created my topology, with a Spout linked to a MySql database, and two Bolts. The first bolt, linked to the spout, prepares and removes information not necessary of the tuples; the second, does a filtering of the tuples.

我创建了我的拓扑,Spout链接到MySql数据库,还有两个螺栓。连接到喷口的第一个螺栓准备并移除元组不必要的信息;第二,对元组进行过滤。

I'm working in local mode.

我在本地模式工作。

My question is: why after running topology, in my console I see output like the lines below?

我的问题是:为什么在运行拓扑后,在我的控制台中我看到输出如下面的行?

38211 [Thread-14-movie-SPOUT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
67846 [Thread-10-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67846 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67852 [Thread-10-__acker] INFO  backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@3c270095> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]]
67853 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@38c3d111> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]]
67854 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67855 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6d5c75a9> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

我读到最后一个元组处理后的这些行被认为是正常的。不是吗?

And how can I run other code after the submission of topology? For example, I want to print the results of my filtering done in the second bolt, saved in a HashMap. If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

如何在提交拓扑后运行其他代码?例如,我想在第二个螺栓中打印我的过滤结果,保存在HashMap中。如果我将代码放在包含submitTopology()方法的行之后,代码将在元组完成之前运行。

The second and last question is: why in every example of Storm, I see in the Spout

第二个也是最后一个问题是:为什么在Storm的每一个例子中,我都会在Spout中看到

"Thread.sleep(1000)"?

“了Thread.sleep(1000)”?

Maybe it's linked to my first question.

也许这与我的第一个问题有关。

I hope my questions are clear. Thank you in advance!

我希望我的问题很清楚。先谢谢你!

1 个解决方案

#1


0  

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

我读到最后一个元组处理后的这些行被认为是正常的。不是吗?

Those are just INFO messages. So no need to worry about them.

这些只是INFO消息。所以不用担心它们。

If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

如果我将代码放在包含submitTopology()方法的行之后,代码将在元组完成之前运行。

If you submit your topology, the topology gets executed in the background (ie, multi-threaded). This is required, as your topology runs "forever" (until you stop it explicitly -- or your Java application terminates, as you are running local mode).

如果提交拓扑,拓扑将在后台执行(即多线程)。这是必需的,因为您的拓扑“永远”运行(直到您显式停止它 - 或者您的Java应用程序终止,因为您正在运行本地模式)。

Running code "after your topology finished" does not align with Storm concepts, as Strom is a streaming system and there is "no end in processing" (input stream in infinite, thus processing runs forever). If you want to process a finite data set, you might want to consider a batch processing framework like Flink or Spark.

在拓扑完成后运行代码与Storm概念不一致,因为Strom是一个流媒体系统,并且“处理无止境”(无限输入流,因此处理永远运行)。如果要处理有限数据集,可能需要考虑批处理框架,如Flink或Spark。

Thus, if you want to make this work in Storm, you need to be able to determine when all data got processed. Thus, after topology submission, you block and wait explicitly after all data got processed.

因此,如果您想在Storm中使这项工作,您需要能够确定何时处理所有数据。因此,在拓扑提交之后,您将在处理完所有数据后显式阻塞并等待。

However, for your use case, why do you not just print your result from within the last bolt?

但是,对于您的用例,为什么不只是在最后一个螺栓内打印结果?

About Thread.sleep() I am not sure what example you do refer to. No idea why anybody should put it in for production. Maybe it's there for demoing purpose to slow down processing artificially.

关于Thread.sleep()我不确定你引用的是什么示例。不知道为什么有人应该把它投入生产。也许这是为了演示目的,人为地减慢处理速度。

#1


0  

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

我读到最后一个元组处理后的这些行被认为是正常的。不是吗?

Those are just INFO messages. So no need to worry about them.

这些只是INFO消息。所以不用担心它们。

If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

如果我将代码放在包含submitTopology()方法的行之后,代码将在元组完成之前运行。

If you submit your topology, the topology gets executed in the background (ie, multi-threaded). This is required, as your topology runs "forever" (until you stop it explicitly -- or your Java application terminates, as you are running local mode).

如果提交拓扑,拓扑将在后台执行(即多线程)。这是必需的,因为您的拓扑“永远”运行(直到您显式停止它 - 或者您的Java应用程序终止,因为您正在运行本地模式)。

Running code "after your topology finished" does not align with Storm concepts, as Strom is a streaming system and there is "no end in processing" (input stream in infinite, thus processing runs forever). If you want to process a finite data set, you might want to consider a batch processing framework like Flink or Spark.

在拓扑完成后运行代码与Storm概念不一致,因为Strom是一个流媒体系统,并且“处理无止境”(无限输入流,因此处理永远运行)。如果要处理有限数据集,可能需要考虑批处理框架,如Flink或Spark。

Thus, if you want to make this work in Storm, you need to be able to determine when all data got processed. Thus, after topology submission, you block and wait explicitly after all data got processed.

因此,如果您想在Storm中使这项工作,您需要能够确定何时处理所有数据。因此,在拓扑提交之后,您将在处理完所有数据后显式阻塞并等待。

However, for your use case, why do you not just print your result from within the last bolt?

但是,对于您的用例,为什么不只是在最后一个螺栓内打印结果?

About Thread.sleep() I am not sure what example you do refer to. No idea why anybody should put it in for production. Maybe it's there for demoing purpose to slow down processing artificially.

关于Thread.sleep()我不确定你引用的是什么示例。不知道为什么有人应该把它投入生产。也许这是为了演示目的,人为地减慢处理速度。