storm 实战及实例讲解(三)
——comaple.zhang
——2012-09-13
本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。
spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:
package com.jd.comaple.storm.test.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleSpout extends BaseRichSpout {
/**
* 用来发射数据的工具类
*/
private SpoutOutputCollector collector;
private static String[] info = new String[]{
"comaple\t,12424,44w46,654,12424,44w46,654,",
"lisi\t,435435,6537,12424,44w46,654,",
"lipeng\t,45735,6757,12424,44w46,654,",
"*\t,45735,6757,12424,44w46,654,",
"jiangmin\t,23545,6457,2455,7576,qr44453",
"beijing\t,435435,6537,12424,44w46,654,",
"xiaoming\t,46654,8579,w3675,85877,077998,",
"xiaozhang\t,9789,788,97978,656,345235,09889,",
"ceo\t,46654,8579,w3675,85877,077998,",
"cto\t,46654,8579,w3675,85877,077998,",
"zhansan\t,46654,8579,w3675,85877,077998,"};
Random rd = new Random();
/**
* 这里初始化collector
* @param conf
* @param context
* @param collector
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
* 该方法会被不停的调用
*/
@Override
public void nextTuple() {
try {
String msg = info[rd.nextInt(10)];
//调用发射方法
collector.emit(new Values(msg));
//模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
* 该declarer变量有很大作用,我们还可以调用 declarer.declareStream(); 来定义stramId,该id可以用来定义
* 更加复杂的流拓扑结构
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
}
}
bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。
package com.jd.comaple.storm.test.bolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( "info"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String mesg = input.getString(0);
if (mesg != null)
collector.emit(new Values( mesg+"mesg is processed!"));
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
相关文章
- Selenium2学习-027-WebUI自动化实战实例-025-JavaScript 在 Selenium 自动化中的应用实例之三(页面滚屏,模拟鼠标拖动滚动条)
- Android实战简易教程-第三十五枪(将二维码扫描和生成Demo引入项目实例)
- Python字典实现简单的三级菜单(实例讲解)
- java判断三位数的实例讲解
- TensorFlow 实战之实现卷积神经网络的实例讲解
- 区块链入门教程之从比特币到以太访再到智能合约从架构概念到应用实战(DAPP)(三、以太访介绍、原理、概念讲解)
- 《FPGA全程进阶---实战演练》第三十二章 Signal Tap II 应用实例
- storm实战入门:开发简易Topology实例
- python密码错误三次锁定(实例讲解)
- storm 实战及实例讲解(三)