Kafka消费者 从Kafka中读取数据
最近有需求要从kafak上消费读取实时数据,并将数据中的key输出到文件中,用于发布端的原始点进行比对,以此来确定是否传输过程中有遗漏数据。
不废话,直接上代码,公司架构设计 kafak 上有多个TOPIC,此代码每次需要指定一个TOPIC,一个TOPIC有3个分区Partition,所以消费的时候用多线程,
读取数据过程中直接过滤重复的key点,因为原始推送点有20W的量(可能发生在一秒或者几秒)。当时我直接用的HASHMAP来过滤。
1、ConsumerGroup
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; public class ConsumerGroup {
private List<ConsumerRunnable> consumers; public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList,HashMap<String,String> points) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic, points);
consumers.add(consumerThread);
}
} public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
2、ConsumerRunnable
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties; public class ConsumerRunnable implements Runnable { // 每个线程维护私有的KafkaConsumer实例
private final KafkaConsumer<String, String> consumer; HashMap<String,String> points = new HashMap<>(); public ConsumerRunnable(String brokerList, String groupId, String topic,HashMap<String,String> nodepoint) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自动提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分区副本自动分配策略
points = nodepoint;
} @Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
// System.out.printf("Partition = %s , offset = %d, key = %s, value=%s",record.partition(),record.offset(),record.key(),record.value()); JsonParser parse = new JsonParser();
JsonObject jsonObject = (JsonObject) parse.parse(record.value());
JsonArray jsonArray = jsonObject.get("list").getAsJsonArray();
for (int i=0 ;i <jsonArray.size();i++){
JsonObject subject = jsonArray.get(i).getAsJsonObject();
String cedian = subject.get("id").getAsString().trim();
if(points.containsKey(cedian) == false){
points.put(cedian,cedian);
WriterDataFile.writeData(cedian);
} // System.out.println(subject.get("id").getAsString());
}
}
}
}
}
3、ConsumerTest
import java.util.HashMap; public class ConsumerTest { public static void main(String[] args) {
String brokerList = "172.16.10.22:9092,172.16.10.23:9092,172.16.10.21:9092";
String groupId = "test20190722";
String topic = "SDFD";
int consumerNum = 3; HashMap<String,String> points = new HashMap<>(); ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList,points);
consumerGroup.execute(); }
}
4、WriterDataFile
import java.io.*;
import java.util.HashMap; public class WriterDataFile { private static String path = "E:\\kafkadata_SDFD.txt"; public static void writeData(String strvalue){
FileWriter fw ;
try {
fw = new FileWriter(path,true);
BufferedWriter bw = new BufferedWriter(fw);
bw.write(strvalue+"\r\n");
bw.flush();
bw.close();
fw.close();
} catch (IOException e) {
e.printStackTrace();
} } }
都是基础写法,没有时间整理,如有不合理处请谅解。
码字不易...