Centos 下Kafka集群的搭建
第一步
先去官网下载 kafka_2.10-0.9.0.0.tgz 并解压再进入到安装目录,也可以自己配置路径,跟名为kafka。
tar -xzfkafka_2.10-0.9.0.0.tgz mv kafka_2.10-0.9.0.0 kafka |
第二步
1.zookeeper集群搭建在hadoop0,hadoop1,hadoop2上。
2、进入到打开/ect下的hosts文件末尾
修改真加内容 其中(ip和机器名根据个人实际情况修改)
vi /etc/hosts 192.168.163.128 hadoop0 192.168.163.129 hadoop1 192.168.163.130 hadoop2 |
第三步
启动zookeeper服务(每台机子的zookeeper都要启)
[root@hadoop0bin]# zkServer.sh start[root@hadoop1 bin]# zkServer.sh start [root@hadoop2 bin]# zkServer.sh start |
第四步
配置kafka
1、在kafka安装目录下的config目录下打开server.properties文件
修改
#其他两台机子的server.properties文件中的broker.id也要改,三台机子的broker.id不能有重复 broker.id=0#如果在同一台机子上,设置不同的端口值port=9092 host.name=hadoop0 log.dirs=/usr/softinstall/kafka/kafka-logs-0 #zookeeper集群中主机名和其端口号zookeeper.connect=hadoop0:2181,hadoop1:2181,hadoop2:2181 |
|
2、修改producer.properties文件
修改
metadata.broker.list=hadoop0:9092,hadoop1:9092,hadoop2:9092 producer.type=sync |
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# see kafka.producer.ProducerConfig for more details ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...metadata.broker.list=hadoop0:9092,hadoop1:9092,hadoop2:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly#partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync)producer.type=sync # specify the compression codec for all data generated: none, gzip, snappy, lz4.# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectivelycompression.codec=none # message encoderserializer.class=kafka.serializer.DefaultEncoder # allow topic level compression#compressed.topics= ############################# Async Producer ############################## maximum time, in milliseconds, for buffering data on the producer queue #queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer #queue.buffering.max.messages= # Timeout for event enqueue:# 0: events will be enqueued immediately or dropped if the queue is full# -ve: enqueue will block indefinitely if the queue is full# +ve: enqueue will block up to this many milliseconds if the queue is full#queue.enqueue.timeout.ms= # the number of messages batched at the producer #batch.num.messages= |
3、修改consumer.properties文件
修改
zookeeper.connect=hadoop0:2181,hadoop1:2181,hadoop2:2181 |
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at# # http://www.apache.org/licenses/LICENSE-2.0# # Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# see kafka.consumer.ConsumerConfig for more details # Zookeeper connection string# comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"zookeeper.connect=hadoop0:2181,hadoop1:2181,hadoop2:2181 # timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000 #consumer group idgroup.id=test-consumer-group #consumer timeout#consumer.timeout.ms=5000 |
scp -r kafka hadoop1:/usr/softinstall/scp -r kafka hadoop2:/usr/softinstall/ |
hadoop1 broker.id=1#如果在同一台机子上,设置不同的端口值port=9092 host.name=hadoop1 log.dirs=/usr/softinstall/kafka/kafka-logs-1 |
hadoop2 broker.id=2#如果在同一台机子上,设置不同的端口值port=9092 host.name=hadoop2 log.dirs=/usr/softinstall/kafka/kafka-logs-2 |
6、在hadoop0,hadoop1,hadoop2机子启动kafka服务
bin/kafka-server-start.sh config/server.properties &
[root@hadoop0 kafka]# jps3555 QuorumPeerMain6410 Kafka6462 Jps |
第四步:建立一个主题
bin/kafka-topics.sh --create --zookeeper hadoop0:2181,hadoop1:2181,hadoop2:2181 --replication-factor 3 --partitions 6 --topic mytest factor大小不能超过broker数 |
通过以下命令查看主题topic
[root@hadoop0 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop0:2181,hadoop1:2181,hadoop2:2181my-replicated-topicmy-topicmytopic1testtest1 - marked for deletiontest2 - marked for deletiontopic1 |
通过下述命令可以看到该主题详情
bin/kafka-topics.sh --describe --zookeeper hadoop0:2181,hadoop1:2181,hadoop2:2181 --topic mytestTopic:mytest PartitionCount:6 ReplicationFactor:3 Configs: Topic: mytest Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: mytest Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: mytest Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: mytest Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: mytest Partition: 4 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: mytest Partition: 5 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
|
第五步:生产者生成消息和消费者消费消息
在hadoop0上建立生产者角色,并发送消息(其实可以是三台机子中的任何一台)
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic mytest this is a messagethis is the second message |
在hadoop2上建立消费者角色(在该终端窗口内可以看到生产者发布这消息)
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic mytest --from-beginningthis is a messagethis is the second message |
一个kafka集群就搭好了,可以作为kafka服务器了.
kafka的Java客户端环境搭建
第一步 建立maven项目,添加kafka依赖包。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> <!-- 用于文件操作 --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> |
package com.east.kafka; import java.io.File;import java.io.IOException;import java.util.Collection;import java.util.Date;import java.util.List;import java.util.Properties;import org.apache.commons.io.FileUtils; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig; public class ProducerTest { public static void main(String[] args) throws IOException, InterruptedException { Properties props = new Properties(); props.put("zookeeper.connect", "hadoop0:2181,hadoop1:2181,hadoop2:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "hadoop0:9092"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); while (true) { //读取d:\\test 下的TXT文件 Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[] { "txt" }, true); for (File file : listFiles) { List<String> lines = FileUtils.readLines(file); //读取文件每一行的内容 for (String line : lines) { Date date = new Date(); System.out.println(date + " send:" + line); KeyedMessage<String, String> message = new KeyedMessage<String, String>("my-replicated-topic", line); //生产者发送消息 producer.send(message); } //更改文件名 FileUtils.moveFile(file, new File(file.getAbsolutePath() + System.currentTimeMillis())); Thread.sleep(6000); } } }} }} |
package com.east.kafka; import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata; public class ConsumerTest extends Thread { private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { ConsumerTest consumerThread = new ConsumerTest("my-replicated-topic"); consumerThread.start(); } public ConsumerTest(String topic) { System.out.println(topic); consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", "hadoop0:2181,hadoop1:2181,hadoop2:2181"); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms", "400000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> next = it.next(); String message = next.message().toString(); System.out.println("recevice:" + message); } }} |
kafka Java demo |
问题kafka (kafka delete topic,marked for deletion)http://blog.csdn.net/wind520/article/details/48710043
kafka 删除topic 提示marked for deletion
- [root@logSer config]# kafka-topics.sh --delete --zookeeper localhost:2181 --topic test-group
- Topic test-group is marked for deletion.
- Note: This will have no impact if delete.topic.enable is not set to true.
- [root@logSer config]# kafka-topics.sh --list --zookeeper localhost:2181
- test-group - marked for deletion
- test-topic
- test-user-001
并没有真正删除,如果要真正删除
配置delete.topic.enable=true
配置文件在kafka\config目录
[html] view plaincopy- [root@logSer config]# vi server.properties
参考连接http://my.oschina.net/u/1757031/blog/408511http://my.oschina.net/u/1757031/blog/408511http://www.bkjia.com/yjs/967259.html#top(Centos 下Kafka集群的安装,centoskafka)http://www.bkjia.com/yjs/1039755.html(kafka 安装步骤,kafka安装步骤)http://www.bkjia.com/yjs/1032939.html(kafka集群搭建,kafka集群)http://blog.csdn.net/lizhitao/article/details/39499283( apache kafka技术分享系列(目录索引))http://my.oschina.net/guol/blog/413664http://my.oschina.net/u/1024514/blog?disp=1&catalog=3282464&sort=time&p=2(重点)http://blog.csdn.net/honglei915/article/category/2383433(重点)http://my.oschina.net/u/1024514/blog/393504(kafka中级)http://my.oschina.net/crxy/blog/386513(kafka设计原理)http://blog.csdn.net/honglei915/article/details/37564329http://my.oschina.net/ydsakyclguozi/blog/381898