flume+kafka+storm

时间:2022-09-26 23:25:53

centos06.6+JDK1.7

flume1.4+kafka2.10+storm0.9.3

zookeeper3.4.6


集群:

192.168.80.133 x01

192.168.80.134 x02


1.两台机器上设置hostname和hosts

。。。

flume+kafka+storm

2.两台机器上安装JDK并设置环境变量

flume+kafka+storm

3.下载安装zookeeper并设置环境变量

# example sakes.
dataDir=/data/zookeeper/data
# the port at which the clients will connect
clientPort=
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=
# Purge task interval in hours
# Set to "" to disable auto purge feature
#autopurge.purgeInterval= server.=x01::
server.=x02::
zkServer.sh start
zkserver.sh status

flume+kafka+storm

4.下载安装flume

http://www.cnblogs.com/admln/p/flume.html

5.下载安装kafka

http://www.cnblogs.com/admln/p/kafka-install.html

6.整合flume和kafka

下载整合插件flumeng-kafka-plugin:https://github.com/beyondj2ee/flumeng-kafka-plugin

提取插件中的flume-conf.properties,修改后放到kafka的conf目录下

############################################
# producer config
########################################### #agent section
producer.sources = s
producer.channels = c
producer.sinks = r #source section
producer.sources.s.type = spooldir
producer.sources.s.spoolDir=/home/hadoop/testFlume
producer.sources.s.fileHeader=false
producer.sources.s.channels = c # Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=x01:
producer.sinks.r.partition.key=
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=
producer.sinks.r.max.message.size=
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-
producer.sinks.r.custom.topic.name=test #Specify the channel the sink should use
producer.sinks.r.channel = c # Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity =

将Plugin中的jar包拷贝到flume的lib目录中

在/home/hadoop/testFlume中放入文件,在kafka中启用一个console的consumer来测试

bin/flume-ng agent -n producer -c conf -f conf/kafka.conf -Dflume.root.logger=DEBUG,console
bin/kafka-console-consumer.sh --zookeeper x01: --topic test --from-beginning

flume+kafka+storm

flume+kafka+storm

测试成功

注意:如果让flume传输中文的话,文件编码最好是UTF-8,否则容易乱码导致flume死掉

7.安装storm

http://www.cnblogs.com/admln/p/storm-install.html

flume+kafka+storm

8.整合storm和kafka

将kafka的一些jar包复制到storm的lib目录中

cp kafka_2.-0.8.1.1/libs/kafka_2.-0.8.1.1.jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/scala-library-2.10..jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/metrics-core-2.2..jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/snappy-java-1.0..jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/zkclient-0.3.jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/log4j-1.2..jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/slf4j-api-1.7..jar apache-storm-0.9./lib/
cp kafka_2.-0.8.1.1/libs/jopt-simple-3.2.jar apache-storm-0.9./lib/

把zookeeper的zookeeper-3.4.6.jar复制到storm的lib目录中

cp zookeeper-3.4./zookeeper-3.4..jar apache-storm-0.9./lib/

编写storm程序来测试

pom.xml

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

spout

package org.admln.flume_kafka_storm;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties; import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values; public class KafkaSpout extends BaseRichSpout { private static final long serialVersionUID = -9174998944310422274L;
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic; public KafkaSpout() {} public KafkaSpout(String topic) {
this.topic = topic;
} public void nextTuple() { } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
} public void ack(Object msgId) { } public void activate() {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, ); System.out.println("*********Results********topic:"+topic); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get();
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
String value =new String(it.next().message());
System.out.println("storm接收到来自kafka的消息------->" + value);
collector.emit(new Values(value), value);
}
} private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect","x01:2181,x02:2181");
// 设置group id
props.put("group.id", "");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "");
props.put("zookeeper.session.timeout.ms","");
return new ConsumerConfig(props);
} public void close() { } public void deactivate() { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
} public Map<String, Object> getComponentConfiguration() {
System.out.println("getComponentConfiguration被调用");
topic="test";
return null;
}
}

bolt(wordsplitter)

package org.admln.flume_kafka_storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; public class KafkaWordSplitterBolt extends BaseRichBolt { private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
} public void execute(Tuple input) {
String line = input.getString();
String[] words = line.split(",");
for(String word : words) {
         //这里除了提交一个传向下个bolt的list集,还把tuple提交了,这是collector的emit方法之一,为了下面的ack错误恢复
collector.emit(input, new Values(word, ));
}
collector.ack(input);
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

bolt(wordcount)

package org.admln.flume_kafka_storm;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger; import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple; public class KafkaWordCounterBolt extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap; public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
} public void execute(Tuple input) {
String word = input.getString();
int count = input.getInteger();
AtomicInteger ai = this.counterMap.get(word);
if (ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
} public void cleanup() {
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
System.out.println(entry.getKey() + "\t:\t" + entry.getValue().get());
} } public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

topology

package org.admln.flume_kafka_storm;

import java.util.HashMap;
import java.util.Map; import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; public class KafkaTopology { public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(""), );
     //bolt1 是此bolt在这个图中的ID
     //2表示启用多少线程来运行,可以省略,省略的话则默认分配一个线程
