storm的流分组

时间:2023-03-09 23:31:32
storm的流分组

storm的流分组

用的是ShuffleGrouping分组方式,并行度设置为3

storm的流分组

storm的流分组

这是跑下来的结果

storm的流分组

参考代码StormTopologyShufferGrouping.java


package yehua.storm;


import java.util.Map;


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


/**
* shufferGrouping
* 没有特殊情况下,就使用这个分组方式,可以保证负载均衡,工作中最常用的
* @author yehua
*
*/


public class StormTopologyShufferGrouping {

public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
// @Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}


int num = 0;
//@Override
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep(1000);
}


//@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}

}

public static class MyBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
// @Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}

//@Override
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
}


//@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName();

topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt(),3).shuffleGrouping(spout_id);

Config config = new Config();
String topology_name = StormTopologyShufferGrouping.class.getSimpleName();
if(args.length==0){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}

}


}

 

用fieldsGrouping方法

storm的流分组

按奇偶数分组(也就是按字段分组)

storm的流分组

从跑出来的结果看出来,一个线程处理奇数的一个线程处理偶数的

storm的流分组

参考代码StormTopologyFieldsGrouping.java

 package yehua.storm;

 import java.util.Map;

 import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; /**
* FieldsGrouping
* 字段分组
* @author yehua
*
*/ public class StormTopologyFieldsGrouping { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = 0;
//@Override
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num,num%2));
Utils.sleep(1000);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num","flag"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
//@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} //@Override
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
//注意:字段分组一定可以保证相同分组的数据进入同一个线程处理
topologyBuilder.setBolt(bolt_id, new MyBolt(),2).fieldsGrouping(spout_id, new Fields("flag")); Config config = new Config();
String topology_name = StormTopologyFieldsGrouping.class.getSimpleName();
if(args.length==0){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }

这里补充一下,比如说有两类数据3个线程的时候

storm的流分组

我们再看看运行结果,发现只有两个线程干活了

storm的流分组

还有一种情况,只有一个线程的情况,还是两类数据

storm的流分组

从运行结果看出来,所有话一个进程干完了

storm的流分组

allGrouping方法

storm的流分组

运行结果:spout每发一条数据三个进程都接收到了(基本没什么应用场景)

storm的流分组

参考代码StormTopologyAllGrouping.java

 package yehua.storm;

 import java.util.Map;

 import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; /**
* AllGrouping
* 广播分组
* @author yehua
*
*/ public class StormTopologyAllGrouping { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = 0;
//@Override
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep(1000);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
//@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} //@Override
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt(),3).allGrouping(spout_id); Config config = new Config();
String topology_name = StormTopologyAllGrouping.class.getSimpleName();
if(args.length==0){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }

LocalOrShufferGrouping方法

storm的流分组

spout只会给同一个主机的线程发送数据(图中的线程1),也就是在同一个线程里会被发送数据,这样做的好处就是在同一个进程里发送数据效率搞,不用跨主机传输

但是当数据量太大的时候,线程1处理不了的时候就麻烦了,所以在实际工作中不建议这样做。

这里用的是3个线程(3个bolt),2个进程(2个worker)

storm的流分组

从运行的结果我们可以看出来,只有一个线程在接收数据

storm的流分组

还有一种情况,如果本地没有线程的时候,他就跟ShufferGrouping的效果一样的

storm的流分组

参考代码StormTopologyLocalOrShufferGrouping.java

 package yehua.storm;

 import java.util.Map;

 import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; /**
* LocalAllshufferGrouping
* @author yehua
*
*/ public class StormTopologyLocalOrShufferGrouping { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = 0;
//@Override
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep(1000);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
//@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} //@Override
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
} //@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt(),3).localOrShuffleGrouping(spout_id); Config config = new Config();
config.setNumWorkers(2);
String topology_name = StormTopologyLocalOrShufferGrouping.class.getSimpleName();
if(args.length==0){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }