03-spark kafka

时间:2022-09-26 15:27:20

1、概念

  Kafka是一个开源的消息系统。由Scala编写,它具备以下特点:

    ①消息持久化: 为了从大数据中获取有价值的信息,任何信息的丢失都是负担不起的。使用Kafka时,message会被存储并且会被复制(zk备份)以防止数据丢失。

    ②高吞吐量: 设计是工作在普通的硬件设施上多个客户端能够每秒处理几百兆的数据量。

    ③分布式: Kafka Broker的中心化集群支持消息分区,而consumer采用分布式进行消费。

    ④多种Client支持: Kafka很容易与其它平台进行支持,例如:Java、.NET、PHP、Ruby、Python。

    ⑤实时: 消息由producer产生后立即对consumer可见。这个特性对于基于事件的系统是很关键的。

  Kafka使用场景

    (1)日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;

    (2)消息系统:解耦和生产者和消费者、缓存消息等;

    (3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,
        然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;

    (4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

    (5)流式处理:比如sparkstreaming和storm;

2、组件

    Broker:安装kafka服务的那台机器就叫一个broker,Kafka集群包含一个或多个服务器(broker),
  每个broker的id 在集群中全局唯一,每个broker可以容纳多个topic。   注:一个机器上可以部署一个(但基本上都一个机器一个broker)或者多个Broker,这多个Broker连接到相同的ZooKeeper就组成了Kafka集群。
  注:Broker的英语翻译为中间者 Topic: 就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者,也就是消息的消费者consumer。
   Producer将消息推送到topic,由订阅该topic的consumer从topic中拉取消息。   ①Topic 与broker   一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。   ②Partition log    1、消息格式    message:
   offset 偏移量
  MessageSize 消息大小
  data 内容   partition中的每条Message包含了这三个属性    2、分区   每个Topic都会有多个分区,分区不是Spark中的内存计算概念!,这里的分区(partition)是以文件的形式存储在文件系统! //对应的是相应的文件夹及其目录内容   比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录:
  page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,
  其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。    3、数据文件的分段(segment)    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,
  第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,
  用二分查找就可以定位到该Message在哪个段中    4、数据文件索引    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。
  为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。     总结:每个机器可以部署多个Broker,每个Broker都有多个Topic,每个Topic有很多个分区。    比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录:
  page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,
  其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。    比如page_visits-0文件目录中,会根据数据分段,有很多的log文件:
  0000000000000.log 0000000000000000.index
  0000000001234.log 0000000000001234.index
  0000000002468.log 0000000000002468.index
  0000000003936.log 0000000000003936.index   注意:①每当一个message被发布到一个topic上的一个partition,broker应会将该message追加到这个逻辑log文件的最后一个segment上。
  这些segments 会被flush到磁盘上。Flush时可以按照时间来进行,也可以按照message 数来执行。
  也就是说新来的数据会分配到最后一段上,然后进行Flush,这样保证了消息是有序的!不会发生今天的消息在昨天的消息之前!! ②当kafka保存时间超过7天或者这个分区内的文件达到一定大小,不管消息有没有被消费过,都会被删除。(可以设置)   ③Partition distribution(分布式分区)   日志分区是分布式的存在于一个kafka集群的多个broker上。每个partition会被复制多份存在于不同的broker上。这样做是为了容灾。
  具体会复制几份,会复制到哪些broker上,都是可以配置的。经过相关的复制策略后,每个topic在每个broker上会驻留一到多个partition。   注:因为分区实质是一个文件系统,很容易丢失(不恰当的命令),所以必须进行备份,备份是通过ZK实现的 Producer Producer作为消息的生产者,在生产完消息后需要将消息投送到指定的目的地(某个topic的某个partition)。
Producer可以根据指定选择partition的算法或者是随机方式来选择发布消息到哪个partition。 Consumer Group 既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。
组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。 注:比如某个group下有20个consumer,它订阅了2个具有100个分区的topic,那么就是通过这20个consumer来一起消费这个2个topic,每个consumer消费10个分区 注:①每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定groupname则属于默认的group)。 ②每个消息分区只能被consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。 ③每个consumer group可以订阅多个topic ④消费者群组里的消费者数量如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

3、两种消息队列模式

  (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

     点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户
     端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

  (2)发布/订阅模式(一对多,数据生产后,推送给所有订阅者)

    发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监
    听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态

  注:如果要实现传统消息系统的“队列”模型,可以让每个消费者都拥有相同的消费组名称,这样消息就会负责均衡到所有的消费者;

  注:如果要实现“发布-订阅”模型,则每个消费者的消费者组名称都不相同,这样每条消息就会广播给所有的消费者。

  其实不管是那种模式,都不是主动给consumer group发数据,都是consumer group中的consumer自己进行拉取数据,然后consumer记录自己读取的offset,
  zk定时将这些offset进行保存,防止consumer宕机,新接手的consumer将按照zk上的offset进行继续拉数据

4、Kafka其他概念

  ①一个典型的kafka集群中包含:

    若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),
    若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),
    若干consumer group,
    一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。(自动负载均衡)

  ②负载均衡(Rebalance)

    rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

    比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。

    触发的条件:

      ①组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)
      ②订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
      ③订阅主题的分区数发生变更

      注:组成员、订阅的主题数、订阅主题数的分区数发生变化都会触发负载均衡,负载均衡是有ZK实现的。

  ③分区
    1、原因 :
      (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;

          注:如果存储不够,那么加机器,然后对topic进行分区,新分的区在新节点上,实现了高扩展。

      (2)可以提高并发,因为可以以Partition为单位读写了。

         注:因为之前是Topic是,生产者写入的时候很拥挤,但是分区之后,生产者可以去分区上写,提高了并发。

    2、分区的原则

      当消息写入时:

        (1)指定了patition,则直接使用;
        (2)未指定patition但指定key,通过对key的value进行hash出一个patition
        (3)patition和key都未指定,使用轮询选出一个patition。

    3、有序

      消费者读取一个分区消息的顺序和生产者写入到这个分区的顺序是一致的。比如,生产者写入“hello”和“Kafka”两条消息到分区P1,
      则消费者读取到的顺序也一定是“hello”和“Kafka”。

      但如果有多个分区,消息落入不同的分区,也就导致可能没有顺序,此时只能设置一个分区。缺点就是只能有一个消费者。(此时的一个指的是只能有一个消费者
      进行这个分区的处理,如果数据过大,并不能很多的处理!而不是只能有一个消费者订阅)

      一般来说,只需要保证每个分区的有序性,再对消息假设键来保证相同键的所有消息落入同一分区,就可以满足绝大多数的应用。

      注: ①如果producer往特定的partition发送消息时,会按照先后顺序存储,也就是说如果发送顺序是message1、message2、message3。
          那么这三个消息在partition log中的记录的顺序也是message1、message2、message3。

         ②consumer也是有序的浏览log中的记录。不会发生今天的消息在昨天的消息中出现

5、副本(Replication)

    ①Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上。
     其中一个节点会作为主副本(Leader),其他节点作为备份副本(Follower,也叫作从副本。一般都是一个Leader和两个fllower)。

    ②主副本会负责所有的客户端写操作,备份副本仅仅从主副本同步数据,和读数据。

    ③当主副本出现故障时,备份副本中的一个副本会被选择为新的主副本。

    ④因为每个分区的副本中只有主副本接受写,所以每个服务器端都会作为某些分区的主副本,以及另外一些分区的备份副本。
     保证 Producer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力

     这样Kafka集群的所有服务端整体上对客户端是负载均衡的!!!

    注:杜绝了某些服务器上的所有分区都不是Leader ,而不参与写文件!而某些服务器上分区的Leader很多,参与写文件太多!!

    注:如果一个topic指定了replication factor(副本块)为N,那么就允许有N-1个Broker出错。(高容错)

6、集群(了解)

  producer集群:

    1、生产者负责获取数据,比如Flume、自定义数据采集脚本。生产者会监控某一个目录并将数据发送给Kafka
    2、生产者集群是由多个进程组成,一个生产者可以作为一个独立的进程
    3、多个生产者发送的数据时可以放到同一个topic的同一个分区的
    4、一个生产者生产的数据可以放到多个topic里
    5、单个生产者具有数据分发的能力

  Kafka集群:

    1、Kafka集群可以保存多种数据类型的数据,一种数据可以保存到一个topic,一个kafka集群可以保存多个topic
    2、每个topic里面可以创建多个分区,分区的数量是在创建topic时指定的
    3、每个分区的数据由多个Segment组成,一个Segement有index和data文件组成
    4、一个topic的分区数据可以有多个备份,备份数据也是在创建topic时指定,原始数据和备份数据不可以再同一节点上

  consumer集群:

    1、消费者负责拉取数据进行消费,比如SparkStreaming实时的获取的kfaka的数据进行计算
    2、一个ConsumerGroup称为Consumer集群
    3、新增或减少ConsumerGroup组成员会触发负载均衡
    4、ConsumerGroup可以消费一个或多个分区的数据,每个消息分区只能被组中的一个Consumer所消费

7、写入流程

    1)producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader

    2)producer将消息发送给该leader

    3)leader将消息写入本地log

    4)followers从leader pull消息,写入本地log后向leader发送ACK

    5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK //这里选择的消息同步机制是-1

