Kafka实战解惑

时间:2022-02-10 16:51:20

目录

一、 kafka简介
二、 Kafka架构方案
三、 Kafka安装
四、 Kafka Client API
  4.1 Producers API
  4.2 Consumers API
  4.3 消息高可靠 At-Least-Once
  4.4 消息高可靠Consumer
  4.5 生产者、消费者总结
五、 Kafka运维
  5.1 Broker故障切换
  5.2 Broker动态扩容
  5.2.1 增加分区
  5.2.2 增加Broker Server
  5.3 Kafka配置优化
  5.4 数据清理
  5.4.1 数据删除
  5.4.2 数据压缩
  5.5 Kafka运行监控
六、Kafka其他组件](#c6)
  6.1 Kafka Connect
  6.2 Kafka Stream
  6.3 Kafka Camus
七、 Kafka典型应用场景
  7.1 ETL
八、 参考资料


一、Kafka简介

Kafka是LinkedIn使用scala开发的一个分布式消息系统,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点:

  1. 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  2. 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
  3. 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的,无需停机即可扩展机器。
  4. 消息被处理的状态由消费者同步到zookeeper而非broker server中,当broker server失效时,通过副本切换机制选择一个新的broker server,消费者从zookeeper中读取之前消费消息的位置,不会引起消息丢失。
  5. 支持online和offline的场景。

LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的,与Kafka相比,Confluent包含了更多的组件:

  1. Confluent Control Center(闭源)。管理和监控Kafka最全面的GUI驱动系统
  2. Confluent Kafka Connectors(开源)。连接SQL数据库/Hadoop/Hive
  3. Confluent Kafka Clients(开源)。对于其他编程语言,包括C/C++,Python
  4. Confluent Kafka REST Proxy(开源)。允许一些系统通过HTTP和Kafka之间发送和接收消息。
  5. Confluent Schema Registry(开源)。帮助确定每一个应用使用正确的schema当写数据或者读数据到Kafka中。

二、 Kafka架构方案

从物理结构上看,整个Kafka系统由消息生产者、消息消费者、消费存储服务器外加Zookeeper构成。其中消息生产者被称为Producer、消息消费者被称为Consumer、消息存储服务器被称为Broker。整个Kafka的架构方案非常简单,典型的无状态水平扩展架构,通过水平增加Broker实例实现系统的高吞吐率,而有状态的数据则存储到Zookeeper中。

Kafka采用Push-Pull模式,生产者发送消息时,可根据策略存储在Kafka集群的任意一台broker上,消费者通过定时轮询(非固定周期)的方式从Broker上取得消息。消息发送到哪一台服务器上,又从哪台服务器上获取消息,则是由逻辑结构解决的,或者说逻辑结构建立在物理结构基础上,对于生产者、消费者而言,只要了解逻辑结构就可以了。

从逻辑上讲,一个Kafka集群中包含若干个消息队列,每个消息队列都有自己的名称,在Kafka中消息队列的名称被称为Topic,为了实现系统的高吞吐率,每个消息队列被拆分成不同部分,即我们所说的分区(Partition),分区存储在不同的Broker中。生产者发送消息时可根据一定策略发送到不同的分区中,这类似于数据库的分库分表操作,同样消费者拉取消息时,也可以根据一定策略从某个分区中读取消息。就物理结构而言,每个分区就是broker上的一个文件,试想一下并发的对多个分布在不同broker上的文件进行读写,性能当然显著优于对单台broker上的文件进行读写,我们所说的Kfaka具有高吞吐率就是这个道理。

Kafak每个Topic的消息都存储在日志文件中,Kafka消息日志文件由一个索引文件和若干个具体的消息文件构成。每个消息文件都由起始消息编号构成,通过索引可以快速定位消息文件进行读写,由于消息是顺序写入文件中,所以读写效率非常高。在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各种微小琐碎的逻辑写入组织合并成一次较大的物理写入,很多时候线性读写磁盘比随机读取内存都快。

与其他常见的消息队列不同,Kafka有一个叫做消费组的概念,多个消费者被逻辑上合并在一起叫做消费组。一个消息队列理论上可拥有无限个消费组,消费组是Kafka有别于其他消息队列的一个重要概念,同一个分区的消息只能被一个消费组内的某个消费者读取,但其他消费组内的消费者仍然可读取这个分区的消费。如下图所示整个Kafka消息队列由两个broker server构成,server1上包含两个分区p0、p3,server2上包含两个分区p1、p2。现在有两个消费组A、B,消费组A中包含两个消费者C1、C2,消费组B中包含4个消费者C3、C4、C5、C6。那么假定P0分区上有一条消息。Consumer Group A中的C1、C2其中之一会消费这条消息,Consumer Group B中的C3、C4、C5、C6其中之一也会消费这条消息,也就是说两个消费组A、B中的消费者都会同时消费这条消息,而组内只能有一个消费者消费这条消息。

我们所说的C1、C2只是一个逻辑上的划分就具体实现而言,C1、C2可以是一个进程内部的两个线程,也可以是两个独立的进程,对于C3、C4、C5、C6也是同样的道理。我们知道Kafka每个分区中的消息都是以顺序结构保存到文件中的,那么消费者每次从什么位置读取消息呢,奥秘就是每个消费者都保存offset到zookeeper中。

如前所述,Kafka是一个Push-Pull模式的消息队列,并且可以有多个生产者、多个消费者,那么这些生产者和消费者是如何协同工作的呢?首先我们来看生产者怎么确定把消费发送到哪个分区上。默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。

def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}

这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?我们来看下面的代码:

if(key == null) {  // 如果没有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id
id match {
case Some(partitionId) =>
partitionId // 如果有的话直接使用这个分区Id就好了
case None => // 如果没有的话,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
partitionId
}
}

可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)

接下来我们来看消费者如何获取消息。对于消费者Kafka提供的两种分配策略: range和roundrobin,由参数 partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?

C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9

为了保证高可靠,Kafka每个分区都有一定数量的副本,当故障发生时通过zookeeper选择其一作为领导者,Kafka采用同步复制机制,写leader完成后在写副本。如果某个副本写失败,则将这个副本从当前分区一致集合中摘除,后期根据一定策略在进行异步补偿,将不一致状态变为一致状态。极端情况下如果所有副本写入均失败,变为不一致状态,如果在变成一致状态前leader崩溃,那么消息才可能真正丢失,但极端情况很难出现,一旦出现这种极端情况,任何系统都无能为力了,所以我们说Kafka还是非常可靠的。

三、 Kafka安装

我们现在使用192.168.104.101、192.168.104.102两台Centos 6服务器安装Kafka。安装Kafka之前首先需要安装Zookeeper,为了简便起见我们采用单机伪分布式集群安装Zookeeper,将Zookeeper安装在192.168.104.101这台服务器上,并启动三个实例,组成高可高的Zookeeper集群。

接下来我们安装Kafka_2.11-0.10.0.1,因为我们有192.168.104.101、192.168.104.102两台服务器,因此我们可以构建一个完整的Kfaka集群。在192.168.104.101服务器上修改Kafka安装目录下的config/server. Properties文件,设置如下参数:
broker.id=0
listeners=PLAINTEXT://192.168.104.101:9092
advertised.listeners=PLAINTEXT://192.168.104.101:9092
zookeeper.connect=192.168.104.101:2181,192.168.104.101:2182,192.168.104.101:2183
对于192.168.104.102这台机器,我们将listeners、advertised.listeners中的ip地址改为192.168.104.102。经过上述设置我们可以在两个服务器上分别使用bin/Kafka-server-start.sh config/server.properties命令启动Kafka集群了。

四、 Kafka Client API

如前所述Kafka是一个消息队列,生产者发送消息到Kafka,消费者从Kafka中拉取消息,因此Kafka提供生产者、消费者两类API供程序开发使用。我们先来看一个生产者、消费者的简单例子,了解一下Kafka Client API的基本用法,而后在深入了解Kafka Client API的细节。

4.1 Producers API

package com.Kafka.sample.newapi;

