Storm 从入门到精通 第二十五讲 Storm 批处理事务 - TweetsTransactionalSpout (Base) 【实战运行】《Getting Started With Storm》

时间:2021-11-28 20:47:14

问题: 创建一个Twitter分析工具, 要求从一个Redis数据库读取tweets,通过几个bolt处理它们,最后把结果保存在另一个Redis数据库的列表中。处理结果就是所有话题和它们的在tweets中出现的次数列表,所有用户和他们在tweets中出现的次数列表,还有一个包含发起话题和频率的用户列表。

1. 题目解释:

测试用例Tweet 如下: 

@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system.

@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use.

@John @bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.

Storm 实时结果:

用户频率: @John : 2   @alex:1 @Sunny:1  @chris:1  @bob:1

Topic Tag 频率:#Storm#:3  #realtime# :2  等等

用户加Topic Tag 频率: @John#Storm# : 2  @John#realtime# : 2  @alex #realtime#:1  等等  大家就明白了题目意思。

2. Topology 设计思想

a. TweetsTransactionalSpout : 通过Redis 按照分页思想(TransactionMetadata from & quantity)按照多个批次读取 Tweet 数据 和 tweet id , 通过Emitter 发射批次数据。

b. UserSplitterBolt: 通过shuffle grouping方式, 接收TweetsTransactionalSpout 的批次中每条Tweet数据,进行分词后,查找user (@开头), 再先下一个blot发送@user  和 tweet id

c. TopicTagSplitterBolt:  通过shuffle grouping方式, 接收TweetsTransactionalSpout 的批次中每条Tweet数据,进行分词后,查找Topic tag (#Topic#), 再先下一个blot发送 #Tag# 和 tweet id

d. UserTopicTagJoinBolt:  通过field grouping (tweet-id)方式 , 联合接收UserSplitterBolt 与TopicTagSplitterBolt 数据,计算 @user #TopicTag# 数据频率。  

e. TweetRedisCommiterBolt通过global grouping (汇总频率数据), 联合接收UserSplitterBolt 、TopicTagSplitterBolt 和 UserTopicTagJoinBolt 数据,根据 Component Source name ,分别统计 用户频率,  热词频率 及 用户+热词 的频率。

Storm 从入门到精通 第二十五讲 Storm 批处理事务 - TweetsTransactionalSpout (Base) 【实战运行】《Getting Started With Storm》

                                                        TweetsTransactionalTopology

3. 代码

TweetsTransactionalTopology

package com.john.learn.storm.transaction.tweets.analytics.redis;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.transactional.TransactionalTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TopicTagSplitterBolt;
import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TweetRedisCommiterBolt;
import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserSplitterBolt;
import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserTopicTagJoinBolt;
import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsTransactionalSpout;

public class TweetsTransactionalTopology {

	public static void main(String[] args) {

		TransactionalTopologyBuilder topologyBuilder = new TransactionalTopologyBuilder("TweetsTransactionalTopology",
				"TweetsTransactionalSpout", new TweetsTransactionalSpout());

		topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4)
				.shuffleGrouping("TweetsTransactionalSpout");

		topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4)
				.shuffleGrouping("TweetsTransactionalSpout");

		topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 8)
				.fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId"))
				.fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId"));

		topologyBuilder.setBolt("Tweet-RedisCommiter", new TweetRedisCommiterBolt())
				.globalGrouping("User-TopicTag-Join").globalGrouping("Users-Splitter", "Users")
				.globalGrouping("TopicTags-Splitter", "TopicTags");

		//

		Config conf = new Config();

		LocalCluster cluster = new LocalCluster();

		cluster.submitTopology("TiwtterReachTopology", conf, topologyBuilder.buildTopology());

		// Utils.sleep(5000);
		//
		// cluster.killTopology("TiwtterReachTopology");
		//
		// cluster.shutdown();
	}
}

TweetsRedis 实现 封装 Redis 操作 

package com.john.learn.storm.transaction.tweets.analytics.redis;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;

import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.utils.Utils;

import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TransactionMetadata;
import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsTransactionalSpoutCoordinator;

import clojure.main;
import redis.clients.jedis.Jedis;


public class TweetsRedis {

	public static final String NEXT_READ_POSTION = "Tweets.Redis.NEXT_READ_POSTION";

	public static final String NEXT_WRITE_POSTION = "Tweets.Redis.NEXT_WRITE_POSTION";

	private Jedis jedis;

	public TweetsRedis(String hostName, int port, String password) {

		jedis = new Jedis(hostName, port);

		if (!StringUtils.isEmpty(password)) {

			jedis.auth(password);
		}

	}

	public TweetsRedis(String hostName, int port) {

		this(hostName, port, null);
	}

	public List<String> getMessages(long from, int quantity) {

		String[] keys = new String[quantity];

		for (int i = 0; i < quantity; i++) {

			keys[i] = MESSAGE_ID_KEY + "." + (from + i);
		}

		System.out.println("keys:" + Arrays.toString(keys));

		return jedis.mget(keys);
	}

	public String get(String key) {

		return this.jedis.get(key);
	}

	public Jedis getJedis() {

		return this.jedis;
	}

	public void close() {

		try {

			jedis.disconnect();

		} catch (Exception e) {

		}
	}

	public void clear() {

		int quantity = 1000;

		String[] keys = new String[quantity];

		for (int i = 0; i < quantity; i++) {

			keys[i] = MESSAGE_ID_KEY + "." + i;
		}

		jedis.del(keys);
		jedis.del(NEXT_WRITE_POSTION);
		jedis.del(NEXT_READ_POSTION);
	}

	public void addMessage(String message) {

		long index = jedis.incr(NEXT_WRITE_POSTION);

		jedis.set(MESSAGE_ID_KEY + "." + index, message);

	}

	public long getNextWrite() {

		String position = jedis.get(NEXT_WRITE_POSTION);

		if (position == null) {

			return 1;
		}

		return Long.valueOf(position) + 1;
	}

	public long getNextRead() {

		String position = jedis.get(NEXT_READ_POSTION);

		if (position == null) {
			return 1;
		}

		return Long.valueOf(position);
	}

	public void setNextRead(long position) {

		jedis.set(NEXT_READ_POSTION, String.valueOf(position));
	}

	public long getAvailableToRead(long current) {

		long items = getNextWrite() - current;

		return items > 0 ? items : 0;
	}

	/**
	 * 模拟继续发送
	 * 
	 * @param args
	 */
	// public static void main2(String[] args) throws InterruptedException {
	//
	// TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth");
	//
	// int maxCount = 10;
	//
	// int count = 0;
	//
	// while (count < maxCount) {
	//
	// for (int i = 0; i < 1; i++) {
	//
	// long tx = System.currentTimeMillis();
	//
	// tweetsRedis.addMessage(
	// "@John @alex 3. Apache #Storm# is a free and open source distributed
	// #realtime# #computation# system.");
	//
	// }
	//
	// Thread.sleep(000);
	//
	// count++;
	// }
	//
	// for (String key : tweetsRedis.jedis.keys("Tweets.Redis.*.Frequency")) {
	//
	// System.out.println("-------------" + key + "-------------");
	// System.out.println(tweetsRedis.jedis.hgetAll(key));
	// }
	//
	// }

	public static void main(String[] args) {

		TweetsRedis tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth");

		// 你可以清空数据,从来
		tweetsRedis.clear();

		tweetsRedis.addMessage("Hi @Tom @Simith 1. I want to #USA# and #Hometown# city.");

		tweetsRedis.addMessage("Hi @John @david @vivian 2. I want to #China# and #BeiJing# city.");
		tweetsRedis.addMessage(
				"@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system.");
		tweetsRedis.addMessage("Hi @david @vivian 4. I want to #China# and #BeiJing# city.");
		tweetsRedis.addMessage(
				"@Lily @Toe 5. #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.");
		tweetsRedis
				.addMessage("@Tim #Storm# integrates with the queueing and #database# technologies you already use.");
		tweetsRedis.addMessage(
				"@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use.");
		tweetsRedis.addMessage(
				"@nathan #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use.");
		tweetsRedis.addMessage(
				"@Lily #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use.");

		tweetsRedis.addMessage(
				"@bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.");
		tweetsRedis.addMessage(
				"@carissa Apache #Storm# is a free and open source distributed #realtime# #computation# system.");

		System.out.println("------------- Test TweetsTransactionalSpoutCoordinator Start-------------");

		TweetsTransactionalSpoutCoordinator transactionalSpoutCoordinator = new TweetsTransactionalSpoutCoordinator();

		TransactionMetadata transactionMetadata = null;

		while (transactionalSpoutCoordinator.isReady()) {

			transactionMetadata = transactionalSpoutCoordinator.initializeTransaction(BigInteger.valueOf(1),
					transactionMetadata);

			System.out.println("SpoutCoordinator Initialize Transaction Meta: " + transactionMetadata);

		}

		System.out.println("------------- Test TweetsTransactionalSpoutCoordinator End-------------");

	}

	private static final String MESSAGE_ID_KEY = "Tweets.Message";

}

提示: 运行main 方法可以初始化Redis数据。 main2 方法 模拟持续发送 Tweet消息。 

接下来一个事务性拓扑中实现Spout

TransactionMetadata

package com.john.learn.storm.transaction.tweets.analytics.redis.spout;

import java.io.Serializable;

public class TransactionMetadata implements Serializable {

	private static final long serialVersionUID = 1L;

	public long from;

	public int quantity;

	public TransactionMetadata(long from, int quantity) {
		this.from = from;
		this.quantity = quantity;
	}

	public TransactionMetadata() {

	}
	
	@Override
	public String toString() {
		
		return "from:"+from + " To:"+ (from+quantity-1) +" quantity:"+quantity ;
	}

}

TweetsTransactionalSpoutCoordinator

package com.john.learn.storm.transaction.tweets.analytics.redis.spout;

import java.math.BigInteger;

import org.apache.storm.transactional.ITransactionalSpout.Coordinator;

import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis;

public class TweetsTransactionalSpoutCoordinator implements Coordinator<TransactionMetadata> {

	public TweetsTransactionalSpoutCoordinator() {

		this.tweetsRedis =  new TweetsRedis("127.0.0.1", 6379, "6379Auth");

		nextRead = tweetsRedis.getNextRead();

		System.out.println("nextRead:" + nextRead);

	}

	@Override
	public void close() {

		tweetsRedis.close();
	}

	@Override
	public TransactionMetadata initializeTransaction(BigInteger txId, TransactionMetadata transactionMetadata) {
                //判断最后一个批次的大小
		int quantity = Math.min(TRANSACTION_MAX_SIZE, (int) tweetsRedis.getAvailableToRead(nextRead));

		transactionMetadata = new TransactionMetadata(nextRead, Math.min(TRANSACTION_MAX_SIZE, quantity));

		nextRead += quantity;

		return transactionMetadata;
	}

	@Override
	public boolean isReady() {

		// 查看是否还有数据,非常重要,不然 不断的发送数据, 如何 总是返回 true 的。
		return tweetsRedis.getAvailableToRead(nextRead) > 0;
	}

	private transient TweetsRedis tweetsRedis;

	private long nextRead = 0;

	private static final int TRANSACTION_MAX_SIZE = 1000;

}

//判断最后一个批次的大小
int quantity = Math.min(TRANSACTION_MAX_SIZE, (int) tweetsRedis.getAvailableToRead(nextRead));

值得一提的是,在整个拓扑中只会有一个提交者实例。创建提交者实例时,它会从redis读取一个从1开始的序列号,这个序列号标识要读取的tweet下一条。

第一个方法是isReady。在initializeTransaction之前调用它确认数据源已就绪并可读取。此方法应当相应的返回truefalse。在此例中,读取tweets数量并与已读数量比较。它们之间的不同就在于可读tweets数。如果它大于0,就意味着还有tweets未读。

最后,执行initializeTransaction。正如你看到的,它接收txidprevMetadata作为参数。第一个参数是Storm生成的事务ID,作为批次的惟一性标识。prevMetadata是协调器生成的前一个事务元数据对象。

