Zeppelin 自定义Kafka解释器Interpreter开发

时间:2021-07-23 17:10:51

Zeppelin 自定义Kafka解释器Interpreter开发

自定义解释器开发步骤见下文:

Zeppelin 自定义interpreter插件开发

这里主要介绍自定义的Interpreter类:

一. 生产者解释器 KafkaProducerInterpreter:

public class KafkaProducerInterpreter extends Interpreter {
private KafkaProducer kafkaProducer;

private Logger logger = LoggerFactory.getLogger(KafkaProducerInterpreter.class);

public KafkaProducerInterpreter(Properties property) {
super(property);
}

public void open() {
//Producer 配置
Properties producerProp = new Properties();
producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,getProperty("bootstrap.servers"));
//每当多个记录被发送到同一个分区时,生产者将尝试将记录批处理更少-16M
producerProp.put(ProducerConfig.BATCH_SIZE_CONFIG,getProperty("batch.size"));
//生产者将在请求传输之间到达的任何记录组合成一个批量请求的时间
producerProp.put(ProducerConfig.LINGER_MS_CONFIG,getProperty("linger.ms"));
//生产者可以用来缓冲等待发送到服务器的记录的总字节数-32M
producerProp.put(ProducerConfig.BUFFER_MEMORY_CONFIG,getProperty("buffer.memory"));
producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,getProperty("key.serializer"));
producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,getProperty("value.serializer"));

kafkaProducer = new KafkaProducer<String, String>(producerProp);
}

public void close() {

}

public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
//消息格式
// 控制台:
// producer --topic topic名字 --Message:内容
// 文件:
// producer --topic topic名字 --File:路径

String [] parts = cmd.split("--");

//判断字符数组长度
if(parts.length < 3){
String error ="Error Message Format\n";
error += "producer --topic topicName --Message:context OR \n";
error += "producer --topic topicName --File:path";
return new InterpreterResult(InterpreterResult.Code.ERROR,error);
}

String topic = parts[1].substring(6,parts[1].length()-1);//topicName
String sendSource = parts[2].split(":")[0];//Message or File

//发消息
if (parts[0].trim().equals("producer")){
// 控制台
if(sendSource.equals("Message")){
String mes = parts[2].substring(8,parts[2].length());
kafkaProducer.send(new ProducerRecord<String, String>(topic,null,mes));
return new InterpreterResult(InterpreterResult.Code.SUCCESS,"Send Succeed: "+mes);

// 文件
}else if(sendSource.equals("File")){
String path = parts[2].substring(5,parts[2].length());

File file = new File(path);

if (!file.exists()){
return new InterpreterResult(InterpreterResult.Code.ERROR,"Path not exist: "+path);
}
//读取文件信息
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
String line = null;
while ((line = reader.readLine()) != null){
kafkaProducer.send(new ProducerRecord<String, String>(topic,null,line));
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
logger.error(e.getMessage());
}

return new InterpreterResult(InterpreterResult.Code.SUCCESS,"Send Succeed: "+path);

//输入有误
}else {
String error ="Error Message Source\n";
error += "Send Source only support Message or File";
return new InterpreterResult(InterpreterResult.Code.ERROR,error);
}
}else {
return new InterpreterResult(InterpreterResult.Code.ERROR,"Producer can`t Analysis "+parts[0].trim());
}
}

public void cancel(InterpreterContext interpreterContext) {

}

public FormType getFormType() {
return FormType.SIMPLE;
}

public int getProgress(InterpreterContext interpreterContext) {
return 0;
}
}

二. 消费者解释器 KafkaConsumerInterpreter:

public class KafkaConsumerInterpreter extends Interpreter {
private KafkaConsumer kafkaConsumer;

private Logger logger = LoggerFactory.getLogger(KafkaConsumerInterpreter.class);

private boolean isTerminated;

public KafkaConsumerInterpreter(Properties property) {
super(property);
}

public void open() {

//Consumer 配置
Properties consumerProp = new Properties();
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,getProperty("bootstrap.servers"));
consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG,getProperty("group.id"));
consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,getProperty("auto.commit.interval.ms"));
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,getProperty("auto.offset.reset"));
consumerProp.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,getProperty("session.timeout.ms"));
consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,getProperty("key.deserializer"));
consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,getProperty("value.deserializer"));

kafkaConsumer = new KafkaConsumer(consumerProp);

isTerminated = false;

}

public void close() {
isTerminated = true;
}

public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
if(isTerminated){
isTerminated = false;
}
//消息格式:
//consumer --topic topicName
String [] parts = cmd.split("--");
//发消息
if (parts[0].trim().equals("consumer") || parts.length == 2){
String topic = parts[1].substring(6, parts[1].length());

//Consumer一直在读信息
while (!isTerminated){
kafkaConsumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String,String> records = kafkaConsumer.poll(1000);
try {
for(ConsumerRecord<String,String> record:records){
String mes = "Received Message:(" + record.key() + "," + record.value() + ") " +
"at partition " + record.partition() +", offset " + record.offset();
//利用InterpreterOutput流将数据打印到前段
interpreterContext.out().write(mes+"\n");
}
interpreterContext.out().flush();
} catch (IOException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}
}
return new InterpreterResult(InterpreterResult.Code.SUCCESS,"Stop Consuming");
}else {
String error ="Error Message Format\n";
error += "consumer --topic topicName";
return new InterpreterResult(InterpreterResult.Code.ERROR,error);
}

}

public void cancel(InterpreterContext interpreterContext) {
isTerminated = true;
}

public FormType getFormType() {
return FormType.NONE;
}

public int getProgress(InterpreterContext interpreterContext) {
return 0;
}
}