Kafka简介及使用PHP处理Kafka消息
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
Kafka的特点:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
- 支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
- 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
- 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
- 同时支持离线数据处理和实时数据处理。
Kafka的架构:
kafka架构图
Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。
Kafka基本概念:
- Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
- Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
- Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
- Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
- Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
- Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。
Kafka消息发送的流程:
Kafka消息发送
下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):
1.从zookeeper源码src/c/src安装zookeeper c client
1
2
3
|
cd
zookeeper
-
3.4.8
/
src
/
c
.
/
configure
make
&&
make
install
|
2.编译php libzookper扩展
1
2
3
4
5
|
git
clone
https
:
/
/
github
.com
/
Timandes
/
libzookeeper
.git
cd
libzookeeper
phpize
.
/
configure
--
with
-
libzookeeper
=
/
usr
/
local
/
bin
/
cli_mt
make
&&
make
install
|
3.编译php zookeeper扩展
1
2
3
4
5
|
git
clone
https
:
/
/
github
.com
/
andreiz
/
php
-
zookeeper
.git
cd
php
-
zookeeper
phpize
.
/
configure
make
&&
make
install
|
4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展
1
2
|
extension
=
libzookeeper
.so
extension
=
zookeeper
.so
|
PHP处理Kafka消息:
1.启动zookeeper和kafka
1
2
|
kafka_2
.
11
-
0.10.0.0
/
bin
/
zookeeper
-
server
-
start
.sh
--
daemon
kafka_2
.
11
-
0.10.0.0
/
config
/
zookeeper
.properties
kafka_2
.
11
-
0.10.0.0
/
bin
/
kafka
-
server
-
start
.sh
kafka_2
.
11
-
0.10.0.0
/
config
/
server
.properties
|
2.创建由2个partition组成的、名为testtopic的topic
1
|
kafka_2
.
11
-
0.10.0.0
/
bin
/
kafka
-
topics
.sh
--
create
--
zookeeper
localhost
:
2181
--
replication
-
factor
1
--
partitions
2
--
topic
testtopic
|
3.composer安装nmred/kafka-php
1
|
composer
require
"nmred/kafka-php"
|
4.producer.php代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<?php
require_once
(
'./vendor/autoload.php'
)
;
$produce
=
\
Kafka
\
Produce::
getInstance
(
'localhost:2181'
,
3000
)
;
$produce
->
setRequireAck
(
-
1
)
;
$topicName
=
'testtopic'
;
//获取到topic下可用的partitions
$partitions
=
$produce
->
getAvailablePartitions
(
$topicName
)
;
$partitionCount
=
count
(
$partitions
)
;
$count
=
1
;
while
(
true
)
{
$message
=
json_encode
(
array
(
'uid'
=
>
$count
,
'age'
=
>
$count
%
100
,
'datetime'
=
>
date
(
'Y-m-d H:i:s'
)
)
)
;
//发送消息到不同的partition
$partitionId
=
$count
%
$partitionCount
;
$produce
->
setMessages
(
'testtopic'
,
$partitionId
,
array
(
$message
)
)
;
$result
=
$produce
->
send
(
)
;
var_dump
(
$result
)
;
$count
++
;
echo
"producer sleeping\n"
;
sleep
(
1
)
;
}
|
5.consumer.php代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<?php
require_once
(
'./vendor/autoload.php'
)
;
//获取需要处理的partitionId
$partitionId
=
isset
(
$argv
[
1
]
)
?
intval
(
$argv
[
1
]
)
:
0
;
$consumer
=
\
Kafka
\
Consumer::
getInstance
(
'localhost:2181'
)
;
$consumer
->
setGroup
(
'test-consumer-group'
)
;
$consumer
->
setPartition
(
'testtopic'
,
$partitionId
)
;
$consumer
->
setFromOffset
(
true
)
;
$consumer
->
setMaxBytes
(
102400
)
;
while
(
true
)
{
$topic
=
$consumer
->
fetch
(
)
;
foreach
(
$topic
as
$topicName
=
>
$partition
)
{
foreach
(
$partition
as
$partId
=
>
$messageSet
)
{
foreach
(
$messageSet
as
$message
)
{
var_dump
(
$message
)
;
}
}
}
echo
"consumer sleeping\n"
;
sleep
(
1
)
;
}
|
6.运行php代码
在3个终端界面分别运行
1
2
3
|
php
producer
.
php
php
consumer
.
php
0
php
consumer
.
php
1
|
7.结果
两个consumer脚本依次收到producer发送的消息
php-kafka-consumer-output