在这个例子中,首先确认有多少tweets可读。只要确认了这一点,就创建一个TransactionMetadata对象,标识读取的第一个tweet(对象属性from),以及读取的tweets数量(对象属性quantity)。

元数据对象一经返回,Storm把它跟txid一起保存在zookeeper。这样就确保了一旦发生故障,Storm可以利用分发器(Emitter,见下文)重新发送批次。

TweetsTransactionalSpoutEmitter

package com.john.learn.storm.transaction.tweets.analytics.redis.spout;

import java.math.BigInteger;

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.transactional.ITransactionalSpout.Emitter;

import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis;

import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Values;

public class TweetsTransactionalSpoutEmitter implements Emitter<TransactionMetadata> {

	public TweetsTransactionalSpoutEmitter() {

		this.tweetsRedis =  new TweetsRedis("127.0.0.1", 6379, "6379Auth");


	}

	@Override
	public void cleanupBefore(BigInteger txId) {

	}

	@Override
	public void close() {

		tweetsRedis.close();
	}

	@Override
	public void emitBatch(TransactionAttempt transactionAttempt, TransactionMetadata transactionMetadata,
			BatchOutputCollector collector) {

		/**
		 * 分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次。
		 * 这样,如果在处理批次的过程中发生了故障,Storm就能够利用分发器重复相同的事务id和事务元数据,并确保批次已经重复过了。
		 * Storm会在TransactionAttempt对象里为尝试次数增加计数(译者注:attempt id)。这样就能知道批次已经重复过了
		 */
		long tweetId = transactionMetadata.from;

		for (String tweet : tweetsRedis.getMessages(transactionMetadata.from, transactionMetadata.quantity)) {
			
			if(StringUtils.isEmpty(tweet)) {
				continue;
			}

			collector.emit(new Values(transactionAttempt, String.valueOf(tweetId), tweet));

			tweetId++;
		}

		// Save next data
		tweetsRedis.setNextRead(transactionMetadata.from + transactionMetadata.quantity);

	}

	private transient TweetsRedis tweetsRedis;

}

分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次。这样,如果在处理批次的过程中发生了故障,Storm就能够利用分发器重复相同的事务id和事务元数据,并确保批次已经重复过了。Storm会在TransactionAttempt对象里为尝试次数增加计数(attempt id)。这样就能知道批次已经重复过了。

在这里emitBatch是个重要方法。在这个方法中,使用传入的元数据对象从redis得到tweets,同时增加redis维持的已读tweets数。当然它还会把读到的tweets分发到拓扑。

TweetsTransactionalSpout

package com.john.learn.storm.transaction.tweets.analytics.redis.spout;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.topology.base.BaseTransactionalSpout;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis;

public class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata> {

	public TweetsTransactionalSpout() {

	}

	@Override
	public Coordinator<TransactionMetadata> getCoordinator(Map config, TopologyContext context) {

		return new TweetsTransactionalSpoutCoordinator();
	}

	@Override
	public Emitter<TransactionMetadata> getEmitter(Map config, TopologyContext context) {

		return new TweetsTransactionalSpoutEmitter();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		/**
		 * Refer to Emitter collector.emit(new Values(transactionAttempt,
		 * String.valueOf(tweetId), tweet));
		 */

		fieldsDeclarer.declare(new Fields("txid", "tweetId", "tweet"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {

		return null;
	}

	private SpoutOutputCollector collector;
}

备注: getCoordinator方法,告诉Storm用来协调生成批次的类。getEmitter,负责读取批次并把它们分发到拓扑中的数据流组。最后,就像之前做过的,需要声明要分发的域。

bolts

UserSplitterBolt:UserSplitterBolt接收元组,解析tweet文本,分发@开头的单词————tweeter用户

package com.john.learn.storm.transaction.tweets.analytics.redis.bolt;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class UserSplitterBolt implements IBasicBolt {

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		fieldsDeclarer.declareStream("Users", new Fields("txid", "tweetId", "user"));

	}

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
		String tweetId = tuple.getString(1);
		String tweet = tuple.getString(2);

		Set<String> users = new HashSet<>();

		for (String user : StringUtils.splitByWholeSeparator(tweet, " ")) {

			// 确保这是个真实的用户,并且在这个tweet中没有重复
			if (user.startsWith("@") && !users.contains(user)) {

				users.add(user);

				collector.emit("Users", new Values(transactionAttempt, tweetId, user));

				System.out.println("Tx:" + tuple.getValue(0) + " [" + tuple.getString(1) + "]:" + user);
			}
		}

	}

	@Override
	public void prepare(Map config, TopologyContext context) {

	}

	@Override
	public Map<String, Object> getComponentConfiguration() {

		return null;
	}

	@Override
	public void cleanup() {

	}

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

}

TopicTagSplitterBolt:接收元组,解析tweet文本,分发#开头#结尾的单词————Topic Tag

package com.john.learn.storm.transaction.tweets.analytics.redis.bolt;

import java.util.HashSet;
import java.util.Set;

import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class TopicTagSplitterBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
		String tweetId = tuple.getString(1);
		String tweet = tuple.getString(2);

