kafka java使用

时间:2022-07-24 18:59:15

首先添加maven依赖

  Kafka

    <dependency>

      <groupId>org.apache.kafka</groupId>

      <artifactId>kafka_2.10</artifactId>

          <version>0.8.2.2</version>

    </dependency>

  Spring

    <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>4.1.4.RELEASE</version>
    </dependency>
    <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-beans</artifactId>
         <version>4.1.4.RELEASE</version>
    </dependency>
    <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-core</artifactId>
         <version>4.1.4.RELEASE</version>
    </dependency>

  Kafka API

  <dependency>

      <groupId>com.zc</groupId>

      <artifactId>kafka-api-core</artifactId>

      <version>0.1</version>

    </dependency>

kafka-api-core-0.1.jar下载地址:http://files.cnblogs.com/files/zcjy/kafka-api-core-0.1.zip

使用kafka java api有两种方式:

第一种:源码方式

  Consumer:

        KafkaConsumerOptions options = new KafkaConsumerOptions();
options.setConnectionZk("localhost.zk1:2181,localhost.zk2:2181,localhost.zk3:2181");//连接zookeeper
options.setGroupName("group_zc");//设置group
options.setZkSessionTimeout("4000");
options.setZkSyncTime("2000");
options.setAutoCommitInterval("1000");
options.setRebalanceBackOff("4000");
options.setRebalanceMaxRetries("11");
options.setAutoOffsetReset("smallest");
options.setTopicName("testTopic");//设置topic
KafkaConsumerFactory factory = new KafkaConsumerFactory(options);
        factory.initialize();
KafkaConsumer consumer = new KafkaConsumer(factory);
consumer.start(new BaseConsumerHandler() { 
public void execute(String message) {
System.out.println("message : " + message);
}
});

  Producer:

     KafkaProduceOptions options = new KafkaProduceOptions();
options.setMetadataBrokerList("localhost.kafka1:9092,localhost.kafka2:9092,localhost.kafka3:9092");//连接kafka集群
options.setTopicName("testTopic");//设置topic
KafkaProduceFactory factory = new KafkaProduceFactory(options);
factory.sendMessage("test kafka at " + new Date());

第二种:注解方式

  Consumer:

    配置

      <bean id="kafkaConsumerOptions" class="com.zc.kafka.api.core.factory.KafkaConsumerOptions">
            <property name="connectionZk" value="localhost.zk1:2181,localhost.zk2:2181,localhost.zk2:2181" />
            <property name="groupName" value="test_kafka" />
            <property name="zkSessionTimeout" value="4000" />
            <property name="zkSyncTime" value="1000" />
            <property name="autoCommitInterval" value="2000" />
            <property name="rebalanceBackOff" value="2000" />
            <property name="rebalanceMaxRetries" value="11" />
            <property name="autoOffsetReset" value="smallest" />
            <property name="topicName" value="testTopic" />
      </bean>
      <bean id="kafkaConsumerFactory" class="com.zc.kafka.api.core.factory.KafkaConsumerFactory"
         init-method="initialize">
            <constructor-arg ref="kafkaConsumerOptions" />
      </bean>

      <bean id="kafkaConsumer" class="com.zc.kafka.api.core.consumer.KafkaConsumer">
            <constructor-arg ref="kafkaConsumerFactory" />
      </bean>

    代码:

      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-kafka-consumer.xml");
      context.start();
      KafkaConsumer consumer = context.getBean(KafkaConsumer.class);
      consumer.start(new BaseConsumerHandler() {
            public void execute(String message) {
                  System.out.println("message : " + message);
            }
      });

  

  Producer:

    配置

      <bean id="kafkaConsumerOptions" class="com.zc.kafka.api.core.factory.KafkaProduceOptions">
            <property name="metadataBrokerList" value="localhost.kafka1:9092,localhost.kafka2:9092,localhost.kafka3:9092" />

        <property name="topicName" value="testTopic" />
      </bean>
      <bean id="kafkaProduceFactory" class="com.zc.kafka.api.core.factory.KafkaProduceFactory"
        init-method="initialize"> 
            <constructor-arg ref="kafkaProduceOptions" />
      </bean>

      <bean id="kafkaProduce" class="com.zc.kafka.api.core.produce.KafkaProduce">
            <constructor-arg ref="kafkaProduceFactory" />
      </bean>

    代码:        

      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-kafka-produce.xml");
      context.start();
      KafkaProduce produce = context.getBean(KafkaProduce.class);
      produce.send("test send produce message " + new Date());