kafka_2.9.2-0.8.1.1分布式集群搭建代码开发实例

时间:2021-10-15 09:09:21

准备3台虚拟机, 系统是RHEL64服务版. 1) 每台机器配置如下:
$ cat /etc/hosts
    # zookeeper hostnames:       192.168.8.182       zk1       192.168.8.183       zk2       192.168.8.184       zk3  
2) 每台机器上安装jdk, zookeeper, kafka, 配置如下:
$ vi /etc/profile            # jdk, zookeeper, kafka       export KAFKA_HOME=/usr/local/lib/kafka/kafka_2.9.2-0.8.11       export ZK_HOME=/usr/local/lib/zookeeper/zookeeper-3.4.6       export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar       export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$KAFKA_HOME/bin:$ZK_HOME/bin:$PATH  
3) 每台机器上运行:
$ source /etc/profile
$ mkdir -p /var/lib/zookeeper
$ cd $ZK_HOME/conf
$ cp zoo_sample.cfg zoo.cfg
$ vi zoo.cfg            dataDir=/var/lib/zookeeper              # the port at which the clients will connect       clientPort=2181              # zookeeper cluster       server.1=zk1:2888:3888       server.2=zk2:2888:3888       server.3=zk3:2888:3888  
4) 每台机器上生成myid:
zk1:
$ echo "1" > /var/lib/zookeeper/myid
zk2:
$ echo "2" > /var/lib/zookeeper/myid
zk3:
$ echo "3" > /var/lib/zookeeper/myid 5) 每台机器上运行setup关闭防火墙
Firewall:
[   ] enabled 6) 每台机器上启动zookeeper:
$ zkServer.sh start
查看状态:
$ zkServer.sh status
1)下载KAFKA
    $ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
安装和配置参考上一篇文章:
http://blog.csdn.net/ubuntu64fan/article/details/26678877
2)配置$KAFKA_HOME/config/server.properties
我们安装3个broker,分别在3个vm上:zk1,zk2,zk3:
zk1:
$ vi /etc/sysconfig/network
    NETWORKING=yes       HOSTNAME=zk1  
$ vi $KAFKA_HOME/config/server.properties
    broker.id=0       port=9092       host.name=zk1       advertised.host.name=zk1       ...       num.partitions=2       ...       zookeeper.contact=zk1:2181,zk2:2181,zk3:2181  
zk2:
$ vi /etc/sysconfig/network
    NETWORKING=yes       HOSTNAME=zk2  
$ vi $KAFKA_HOME/config/server.properties
    broker.id=1       port=9092       host.name=zk2       advertised.host.name=zk2       ...       num.partitions=2       ...       zookeeper.contact=zk1:2181,zk2:2181,zk3:2181  
zk3:
$ vi /etc/sysconfig/network
    NETWORKING=yes       HOSTNAME=zk3  
$ vi $KAFKA_HOME/config/server.properties
    broker.id=2       port=9092       host.name=zk3       advertised.host.name=zk3       ...       num.partitions=2       ...       zookeeper.contact=zk1:2181,zk2:2181,zk3:2181  
3)启动zookeeper服务, 在zk1,zk2,zk3上分别运行:
$ zkServer.sh start 4)启动kafka服务, 在zk1,zk2,zk3上分别运行:
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties 5) 新建一个TOPIC(replication-factor=num of brokers)
$ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181 6)假设我们在zk2上,开一个终端,发送消息至kafka(zk2模拟producer)
$ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
在发送消息的终端输入:Hello Kafka
7)假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
$ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning 在消费消息的终端显示:Hello Kafka
项目准备开发
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版
本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

  1. <dependencies>
  2. <dependency>
  3. <groupId>log4j</groupId>
  4. <artifactId>log4j</artifactId>
  5. <version>1.2.14</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.kafka</groupId>
  9. <artifactId>kafka_2.8.2</artifactId>
  10. <version>0.8.0</version>
  11. <exclusions>
  12. <exclusion>
  13. <groupId>log4j</groupId>
  14. <artifactId>log4j</artifactId>
  15. </exclusion>
  16. </exclusions>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.scala-lang</groupId>
  20. <artifactId>scala-library</artifactId>
  21. <version>2.8.2</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>com.yammer.metrics</groupId>
  25. <artifactId>metrics-core</artifactId>
  26. <version>2.2.0</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.101tec</groupId>
  30. <artifactId>zkclient</artifactId>
  31. <version>0.3</version>
  32. </dependency>
  33. </dependencies>