import org.apache.Kafka.clients.producer.Callback;
import org.apache.Kafka.clients.producer.KafkaProducer;
import org.apache.Kafka.clients.producer.ProducerRecord;
import org.apache.Kafka.clients.producer.RecordMetadata; import java.util.Properties; public class Producer {
public void run() throws InterruptedException {
KafkaProducer<String, String> producer = getProducer(); int i = 0; while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(ClientConfig.TOPICS, String.valueOf(i), "This is message: " + i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
}
}); i++; Thread.sleep(1000);
}
} private KafkaProducer<String, String> getProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props); return kp;
} public static void main(String[] args) {
Producer producer = new Producer(); try {
producer.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Kafka 0.82版之后,提供新的API,对于生产者的API来讲,使用逻辑比较简单,推荐使用新API向Kafka发送消息。向Kafka发送消息时首先需要构建一个KafkaProducer对象,并设置发送消息的一些参数。Producer端的常用配置有:

bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
acks:broker消息确认的模式,有三种:
0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
1:由Leader确认,Leader接收到消息后会立即返回确认信息
all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息。
我们可以根据消息的重要程度,设置不同的确认模式。默认为1
retries:发送失败时Producer端的重试次数,默认为0
batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节。
linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,
以积累更多的消息打包发送,达到节省网络资源的目的。默认为0。
key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定。
buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,
那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)。

相比起Producers API的便宜使用,Consumer API的使用要复杂很多,核心问题就是如何高可靠的处理消息,保证消息不丢失。Kafka为了保证消息不丢失能被消费者成功的处理,在消费者处理消息成功后需要向Kafka发送确认确认消息被成功的消费。

package com.Kafka.sample.newapi;

import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;
import java.util.Properties; public class Consumer {
public void run() {
KafkaConsumer<String, String> consumer = getConsumer();
consumer.subscribe(Arrays.asList(ClientConfig.TOPICS)); while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
}
} private KafkaConsumer<String, String> getConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
props.put("group.id", "1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props); return kc;
} public static void main(String[] args) throws Exception{
Consumer consumer = new Consumer();
consumer.run();
}
}

上面的代码很容易看懂,但props.put("auto.commit.interval.ms", "1000")需要特殊说明一下。

4.3 消息高可靠 At-Least-Once

网上各种文章经常谈到Kafka丢消息问题,那么Kakfa真的不可靠,只能用在允许有一定错误的系统中吗?这个问题还得从Kaka的设计初衷来看。

Kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说Kafka并不能完全保证数据不丢失。尽管Kafka官网声称能够保证at-least-once,但如果consumer进程数小于partition_num,这个结论不一定成立。考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。如果auto.commit.enable=false,假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里需要着重说明的是,当手动执行commit的时候,实际上是对这个consumer进程所占有的所有partition进行commit,Kafka暂时还没有提供更细粒度的commit方式,也就是说,即使t2没有处理完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在处理的这条数据就丢失了。如果希望能够严格的不丢数据,解决办法有两个:

  1. 手动commit offset,并针对partition_num启同样数目的consumer进程,这样就能保证一个consumer进程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个方法比较局限,因为partition和consumer进程的数目必须严格对应。
  2. 另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。当然这只是基本思路,实际上操作起来不是这么简单,具体做法以后我再另开一篇。

4.4 消息高可靠Consumer

public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class); public ManualOffsetConsumer() {
// TODO Auto-generated constructor stub
} public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g1"); props.put("enable.auto.commit", "false"); //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test")); final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
LOG.info("now commit offset");
consumer.commitSync();
buffer.clear();
}
}
}
}
上面例子中我们将自动提交改为手动提交,如果取得消息后,因为某种原因没有进行提交,那么消息仍然保持在Kafka中,可以重复拉取之前没有确认的消息,保证消息不会丢失,但有可能重复处理相同的消息,消费者接收到重复消息后应该通过业务逻辑保证重复消息不会带来额外影响,这就是Kafka所说的At-Least-Once。上面的这种读取消息的方法是单线程的,除此之外还可以用多线程方法读取消息,每个线程从指定的分区中读取消息。
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g2"); props.put("enable.auto.commit", "false"); //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
LOG.info("now commit the partition[ "+partition.partition()+"] offset");
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}

