同步生产消息到Kafka中
接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。
导入Maven Kafka POM依赖
-
<!-- kafka客户端工具 -->
-
<dependency>
-
<groupId></groupId>
-
<artifactId>kafka-clients</artifactId>
-
<version>2.4.1</version>
-
</dependency>
创建KafkaProducerTest类。
可以参考以下方式来编写第一个Kafka示例程序
1.创建用于连接Kafka的Properties配置
2.创建一个生产者对象KafkaProducer
3.调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
4.再调用一个()方法等待响应
5.关闭生产者
参考代码:
-
-
import ;
-
import ;
-
import ;
-
-
import ;
-
import ;
-
import ;
-
-
public class KafkaProducerTest {
-
public static void main(String[] args) {
-
// 1. 创建用于连接Kafka的Properties配置
-
Properties props=new Properties();
-
("","192.168.2.3:9092");
-
("acks","all");
-
("","");
-
("","");
-
// 2. 创建一个生产者对象KafkaProducer
-
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
-
// 3. 调用send发送1-100消息到指定Topic test
-
for(int i = 0; i < 100; ++i) {
-
try {
-
// 获取返回值Future,该对象封装了返回值
-
Future<RecordMetadata> future = (new ProducerRecord<String, String>("test", null, i + ""));
-
// 调用一个()方法等待响应
-
();
-
} catch (InterruptedException e) {
-
();
-
} catch (ExecutionException e) {
-
();
-
}
-
}
-
-
// 5. 关闭生产者
-
();
-
}
-
}
从Kafka的topic中消费消息
从 test topic中,将消息都消费,并将记录的offset、key、value打印出来
创建KafkaConsumerTest类
1.创建Kafka消费者配置
-
Properties props = new Properties();
-
("", "192.168.2.3:9092");
-
("", "test");
-
("", "true");
-
("", "1000");
-
("", "");
-
("", "");
2.创建Kafka消费者
3.订阅要消费的主题
4.使用一个while循环,不断从Kafka的topic中拉取消息
5.将将记录(record)的offset、key、value都打印出来
参考代码
-
-
import ;
-
import ;
-
import ;
-
-
import ;
-
import ;
-
import ;
-
-
public class KafkaConsumerTest {
-
public static void main(String[] args) {
-
// 1. 创建用于连接Kafka的Properties配置
-
Properties props = new Properties();
-
("", "192.168.2.3:9092");
-
("", "test-group");
-
("", ()); // 设置键的反序列化器
-
("", ()); // 设置值的反序列化器
-
-
// 2. 创建一个消费者对象 KafkaConsumer
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
-
// 3. 订阅主题
-
(("test"));
-
-
// 4. 拉取消息
-
try {
-
while (true) {
-
ConsumerRecords<String, String> records = ((100));
-
(record -> {
-
("offset = %d, key = %s, value = %s%n", (), (), ());
-
});
-
}
-
} catch (Exception e) {
-
();
-
} finally {
-
// 5. 关闭消费者
-
();
-
}
-
}
-
}
异步使用带有回调函数方法生产消息
如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。
需求:
- 在发送消息出现异常时,能够及时打印出异常信息
- 在发送消息成功时,打印Kafka的topic名字、分区id、offset
-
import ;
-
import ;
-
import ;
-
-
import ;
-
import ;
-
import ;
-
import ;
-
-
public class KafkaProducerTest {
-
public static void main(String[] args) {
-
// 1. 创建用于连接Kafka的Properties配置
-
Properties props = new Properties();
-
("", "192.168.2.3:9092");
-
("acks", "all");
-
("", "");
-
("", "");
-
-
// 2. 创建一个生产者对象KafkaProducer
-
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
-
// 3. 调用send发送1-100消息到指定Topic test
-
for (int i = 0; i < 100; ++i) {
-
// 一、同步方式
-
// 获取返回值Future,该对象封装了返回值
-
// Future<RecordMetadata> future = (new ProducerRecord<String, String>("test", null, i + ""));
-
// 调用一个()方法等待响应
-
// ();
-
-
// 二、带回调函数异步方式
-
(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
-
@Override
-
public void onCompletion(RecordMetadata metadata, Exception exception) {
-
if (exception != null) {
-
("发送消息出现异常");
-
} else {
-
String topic = ();
-
int partition = ();
-
long offset = ();
-
-
("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
-
}
-
}
-
});
-
}
-
-
-
// 5. 关闭生产者
-
();
-
}
-
}