Producer端代码
    1) producer.properties文件:此文件放在/resources目录下

  1. #partitioner.class=
  2. ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
  3. ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
  4. ##此值,我们可以在spring中注入过来
  5. ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
  6. ##,127.0.0.1:9093
  7. ##同步,建议为async
  8. producer.type=sync
  9. compression.codec=0
  10. serializer.class=kafka.serializer.StringEncoder
  11. ##在producer.type=async时有效
  12. #batch.num.messages=100

2) KafkaProducerClient.java代码样例

  1. import java.util.ArrayList;
  2. import java.util.Collection;
  3. import java.util.List;
  4. import java.util.Properties;
  5. import kafka.javaapi.producer.Producer;
  6. import kafka.producer.KeyedMessage;
  7. import kafka.producer.ProducerConfig;
  8. public class KafkaProducerClient {
  9. private Producer<String, String> inner;
  10. private String brokerList;//for metadata discovery,spring setter
  11. private String location = "kafka-producer.properties";//spring setter
  12. private String defaultTopic;//spring setter
  13. public void setBrokerList(String brokerList) {
  14. this.brokerList = brokerList;
  15. }
  16. public void setLocation(String location) {
  17. this.location = location;
  18. }
  19. public void setDefaultTopic(String defaultTopic) {
  20. this.defaultTopic = defaultTopic;
  21. }
  22. public KafkaProducerClient(){}
  23. public void init() throws Exception {
  24. Properties properties = new Properties();
  25. properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
  26. if(brokerList != null) {
  27. properties.put("metadata.broker.list", brokerList);
  28. }
  29. ProducerConfig config = new ProducerConfig(properties);
  30. inner = new Producer<String, String>(config);
  31. }
  32. public void send(String message){
  33. send(defaultTopic,message);
  34. }
  35. public void send(Collection<String> messages){
  36. send(defaultTopic,messages);
  37. }
  38. public void send(String topicName, String message) {
  39. if (topicName == null || message == null) {
  40. return;
  41. }
  42. KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
  43. inner.send(km);
  44. }
  45. public void send(String topicName, Collection<String> messages) {
  46. if (topicName == null || messages == null) {
  47. return;
  48. }
  49. if (messages.isEmpty()) {
  50. return;
  51. }
  52. List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
  53. int i= 0;
  54. for (String entry : messages) {
  55. KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
  56. kms.add(km);
  57. i++;
  58. if(i % 20 == 0){
  59. inner.send(kms);
  60. kms.clear();
  61. }
  62. }
  63. if(!kms.isEmpty()){
  64. inner.send(kms);
  65. }
  66. }
  67. public void close() {
  68. inner.close();
  69. }
  70. /**
  71. * @param args
  72. */
  73. public static void main(String[] args) {
  74. KafkaProducerClient producer = null;
  75. try {
  76. producer = new KafkaProducerClient();
  77. //producer.setBrokerList("");
  78. int i = 0;
  79. while (true) {
  80. producer.send("test-topic", "this is a sample" + i);
  81. i++;
  82. Thread.sleep(2000);
  83. }
  84. } catch (Exception e) {
  85. e.printStackTrace();
  86. } finally {
  87. if (producer != null) {
  88. producer.close();
  89. }
  90. }
  91. }
  92. }

Consumer端
     1) consumer.properties:文件位于/resources目录下

  1. ## 此值可以配置,也可以通过spring注入
  2. ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
  3. ##,127.0.0.1:2182,127.0.0.1:2183
  4. # timeout in ms for connecting to zookeeper
  5. zookeeper.connectiontimeout.ms=1000000
  6. #consumer group id
  7. group.id=test-group
  8. #consumer timeout
  9. #consumer.timeout.ms=5000
  10. auto.commit.enable=true
  11. auto.commit.interval.ms=60000