		Set<String> topicTags = new HashSet<>();

		for (String topicTag : StringUtils.splitByWholeSeparator(tweet, " ")) {

			// 确保这是个真实的用户,并且在这个tweet中没有重复
			if (topicTag.startsWith("#") && topicTag.endsWith("#") && !topicTags.contains(topicTag)) {

				topicTags.add(topicTag);

				collector.emit("TopicTags", new Values(transactionAttempt, tweetId, topicTag));

				System.out.println("Tx:" + tuple.getValue(0) + " [" + tuple.getString(1) + "]:" + topicTag);
			}
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		fieldsDeclarer.declareStream("TopicTags", new Fields("txid", "tweetId", "topicTag"));

	}

	private static final long serialVersionUID = 1L;

}

UserTopicTagJoinBolt

package com.john.learn.storm.transaction.tweets.analytics.redis.bolt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBatchBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//tweet_id
public class UserTopicTagJoinBolt extends BaseBatchBolt<TransactionAttempt> {

	@Override
	public void prepare(Map config, TopologyContext context, BatchOutputCollector collector,
			TransactionAttempt transactionAttempt) {

		System.out.println(
				"prepare()   Task Id:" + context.getThisTaskId() + " transactionAttempt:" + transactionAttempt);

		this.collector = collector;

		/**
		 * 每个批次 初始化一下
		 */
		userTweets = new HashMap<>();

		topicTagTweets = new HashMap<>();

		this.transactionAttempt = transactionAttempt;
	}

	@Override
	public void execute(Tuple tuple) {

		System.out.println("userTweets size " + userTweets.size() + " Task Id:" + tuple.getSourceTask());

		TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);

		String tweetId = tuple.getString(1);

		String source = tuple.getSourceStreamId();

		if (source.equalsIgnoreCase("Users")) {

			String user = tuple.getString(2);

			// 每个用户所有tweet Id (消息Id)
			add(userTweets, user, tweetId);

			return;
		}

		if (source.equalsIgnoreCase("TopicTags")) {

			String topicTag = tuple.getString(2);

			// 每个TweetId 包含所有Topic Tag
			add(topicTagTweets, tweetId, topicTag);

			return;
		}

