Storm中关于Topology的设计

时间:2025-01-21 10:02:56

一:介绍Storm设计模型

1.Topology

  Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段    

  点: 计算组件   Spout   Bolt

  边: 数据流向    数据从上一个组件流向下一个组件  带方向

2.tuple

  Storm每条记录 封装成一个tuple

  其实就是一些keyvalue对按顺序排列

  方便组件获取数据

3.Spout

  数据采集器

  源源不断的日志记录  如何被topology接收进行处理?

  Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射

4.Bolt

  数据处理器

  

二:开发wordcount案例

1.书写整个大纲的点线图

  Storm中关于Topology的设计

2..程序结构

  Storm中关于Topology的设计

3.修改pom文件

  这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。

  Storm中关于Topology的设计

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.cj.it</groupId>
<artifactId>storm</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.98.6-cdh5.3.6</hbase.version>
<hdfs.version>2.5.0-cdh5.3.6</hdfs.version>
<storm.version>0.9.6</storm.version>
</properties> <repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories> <dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!-- IDEA执行要注释掉下面这行,打包时解开 -->
<!--<scope>provided</scope>--> </dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
</exclusions>
</dependency> <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hdfs.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cz.mallat.uasparser</groupId>
<artifactId>uasparser</artifactId>
<version>0.6.1</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/src.xml</descriptor>
</descriptors>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>

4.src.xml

 <?xml version="1.0" encoding="UTF-8"?>
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<unpack>false</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>
<directory>/lib</directory>
</fileSet>
</fileSets>
</assembly>

5.log

 log4j.rootLogger=info,console

 log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.SimpleLayout log4j.logger.com.ibeifeng=INFO

6.SentenceSpout.java

 package com.jun.it;

 import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Map;
import java.util.Random; public class SentenceSpout extends BaseRichSpout {
private static final Logger logger= LoggerFactory.getLogger(SentenceSpout.class);
private SpoutOutputCollector collector;
//制造数据
private static final String[] SENTENCES={
"hadoop oozie storm hive",
"hadoop spark sqoop hbase",
"error flume yarn mapreduce"
};
//初始化collector
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector=spoutOutputCollector;
}
//Key的设置
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
//Tuple的组装
@Override
public void nextTuple() {
String sentence=SENTENCES[new Random().nextInt(SENTENCES.length)];
if(sentence.contains("error")){
logger.error("记录有问题"+sentence);
}else{
this.collector.emit(new Values(sentence));
}
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
} public SentenceSpout() {
super();
} @Override
public void close() { } @Override
public void activate() {
super.activate();
} @Override
public void deactivate() {
super.deactivate();
} @Override
public void ack(Object msgId) {
super.ack(msgId);
} @Override
public void fail(Object msgId) {
super.fail(msgId);
} }

7.SplitBolt.java

 package com.jun.it;

 import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; import java.util.Map; public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector=outputCollector;
} @Override
public void execute(Tuple tuple) {
String sentence=tuple.getStringByField("sentence");
if(sentence!=null&&!"".equals(sentence)){
String[] words=sentence.split(" ");
for (String word:words){
this.collector.emit(new Values(word));
}
}
} @Override
public void cleanup() { } @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
} @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

8.CountBolt.java

 package com.jun.it;

 import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; import java.util.HashMap;
import java.util.Map; public class CountBolt implements IRichBolt {
private Map<String,Integer> counts;
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector=outputCollector;
counts=new HashMap<>();
} @Override
public void execute(Tuple tuple) {
String word=tuple.getStringByField("word");
int count=1;
if(counts.containsKey(word)){
count=counts.get(word)+1;
}
counts.put(word,count);
this.collector.emit(new Values(word,count));
} @Override
public void cleanup() { } @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
} @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

9.printBolt.java

 package com.jun.it;

 import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple; import java.util.Map; public class PrintBolt implements IRichBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } @Override
public void execute(Tuple tuple) {
String word=tuple.getStringByField("word");
int count=tuple.getIntegerByField("count");
System.out.println("word:"+word+", count:"+count);
} @Override
public void cleanup() { } @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

10.WordCountTopology.java

 package com.jun.it;

 import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; public class WordCountTopology {
private static final String SENTENCE_SPOUT="sentenceSpout";
private static final String SPLIT_BOLT="splitBolt";
private static final String COUNT_BOLT="countBolt";
private static final String PRINT_BOLT="printBolt";
public static void main(String[] args){
TopologyBuilder topologyBuilder=new TopologyBuilder();
topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
Config config=new Config();
if(args==null||args.length==0){
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
}else{
config.setNumWorkers(1);
try {
StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} }
}

三:本地运行

1.前提

  原本以为需要启动storm,后来发现,不需要启动Storm。

  只需要在main的时候Run即可

2.结果

  Storm中关于Topology的设计

四:集群运行

1.在IDEA下打包

  下面的是有依赖的包。

 Storm中关于Topology的设计

2.上传到datas下

  Storm中关于Topology的设计

3.运行

   bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount

  Storm中关于Topology的设计

4.UI效果

  Storm中关于Topology的设计