builder.setBolt("bolt1", new KafkaWordSplitterBolt(), )
.shuffleGrouping("spout");
builder.setBolt("bolt2", new KafkaWordCounterBolt(), ).fieldsGrouping(
"bolt1", new Fields("word"));
String name = KafkaTopology.class.getSimpleName();
if (args != null && args.length > ) {
Config conf = new Config();
// 通过指定nimbus主机
conf.put(Config.NIMBUS_HOST, args[]);
conf.setNumWorkers();
StormSubmitter.submitTopologyWithProgressBar(name, conf,
builder.createTopology());
} else {
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, );
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-flume-kafka-storm-topology-integration",
conf, builder.createTopology());
}
}
}

可以直接在eclipse中本地运行也可以放到集群上运行

集群上

bin/storm jar flume-kafka-storm.jar org.admln.flume_kafka_storm.KafkaToplology x01

flume+kafka+storm的更多相关文章

  1. 简单测试flume&plus;kafka&plus;storm的集成

    集成 Flume/kafka/storm 是为了收集日志文件而引入的方法,最终将日志转到storm中进行分析.storm的分析方法见后面文章,这里只讨论集成方法. 以下为具体步骤及测试方法: 1.分别 ...

  2. Flume&plus;Kafka&plus;Storm&plus;Hbase&plus;HDSF&plus;Poi整合

    Flume+Kafka+Storm+Hbase+HDSF+Poi整合 需求: 针对一个网站,我们需要根据用户的行为记录日志信息,分析对我们有用的数据. 举例:这个网站www.hongten.com(当 ...

  3. Flume&plus;Kafka&plus;Storm整合

    Flume+Kafka+Storm整合 1. 需求: 有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-k ...

  4. 大数据处理框架之Strom:Flume&plus;Kafka&plus;Storm整合

    环境 虚拟机:VMware 10 Linux版本:CentOS-6.5-x86_64 客户端:Xshell4 FTP:Xftp4 jdk1.8 storm-0.9 apache-flume-1.6.0 ...

  5. Flume&plus;Kafka&plus;storm的连接整合

    Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html ...

  6. flume&plus;kafka&plus;storm&plus;mysql架构设计

    前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql (项目是mav ...

  7. 一次简单的springboot&plus;dubbo&plus;flume&plus;kafka&plus;storm&plus;redis系统

    最近无事学习一下,用springboot+dubbo+flume+kafka+storm+redis做了一个简单的scenic系统 scenicweb:展现层,springboot+dubbo sce ...

  8. Flume&plus;Kafka&plus;Storm&plus;Redis 大数据在线实时分析

    1.实时处理框架 即从上面的架构中我们可以看出,其由下面的几部分构成: Flume集群 Kafka集群 Storm集群 从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间 ...

  9. flume&plus;kafka&plus;storm单机部署

    flume-1.6.0 kafka0.9.0.0 storm0.9.6 一.部署flume 1.解压 tar -xzvf apache-flume-1.6.0-bin.tar.gz -C ../app ...

随机推荐

  1. 网站微信登录授权 ASP&period;NET

    最新做一些项目都有微信登录注册什么的,今天就把自己整理的demo提供给大家 微信认证流程(我自己简称三次握手): 1.用户同意授权,获取code 2.通过code换取网页授权access_token, ...

  2. CDN 备胎技巧

    如果你使用 CDN 服务,建议准备一个备胎,万一 CDN 服务挂了,可以从自己的服务器上读取: <script src="http://cdn.staticfile.org/jquer ...

  3. Qt中如何添加&period;qrc文件

    You need a resource file (.qrc) within which you embed the input text file. The input file can be an ...

  4. C语言老司机学Python (六)- 多线程

    前面的1-5都是比较基础的东西,能做的事情也有限. 从本节起,随着更多进阶技术的掌握,渐渐就可以用Python开始浪了. Python3使用threading模块来实现线程操作. 根据在其他语言处学来 ...

  5. Tp5&period;1使用导出Excel

    composer require phpoffice/phpexcel 不管它的警告,都能用的. use PHPExcel; use PHPExcel_IOFactory; public static ...

  6. 对《禁忌搜索&lpar;Tabu Search&rpar;算法及python实现》的修改

    这个算法是在听北大人工智能mooc的时候,老师讲的一种局部搜索算法,可是举得例子不太明白.搜索网页后,发现<禁忌搜索(Tabu Search)算法及python实现>(https://bl ...

  7. monitorix(linux)系统和网络监控公工具

    一.monitorix Monitorix是一款功能非常强大的免费开源轻型工具,目的在于监测Linux中的系统和网络资源.它可以定期收集系统和网络数据,并使用自己的Web界面,通过图形显示相关信息.M ...

  8. c数据结构第一个公式写程序计算给定多项式在给定点x处的值

    假设x=2,那么a的数组有几个, 那n就是根据索引来的 假设数组[, , , , ] n就是0, 1, 2,,3, 4 f = + ( + ( + ( + ()))) f = + ( + ( + ( ...

  9. 【JBPM4】任务form表单

    主要是在任务节点处保存一个链接地址(表单),方便在执行到此时取值,访问. 好处是XML可随时修改,部署. JPDL <?xml version="1.0" encoding= ...

  10. 分区表分区字段的update操作

    默认情况下,oracle的分区表对于分区字段是不允许进行update操作的,如果有对分区字段行进update,就会报错——ORA-14402: 更新分区关键字列将导致分区的更改.但是可以通过打开表的r ...