		System.err.println("sorry, source not found " + source);

	}

	@Override
	public void finishBatch() {

		System.out.println("finishBatch call().............");

		// 完成每对用户和话题出现的次数
		for (String user : userTweets.keySet()) {

			// 获取用户所有Tweet Id集合
			Set<String> tweetIds = getTweetIdsByUser(user);

			countOccurFrequencyPerUserAndTopicTag(user, tweetIds);
		}
	}

	/**
	 * 方法计算每对用户和话题出现的次数
	 * 
	 * @param user
	 * @param tweetIds
	 */
	private void countOccurFrequencyPerUserAndTopicTag(String user, Set<String> tweetIds) {

		Map<String, Integer> topicTagCounter = new HashMap<String, Integer>();

		for (String tweetId : tweetIds) {

			Set<String> topicTags = getTweetTopicTagsById(tweetId);

			if (topicTags == null) {

				continue;

			}

			for (String topicTag : topicTags) {

				Integer topicTagCount = topicTagCounter.get(topicTag);

				if (topicTagCount == null) {
					topicTagCount = 0;

				}

				topicTagCount++;

				topicTagCounter.put(topicTag, topicTagCount);
			}
		}

		for (String topicTag : topicTagCounter.keySet()) {
			// emit 每对用户和话题出现的次数

			System.out.println("@" + user + "#" + topicTag + "# Counts:" + topicTagCounter.get(topicTag));
			this.collector.emit(new Values(transactionAttempt, user, topicTag, topicTagCounter.get(topicTag)));
		}

	}

	private Set<String> getTweetTopicTagsById(String tweetId) {

		return this.topicTagTweets.get(tweetId);
	}

	private Set<String> getTweetIdsByUser(String user) {

		return this.userTweets.get(user);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

		fieldsDeclarer.declare(new Fields("txid", "user", "topicTag", "count"));
	}

	private void add(Map<String, Set<String>> bag, String key, String value) {

		Set<String> slot = bag.get(key);

		if (slot == null) {

			slot = new HashSet<>();

			bag.put(key, slot);
		}

		slot.add(value);
	}

	private Map<String, Set<String>> userTweets;

	private Map<String, Set<String>> topicTagTweets;

	private BatchOutputCollector collector;

	private TransactionAttempt transactionAttempt;

	private static final long serialVersionUID = 1L;
}

UserTopicTagJoinBolt的实现: 首先要注意的是它是一个BaseBatchBolt。这意味着,execute方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm会调用finishBatch方法。

public void execute(Tuple tuple) {

		System.out.println("userTweets size " + userTweets.size() + " Task Id:" + tuple.getSourceTask());

		TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);

		String tweetId = tuple.getString(1);

		String source = tuple.getSourceStreamId();

		if (source.equalsIgnoreCase("Users")) {

			String user = tuple.getString(2);

			// 每个用户所有tweet Id (消息Id)
			add(userTweets, user, tweetId);

			return;
		}

		if (source.equalsIgnoreCase("TopicTags")) {

			String topicTag = tuple.getString(2);

			// 每个TweetId 包含所有Topic Tag
			add(topicTagTweets, tweetId, topicTag);

			return;
		}

		System.err.println("sorry, source not found " + source);

	}

既然要结合tweet中提到的用户为出现的所有话题计数,就需要加入前面的bolts创建的两个数据流组。这件事要以批次为单位进程,在批次处理完成时,调用finishBatch方法。

@Override
	public void finishBatch() {

		System.out.println("finishBatch call().............");

		// 完成每对用户和话题出现的次数
		for (String user : userTweets.keySet()) {

			// 获取用户所有Tweet Id集合
			Set<String> tweetIds = getTweetIdsByUser(user);

			countOccurFrequencyPerUserAndTopicTag(user, tweetIds);
		}
	}

这个方法计算每对用户-话题出现的次数,并为之生成和分发元组。

提交者 TweetRedisCommiterBolt

在这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。已经学习了,批次通过协调器和分Tips:这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。

协调者bolts是一类特殊的批处理bolts,它们实现了ICommitter或者通过TransactionalTopologyBuilder调用setCommiterBolt设置了提交者bolt。它们与其它的批处理bolts最大的不同在于,提交者boltsfinishBatch方法在提交就绪时执行。这一点发生在之前所有事务都已成功提交之后。另外,finishBatch方法是顺序执行的。因此如果同时有事务ID1和事务ID2两个事务同时执行,只有在ID1没有任何差错的执行了finishBatch方法之后,ID2才会执行该方法。

package com.john.learn.storm.transaction.tweets.analytics.redis.bolt;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsRedis;

import redis.clients.jedis.Transaction;

public class TweetRedisCommiterBolt extends BaseTransactionalBolt implements ICommitter {

	@Override
	public void prepare(Map config, TopologyContext context, BatchOutputCollector collector,
			TransactionAttempt transactionAttempt) {

		this.transactionAttempt = transactionAttempt;

		sourceCounterMapCounters = new HashMap<>();

		// 避免重复初始化redis 连接
		if (tweetsRedis == null) {

			this.tweetsRedis = new TweetsRedis("127.0.0.1", 6379, "6379Auth");

		}

	}

