Storm累计求和进群运行代码

时间:2024-06-01 18:35:56

打成jar包放在主节点上去运行.

 import java.util.Map;

 import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils; /**
* 在集群运行
* 数字累加求和
* 先添加storm依赖
*
* @author Administrator
*
*/
public class StormTopologySum { /**
* spout需要继承baserichspout,实现未实现的方法
* @author Administrator
*
*/
public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector; /**
* 初始化方法,只会执行一次
* 在这里面可以写一个初始化的代码
* Map conf:其实里面保存的是topology的一些配置信息
* TopologyContext context:topology的上下文,类似于servletcontext
* SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
*/
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
} int num = 1;
/**
* 这个方法是spout中最重要的方法,
* 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
* 每调用一次,会向外发射一条数据
*/
@Override
public void nextTuple() {
System.out.println("spout发射:"+num);
//把数据封装到values中,称为一个tuple,发射出去
this.collector.emit(new Values(num++));
Utils.sleep(1000);
} /**
* 声明输出字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//给values中的数据起个名字,方便后面的bolt从这个values中取数据
//fields中定义的参数和values中传递的数值是一一对应的
declarer.declare(new Fields("num"));
} } /**
* 自定义bolt需要实现baserichbolt
* @author Administrator
*
*/
public static class MyBolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector; /**
* 和spout中的open方法意义一样
*/
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} int sum = 0;
/**
* 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
*/
@Override
public void execute(Tuple input) {
//input.getInteger(0);//也可以根据角标获取tuple中的数据
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("和:"+sum);
} /**
* 声明输出字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
//如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
} }
/**
* 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
* @param args
*/
public static void main(String[] args) {
//组装topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout1", new MySpout());
//.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1"); //创建本地storm集群
/*LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/ //在集群运行
String simpleName = StormTopologySum.class.getSimpleName();
try {
Config stormConf = new Config();
//stormConf.setNumWorkers(2);
StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}