我们还可以进一步让消费者消费某个分区的消息。

    public static void main(String[] args) {
Properties props = new Properties();
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id", "manual_g4");
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
TopicPartition partition0 = new TopicPartition("producer_test", 0);
TopicPartition partition1 = new TopicPartition("producer_test", 1);
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.assign(Arrays.asList(partition0, partition1));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
4.5 生产者、消费者总结
  1. 如果consumer比partition多,是浪费,因为Kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数。
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目。
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,Kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同。
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  5. High-level接口中获取不到数据的时候是会block的。

五、 Kafka运维

5.1 Broker故障切换

我们在192.168.104.101、192.168.104.102两台服务器上启动Kafka组成一个集群,现在我们观察一下topic t1的情况。

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

我们看到t1由两个分区组成,分布于Leader 0、1两个服务器上。下面我们运行消费者程序,同时运行生产者程序,我们向topic t1发送111、222、333、444、555、666的数据。

bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1
111
222
333
444
555
666

接下来我们观察一下消费者接收消息的情况。

fetched from partition 0, offset: 3, message:
fetched from partition 1, offset: 4, message: 111
fetched from partition 0, offset: 4, message: 222
fetched from partition 1, offset: 5, message: 333
fetched from partition 0, offset: 5, message: 444
fetched from partition 1, offset: 6, message: 555
fetched from partition 0, offset: 6, message: 666

可以看到消息被非常均匀的发送到两个分区,消费者从两个分区中拉取了消息。为了模拟故障,我们手工kill 101上的Kafka进程。这时我们在观察Kafka的分区情况,对照之前的结果,我们发现两个分区的Leader都变为1了,说明Kafka启用了副本机制进行故障切换。

./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: t1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1

我们继续向分区发送888, 999,消费者仍然能够接收到发送的消息,而不受故障进度的影响。逻辑上看消费者只是读取分区上的消息,与具体的服务器没关系。

fetched from partition 1, offset: 7, message: 999
fetched from partition 0, offset: 7, message: 888

5.2 Broker动态扩容

5.2.1 增加分区

我们为已经创建的包含两个分区的Topic在添加一个分区。

Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3

我们观察一下增加分区后的结果:

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1

Topic:t1    PartitionCount:3    ReplicationFactor:2    Configs:
Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: t1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

接下来,我们使用生产者程序发送数据,过了一段时间后发现生产者程序已经可以向新增分区写入数据了。说明分区的增减对正在运行的应用程序(生产者、消费者)没有影响, 生产者、消费者都不需要重新启动。

5.2.2 增加Broker Server

待补充

5.3 Kafka配置优化

待补充

5.4 数据清理

5.4.1 数据删除

消息被kafka存储后,针对过期消息,可以通过设置策略(log.cleanup.policy=delete)进行删除。除了在kafka中做默认设置外,也可以再 topic创建时指定参数,这样将会覆盖kafka的默认设置,触发删除动作有两种条件:

  1. 清理超过指定时间消息,通过log.retention.hours设置过期时间。
  2. 清理大小超过预定设置消息,通过log.retention.bytes进行设置。

5.4.2 数据压缩

kafka还可以进行数据压缩,设置log.cleanup.policy=compact只保留每个key最后一个版本的数据。

5.5 Kafka运行监控

目前Kafka有三个常用的监控系统: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,这三个系统或多或少都有些问题,不是特别完善,推荐使用KafkaOffsetMonitor。

六、Kafka其他组件

6.1 Kafka Connect

Kafka 0.9+增加了一个新的特性 Kafka Connect ,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、 Elastic Search 、 Apache Ignite 等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

当前Kafka Connect支持两种分发担保:at least once (至少一次) 和 at most once(至多一次),exactly once将在未来支持,当前已有的Connectors包括:

Connector Name Owner Status
HDFS confluent-platform@googlegroups.com Confluentsupported
JDBC confluent-platform@googlegroups.com Confluentsupported
Debezium - CDC Sources debezium@gmail.com Community project
MongoDB Source a.patelli@reply.de a.topchyan@reply.de In progress
MQTT Source tomasz.pietrzak@evok.ly Community project
MySQL Binlog Source wushujames@gmail.com In progress
Twitter Source rollulus@xs4all.nl In progress
Cassandra Sink Cassandra Sink Community project
Elastic Search Sink ksenji@gmail.com Community project
Elastic Search Sink hannes.stockner@gmail.com In progress
Elastic Search Sink a.patelli@reply.de a.topchyan@reply.de In progress

我们来看一个使用Kafka Connect从一个文件读取数据在传输到另一个文件的例子。

  • 首先在192.168.104.101、192.168.104.102两台服务器上启动Kafka。
  • 在192.168.104.102服务器的Kafka安装目录上,修改connect-standalone.properties文件:
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092
key.converter=org.apache.Kafka.connect.storage.StringConverter
value.converter=org.apache.Kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false 修改connect-file-source.properties文件:
file=/root/data.txt
topic=t1 修改connect-file-sink.properties文件:
file=/root/output.txt
topics=t1
  • 在192.168.104.102服务器上启动Kafka-connect
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 向/root/data.txt中写入数据,echo “Kafka connect”>> data.txt,可以观察”Kafka connect”被写入到/root/output.txt文件中。

6.2 Kafka Stream

Kafka Streams是一套类库,嵌入到java应用程序中,它使得Apache Kafka可以拥有流处理的能力,通过使用Kafka Stream API进行业务逻辑处理最后写回Kakfa或者其他系统中。Kafka Stream中有几个重要的流处理概念:严格区分Event time和Process Time、支持窗口函数、应用状态管理。开发者使用Kafka Stream的门槛非常低,比如单机进行一些小数据量的功能验证而不需要在其他机器上启动一些服务(比如在Storm运行Topology需要启动Nimbus和Supervisor,当然也支持Local Mode),Kafka Stream的并发模型可以对单应用多实例进行负载均衡。有了Kafka Stream可以在很多场景下代替Storm、Spark Streaming减少技术复杂度。目前Kafka Stream仍然处于开发阶段,不建议生产环境使用,所以期待正式版发布吧。

6.3 Kafka Camus

Camus是Linkedin开源的一个从Kafka到HDFS的数据管道,本质上上Camus是一个运行在Hadoop中的MapReduce程序,调用一些Camus提供的API从Kafka中读取数据然后写入HDFS。Camus2015年已经停止维护了,gobblin是后续产品,camus功能是是Gobblin的一个子集,通过执行MapReduce任务实现从Kafka读取数据到HDFS,而gobblin是一个通用的数据提取框架,可以将各种来源的数据同步到HDFS上,包括数据库、FTP、Kafka等。

七、 Kafka典型应用场景

Kafka作为一个消息中间件,最长应用的场景是将数据进行加工后从源系统移动到目的系统,也就是所谓的ETL过程,ETL是一个数据从源头到目的地的移动过程,当然其中也伴随数据清洗。通常数据源头是应用程序所输出的消息、日志、生产数据库数据。应用程序输出消息通常由应用程序主动控制写入Kfaka的行为,而从日志、生产数据库到Kfaka通常由第三方独立应用处理。从日志到Kfaka典型的技术方案如ELK,从生产数据库到Kafka通常可采用如下三种方式:

  • 通过时间戳方式记录数据变更并写入kafka,如使用kettle等ETL工具。
  • 通过触发器方式记录数据变更并写入kafka,如使用kettle等ETL工具。
  • 通过数据库特有特性记录数变更并写入kafka,如Oracle GoldenGate,MySQL Binlog,Postgre SQL Wal,MongoDB Oplog,CouchDB Changes Feed,值得一提的是PostgreSQL 9.4后的Bottled Water是一个非常好用的方案,将PostgreSQL数据同步到Kfaka中。

数据通过Kafka移动到Hadoop通常有如下方案:

  • Kafka -> Flume -> Hadoop Hdfs
  • Kafka -> Gobblin -> Hadoop Hdfs
  • Kafka -> Kafka Hadoop Loader -> Hadoop Hdfs
  • Kafka -> KaBoom -> Hadoop Hdfs
  • Kafka -> Kafka Connect -> Hadoop Hdfs
  • Kafka -> Storm\Spark Streaming -> Hadoop Hdfs

从目前看这些方法都是常用的成熟方案,很多技术也在被一线互联网公司所使用,比如京东内部在使用Gobblin将数据从Kafka同步到Hdfs中,但从长远看Kafka Connect则是最佳方案,毕竟是官方标准出品而且Kafka Connect还在快速的发展。

八、 参考资料