	/**
	 * topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4)
	 * .shuffleGrouping("TweetsTransactionalSpout");
	 * 
	 * topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4)
	 * .shuffleGrouping("TweetsTransactionalSpout");
	 * 
	 * topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 1)
	 * .fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId"))
	 * .fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId"));
	 * 
	 */
	@Override
	public void execute(Tuple tuple) {

		String source = tuple.getSourceComponent();

		if (source.equalsIgnoreCase("Users-Splitter")) {

			count("Tweets.Redis.Users.Frequency", tuple.getStringByField("user"), 1);

			return;
		}

		if (source.equalsIgnoreCase("TopicTags-Splitter")) {

			count("Tweets.Redis.TopicTags.Frequency", tuple.getStringByField("topicTag"), 1);

			return;
		}

		if (source.equalsIgnoreCase("User-TopicTag-Join")) {

			count("Tweets.Redis.UserTopicTags.Frequency",
					tuple.getStringByField("user") + ":" + tuple.getStringByField("topicTag"),
					tuple.getIntegerByField("count"));
		}

	}

	private void count(String sourceCounterMapKey, String counterTag, Integer count) {

		if (!sourceCounterMapCounters.containsKey(sourceCounterMapKey)) {

			sourceCounterMapCounters.put(sourceCounterMapKey, new HashMap<>());
		}

		Map<String, Integer> counters = sourceCounterMapCounters.get(sourceCounterMapKey);

		Integer prevTotalCount = counters.get(counterTag);

		if (prevTotalCount == null) {

			prevTotalCount = 0;
		}

		counters.put(counterTag, prevTotalCount + count);

	}

	@Override
	public void finishBatch() {

		String lastCommitTransaction = tweetsRedis.get(LAST_COMMITED_TRANSACTION);

		if (String.valueOf(this.transactionAttempt.getTransactionId()).equals(lastCommitTransaction)) {

			return;
		}

		Transaction multi = tweetsRedis.getJedis().multi();

		multi.set(LAST_COMMITED_TRANSACTION, String.valueOf(transactionAttempt.getTransactionId()));

		for (String sourceCounterKey : sourceCounterMapCounters.keySet()) {

			Map<String, Integer> sourceTotalMap = sourceCounterMapCounters.get(sourceCounterKey);

			for (String counterTag : sourceTotalMap.keySet()) {

				multi.hincrBy(sourceCounterKey, counterTag, sourceTotalMap.get(counterTag));
			}

		}

		multi.exec();


	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {

	}

	private Map<String, Map<String, Integer>> sourceCounterMapCounters;

	private TransactionAttempt transactionAttempt;

	private static TweetsRedis tweetsRedis;

	public static final String LAST_COMMITED_TRANSACTION = "Tweets.Redis.LAST_COMMIT";

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
}

这个实现很简单,但是在finishBatch有一个细节: 开启事务,防止重复操作及出错后,回滚该批次的所有数据从新发送。

         Transaction multi = tweetsRedis.getJedis().multi();

	multi.set(LAST_COMMITED_TRANSACTION, String.valueOf(transactionAttempt.getTransactionId()));

	for (String sourceCounterKey : sourceCounterMapCounters.keySet()) {

		Map<String, Integer> sourceTotalMap = sourceCounterMapCounters.get(sourceCounterKey);

		for (String counterTag : sourceTotalMap.keySet()) {

			multi.hincrBy(sourceCounterKey, counterTag, sourceTotalMap.get(counterTag));
		}

	}

