企业级消息队列(Kafka)
- Kafka是什么? 消息队列
- 为什么要有消息队列? 解耦、异构、并行
- Kafka数据生成方式
- producer-->kafka---->保存到本地
- consumer---主动拉取数据
- Kafka核心概念
- producer(生产者)
- 消息不丢失
- 数据分发策略
- kafka brokers
- topic(主题):一类消息,比如用户信息、商品信息、订单信息
- partition
- 为什么要有partition? 数据太大,单个节点无法存储。
- partition的物理形态? 文件夹
- partition的数量设计?
- 如果集群很小(10),将partition设置为broker的数量
- 如果集群大于10台,就要根据数据量来设计
- replication
- 为什么要有replication? 保证数据安全,数据容错的考虑。
- 设置几个副本?
- 数据的越重要,副本数可以设的越高。但是,数据冗余高,ack时间越长,效率越低。
- 平常设置2个。
- segment(段)
- 为什么要分段? 如果文件巨大,删除麻烦、查找麻烦。
- 分段大小,1G。这个可以设置。
- 删除数据的过期时间,168小时,等于7天。注意,在规划Kafka集群的是,要考虑数据存储几天的。Kafka集群的数量,建议3-5台。 24T * 5 = 120T
- segment物理形态:有个log文件和index索引文件。
- log 存放的原始数据
- index 存放的是offset和物理地址值
- 数据查找中会有二分查找法(掌握)
- 为什么要分段? 如果文件巨大,删除麻烦、查找麻烦。
- pagecache
- 现代操作系统提供缓存技术,Kafka将当前生产的数据保存在缓存中。
- 由于生产和消费时间差很小,所以消费者消费数据的时候,基本上都是从内存中获取数据。
- sendfile
- 作为消费者,想消费历史的数据。senfile技术不经过应用,在操作系统层面,读取完数据之后,直接输出到网卡。
- partition isr
- 如果partition有多个副本,需要选择一个leader出来,负责数据的读写读写读写操作。
- 这个leader有可能挂掉,因为压力大。
- 如果使用投票机制,会有相对来说比较大时间消费,时刻准备好备胎。
- 满足什么样条件才能isr成员?同步leader的数据,在某个时间阈值和数量阈值内。不满足条件就踢出ISR。
- partition leader
- leader是针对partition的描述。
- 负责数据的读写。
- consumer
- consumerGroup,消费数据都是以消费组的形式 出现的。
- 消费组中的成员,消费数据是互不干扰的。当有一个消费者挂了之后,会在确定消费者无法重新消费后,触发负载均衡。
- 两个不同的消费组,消费同一个topic的数据,都是完整。注意:在实际开发过程,要将自己的消费组设计成唯一的。
- consumer消费offset管理。
- 0.8版本,offset是有zookeeper进行管理的。
- 0.8+,可以选择使用kafka的consumer_offset的topic进行管理。
- producer(生产者)
- Kafka常见问题?
- Kafka为什么那么快? pageche、sendfile
- Kafka消费不丢失机制? producer、broker、consumer。
- Kafka消费数据全局有序? 单个partition是有序,全局有序违背设计的初衷。
流式计算框架(Storm)
- 流式计算框架的组成:一般flume+kafka+storm+redis,每个组件都可以被替换掉。
- Storm是什么? 流式计算框架,一旦启动,永不停止。
- Storm架构是什么?
- client:用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
- nimbus:发布rpc的服务,接受client的任务提交,对任务进行校验。将任务放入任务队列中(阻塞队列)。后台线程读取队列,获得任务信息,进行任务分配。
- 获取当前集群中,空闲的worker资源。
- 获取当前任务需要多少个Task。Task数量就是所有component(组件)的并行数量加上每个worker会启动的一个ackerBolt之和。
- 将任务信息保存到zookeeper。
- zookeeper:zookeeper保存任务信息及节点各种其他信息。
- supervisor:通过watch机制,得到任务信息,然后启动属于自己的worker。
- worker:被supervisor启动,负责具体的任务执行。
- Task:本质上是个线程,是个executor。分为三种类型 SpoutTask、BoltTask、AckerTask。
- Storm的编程模型?
- Spout extends baseRichSpout
- open :初始化方法
- nextTuple :有个while一直调用该方法,调用一次发送一次数据。
- field:声明输出的字段名称和数量
- bolt1 extends baseRichBolt (手动ack)
- prepare:初始化方法
- execute: 执行方法
- field:声明输出的字段名称和数量
- bolt2 extends baseBasicBolt(自动ack)
- execute: 执行方法
- field:声明输出的字段名称和数量
- 驱动类:topologybuilder
- 运行模式:本地模式、集群模式
- Spout extends baseRichSpout
- Storm组件的并行度怎么设置?
- Spout是根据上游kafka的topic的分区数量设置。
- Bolt1是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)
- Bolt2是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)
- Spout的worker的数量怎么设置?
- 根据所有组件的并行度之和,进行设置。
- 可以一个worker有两个Spout或者多个Spout。
- 如果Spout下游不同层级的所有的bolt的数量很多情况下,运算压力,可以考虑worker和spout数量保持一致。
- 压力更大,只能修改partition的数量。
- 如果修改不了partition数量,只能曾加worker数,任由数据充斥在网络中。
- Spout上下游衔接策略(StreamGrouping)
- localorshuffle 分组策略是任何时候的第一选择。
- fieldGroup 字段分组。
Storm的原理
- 任务提交流程
- 用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
- 扩展:RPC框架,动态+反射技术+网络通信技术。
- 集群启动流程
- Java系统流程: Java -jar、Java -server、Java -client
- 手动启动:nimbus、supervisor
- 自动启动:supervisor根据任务信息启动worker。
- 任务执行流程
- 与nimbus、supervisor没有半毛钱关系,都在worker。
- SpoutTask.open() 一般用来打开外部的数据源
- while 方式调用 nextTuple方法。发送数据,需要考虑数据的分组策略。
- 发送数据都是发送Tuple,会携带当前Tuple要发送给哪个taskid。
- 然后根据task分配信息,得到taskid所在的worker。
- 通过网络请求将tuple发送给远端的worker。
- 远端的worker有个接受的线程,根据taskid找到对应的Bolt的输入队列(无锁队列,每秒处理600万订单),将tuple放到bolt的输入中。
- 每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。
- Bolt的execute方法接收到了Tuple,经过一顿处理。然后向下游发送数据Tuple。
- 发送数据时,会根据下游的分组策略。比如:localorshuffle。
- 如果是localorshuffle方式,直接找到当前worker中的对应Task,进行分发。将数据放入响应bolt的输入队列。
- 每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。
- 如此循环。
- 消息不丢失机制
- 如何开启消息不丢失机制?
- spout端发送数据的时候,要加上messageid.
- spout要重写ack和fail方法。
- 在topologybuiler的config文件中设置setNumAckers的数量大于1.默认是1
- 在下游个每个层级bolt上,需要增加锚点。
- 现象是什么?
- 当消息处理成功,会调用ack方法。
- 当消息处理失败
- 超时处理,默认30S,会调用faile方法,并传入messageid。
- 真的异常了,消息重发。 消息重发,需要手动设置。
- 消息重发,最好是在spout发送tuple的时候,将tuple本身当做messageid传入,失败后,直接发送messageid
- 实现机制?异或机制。
- 相同为0,不同为1。
- 需要上游发送时候的时候,发一个状态。并且需要下游处理完数据之后,发送一个状态。这两个状态值是一样的。
- 每个层级都会产生新的锚点id。64位长整型。
- 如何开启消息不丢失机制?