storm 实战及实例讲解(三)

时间:2021-08-29 15:47:24
storm 实战及实例讲解(三)
                                     ——comaple.zhang
                                                                                              ——2012-09-13
            本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。

spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:

package com.jd.comaple.storm.test.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

/**
 * Created by IntelliJ IDEA.
 * User: comaple.zhang
 * Date: 12-8-28
 * Time: 下午2:11
 * To change this template use File | Settings | File Templates.
 */
public class SimpleSpout extends BaseRichSpout {

    /**
     * 用来发射数据的工具类
     */
    private SpoutOutputCollector collector;
    private static String[] info = new String[]{
            "comaple\t,12424,44w46,654,12424,44w46,654,",
            "lisi\t,435435,6537,12424,44w46,654,",
            "lipeng\t,45735,6757,12424,44w46,654,",
            "*\t,45735,6757,12424,44w46,654,",
            "jiangmin\t,23545,6457,2455,7576,qr44453",
            "beijing\t,435435,6537,12424,44w46,654,",
            "xiaoming\t,46654,8579,w3675,85877,077998,",
            "xiaozhang\t,9789,788,97978,656,345235,09889,",
            "ceo\t,46654,8579,w3675,85877,077998,",
            "cto\t,46654,8579,w3675,85877,077998,",
            "zhansan\t,46654,8579,w3675,85877,077998,"};

    Random rd = new Random();

    /**
     * 这里初始化collector
     * @param conf
     * @param context
     * @param collector
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;

    }

    /**
     * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
     * 该方法会被不停的调用
     */
    @Override
    public void nextTuple() {
        try {
            String msg = info[rd.nextInt(10)];
            //调用发射方法
            collector.emit(new Values(msg));
            //模拟等待100ms
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
     * 该declarer变量有很大作用,我们还可以调用  declarer.declareStream();  来定义stramId,该id可以用来定义
     * 更加复杂的流拓扑结构
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("source"));

    }
}


bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。
package com.jd.comaple.storm.test.bolt;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * Created by IntelliJ IDEA.
 * User: comaple.zhang
 * Date: 12-8-28
 * Time: 下午2:11
 * To change this template use File | Settings | File Templates.
 */
public class SimpleBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields( "info"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String mesg = input.getString(0);
            if (mesg != null)
                collector.emit(new Values( mesg+"mesg is processed!"));
        } catch (Exception e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }

    }
}