问题: 创建一个Twitter分析工具, 要求从一个Redis数据库读取tweets,通过几个bolt处理它们,最后把结果保存在另一个Redis数据库的列表中。处理结果就是所有话题和它们的在tweets中出现的次数列表,所有用户和他们在tweets中出现的次数列表,还有一个包含发起话题和频率的用户列表。
1. 题目解释:
测试用例Tweet 如下:
@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system.
@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use.
@John @bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.
Storm 实时结果:
用户频率: @John : 2 @alex:1 @Sunny:1 @chris:1 @bob:1
Topic Tag 频率:#Storm#:3 #realtime# :2 等等
用户加Topic Tag 频率: @John#Storm# : 2 @John#realtime# : 2 @alex #realtime#:1 等等 大家就明白了题目意思。
2. Topology 设计思想
a. TweetsTransactionalSpout : 通过Redis 按照分页思想(TransactionMetadata from & quantity)按照多个批次读取 Tweet 数据 和 tweet id , 通过Emitter 发射批次数据。
b. UserSplitterBolt: 通过shuffle grouping方式, 接收TweetsTransactionalSpout 的批次中每条Tweet数据,进行分词后,查找user (@开头), 再先下一个blot发送@user 和 tweet id
c. TopicTagSplitterBolt: 通过shuffle grouping方式, 接收TweetsTransactionalSpout 的批次中每条Tweet数据,进行分词后,查找Topic tag (#Topic#), 再先下一个blot发送 #Tag# 和 tweet id
d. UserTopicTagJoinBolt: 通过field grouping (tweet-id)方式 , 联合接收UserSplitterBolt 与TopicTagSplitterBolt 数据,计算 @user #TopicTag# 数据频率。
e. TweetRedisCommiterBolt: 通过global grouping (汇总频率数据), 联合接收UserSplitterBolt 、TopicTagSplitterBolt 和 UserTopicTagJoinBolt 数据,根据 Component Source name ,分别统计 用户频率, 热词频率 及 用户+热词 的频率。
TweetsTransactionalTopology
3. 代码
TweetsTransactionalTopology
package com.john.learn.storm.transaction.tweets.analytics.redis; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.transactional.TransactionalTopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TopicTagSplitterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TweetRedisCommiterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserSplitterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserTopicTagJoinBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsTransactionalSpout; public class TweetsTransactionalTopology { public static void main(String[] args) { TransactionalTopologyBuilder topologyBuilder = new TransactionalTopologyBuilder("TweetsTransactionalTopology", "TweetsTransactionalSpout", new TweetsTransactionalSpout()); topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4) .shuffleGrouping("TweetsTransactionalSpout"); topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4) .shuffleGrouping("TweetsTransactionalSpout"); topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 8) .fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId")) .fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId")); topologyBuilder.setBolt("Tweet-RedisCommiter", new TweetRedisCommiterBolt()) .globalGrouping("User-TopicTag-Join").globalGrouping("Users-Splitter", "Users") .globalGrouping("TopicTags-Splitter", "TopicTags"); // Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TiwtterReachTopology", conf, topologyBuilder.buildTopology()); // Utils.sleep(5000); // // cluster.killTopology("TiwtterReachTopology"); // // cluster.shutdown(); } }
TweetsRedis 实现 封装 Redis 操作
package com.john.learn.storm.transaction.tweets.analytics.redis; import java.math.BigInteger; import java.util.Arrays; import java.util.List; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.utils.Utils; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TransactionMetadata; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsTransactionalSpoutCoordinator; import clojure.main; import redis.clients.jedis.Jedis; public class TweetsRedis { public static final String NEXT_READ_POSTION = "Tweets.Redis.NEXT_READ_POSTION"; public static final String NEXT_WRITE_POSTION = "Tweets.Redis.NEXT_WRITE_POSTION"; private Jedis jedis; public TweetsRedis(String hostName, int port, String password) { jedis = new Jedis(hostName, port); if (!StringUtils.isEmpty(password)) { jedis.auth(password); } } public TweetsRedis(String hostName, int port) { this(hostName, port, null); } public List<String> getMessages(long from, int quantity) { String[] keys = new String[quantity]; for (int i = 0; i < quantity; i++) { keys[i] = MESSAGE_ID_KEY + "." + (from + i); } System.out.println("keys:" + Arrays.toString(keys)); return jedis.mget(keys); } public String get(String key) { return this.jedis.get(key); } public Jedis getJedis() { return this.jedis; } public void close() { try { jedis.disconnect(); } catch (Exception e) { } } public void clear() { int quantity = 1000; String[] keys = new String[quantity]; for (int i = 0; i < quantity; i++) { keys[i] = MESSAGE_ID_KEY + "." + i; } jedis.del(keys); jedis.del(NEXT_WRITE_POSTION); jedis.del(NEXT_READ_POSTION); } public void addMessage(String message) { long index = jedis.incr(NEXT_WRITE_POSTION); jedis.set(MESSAGE_ID_KEY + "." + index, message); } public long getNextWrite() { String position = jedis.get(NEXT_WRITE_POSTION); if (position == null) { return 1; } return Long.valueOf(position) + 1; } public long getNextRead() { String position = jedis.get(NEXT_READ_POSTION); if (position == null) { return 1; } return Long.valueOf(position); } public void setNextRead(long position) { jedis.set(NEXT_READ_POSTION, String.valueOf(position)); } public long getAvailableToRead(long current) { long items = getNextWrite() - current; return items > 0 ? items : 0; } /** * 模拟继续发送 * * @param args */ // public static void main2(String[] args) throws InterruptedException { // // TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); // // int maxCount = 10; // // int count = 0; // // while (count < maxCount) { // // for (int i = 0; i < 1; i++) { // // long tx = System.currentTimeMillis(); // // tweetsRedis.addMessage( // "@John @alex 3. Apache #Storm# is a free and open source distributed // #realtime# #computation# system."); // // } // // Thread.sleep(000); // // count++; // } // // for (String key : tweetsRedis.jedis.keys("Tweets.Redis.*.Frequency")) { // // System.out.println("-------------" + key + "-------------"); // System.out.println(tweetsRedis.jedis.hgetAll(key)); // } // // } public static void main(String[] args) { TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); // 你可以清空数据,从来 tweetsRedis.clear(); tweetsRedis.addMessage("Hi @Tom @Simith 1. I want to #USA# and #Hometown# city."); tweetsRedis.addMessage("Hi @John @david @vivian 2. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage( "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); tweetsRedis.addMessage("Hi @david @vivian 4. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage( "@Lily @Toe 5. #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more."); tweetsRedis .addMessage("@Tim #Storm# integrates with the queueing and #database# technologies you already use."); tweetsRedis.addMessage( "@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@nathan #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@Lily #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more."); tweetsRedis.addMessage( "@carissa Apache #Storm# is a free and open source distributed #realtime# #computation# system."); System.out.println("------------- Test TweetsTransactionalSpoutCoordinator Start-------------"); TweetsTransactionalSpoutCoordinator transactionalSpoutCoordinator = new TweetsTransactionalSpoutCoordinator(); TransactionMetadata transactionMetadata = null; while (transactionalSpoutCoordinator.isReady()) { transactionMetadata = transactionalSpoutCoordinator.initializeTransaction(BigInteger.valueOf(1), transactionMetadata); System.out.println("SpoutCoordinator Initialize Transaction Meta: " + transactionMetadata); } System.out.println("------------- Test TweetsTransactionalSpoutCoordinator End-------------"); } private static final String MESSAGE_ID_KEY = "Tweets.Message"; }
提示: 运行main 方法可以初始化Redis数据。 main2 方法 模拟持续发送 Tweet消息。
接下来一个事务性拓扑中实现Spout
TransactionMetadata
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.io.Serializable; public class TransactionMetadata implements Serializable { private static final long serialVersionUID = 1L; public long from; public int quantity; public TransactionMetadata(long from, int quantity) { this.from = from; this.quantity = quantity; } public TransactionMetadata() { } @Override public String toString() { return "from:"+from + " To:"+ (from+quantity-1) +" quantity:"+quantity ; } }
TweetsTransactionalSpoutCoordinator
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.math.BigInteger; import org.apache.storm.transactional.ITransactionalSpout.Coordinator; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis; public class TweetsTransactionalSpoutCoordinator implements Coordinator<TransactionMetadata> { public TweetsTransactionalSpoutCoordinator() { this.tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); nextRead = tweetsRedis.getNextRead(); System.out.println("nextRead:" + nextRead); } @Override public void close() { tweetsRedis.close(); } @Override public TransactionMetadata initializeTransaction(BigInteger txId, TransactionMetadata transactionMetadata) { //判断最后一个批次的大小 int quantity = Math.min(TRANSACTION_MAX_SIZE, (int) tweetsRedis.getAvailableToRead(nextRead)); transactionMetadata = new TransactionMetadata(nextRead, Math.min(TRANSACTION_MAX_SIZE, quantity)); nextRead += quantity; return transactionMetadata; } @Override public boolean isReady() { // 查看是否还有数据,非常重要,不然 不断的发送数据, 如何 总是返回 true 的。 return tweetsRedis.getAvailableToRead(nextRead) > 0; } private transient TweetsRedis tweetsRedis; private long nextRead = 0; private static final int TRANSACTION_MAX_SIZE = 1000; }
//判断最后一个批次的大小 int quantity = Math.min(TRANSACTION_MAX_SIZE, (int) tweetsRedis.getAvailableToRead(nextRead));
值得一提的是,在整个拓扑中只会有一个提交者实例。创建提交者实例时,它会从redis读取一个从1开始的序列号,这个序列号标识要读取的tweet下一条。
第一个方法是isReady。在initializeTransaction之前调用它确认数据源已就绪并可读取。此方法应当相应的返回true或false。在此例中,读取tweets数量并与已读数量比较。它们之间的不同就在于可读tweets数。如果它大于0,就意味着还有tweets未读。
最后,执行initializeTransaction。正如你看到的,它接收txid和prevMetadata作为参数。第一个参数是Storm生成的事务ID,作为批次的惟一性标识。prevMetadata是协调器生成的前一个事务元数据对象。
在这个例子中,首先确认有多少tweets可读。只要确认了这一点,就创建一个TransactionMetadata对象,标识读取的第一个tweet(对象属性from),以及读取的tweets数量(对象属性quantity)。
元数据对象一经返回,Storm把它跟txid一起保存在zookeeper。这样就确保了一旦发生故障,Storm可以利用分发器(Emitter,见下文)重新发送批次。
TweetsTransactionalSpoutEmitter
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.math.BigInteger; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.transactional.ITransactionalSpout; import org.apache.storm.transactional.ITransactionalSpout.Emitter; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Values; public class TweetsTransactionalSpoutEmitter implements Emitter<TransactionMetadata> { public TweetsTransactionalSpoutEmitter() { this.tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); } @Override public void cleanupBefore(BigInteger txId) { } @Override public void close() { tweetsRedis.close(); } @Override public void emitBatch(TransactionAttempt transactionAttempt, TransactionMetadata transactionMetadata, BatchOutputCollector collector) { /** * 分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次。 * 这样,如果在处理批次的过程中发生了故障,Storm就能够利用分发器重复相同的事务id和事务元数据,并确保批次已经重复过了。 * Storm会在TransactionAttempt对象里为尝试次数增加计数(译者注:attempt id)。这样就能知道批次已经重复过了 */ long tweetId = transactionMetadata.from; for (String tweet : tweetsRedis.getMessages(transactionMetadata.from, transactionMetadata.quantity)) { if(StringUtils.isEmpty(tweet)) { continue; } collector.emit(new Values(transactionAttempt, String.valueOf(tweetId), tweet)); tweetId++; } // Save next data tweetsRedis.setNextRead(transactionMetadata.from + transactionMetadata.quantity); } private transient TweetsRedis tweetsRedis; }
分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次。这样,如果在处理批次的过程中发生了故障,Storm就能够利用分发器重复相同的事务id和事务元数据,并确保批次已经重复过了。Storm会在TransactionAttempt对象里为尝试次数增加计数(attempt id)。这样就能知道批次已经重复过了。
在这里emitBatch是个重要方法。在这个方法中,使用传入的元数据对象从redis得到tweets,同时增加redis维持的已读tweets数。当然它还会把读到的tweets分发到拓扑。
TweetsTransactionalSpout
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.topology.base.BaseTransactionalSpout; import org.apache.storm.transactional.ITransactionalSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis; public class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata> { public TweetsTransactionalSpout() { } @Override public Coordinator<TransactionMetadata> getCoordinator(Map config, TopologyContext context) { return new TweetsTransactionalSpoutCoordinator(); } @Override public Emitter<TransactionMetadata> getEmitter(Map config, TopologyContext context) { return new TweetsTransactionalSpoutEmitter(); } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { /** * Refer to Emitter collector.emit(new Values(transactionAttempt, * String.valueOf(tweetId), tweet)); */ fieldsDeclarer.declare(new Fields("txid", "tweetId", "tweet")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } private SpoutOutputCollector collector; }
备注: getCoordinator方法,告诉Storm用来协调生成批次的类。getEmitter,负责读取批次并把它们分发到拓扑中的数据流组。最后,就像之前做过的,需要声明要分发的域。
bolts
UserSplitterBolt:UserSplitterBolt接收元组,解析tweet文本,分发@开头的单词————tweeter用户
package com.john.learn.storm.transaction.tweets.analytics.redis.bolt; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IBasicBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class UserSplitterBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { fieldsDeclarer.declareStream("Users", new Fields("txid", "tweetId", "user")); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0); String tweetId = tuple.getString(1); String tweet = tuple.getString(2); Set<String> users = new HashSet<>(); for (String user : StringUtils.splitByWholeSeparator(tweet, " ")) { // 确保这是个真实的用户,并且在这个tweet中没有重复 if (user.startsWith("@") && !users.contains(user)) { users.add(user); collector.emit("Users", new Values(transactionAttempt, tweetId, user)); System.out.println("Tx:" + tuple.getValue(0) + " [" + tuple.getString(1) + "]:" + user); } } } @Override public void prepare(Map config, TopologyContext context) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void cleanup() { } /** * */ private static final long serialVersionUID = 1L; }
TopicTagSplitterBolt:接收元组,解析tweet文本,分发#开头#结尾的单词————Topic Tag
package com.john.learn.storm.transaction.tweets.analytics.redis.bolt; import java.util.HashSet; import java.util.Set; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IBasicBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class TopicTagSplitterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0); String tweetId = tuple.getString(1); String tweet = tuple.getString(2); Set<String> topicTags = new HashSet<>(); for (String topicTag : StringUtils.splitByWholeSeparator(tweet, " ")) { // 确保这是个真实的用户,并且在这个tweet中没有重复 if (topicTag.startsWith("#") && topicTag.endsWith("#") && !topicTags.contains(topicTag)) { topicTags.add(topicTag); collector.emit("TopicTags", new Values(transactionAttempt, tweetId, topicTag)); System.out.println("Tx:" + tuple.getValue(0) + " [" + tuple.getString(1) + "]:" + topicTag); } } } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { fieldsDeclarer.declareStream("TopicTags", new Fields("txid", "tweetId", "topicTag")); } private static final long serialVersionUID = 1L; }
UserTopicTagJoinBolt
package com.john.learn.storm.transaction.tweets.analytics.redis.bolt; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBatchBolt; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; //tweet_id public class UserTopicTagJoinBolt extends BaseBatchBolt<TransactionAttempt> { @Override public void prepare(Map config, TopologyContext context, BatchOutputCollector collector, TransactionAttempt transactionAttempt) { System.out.println( "prepare() Task Id:" + context.getThisTaskId() + " transactionAttempt:" + transactionAttempt); this.collector = collector; /** * 每个批次 初始化一下 */ userTweets = new HashMap<>(); topicTagTweets = new HashMap<>(); this.transactionAttempt = transactionAttempt; } @Override public void execute(Tuple tuple) { System.out.println("userTweets size " + userTweets.size() + " Task Id:" + tuple.getSourceTask()); TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0); String tweetId = tuple.getString(1); String source = tuple.getSourceStreamId(); if (source.equalsIgnoreCase("Users")) { String user = tuple.getString(2); // 每个用户所有tweet Id (消息Id) add(userTweets, user, tweetId); return; } if (source.equalsIgnoreCase("TopicTags")) { String topicTag = tuple.getString(2); // 每个TweetId 包含所有Topic Tag add(topicTagTweets, tweetId, topicTag); return; } System.err.println("sorry, source not found " + source); } @Override public void finishBatch() { System.out.println("finishBatch call()............."); // 完成每对用户和话题出现的次数 for (String user : userTweets.keySet()) { // 获取用户所有Tweet Id集合 Set<String> tweetIds = getTweetIdsByUser(user); countOccurFrequencyPerUserAndTopicTag(user, tweetIds); } } /** * 方法计算每对用户和话题出现的次数 * * @param user * @param tweetIds */ private void countOccurFrequencyPerUserAndTopicTag(String user, Set<String> tweetIds) { Map<String, Integer> topicTagCounter = new HashMap<String, Integer>(); for (String tweetId : tweetIds) { Set<String> topicTags = getTweetTopicTagsById(tweetId); if (topicTags == null) { continue; } for (String topicTag : topicTags) { Integer topicTagCount = topicTagCounter.get(topicTag); if (topicTagCount == null) { topicTagCount = 0; } topicTagCount++; topicTagCounter.put(topicTag, topicTagCount); } } for (String topicTag : topicTagCounter.keySet()) { // emit 每对用户和话题出现的次数 System.out.println("@" + user + "#" + topicTag + "# Counts:" + topicTagCounter.get(topicTag)); this.collector.emit(new Values(transactionAttempt, user, topicTag, topicTagCounter.get(topicTag))); } } private Set<String> getTweetTopicTagsById(String tweetId) { return this.topicTagTweets.get(tweetId); } private Set<String> getTweetIdsByUser(String user) { return this.userTweets.get(user); } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { fieldsDeclarer.declare(new Fields("txid", "user", "topicTag", "count")); } private void add(Map<String, Set<String>> bag, String key, String value) { Set<String> slot = bag.get(key); if (slot == null) { slot = new HashSet<>(); bag.put(key, slot); } slot.add(value); } private Map<String, Set<String>> userTweets; private Map<String, Set<String>> topicTagTweets; private BatchOutputCollector collector; private TransactionAttempt transactionAttempt; private static final long serialVersionUID = 1L; }
UserTopicTagJoinBolt的实现: 首先要注意的是它是一个BaseBatchBolt。这意味着,execute方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm会调用finishBatch方法。
public void execute(Tuple tuple) { System.out.println("userTweets size " + userTweets.size() + " Task Id:" + tuple.getSourceTask()); TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0); String tweetId = tuple.getString(1); String source = tuple.getSourceStreamId(); if (source.equalsIgnoreCase("Users")) { String user = tuple.getString(2); // 每个用户所有tweet Id (消息Id) add(userTweets, user, tweetId); return; } if (source.equalsIgnoreCase("TopicTags")) { String topicTag = tuple.getString(2); // 每个TweetId 包含所有Topic Tag add(topicTagTweets, tweetId, topicTag); return; } System.err.println("sorry, source not found " + source); }
既然要结合tweet中提到的用户为出现的所有话题计数,就需要加入前面的bolts创建的两个数据流组。这件事要以批次为单位进程,在批次处理完成时,调用finishBatch方法。
@Override public void finishBatch() { System.out.println("finishBatch call()............."); // 完成每对用户和话题出现的次数 for (String user : userTweets.keySet()) { // 获取用户所有Tweet Id集合 Set<String> tweetIds = getTweetIdsByUser(user); countOccurFrequencyPerUserAndTopicTag(user, tweetIds); } }
这个方法计算每对用户-话题出现的次数,并为之生成和分发元组。
提交者 TweetRedisCommiterBolt
在这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。已经学习了,批次通过协调器和分Tips:这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。
协调者bolts是一类特殊的批处理bolts,它们实现了ICommitter或者通过TransactionalTopologyBuilder调用setCommiterBolt设置了提交者bolt。它们与其它的批处理bolts最大的不同在于,提交者bolts的finishBatch方法在提交就绪时执行。这一点发生在之前所有事务都已成功提交之后。另外,finishBatch方法是顺序执行的。因此如果同时有事务ID1和事务ID2两个事务同时执行,只有在ID1没有任何差错的执行了finishBatch方法之后,ID2才会执行该方法。
package com.john.learn.storm.transaction.tweets.analytics.redis.bolt; import java.util.HashMap; import java.util.Map; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseTransactionalBolt; import org.apache.storm.transactional.ICommitter; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis; import redis.clients.jedis.Transaction; public class TweetRedisCommiterBolt extends BaseTransactionalBolt implements ICommitter { @Override public void prepare(Map config, TopologyContext context, BatchOutputCollector collector, TransactionAttempt transactionAttempt) { this.transactionAttempt = transactionAttempt; sourceCounterMapCounters = new HashMap<>(); // 避免重复初始化redis 连接 if (tweetsRedis == null) { this.tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); } } /** * topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4) * .shuffleGrouping("TweetsTransactionalSpout"); * * topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4) * .shuffleGrouping("TweetsTransactionalSpout"); * * topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 1) * .fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId")) * .fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId")); * */ @Override public void execute(Tuple tuple) { String source = tuple.getSourceComponent(); if (source.equalsIgnoreCase("Users-Splitter")) { count("Tweets.Redis.Users.Frequency", tuple.getStringByField("user"), 1); return; } if (source.equalsIgnoreCase("TopicTags-Splitter")) { count("Tweets.Redis.TopicTags.Frequency", tuple.getStringByField("topicTag"), 1); return; } if (source.equalsIgnoreCase("User-TopicTag-Join")) { count("Tweets.Redis.UserTopicTags.Frequency", tuple.getStringByField("user") + ":" + tuple.getStringByField("topicTag"), tuple.getIntegerByField("count")); } } private void count(String sourceCounterMapKey, String counterTag, Integer count) { if (!sourceCounterMapCounters.containsKey(sourceCounterMapKey)) { sourceCounterMapCounters.put(sourceCounterMapKey, new HashMap<>()); } Map<String, Integer> counters = sourceCounterMapCounters.get(sourceCounterMapKey); Integer prevTotalCount = counters.get(counterTag); if (prevTotalCount == null) { prevTotalCount = 0; } counters.put(counterTag, prevTotalCount + count); } @Override public void finishBatch() { String lastCommitTransaction = tweetsRedis.get(LAST_COMMITED_TRANSACTION); if (String.valueOf(this.transactionAttempt.getTransactionId()).equals(lastCommitTransaction)) { return; } Transaction multi = tweetsRedis.getJedis().multi(); multi.set(LAST_COMMITED_TRANSACTION, String.valueOf(transactionAttempt.getTransactionId())); for (String sourceCounterKey : sourceCounterMapCounters.keySet()) { Map<String, Integer> sourceTotalMap = sourceCounterMapCounters.get(sourceCounterKey); for (String counterTag : sourceTotalMap.keySet()) { multi.hincrBy(sourceCounterKey, counterTag, sourceTotalMap.get(counterTag)); } } multi.exec(); } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { } private Map<String, Map<String, Integer>> sourceCounterMapCounters; private TransactionAttempt transactionAttempt; private static TweetsRedis tweetsRedis; public static final String LAST_COMMITED_TRANSACTION = "Tweets.Redis.LAST_COMMIT"; /** * */ private static final long serialVersionUID = 1L; }
这个实现很简单,但是在finishBatch有一个细节: 开启事务,防止重复操作及出错后,回滚该批次的所有数据从新发送。
Transaction multi = tweetsRedis.getJedis().multi(); multi.set(LAST_COMMITED_TRANSACTION, String.valueOf(transactionAttempt.getTransactionId())); for (String sourceCounterKey : sourceCounterMapCounters.keySet()) { Map<String, Integer> sourceTotalMap = sourceCounterMapCounters.get(sourceCounterKey); for (String counterTag : sourceTotalMap.keySet()) { multi.hincrBy(sourceCounterKey, counterTag, sourceTotalMap.get(counterTag)); } } multi.exec();
Tips:这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。
运行结果:
TweetsRedis
public static void main(String[] args) { TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); // 你可以清空数据,从来 tweetsRedis.clear(); tweetsRedis.addMessage("Hi @Tom @Simith 1. I want to #USA# and #Hometown# city."); tweetsRedis.addMessage("Hi @John @david @vivian 2. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage( "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); tweetsRedis.addMessage("Hi @david @vivian 4. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage( "@Lily @Toe 5. #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more."); tweetsRedis .addMessage("@Tim #Storm# integrates with the queueing and #database# technologies you already use."); tweetsRedis.addMessage( "@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@nathan #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@Lily #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use."); tweetsRedis.addMessage( "@bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more."); tweetsRedis.addMessage( "@carissa Apache #Storm# is a free and open source distributed #realtime# #computation# system."); System.out.println("------------- Test TweetsTransactionalSpoutCoordinator Start-------------"); TweetsTransactionalSpoutCoordinator transactionalSpoutCoordinator = new TweetsTransactionalSpoutCoordinator(); TransactionMetadata transactionMetadata = null; while (transactionalSpoutCoordinator.isReady()) { transactionMetadata = transactionalSpoutCoordinator.initializeTransaction(BigInteger.valueOf(1), transactionMetadata); System.out.println("SpoutCoordinator Initialize Transaction Meta: " + transactionMetadata); } System.out.println("------------- Test TweetsTransactionalSpoutCoordinator End-------------"); }
-------------Tweets.Redis.UserTopicTags.Frequency-------------
{@John:#China#=2, @John:#realtime#=2, @alex:#computation#=2, @david:#BeiJing#=4, @Simith:#Hometown#=2, @bob:#realtime#=2, @carissa:#realtime#=2, @Toe:#realtime#=2, @Toe:#Storm#=2, @chris:#Storm#=2, @bob:#Storm#=2, @alex:#Storm#=2, @nathan:#Storm#=2, @John:#BeiJing#=2, @Tim:#database#=2, @Lily:#realtime#=2, @Simith:#USA#=2, @Tom:#USA#=2, @vivian:#BeiJing#=4, @vivian:#China#=4, @Tim:#Storm#=2, @John:#computation#=2, @John:#Storm#=2, @alex:#realtime#=2, @carissa:#computation#=2, @Lily:#Storm#=4, @david:#China#=4, @Sunny:#Storm#=2, @carissa:#Storm#=2, @Tom:#Hometown#=2}
-------------Tweets.Redis.Users.Frequency-------------{@Lily=4, @david=4, @John=4, @Toe=2, @nathan=2, @Tim=2, @Sunny=2, @vivian=4, @bob=2, @Tom=2, @alex=2, @Simith=2, @chris=2, @carissa=2}
-------------Tweets.Redis.TopicTags.Frequency-------------{#realtime#=8, #Hometown#=2, #computation#=4, #database#=2, #China#=4, #USA#=2, #Storm#=16, #BeiJing#=4}直接运行模拟继续发送数据Tweet
public static void main(String[] args) throws InterruptedException { TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth"); int maxCount = 10; int count = 0; while (count < maxCount) { for (int i = 0; i < 10; i++) { long tx = System.currentTimeMillis(); tweetsRedis.addMessage( "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); } Thread.sleep(1000); count++; } for (String key : tweetsRedis.jedis.keys("Tweets.Redis.*.Frequency")) { System.out.println("-------------" + key + "-------------"); System.out.println(tweetsRedis.jedis.hgetAll(key)); } }