	multi.exec();

Tips:这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。

运行结果:

 

TweetsRedis

 
public static void main(String[] args) {


		TweetsRedis tweetsRedis =  new TweetsRedis("127.0.0.1", 6379, "6379Auth");

		// 你可以清空数据,从来
		tweetsRedis.clear();


		tweetsRedis.addMessage("Hi @Tom @Simith 1. I want to #USA# and #Hometown# city.");


		tweetsRedis.addMessage("Hi @John @david @vivian 2. I want to #China# and #BeiJing# city.");
		tweetsRedis.addMessage(
				"@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system.");
		tweetsRedis.addMessage("Hi @david @vivian 4. I want to #China# and #BeiJing# city.");
		tweetsRedis.addMessage(
				"@Lily @Toe 5. #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.");
		tweetsRedis
				.addMessage("@Tim #Storm# integrates with the queueing and #database# technologies you already use.");
		tweetsRedis.addMessage(
				"@Sunny @chris #Storm# is simple, can be used with any programming#language#, and is a lot of fun to use.");
		tweetsRedis.addMessage(
				"@nathan #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use.");
		tweetsRedis.addMessage(
				"@Lily #Storm# is simple, can be used with any programming #language#, and is a lot of fun to use.");


		tweetsRedis.addMessage(
				"@bob #Storm# has many use cases: #realtime# analytics, online machine learning, continuous #computation#, distributed RPC, ETL, and more.");
		tweetsRedis.addMessage(
				"@carissa Apache #Storm# is a free and open source distributed #realtime# #computation# system.");


		System.out.println("------------- Test TweetsTransactionalSpoutCoordinator Start-------------");


		TweetsTransactionalSpoutCoordinator transactionalSpoutCoordinator = new TweetsTransactionalSpoutCoordinator();


		TransactionMetadata transactionMetadata = null;


		while (transactionalSpoutCoordinator.isReady()) {


			transactionMetadata = transactionalSpoutCoordinator.initializeTransaction(BigInteger.valueOf(1),
					transactionMetadata);


			System.out.println("SpoutCoordinator Initialize Transaction Meta: " + transactionMetadata);


		}


		System.out.println("------------- Test TweetsTransactionalSpoutCoordinator End-------------");


	}

-------------Tweets.Redis.UserTopicTags.Frequency-------------

{@John:#China#=2, @John:#realtime#=2, @alex:#computation#=2, @david:#BeiJing#=4, @Simith:#Hometown#=2, @bob:#realtime#=2, @carissa:#realtime#=2, @Toe:#realtime#=2, @Toe:#Storm#=2, @chris:#Storm#=2, @bob:#Storm#=2, @alex:#Storm#=2, @nathan:#Storm#=2, @John:#BeiJing#=2, @Tim:#database#=2, @Lily:#realtime#=2, @Simith:#USA#=2, @Tom:#USA#=2, @vivian:#BeiJing#=4, @vivian:#China#=4, @Tim:#Storm#=2, @John:#computation#=2, @John:#Storm#=2, @alex:#realtime#=2, @carissa:#computation#=2, @Lily:#Storm#=4, @david:#China#=4, @Sunny:#Storm#=2, @carissa:#Storm#=2, @Tom:#Hometown#=2}

-------------Tweets.Redis.Users.Frequency-------------

{@Lily=4, @david=4, @John=4, @Toe=2, @nathan=2, @Tim=2, @Sunny=2, @vivian=4, @bob=2, @Tom=2, @alex=2, @Simith=2, @chris=2, @carissa=2}

-------------Tweets.Redis.TopicTags.Frequency-------------{#realtime#=8, #Hometown#=2, #computation#=4, #database#=2, #China#=4, #USA#=2, #Storm#=16, #BeiJing#=4}

直接运行模拟继续发送数据Tweet

 
public static void main(String[] args) throws InterruptedException {

		TweetsRedis tweetsRedis =  new TweetsRedis("127.0.0.1", 6379, "6379Auth");


		int maxCount = 10;

		int count = 0;

		while (count < maxCount) {

			for (int i = 0; i < 10; i++) {

				long tx = System.currentTimeMillis();

				tweetsRedis.addMessage(
						"@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system.");

			}

			Thread.sleep(1000);

			count++;
		}

		for (String key : tweetsRedis.jedis.keys("Tweets.Redis.*.Frequency")) {

			System.out.println("-------------" + key + "-------------");
			System.out.println(tweetsRedis.jedis.hgetAll(key));
		}

	}

Storm 从入门到精通 第二十五讲 Storm 批处理事务 - TweetsTransactionalSpout (Base) 【实战运行】《Getting Started With Storm》