I want to configure my spout to emit tick tuples on 2 different frequencies on different streams. My questions are as follows:
我想配置我的spout在不同的流上发出2个不同频率的tick元组。我的问题如下:
-
I understand how this is done using the bolt. But, on a spout, will the tick tuple invoke the next Tuple method on every tick?
我知道如何使用螺栓完成这项工作。但是,在一个spout上,tick元组会在每个tick上调用下一个Tuple方法吗?
-
How can I determine the frequency at which the tick was invoked? Meaning, the actual value of the time I configured in the config object?
如何确定调用tick的频率?含义,我在配置对象中配置的时间的实际值?
2 个解决方案
#1
Only bolts can receive tick tuples. Spouts can only emit tuples.
只有螺栓可以接收刻度元组。 Spouts只能发出元组。
I'm assuming you're trying to do a "read" every so often from within your spout in order to emit a new tuple.
我假设你试图在你的鲸鱼喷水中经常“读”,以便发出一个新的元组。
For example, to sleep 50 milliseconds between reads:
例如,在读取之间睡50毫秒:
@Override
public void nextTuple() {
try {
String message = _mqClient.getMessage();
if (message != null) {
_collector.emit(new Values(message));
}
// sleep for 50 milliseconds
Utils.sleep(50);
} catch (Exception e) {
_collector.reportError(e);
LOG.error("MQ spout error {}", e);
}
}
#2
Maybe this can help you:
也许这可以帮助你:
https://github.com/ptgoetz/storm-signals
Storm-Signals aims to provide a way to send messages ("signals") to components (spouts/bolts) in a storm topology that are otherwise not addressable.
Storm-Signals旨在提供一种向风暴拓扑中的组件(spouts / bolt)发送消息(“信号”)的方法,否则这些消息不可寻址。
Storm topologies can be considered static in that modifications to a topology's behavior require redeployment. Storm-Signals provides a simple way to modify a topology's behavior at runtime, without redeployment.
风暴拓扑可以被认为是静态的,因为对拓扑行为的修改需要重新部署。 Storm-Signals提供了一种在运行时修改拓扑行为的简单方法,无需重新部署。
#1
Only bolts can receive tick tuples. Spouts can only emit tuples.
只有螺栓可以接收刻度元组。 Spouts只能发出元组。
I'm assuming you're trying to do a "read" every so often from within your spout in order to emit a new tuple.
我假设你试图在你的鲸鱼喷水中经常“读”,以便发出一个新的元组。
For example, to sleep 50 milliseconds between reads:
例如,在读取之间睡50毫秒:
@Override
public void nextTuple() {
try {
String message = _mqClient.getMessage();
if (message != null) {
_collector.emit(new Values(message));
}
// sleep for 50 milliseconds
Utils.sleep(50);
} catch (Exception e) {
_collector.reportError(e);
LOG.error("MQ spout error {}", e);
}
}
#2
Maybe this can help you:
也许这可以帮助你:
https://github.com/ptgoetz/storm-signals
Storm-Signals aims to provide a way to send messages ("signals") to components (spouts/bolts) in a storm topology that are otherwise not addressable.
Storm-Signals旨在提供一种向风暴拓扑中的组件(spouts / bolt)发送消息(“信号”)的方法,否则这些消息不可寻址。
Storm topologies can be considered static in that modifications to a topology's behavior require redeployment. Storm-Signals provides a simple way to modify a topology's behavior at runtime, without redeployment.
风暴拓扑可以被认为是静态的,因为对拓扑行为的修改需要重新部署。 Storm-Signals提供了一种在运行时修改拓扑行为的简单方法,无需重新部署。