消息中间件-Kafka3-kafkaJavaClient小例
- Kafak Java Client
private static final String KAFKA_TOPIC = "kafak-test";
private static String bootstrapServers = "localhost:9092";
private static AdminClient client = null;
static {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
client = AdminClient.create(config);
}
在pom.xml 添加kafka client依赖
- 客户端创建主题
@Test
public void createTopic() {
try {
NewTopic topic = new NewTopic(KAFKA_TOPIC, 1, (short) 1);
// 提交创建topic请求
client.createTopics(Collections.singleton(topic)).all().get();
System.out.println("Topic created successfully");
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (client != null) client.close();
}
}
- 客户端获取主题
@Test
public void fetchTopics() {
try {
ListTopicsResult result = client.listTopics();
KafkaFuture<Set<String>> set = result.names();
System.out.println(set.get());
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (client != null) client.close();
}
}
- 客户端生产者发送消息
@Test
public void produceMsg() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
// 创建生产者实例
producer = new KafkaProducer<>(props);
// 发送消息
producer.send(new ProducerRecord<>(KAFKA_TOPIC, "MSG-1005","Hello, 1005!"), (metadata, exception) ->{
if (exception == null) {
System.out.println("消息发送成功,主题:" + metadata.topic() + ", 分区:" + metadata.partition());
}
else {
exception.printStackTrace();
}
});
}
catch (Exception e) {
e.printStackTrace();
}
finally {
// 关闭生成者
if (producer != null) producer.close();
}
}
- 客户端消费者消费消息
@Test
public void consumeMsg() {
// 配置消费者
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "kafka-consumer-group-001");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = null;
try {
// 创建消费者实例
consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Arrays.asList(KAFKA_TOPIC));
// 轮询消费消息
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 每100ms执行一次
for (ConsumerRecord record : records) {
System.out.printf("Offset: %d, Key: %s, Value: %s\n", record.offset(), record.key(), record.value());
}
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (consumer != null) consumer.close();
}
}
后续将使用这些测试小例来调试Kafka源码,当然也可以执行Kafka自带的可执行脚本与kafka交互,进行源码分析,只是通过java代码的方式更加直观。