监控指定文件夹,读取文件(新文件动态读取)里的内容,统计单词的数量。
FileSpout.java,监控文件夹,读取新文件内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package com.test.stormtest.wordcount;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class FileSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private File target = new File( "F:" + File.separator + "test" );
private Collection<File> cacheFiles = null ;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this .collector = collector;
//启动的时候,将文件夹内的所有文件的内容发射出去
cacheFiles = FileUtils.listFiles(target, null , true );
for (File file : cacheFiles) {
emitFileConent(file);
}
}
public void nextTuple() {
try {
Thread.sleep( 5000 );
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//监控新文件,将新文件的内容发射出去
Collection<File> files = FileUtils.listFiles(target, null , true );
for (File file : files) {
if (!cacheFiles.contains(file)) {
emitFileConent(file);
}
}
cacheFiles = files;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "line" ));
}
//将文件内容按行发射出去
private void emitFileConent(File file) {
try {
List<String> lines = FileUtils.readLines(file);
for (String line : lines) {
this .collector.emit( new Values(line));
}
} catch (IOException e) {
e.printStackTrace();
}
}
} |
SplitBolt.java,将行拆分成单词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.test.stormtest.wordcount;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector = null ;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this .collector = collector;
}
public void execute(Tuple input) {
String line = input.getStringByField( "line" );
String[] words = line.split( " " );
for (String word : words) {
this .collector.emit( new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" ));
}
} |
SumBolt.java 统计单词数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.test.stormtest.wordcount;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class SumBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private Map<String, Long> countMap = null ;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
countMap = new HashMap<String, Long>();
}
public void execute(Tuple input) {
String word = input.getStringByField( "word" );
Long count = countMap.get(word);
if (count == null ) {
count = 0L;
}
countMap.put(word, ++count);
System.out.println( "-----------------------------------------------" );
Set<Entry<String, Long>> entries = countMap.entrySet();
for (Entry<String, Long> entry : entries) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
} |
WordCountTopology.java 驱动类,本地模式提交topology
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package com.test.stormtest.wordcount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class WordCountTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "filespout" , new FileSpout());
builder.setBolt( "splitbolt" , new SplitBolt()).shuffleGrouping( "filespout" );
builder.setBolt( "sumtblot" , new SumBolt()).fieldsGrouping( "splitbolt" , new Fields( "word" ));
LocalCluster cluster = new LocalCluster();
Config config = new Config();
config.setDebug( true );
cluster.submitTopology( "wordcount" , config, builder.createTopology());
Utils.sleep( 20000 );
cluster.killTopology( "wordcount" );
cluster.shutdown();
}
} |