1
2
3
|
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package idoall.cloud.flume.sink;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class KafkaSink extends AbstractSink implements Configurable {
private static final Log logger = LogFactory.getLog(KafkaSink. class );
private String topic;
private Producer<String, String> producer;
public void configure(Context context) {
topic = "idoall_testTopic" ;
Properties props = new Properties();
props.setProperty( "metadata.broker.list" , "m1:9092,m2:9092,s1:9092,s2:9092" );
props.setProperty( "serializer.class" , "kafka.serializer.StringEncoder" );
props.put( "partitioner.class" , "idoall.cloud.kafka.Partitionertest" );
props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181/kafka" );
props.setProperty( "num.partitions" , "4" ); //
props.put( "request.required.acks" , "1" );
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
logger.info( "KafkaSink初始化完成." );
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if (e == null ) {
tx.rollback();
return Status.BACKOFF;
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
producer.send(data);
logger.info( "flume向kafka发送消息:" + new String(e.getBody()));
tx.commit();
return Status.READY;
} catch (Exception e) {
logger.error( "Flume KafkaSinkException:" , e);
tx.rollback();
return Status.BACKOFF;
} finally {
tx.close();
}
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
root@m1: /home/hadoop/flume-1 .5.0-bin # vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1. type = syslogtcp
a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1. type = idoall.cloud.flume.sink.KafkaSink
# Use a channel which buffers events in memory a1.channels.c1. type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
1
|
root@m1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
|
1
2
3
4
5
6
7
8
9
10
11
|
root@m1: /home/hadoop # /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
#下面只截取部分日志信息 14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14 /08/19 11:36:34 INFO node.Application: Starting Channel c1
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean.
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started
14 /08/19 11:36:34 INFO node.Application: Starting Sink k1
14 /08/19 11:36:34 INFO node.Application: Starting Source r1
14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting...
|
1
|
root@m1: /home/hadoop # echo "hello idoall.org syslog" | nc localhost 5140
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14 /08/19 11:36:34 INFO node.Application: Starting Channel c1
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean.
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started
14 /08/19 11:36:34 INFO node.Application: Starting Sink k1
14 /08/19 11:36:34 INFO node.Application: Starting Source r1
14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting...
14 /08/19 11:38:05 WARN source .SyslogUtils: Event created from Invalid Syslog data.
14 /08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id :3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)
14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing
14 /08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092
14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing
14 /08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog
|
1
2
3
4
5
6
7
8
9
10
11
|
root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)
[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis)
[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log) [2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2 .9.2-0.8.1.1 /kafka-logs with properties {segment.index.bytes -> 10485760, file .delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)
[2014-08-11 14:22:12,375] INFO Closing socket connection to /192 .168.1.50. (kafka.network.Processor)
hello idoall.org syslog |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
<? xml version = "1.0" encoding = "utf-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion >4.0.0</ modelVersion >
< groupId >idoall.cloud</ groupId >
< artifactId >idoall.cloud</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< packaging >jar</ packaging >
< name >idoall.cloud</ name >
< url >http://maven.apache.org</ url >
< properties >
< project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
</ properties >
< repositories >
< repository >
< id >github-releases</ id >
< url >http://oss.sonatype.org/content/repositories/github-releases/</ url >
</ repository >
< repository >
< id >clojars.org</ id >
< url >http://clojars.org/repo</ url >
</ repository >
</ repositories >
< dependencies >
< dependency >
< groupId >junit</ groupId >
< artifactId >junit</ artifactId >
< version >4.11</ version >
< scope >test</ scope >
</ dependency >
< dependency >
< groupId >com.sksamuel.kafka</ groupId >
< artifactId >kafka_2.10</ artifactId >
< version >0.8.0-beta1</ version >
</ dependency >
< dependency >
< groupId >log4j</ groupId >
< artifactId >log4j</ artifactId >
< version >1.2.14</ version >
</ dependency >
< dependency >
< groupId >storm</ groupId >
< artifactId >storm</ artifactId >
< version >0.9.0.1</ version >
<!-- keep storm out of the jar-with-dependencies --> < scope >provided</ scope >
</ dependency >
< dependency >
< groupId >commons-collections</ groupId >
< artifactId >commons-collections</ artifactId >
< version >3.2.1</ version >
</ dependency >
</ dependencies >
</ project >
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package idoall.cloud.storm;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class KafkaSpouttest implements IRichSpout {
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic;
public KafkaSpouttest() {
}
public KafkaSpouttest(String topic) {
this .topic = topic;
}
public void nextTuple() {
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this .collector = collector;
}
public void ack(Object msgId) {
}
public void activate() {
<span style= "font-size: 9pt; line-height: 25.2000007629395px;" > </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
<span style= "font-size: 9pt; line-height: 25.2000007629395px;" > </span>Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1 );
System.out.println( "*********Results********topic:" +topic);
Map<String, List<KafkaStream< byte [], byte []>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream< byte [], byte []>stream = streamMap.get(topic).get( 0 );
ConsumerIterator< byte [], byte []> it =stream.iterator();
while (it.hasNext()){
String value = new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat ( "yyyy年MM月dd日 HH:mm:ss SSS" );
Date curDate = new Date(System.currentTimeMillis()); //获取当前时间
String str = formatter.format(curDate);
System.out.println( "storm接收到来自kafka的消息------->" + value);
collector.emit( new Values(value, 1 ,str), value);
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181" );
// 设置group id
props.put( "group.id" , "1" );
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put( "auto.commit.interval.ms" , "1000" );
props.put( "zookeeper.session.timeout.ms" , "10000" );
return new ConsumerConfig(props);
}
public void close() {
}
public void deactivate() {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" , "id" , "time" ));
}
public Map<String, Object> getComponentConfiguration() {
System.out.println( "getComponentConfiguration被调用" );
topic= "idoall_testTopic" ;
return null ;
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
package idoall.cloud.storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class KafkaTopologytest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout" , new KafkaSpouttest( "" ), 1 );
builder.setBolt( "bolt1" , new Bolt1(), 2 ).shuffleGrouping( "spout" );
builder.setBolt( "bolt2" , new Bolt2(), 2 ).fieldsGrouping( "bolt1" , new Fields( "word" ));
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 1 );
conf.put(Config.TOPOLOGY_DEBUG, true );
LocalCluster cluster = new LocalCluster();
cluster.submitTopology( "my-flume-kafka-storm-topology-integration" , conf, builder.createTopology());
Utils.sleep( 1000 * 60 * 5 ); // local cluster test ...
cluster.shutdown();
}
public static class Bolt1 extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString( 0 );
int id = input.getInteger( 1 );
String time = input.getString( 2 );
msg = msg+ "bolt1" ;
System.out.println( "对消息加工第1次-------[arg0]:" + msg + "---[arg1]:" +id+ "---[arg2]:" +time+ "------->" +msg);
if (msg != null ) {
collector.emit( new Values(msg));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" ));
}
}
public static class Bolt2 extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
public void execute(Tuple tuple, BasicOutputCollector collector) {
String msg = tuple.getString( 0 );
msg = msg + "bolt2" ;
System.out.println( "对消息加工第2次---------->" +msg);
collector.emit( new Values(msg, 1 ));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" , "count" ));
}
}
} |
1
2
3
4
5
|
root@m2: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
hello welcome idoall.org |
1
2
3
4
5
|
root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
hello welcome idoall.org |
1
2
3
4
5
6
7
8
9
10
11
|
#信息太多,我只截取重要部分: *********Results********topic:idoall_testTopic storm接收到来自kafka的消息------->hello welcome idoall.org 5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051] 对消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt1 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1] 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501] 5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1]
对消息加工第2次---------->hello welcome idoall.orgbolt1bolt2 5270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1] |
1
2
3
4
5
|
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib
|
1
|
root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus &
|
1
|
root@s1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor &
|
1
|
root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm ui &
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
root@m1: /home/hadoop/storm-0 .9.2-incubating # ll
总用量 25768 drwxr-xr-x 11 root root 4096 Aug 19 11:53 ./ drwxr-xr-x 46 hadoop hadoop 4096 Aug 17 15:06 ../ drwxr-xr-x 2 root root 4096 Aug 1 14:38 bin/ -rw-r--r-- 1 502 staff 34239 Jun 13 08:46 CHANGELOG.md drwxr-xr-x 2 root root 4096 Aug 2 12:31 conf/ -rw-r--r-- 1 502 staff 538 Mar 13 11:17 DISCLAIMER drwxr-xr-x 3 502 staff 4096 May 6 03:13 examples/ drwxr-xr-x 3 root root 4096 Aug 1 14:38 external/ -rw-r--r-- 1 root root 26252342 Aug 19 11:36 idoall.cloud.jar drwxr-xr-x 3 root root 4096 Aug 2 12:51 ldir/ drwxr-xr-x 2 root root 4096 Aug 19 11:53 lib/ -rw-r--r-- 1 502 staff 22822 Jun 12 04:07 LICENSE drwxr-xr-x 2 root root 4096 Aug 1 14:38 logback/ drwxr-xr-x 2 root root 4096 Aug 1 15:07 logs/ -rw-r--r-- 1 502 staff 981 Jun 11 01:10 NOTICE drwxr-xr-x 5 root root 4096 Aug 1 14:38 public/ -rw-r--r-- 1 502 staff 7445 Jun 10 02:24 README.markdown -rw-r--r-- 1 502 staff 17 Jun 17 00:22 RELEASE -rw-r--r-- 1 502 staff 3581 May 30 00:20 SECURITY.md root@m1: /home/hadoop/storm-0 .9.2-incubating # /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest
|
1
2
|
root@m1: /home/hadoop # echo "flume->kafka->storm message" | nc localhost 5140
root@m1: /home/hadoop #
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#内容太多,只截取重要部分 storm接收到来自kafka的消息------->flume->kafka->storm message 174218 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360] 174220 [Thread-10-bolt1] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
对消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1 174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1] 174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397] 174222 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: __ack_ack, id : {}, [-2345821945306343027 -2191137958679040397]
174222 [Thread-12-bolt2] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1]
对消息加工第2次---------->flume->kafka->storm messagebolt1bolt2 174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1] 174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671] 174224 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : bolt2:4, stream: __ack_ack, id : {}, [-2345821945306343027 8433871885621516671]
174228 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6] 174228 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: __ack_init, id : {}, [-2345821945306343027 -7738131487327750388 6]
174228 [Thread-20-__acker] INFO backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027] |
Flume+Kafka+Strom基于伪分布式环境的结合使用的更多相关文章
-
基于Centos搭建 Hadoop 伪分布式环境
软硬件环境: CentOS 7.2 64 位, OpenJDK- 1.8,Hadoop- 2.7 关于本教程的说明 云实验室云主机自动使用 root 账户登录系统,因此本教程中所有的操作都是以 roo ...
-
CentOS7下Hadoop伪分布式环境搭建
CentOS7下Hadoop伪分布式环境搭建 前期准备 1.配置hostname(可选,了解) 在CentOS中,有三种定义的主机名:静态的(static),瞬态的(transient),和灵活的(p ...
-
CentOS5.4 搭建Hadoop2.5.2伪分布式环境
简介: Hadoop是处理大数据的主要工具,其核心部分是HDFS.MapReduce.为了学习的方便,我在虚拟机上搭建了一个伪分布式环境,来进行开发学习. 一.安装前准备: 1)linux服务器:Vm ...
-
Ubuntu 14.04 (32位)上搭建Hadoop 2.5.1单机和伪分布式环境
引言 一直用的Ubuntu 32位系统(准备下次用Fedora,Ubuntu越来越不适合学习了),今天准备学习一下Hadoop,结果下载Apache官网上发布的最新的封装好的2.5.1版,配置完了根本 ...
-
linux环境下的伪分布式环境搭建
本文的配置环境是VMware10+centos2.5. 在学习大数据过程中,首先是要搭建环境,通过实验,在这里简短粘贴书写关于自己搭建大数据伪分布式环境的经验. 如果感觉有问题,欢迎咨询评论. 一:伪 ...
-
《OD大数据实战》Hadoop伪分布式环境搭建
一.安装并配置Linux 8. 使用当前root用户创建文件夹,并给/opt/下的所有文件夹及文件赋予775权限,修改用户组为当前用户 mkdir -p /opt/modules mkdir -p / ...
-
OS X Yosemite下安装Hadoop2.5.1伪分布式环境
最近开始学习Hadoop,一直使用的是公司配好的环境.用了一段时间后发现对Hadoop还是一知半解,故决定动手在本机上安装一个供学习研究使用.正好自己用的是mac,所以没啥说的,直接安装. 总体流程 ...
-
Hadoop 2.7 伪分布式环境搭建
1.安装环境 ①.一台Linux CentOS6.7 系统 hostname ipaddress subnet mask ...
-
Hadoop学习笔记1:伪分布式环境搭建
在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下: 1.CentOS 6.7下安装JDK , 地址: http://blog.csdn.net/yule ...
随机推荐
-
p2p网贷平台设计简析
以我之前主持开发的一个商业产品:p2p网贷为例进行分析.整个的概况,可以参见:www.huixinp2p.com(目的只会技术交流) 界面可以直接参考前期博客:http://www.cnblogs.c ...
-
【代码笔记】iOS-淡出淡入效果
一,效果图. 二,工程图. 三,代码. ViewController.h #import <UIKit/UIKit.h> @interface ViewController : UIVie ...
-
Hbase&;Hadoop常用命令
Hbase中根据Rowkey的前缀Prefix查询数据: scan 'test_xiaomifeng_monitoring_log',{FILTER => "(PrefixFilter ...
-
开源分布式实时计算引擎 Iveely Computing 之 WordCount 详解(3)
WordCount是很多分布式计算中,最常用的例子,例如Hadoop.Storm,Iveely Computing也不例外.明白了WordCount在Iveely Computing上的运行原理,就很 ...
-
Carthage
Carthage Carthage - 一个简单.去集中化的Cocoa依赖管理器
-
NeHe OpenGL教程 第十课:3D世界
转自[翻译]NeHe OpenGL 教程 前言 声明,此 NeHe OpenGL教程系列文章由51博客yarin翻译(2010-08-19),本博客为转载并稍加整理与修改.对NeHe的OpenGL管线 ...
-
USACO Section 4.2: Drainage Ditches
最大流的模板题 /* ID: yingzho1 LANG: C++ TASK: ditch */ #include <iostream> #include <fstream> ...
-
基于visual Studio2013解决C语言竞赛题之0605strcat
题目
-
jQuery形式可以计算,它包含了无线电的变化价格,select价格变化,删除行动态计算加盟
jQuery能够计算的表单,包含单选改变价格,select改变价格,动态加入删除行计算 各种表单情况的计算 演示 JavaScript Code <script type="text/ ...
-
sublime text 我的常用配置
{ "color_scheme": "Packages/Color Scheme - Default/IDLE.tmTheme", "font_fac ...