2) KafkaConsumerClient.java代码样例

  1. package com.test.kafka;
  2. import java.nio.ByteBuffer;
  3. import java.nio.CharBuffer;
  4. import java.nio.charset.Charset;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Properties;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. import kafka.consumer.Consumer;
  12. import kafka.consumer.ConsumerConfig;
  13. import kafka.consumer.ConsumerIterator;
  14. import kafka.consumer.KafkaStream;
  15. import kafka.javaapi.consumer.ConsumerConnector;
  16. import kafka.message.Message;
  17. import kafka.message.MessageAndMetadata;
  18. public class KafkaConsumerClient {
  19. private String groupid; //can be setting by spring
  20. private String zkConnect;//can be setting by spring
  21. private String location = "kafka-consumer.properties";//配置文件位置
  22. private String topic;
  23. private int partitionsNum = 1;
  24. private MessageExecutor executor; //message listener
  25. private ExecutorService threadPool;
  26. private ConsumerConnector connector;
  27. private Charset charset = Charset.forName("utf8");
  28. public void setGroupid(String groupid) {
  29. this.groupid = groupid;
  30. }
  31. public void setZkConnect(String zkConnect) {
  32. this.zkConnect = zkConnect;
  33. }
  34. public void setLocation(String location) {
  35. this.location = location;
  36. }
  37. public void setTopic(String topic) {
  38. this.topic = topic;
  39. }
  40. public void setPartitionsNum(int partitionsNum) {
  41. this.partitionsNum = partitionsNum;
  42. }
  43. public void setExecutor(MessageExecutor executor) {
  44. this.executor = executor;
  45. }
  46. public KafkaConsumerClient() {}
  47. //init consumer,and start connection and listener
  48. public void init() throws Exception {
  49. if(executor == null){
  50. throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
  51. }
  52. Properties properties = new Properties();
  53. properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
  54. if(groupid != null){
  55. properties.put("groupid", groupid);
  56. }
  57. if(zkConnect != null){
  58. properties.put("zookeeper.connect", zkConnect);
  59. }
  60. ConsumerConfig config = new ConsumerConfig(properties);
  61. connector = Consumer.createJavaConsumerConnector(config);
  62. Map<String, Integer> topics = new HashMap<String, Integer>();
  63. topics.put(topic, partitionsNum);
  64. Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
  65. List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
  66. threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
  67. //start
  68. for (KafkaStream<byte[], byte[]> partition : partitions) {
  69. threadPool.execute(new MessageRunner(partition));
  70. }
  71. }
  72. public void close() {
  73. try {
  74. threadPool.shutdownNow();
  75. } catch (Exception e) {
  76. //
  77. } finally {
  78. connector.shutdown();
  79. }
  80. }
  81. class MessageRunner implements Runnable {
  82. private KafkaStream<byte[], byte[]> partition;
  83. MessageRunner(KafkaStream<byte[], byte[]> partition) {
  84. this.partition = partition;
  85. }
  86. public void run() {
  87. ConsumerIterator<byte[], byte[]> it = partition.iterator();
  88. while (it.hasNext()) {
  89. // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
  90. MessageAndMetadata<byte[], byte[]> item = it.next();
  91. try{
  92. executor.execute(new String(item.message(),charset));// UTF-8,注意异常
  93. }catch(Exception e){
  94. //
  95. }
  96. }
  97. }
  98. public String getContent(Message message){
  99. ByteBuffer buffer = message.payload();
  100. if (buffer.remaining() == 0) {
  101. return null;
  102. }
  103. CharBuffer charBuffer = charset.decode(buffer);
  104. return charBuffer.toString();
  105. }
  106. }
  107. public static interface MessageExecutor {
  108. public void execute(String message);
  109. }
  110. /**
  111. * @param args
  112. */
  113. public static void main(String[] args) {
  114. KafkaConsumerClient consumer = null;
  115. try {
  116. MessageExecutor executor = new MessageExecutor() {
  117. public void execute(String message) {
  118. System.out.println(message);
  119. }
  120. };
  121. consumer = new KafkaConsumerClient();
  122. consumer.setTopic("test-topic");
  123. consumer.setPartitionsNum(2);
  124. consumer.setExecutor(executor);
  125. consumer.init();
  126. } catch (Exception e) {
  127. e.printStackTrace();
  128. } finally {
  129. if(consumer != null){
  130. consumer.close();
  131. }
  132. }
  133. }
  134. }

需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。