storm 实战及实例讲解(二)

时间:2022-12-29 20:32:30

storm 实战及实例讲解(二)

                                                                                                                                                                                             ——comaple.zhang      

            前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧。

           首先,我们要明白如何创建一个topology,topology是storm集群上面运行的基本单元,而一个topology又可以有若干个sport和bolt以某种策略组合而成,关于storm的流分组等概念我们可以,参考这里的一些资料。提醒一下这些概念很重要。我们开发storm应用的第一步就是定义一个topolog,下面我讲直接上代码,如果这些概念搞不懂的话很难弄清楚。我回尽量把注释写清楚。下面这个例子定义了一个简单的topology,它包括一个数据喷发节点spout,和一个数据处理节点bolt。


package com.jd.comaple.storm.test.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import com.jd.comaple.storm.test.bolt.SimpleBolt;
import com.jd.comaple.storm.test.spout.SimpleSpout;

/**
 * Created by IntelliJ IDEA.
 * User: comaple.zhang
 * Date: 12-8-28
 * Time: 下午2:11
 * To change this template use File | Settings | File Templates.
 */
public class SimpleTopology {

    public static void main(String[] args) {
        try {
            //实例化topologyBuilder类。
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
            topologyBuilder.setSpout("simple-spout", new SimpleSpout(), 1);
            // 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。
            topologyBuilder.setBolt("simple-bolt", new SimpleBolt(), 3).shuffleGrouping("simple-spout");
            Config config = new Config();
            config.setDebug(false);
            if (args != null && args.length > 0) {
                /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
                 如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
                 一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
                 但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
                */
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            } else {
                //这里是本地模式下运行的启动代码。
                config.setMaxTaskParallelism(1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("simple", config,
                        topologyBuilder.createTopology());
            }
        } catch (Exception e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }
}