本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:
概述
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为*开源项目。
主要应用场景是:日志收集系统和消息系统。
安装kafka-php项目依赖
1
|
composer require nmred/kafka-php
|
produce.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<?php
require './vendor/autoload.php' ;
date_default_timezone_set( 'PRC' );
$config = \Kafka\ProducerConfig::getInstance();
$config ->setMetadataRefreshIntervalMs(10000);
$config ->setMetadataBrokerList( '127.0.0.1:9092' );
$config ->setBrokerVersion( '0.10.2.1' );
$config ->setRequiredAck(1);
$config ->setIsAsyn(false);
$config ->setProduceInterval(500);
$producer = new \Kafka\Producer( function () {
$t = time();
return array (
array (
'topic' => 'test' ,
'value' => $t ,
'key' => $t ,
),
);
});
$producer ->success( function ( $result ) {
var_export( $result );
});
$producer ->error( function ( $errorCode ) {
var_dump( 'error' , $errorCode );
});
$producer ->send();
|
consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<?php
require './vendor/autoload.php' ;
date_default_timezone_set( 'PRC' );
$config = \Kafka\ConsumerConfig::getInstance();
$config ->setMetadataRefreshIntervalMs(10000);
$config ->setMetadataBrokerList( '127.0.0.1:9092' );
$config ->setGroupId( 'test' );
$config ->setBrokerVersion( '0.10.2.1' );
$config ->setTopics( array ( 'test' ));
$consumer = new \Kafka\Consumer();
$consumer ->start( function ( $topic , $part , $message ) {
var_dump( $message );
});
|
测试生产者
1
|
php produce.php
|
测试消费者
1
|
php consumer.php
|
希望本文所述对大家PHP程序设计有所帮助。
原文链接:https://my.oschina.net/qiongtaoli/blog/903889