Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

时间:2023-01-22 22:13:04

前期博客

Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现

继续编写

  StormTopologyMoreTask.java

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

package zhouls.bigdata.stormDemo;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; public class StormTopologyMoreTask { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = ;
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep();
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
} public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt()).setNumTasks().shuffleGrouping(spout_id); Config config = new Config();
String topology_name = StormTopologyMoreTask.class.getSimpleName();
if(args.length==){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }

打jar包

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0./jar
[hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreExecutor.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreWorker.jar
[hadoop@master jar]$ rz [hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreExecutor.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreTask.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreWorker.jar
[hadoop@master jar]$

提交作业之前

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

  

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

  因为,我之前运行的StormTopologyMoreExecutor没有停掉

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

  

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现的更多相关文章

  1. Storm编程入门API系列之Storm的Topology多个Workers数目控制实现

    前期博客 Storm编程入门API系列之Storm的Topology默认Workers.默认executors和默认tasks数目 继续编写 StormTopologyMoreWorker.java ...

  2. Storm编程入门API系列之Storm的Topology多个Executors数目控制实现

    前期博客 Storm编程入门API系列之Storm的Topology默认Workers.默认executors和默认tasks数目 Storm编程入门API系列之Storm的Topology多个Wor ...

  3. Storm编程入门API系列之Storm的定时任务实现

    概念,见博客 Storm概念学习系列之storm的定时任务 Storm的定时任务,分为两种实现方式,都是可以达到目的的. 我这里,分为StormTopologyTimer1.java   和  Sto ...

  4. Storm编程入门API系列之Storm的Topology的stream grouping

    概念,见博客 Storm概念学习系列之stream grouping(流分组) Storm的stream grouping的Shuffle Grouping 它是随机分组,随机派发stream里面的t ...

  5. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  6. Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目

    关于,storm的启动我这里不多说了. 见博客 storm的3节点集群详细启动步骤(非HA和HA)(图文详解) 建立stormDemo项目 Group Id :  zhouls.bigdata Art ...

  7. Storm概念学习系列之storm的定时任务

    不多说,直接上干货! 至于为什么,有storm的定时任务.这个很简单.但是,这个在工作中非常重要! 假设有如下的业务场景 这个spoult源源不断地发送数据,boilt呢会进行处理.然后呢,处理后的结 ...

  8. Storm概念学习系列之storm的可靠性

    这个概念,对于理解storm很有必要. 1.worker进程死掉 worker是真实存在的.可以jps查看. 正是因为有了storm的可靠性,所以storm会重新启动一个新的worker进程. 2.s ...

  9. 第1节 storm编程:2、storm的基本介绍

    课程大纲: 1.storm的基本介绍 2.storm的架构模型 3.storm的安装 4.storm的UI管理界面 5.storm的编程模型 6.storm的入门程序 7.storm的并行度 8.st ...

随机推荐

  1. Play Framework 第一个应用

    熟悉的Hello World 新创建一个工程,了解下重要文件的结构 .\app controllers\models\views 目前比较流行的MVC架构 .\conf application.con ...

  2. Java开发之abstract 和 interface的区别

    Java开发abstract 和 interface的区别 java开发里面经常会用到虚函数和接口,这两者的区别是什么呢? abstract: 子类里面只能继承一个父类 interface: 子类可以 ...

  3. remove() 方法的兼容问题

    一直以为jq的remove()方法是兼容的,今天才发现,原来ie的写法不一样,特作此记录. removeNode方法的功能是删除一个节点,语法为node.removeNode(false)或者node ...

  4. QTREE2 spoj 913. Query on a tree II 经典的倍增思想

    QTREE2 经典的倍增思想 题目: 给出一棵树,求: 1.两点之间距离. 2.从节点x到节点y最短路径上第k个节点的编号. 分析: 第一问的话,随便以一个节点为根,求得其他节点到根的距离,然后对于每 ...

  5. 大Q品牌故事_大Q官网_腾讯旗下买卖宝公司倾力打造

    大Q品牌故事_大Q官网_腾讯旗下买卖宝公司倾力打造 走在大路上的改变者,有态度的互联网手机品牌

  6. bzoj4828 [Hnoi2017]大佬

    Description 人们总是难免会碰到大佬.他们趾高气昂地谈论凡人不能理解的算法和数据结构,走到任何一个地方,大佬的气场就能让周围的人吓得瑟瑟发抖,不敢言语.你作为一个OIER,面对这样的事情非常 ...

  7. SQL万能语句-经典操作

    一.基础 1.说明:创建数据库CREATE DATABASE database-name 2.说明:删除数据库drop database dbname3.说明:备份sql server--- 创建 备 ...

  8. C# winFrom窗体设计问题-部分文件打不开窗体设计器 变成类.cs

    https://zhidao.baidu.com/question/1513483178103163220.html C# winform程序设计的时候,出现了问题.默认主窗体form1(改名form ...

  9. Linux安装Tomcat-Nginx-FastDFS-Redis-Solr-集群——【第六集之基本命令使用】

    学习命令的方法:linux中所有操作都是命令操作,可想而知命令有多少,更严重的是每个命令有很多参数,记命令容易,记参数就难了,所以建议: 自己准备一个博客,把通常用到的命令及其功能记载下来,用到的时候 ...

  10. mysql count与sum的区别

    一.count()的结果为>=0           sum()结果可能是null 二.count()计算的行数               sum()计算的是某列的求和