Kafka集群搭建

时间:2022-01-06 16:46:46
Kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。

  • 支持通过kafka服务器和消费机集群来分区消息。

  • 支持Hadoop并行数据加载。

Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

kafka是用Scala编写,用scalac编译器把源文件编译成Java的class文件(即在JVM上运行的字节码),因此Scala是基于JVM的语言,所以使用Kafka需要机器上游JVM支持,本文使用jdk版本为jdk-7u75.

以3台为例,如果是一个Follower宕机,还有2台服务器提供访问,因为Zookeeper上的数据是有多个副本的,数据并不会丢失,如果是一个Leader宕机,Zookeeper会选举出新的Leader。为什么是奇数台,如果Zookeeper 集群是3台,允许宕机1台,如果是4台,同样是允许1台宕机,因为选举算法要求”超过半数“,所以多出的一台没有意义。

配置步骤

Kafka自带了zookeeper,但是一般集群都会有zk,因此使用集群已有的zookeeper。本次不使用Kafka自带的zookeeper。

  1. 下载kafka_2.11-0.10.0.0.tgz
  2. 配置文件 server.properties
############################# Server Basics #############################
# 唯一标识一个broker.
broker.id=1
############################# Socket Server Settings #############################
#绑定服务监听的地址和端口,要填写hostname -i 出来的地址,否则可能会绑定到127.0.0.1,producer可能会发不出消息
listeners=PLAINTEXT://192.168.23.139:9092
#broker对producers和consumers服务的地址和端口,如果没有配置,使用listeners的配置,本文没有配置该项
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 处理网络请求的线程数
num.network.threads=3
# 处理磁盘I/O的线程数
num.io.threads=8
# socket server的发送buffer大小 (SO_SNDBUF)
socket.send.buffer.bytes=102400
# socket server的接收buffer大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
#一个请求的最大size,用来保护防止oom
socket.request.max.bytes=104857600
############################# Log Basics #############################
#存放日志和消息的目录,可以是用逗号分开的目录,同样不推荐使用/tmp
log.dirs=/usr/local/kafka_2.11-2.3.0/logs
#每个topic默认partitions的数量,数量较大表示消费者可以有更大的并行度。
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#日志的过期时间,超过后被删除,单位小时
log.retention.hours=168
#一个日志文件最大大小,超过会新建一个文件
log.segment.bytes=1073741824
#根据过期策略检查过期文件的时间间隔,单位毫秒
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#Zookeeper的连接配置,用逗号隔开,也可以用192.168.23.139:2181/kakfa这样的方式指定kafka数据在zk中的根目录
zookeeper.connect=192.168.23.139:2181,192.168.23.140:2181,192.168.23.141:2181
# 连接zk的超时时间
zookeeper.connection.timeout.ms=6000

主要配置文件为server.properties,对于producer和consumer分别有producer.properties和consumer.properties,但是一般不需要单独配置,可以从server.properties中读取。

启动各个节点

启动各节点,分发此配置文件,修改broker.id和listeners地址,建立相应的目录。

[root@server0 kafka_2.11-2.3.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@server1 kafka_2.11-2.3.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@server2 kafka_2.11-2.3.0]# ./bin/kafka-server-start.sh -daemon config/server.properties

-daemon放在后台运行。

验证是否成功
  1. 创建一个topic名为my-test
[root@server0 kafka_2.11-2.3.0]# bin/kafka-topics.sh --create --zookeeper 172.23.8.144:2181 --replication-factor 3 --partitions 1 --topic my-test
Created topic "my-test".
  1. 发送消息,ctrl+c终止
[root@server0 kafka_2.11-2.3.0]# bin/kafka-console-producer.sh --broker-list 172.23.8.144:9092 --topic my-test
今天是个好日子
hello
  1. 另一台机器消费
[root@server0 kafka_2.11-2.3.0]# bin/kafka-console-consumer.sh --zookeeper slave3:2181 --from-beginning --topic my-test
今天是个好日子
hello

继续发送消息则在消费者终端会一直出现新产生的消息。至此,kafka集群搭建成功。

Kafka HelloWorld

在kafka的手册中给出了java版的producer和cousumer的代码示例.
修改下地址,逗号隔开,该地址是集群的子集,用来探测集群。

  1. Producer代码示例
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"192.168.23.139:9092,192.168.23.140:9092,192.168.23.141:9092");//该地址是集群的子集,用来探测集群。
props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
props.put("retries", 3);// 请求失败重试的次数
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
// ByteArraySerializer或者StringSerializer
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10000; i++) {
// 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。
producer.send(new ProducerRecord<String, String>("my-topic",
Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
  1. Consumer代码示例
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"192.168.23.139:9092,192.168.23.140:9092,192.168.23.141:9092");// 该地址是集群的子集,用来探测集群。
props.put("group.id", "test");// cousumer的分组id
props.put("enable.auto.commit", "true");// 自动提交offsets
props.put("auto.commit.interval.ms", "1000");// 每隔1s,自动提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));// 订阅的topic,可以多个
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s",
record.offset(), record.key(), record.value());
System.out.println();
}
}
}
}
  1. 分别运行即可。看到comsumer打印出消息日志。