<iframe id="iframeu1728839_0" src="http://pos.baidu.com/acom?rdid=1728839&dc=2&di=u1728839&dri=0&dis=0&dai=3&ps=454x1133&dcb=BAIDU_UNION_define&dtm=BAIDU_DUP_SETJSONADSLOT&dvi=0.0&dci=-1&dpt=none&tsr=0&tpr=1457398129447&ti=Storm%E5%BA%94%E7%94%A8%E7%B3%BB%E5%88%97%E4%B9%8B%E2%80%94%E2%80%94%E9%9B%86%E6%88%90Kafka-Kafka-about%E4%BA%91%E5%BC%80%E5%8F%91&ari=1&dbv=2&drs=1&pcs=1349x643&pss=1349x826&cfv=0&cpl=4&chi=1&cce=true&cec=GBK&tlm=1457398129&ltu=http%3A%2F%2Fwww.aboutyun.com%2Fthread-8959-1-1.html&ltr=https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3D_xO7icZragL9hf-aNsDJYlSldtr6jX-XwmTCBdEbhAePgMtCBu4MBlJ-TbVXPG9D16VMGmoJV2c5fdsca_mJwK%26wd%3D%26eqid%3Dcd9e77af000103780000000256de1ff8&ecd=1&psr=1366x768&par=1366x728&pis=-1x-1&ccd=24&cja=false&cmi=6&col=zh-CN&cdo=-1&tcn=1457398130&qn=e881dfaff4e9adff&tt=1457398129395.242.646.648" width="120" height="240" align="center,center" vspace="0" hspace="0" marginwidth="0" marginheight="0" scrolling="no" frameborder="0" allowtransparency="true" style="word-wrap: break-word; border-width: 0px; border-style: initial; vertical-align: bottom; margin: 0px;"></iframe> 问题导读: Kafka集群中的Broker地址,有哪两种方法指定? TransactionalTridentKafkaSpout的作用是什么? 本地模式无法保存Offset该如何解决?
前言Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。
Kafka的基本介绍:什么是Kafka
准备工作 KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造*。只是要注意版本问题: 0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子 源码:https://github.com/nathanmarz/st ... /master/storm-kafka Maven依赖:https://clojars.org/storm/storm-kafka
0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI 源码:https://github.com/wurstmeister/storm-kafka-0.8-plus
这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。
PS: 是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。
2014/7/29 更新: wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录
Maven依赖直接更新成:
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>0.9.2-incubating</version>
- </dependency>
复制代码 但是storm似乎没有直接把external的包加载到classpath,所以使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。 当然,也可以在maven中加上<scope>compile</scope>,直接把该jar打到你项目一起。
使用KafkaSpout一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:
- Kafka集群中的Broker地址 (IP+Port)
有两种方法指定:
1. 使用静态地址,即直接给定Kafka集群中所有Broker信息
- GlobalPartitionInformation info = new GlobalPartitionInformation();
- info.addPartition(0, new Broker("10.1.110.24",9092));
- info.addPartition(0, new Broker("10.1.110.21",9092));
- BrokerHosts brokerHosts = new StaticHosts(info);
复制代码 2. 从Zookeeper动态读取
- BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");
复制代码
推荐使用这种方法,因为Kafka的Broker可能会动态的增减
- topic名字
- 当前spout的唯一标识Id (以下代称$spout_id)
- zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root)
- 当前topic中数据如何解码
了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。
在Topology中加入Spout的代码:
- String topic = "test";
- String zkRoot = "kafkastorm";
- String spoutId = "myKafka";
-
- SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
- spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);
复制代码 其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据
- public class TestMessageScheme implements Scheme {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
-
- @Override
- public List<Object> deserialize(byte[] bytes) {
- try {
- String msg = new String(bytes, "UTF-8");
- return new Values(msg);
- } catch (InvalidProtocolBufferException e) {
- LOGGER.error("Cannot parse the provided message!");
- }
-
- //TODO: what happend if returns null?
- return null;
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields("msg");
- }
-
- }
复制代码 这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,所以这里也还原成String,定义输出为一个名叫"msg"的field。
后面就可以自己添加Bolt处理tuple中该field的数据了。
使用TransactionalTridentKafkaSpoutTransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。
- TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);
- kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
-
- TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
-
- TridentTopology topology = new TridentTopology();
- topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());
复制代码 看到它并没有要求我们提供zkRoot,因为直接代码里面写死了…… -_-T 地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是 /transactional/test_str/myKafaka
常见问题1. 本地模式无法保存Offset KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。 本地模式,要显示的去配置
- spoutConfig.zkServers = new ArrayList<String>(){{
- add("10.1.110.20");
- add("10.1.110.21");
- add("10.1.110.24");
- }};
- spoutConfig.zkPort = 2181;
复制代码
- <del><dependency>
- <groupId>net.wurstmeister.storm</groupId>
- <artifactId>storm-kafka-0.8-plus</artifactId>
- <version>0.2.0</version>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- </dependency></del>
复制代码
|