8、读数据模型

    Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。
    如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。
    消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。

    这种由消费者控制偏移量的优点是:

    消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

9、kafka消息与同步机制(delivery guarantee)

    kafka0.8以后不区分异步和同步,都是异步发送。

    同步交互:指发送一个请求,需要等待返回,然后才能够发送下一个请求,有个等待过程;
异步交互:指发送一个请求,不需要等待返回,随时可以再发送下一个请求,即不需要等待。
区别:一个需要等待,一个不需要等待,在部分情况下,我们的项目开发中都会优先选择不需要等待的异步交互方式。 producer与consumer有可能的delivery guarantee: At most once 消息可能会丢,但绝不会重复传输 (如果在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,因此不会传递给consumer。
在大多数情况下,这样做是为了避免重复的可能性,
业务上必须接收数据传递可能的丢失。) At least one 消息绝不会丢,但可能会重复传输 (如果producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。
但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。
如果broker在发送Ack之前失败(超时),但在消息成功写入Kafka之后,此重试将
导致该消息被写入两次,因此消息会被不止一次地传递给最终consumer,这种策略可能导致重复的工作和不
正确的结果。) Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的 该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作 ①Producer 可以通过request.required.acks参数的设置来进行调整: 0 ,相当于异步发送,消息发送完毕即offset增加,继续生产;相当于At most once 1,leader收到leader replica 对一个消息的接受ack才增加offset(消费者在队列中能看见这条消息),然后继续生产;     -1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产 当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,
遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。就会重复发送。所以最多通过设置参数,能达到At least one ②Consumer 出现的原因主要是处理消息和提交偏移量的顺序不同!! 1、读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,
这就对应于At most once 2、读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,
实际上该消息已经被处理过了。这就对应于At least once(默认) 3、如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,
会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,
如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。
(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中) 注:消息的不同步,导致生产者消息丢失、重复传输,最多只能做到消息不丢,但重复传输 注:消费者处理消息和提交偏移量的顺序不同,导致消息重复消费、或者不消费
—————————————————————————————————————————————————————————————————————— 所以会产生以下的问题: ①重复消费: 场景:比如直连,开始一段逻辑,然后中途连接数据库,最后一段逻辑。
如果错误发生在最后一段逻辑,但是数据库连接没失败,也进行了操作,会导致这次的任务重新执行! 解决:
①幂等
②事务,将整个流程设为一个事务 比如用户对订单付款之后,生成了一个付款成功的消息,发送给了订单系统,订单
系统接收到消息之后,将订单状态为已付款,后来,订单系统又收到了一个发货成功的消息,再将订单状态
更新为已发货,但是由于网络或者是系统的原因,订单系统再次收到了之前的付款成功的消息,也就是消息
重复了,这个在现象在实际应用中也经常出现。订单系统的处理是,查询数据库,发现这个订单状态为已发
货,然后不再更改订单状态。这时候,我们可以说订单处理消息的接口是幂等的,如果订单再次将状态更新
为已付款,接口就是非幂等的 ②消息丢失 场景:(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失; 同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;(有时候写数据过多,内存溢出也可能导致消息丢失) 解决: 首先对kafka进行限速(防止缓冲区满), 其次启用重试机制,重试间隔时间设置长一些,
最后Kafka设置acks=all,即需要相应的所有处于ISR的分区(所有副本)都确认收到该消息后,才算发送成功 ③消息重复(和重复消费不同) 针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

10、kafka中zookeeper的作用

    1、每个分区leader 选举 和 follower 信息同步 //follower的信息是只读的!

    2、管理Broker、Topic元数据:

      Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。
      在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:

      /brokers/ids

      在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,
      由专门的节点来记录,如:

      /borkers/topics

    3、消费者负载均衡

      rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

      比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。
      如果多了20个consumer,那么会触发负载均衡,导致所有的consumer重新分配哪个consumer去消费哪些分区。这个过程叫rebalance

      触发的条件:

        ①组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)
        ②订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
        ③订阅主题的分区数发生变更

      注:组成员、订阅的主题数、订阅主题数的分区数发生变化

    4、消费进度Offset 记录

      consumer group可以订阅多个topic,然后每个topic包含N个partition。然后通过rebalance(负载均衡)实现consumer group中的每个consumer共同消费
      这些topic的所有partition。

      在消费者对指定消息分区进行消息消费的过程中,会定时提交分区消息的消费进度Offset到Zookeeper上
      以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。

      (也就是说,zookeeper会以组为单位,记录每个consumer group所订阅的所有topic的所有分区的所有消息进度offset,在进行rebalance时,能够更好的进行consumer去消费哪个partition)

      注:当读数据的记录偏移量为10,每过3s,消费者提交一次,zookeeper进行记录,如果在2s时宕机,此时已经读到了15,下一台接手的机器会从10开始读。