kafka入门2:java 创建及删除 topic

时间:2021-03-09 08:26:46

1.pom

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.1</version>
  </dependency>

 

2.KafkaTopicBean

public class KafkaTopicBean {
    
    private String topicName;       // topic 名称
    private Integer partition;      // partition 分区数量
    private Integer replication;    // replication 副本数量
    private String descrbe;  
    
    public String getTopicName() {  
        return topicName;  
    }  
  
    public void setTopicName(String topicName) {  
        this.topicName = topicName;  
    }  
  
    public Integer getPartition() {  
        return partition;  
    }  
  
    public void setPartition(Integer partition) {  
        this.partition = partition;  
    }  
  
    public Integer getReplication() {  
        return replication;  
    }  
  
    public void setReplication(Integer replication) {  
        this.replication = replication;  
    }  
  
    public String getDescrbe() {  
        return descrbe;  
    }  
  
    public void setDescrbe(String descrbe) {  
        this.descrbe = descrbe;  
    }  
  
    @Override  
    public String toString() {  
        return "KafkaTopicBean [topicName=" + topicName + ", partition=" + partition  
                + ", replication=" + replication + ", descrbe=" + descrbe +"]";  
    }  

}

 

3.KafkaUtil

import java.util.Properties;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;

public class KafkaUtil {
    
     public static void createKafaTopic(String ZkStr,KafkaTopicBean topic) {  
         ZkUtils zkUtils = ZkUtils.
                 apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
         
        AdminUtils.createTopic(zkUtils, topic.getTopicName(),  topic.getPartition(), 
                topic.getReplication(),  new Properties(), new RackAwareMode.Enforced$());  
        zkUtils.close();
    }
     
     public static void deleteKafaTopic(String ZkStr,KafkaTopicBean topic) {  
         ZkUtils zkUtils = ZkUtils.
                 apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
         
        AdminUtils.deleteTopic(zkUtils, topic.getTopicName());  
        zkUtils.close();
    }

}

 

4.调用方式

    public static void main(String[] args) {
        
        //zookeeper地址:端口号
        String ZkStr = "912.168.0.1:2181";    
        
        //topic对象
        KafkaTopicBean topic = new KafkaTopicBean();    
        topic.setTopicName("testTopic");  //topic名称        
        topic.setPartition(1);            //分区数量设置为1
        topic.setReplication(1);         //副本数量设置为1
        
        //创建topic
        KafkaUtil.createKafaTopic(ZkStr,topic);
        //删除topic
        KafkaUtil.deleteKafaTopic(ZkStr,topic);

    }