建立kafka消费类ConsumerRunnable ,实现Runnable接口:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.*; /**
* @Auther: lyl
* @Date: 2019/9/12 16:28
* @Description:
*/
@Slf4j
public class ConsumerRunnable implements Runnable { // 每个线程维护私有的KafkaConsumer实例
private final KafkaConsumer<String, String> consumer; public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
} @Override
public void run() {
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100); // 本例使用100ms作为获取超时时间
for (ConsumerRecord<String, String> record : records) {
// 这里面写处理消息的逻辑
String value = record.value();
if (value.startsWith("obj_vehicle_pass")) {
// System.out.println(value);
value = value.substring(17, value.length());
JSONObject parse = JSONObject.parseObject(value); }
} } catch (Exception e) {
log.error("kafka数据消费异常=============");
e.printStackTrace();
}
}
} catch (Exception e) {
log.error("初始化kafka异常=============");
e.printStackTrace();
}
} }
在编写一个类,用来初始化上面这个类,并通过线程启动
import java.util.ArrayList;
import java.util.List; /**
* @Auther: lyl
* @Date: 2019/9/12 16:29
* @Description:
*/
public class ConsumerGroup {
private List<ConsumerRunnable> consumers; public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
} public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
} }
最后项目启动时先初始化一下ConsumerGroup这个类,在调用一下execute()方法就能进行消费