(一)安装与配置
安装列表
- kafka_2.10-0.8.1.1.tar
- jdk1.7.0_51.tar
1 安装jdk
如果jdk已安装,可以跳过;没有安装的可以参照:http://blog.csdn.net/ouyang111222/article/details/50344135
2 安装kafka
1.2.1 解压kafka
我的集群有3台机器,ip分别为ip1、ip2、ip3,将kafka_2.10-0.8.1.1.tar分别拷贝至每个机器的/apps/svr/目录下,进行解压
tar -xf kafka_2.10-0.8.1.1.tar
1.2.2 配置/config/server.properties
(1) broker.id
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=
broker.id为依次增长的:0、1、2、3、4,集群中唯一id;本文3台机器的对应关系设置如下:
ip1---->0
ip2---->1
ip3---->2
(2) port
# The port the socket server listens on
port=9092
默认设置为9092
(3) host.name
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=ip1
将host.name设置为所在机器的ip
(4) zookeeper.connect
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
设置zookeeper servers的列表,由于上述集群采用kafka自带的zookeeper,设置为ip1:2181,ip2:2181,ip3:2181即可
(5) num.partitions
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=10
默认分区数为10
还有很多其他参数,这里就不一一介绍
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/platform1/kafka-logs
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
1.2.3 配置/config/zookeeper.properties
添加3台zookeeper server
刚开始忘记了配置其他的了,就直接启动zk了
bin/zookeeper-server-start.sh config/zookeeper.propertie
结果就报错了:
想起来zookeeper.properties中有一个配置,需要创建一个/home/zookeeper目录
dataDir=/home/zookeeper
那就建立吧:
mkdir -p /home/zookeeper
继续运行,还报错:
myid没有配置对应的id,在zookeeper.properties文件中我刚刚设置server.1、server.2、server.3;
即ip1机器是server.1,那myid file就设置1;ip2机器是server.2,那myid file就设置2;ip3机器是server.3,那myid file就设置3。
ok了!
1.2.4 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
ps:每台zookeeper都起来!
1.2.5 启动kafka
./kafka-server-start.sh ../config/server.properties &
(二)测试kafka
2.1 创建topic、显示topic、查看详细信息
创建topic
bin/kafka-topics.sh --zookeeper ip1:2181,ip2:2181,ip3:2181 --topic mytopic --replication-factor 1 --partitions 1 --create
显示topic
bin/kafka-topics.sh --zookeeper ip1:2181,ip2:2181,ip3:2181 --list
查看详细信息
bin/kafka-topics.sh --describe --zookeeper ip1:2181,ip2:2181,ip3:2181
2.2 单机联通 测试
生产数据
./kafka-console-producer.sh --broker-list ip1:9092 --topic mytopic
运行之后打印如下日志:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
原因:缺少slf4j-nop-1.7.6.jar
解决方法:
下载slf4j-1.7.6.zip
wget http://www.slf4j.org/dist/slf4j-1.7.6.zip
解压
unzip slf4j-1.7.6.zip
把slf4j-nop-1.7.6.jar 包cp到kafka libs目录下面
再次运行:
接收消息:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
2.3 分布式联通 测试
Producer都放在服务器server1上,ip地址为ip1
Consumer放在服务器server2上,ip地址为ip2
Server1上运行产生数据:
./kafka-console-producer.sh --broker-list ip1:9092 --topic mytopic
Server2上运行接收消息:
./kafka-console-consumer.sh --zookeeper ip1:2181 --topic mytopic --from-beginning
2.4 Java生产消费kafka
生产代码:
public class kafkaProducer extends Thread{
private String topic;
public kafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092");
return new Producer<Integer, String>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer("mytopic").start();
}
}
消费代码:
public class kafkaConsumer extends Thread{
private String topic;
public kafkaConsumer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//声明zk
properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
public static void main(String[] args) {
new kafkaConsumer("mytopic").start();// 使用kafka集群中创建好的主题 mytopic
}
}
(三)相关错误
错误1
发送数据时报如下的错误
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
解决方法:
在配置文件中有advertised.host.name或者advertised.listeners(因版本不同而已),该字段的值是生产者和消费者使用的。如果没有设置,则会取host.name的值,默认情况下,该值为localhost,所以这里要修改一下。