一般都使用spark streaming从kafka 中消费数据,然后写到其他存储;项目中需要从kafka topic中读数据然后经过 spark streaming 处理后回写到kafka 另一个topic,此处记录下实现方法。
环境:
spark:1.6.1
stremaing-kafka: spark-streaming-kafka_2.10,1.6.1
本例中,每个executor上存在一个单例的kafka producer,message在每个executor本地并行输出;当然,如果性能不满足需求,可以考虑再在executor上使用kafka producer pool,实现executor间并行,executor并发输出。
核心代码
package org.frey.example;
import com.google.common.collect.Lists;
import org.frey.example.KafkaProducer;
import kafka.producer.KeyedMessage;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
/**
* KafkaOutput
*
* @author FREY
* @date 2016/4/28
*/
public class KafkaOutput {
static final String KAFKA_CONSUMER_GROUP_ID = "streaming-kafka-output";
public static void main(String[] args) {
String topic = "origin-topic";
HashSet<String> topicSet = new HashSet<String>();
topicSet.add(topic);
final HashMap<String, String> kafkaParam = new HashMap<String, String>();
kafkaParam.put("metadata.broker.list", "broker_001:9092,broker_002:9092");
kafkaParam.put("group.id", KAFKA_CONSUMER_GROUP_ID);
// 初始化javaStreamingContext
SparkConf sparkConf = new SparkConf().setAppName("streaming-kafka-output-app");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5 * 1000));
// 广播brokerlist
final Broadcast<String> brokerListBroadcast = jssc.sparkContext().broadcast("broker_001:9092,broker_002:9092");
// 广播topic
final Broadcast<String> topicBroadcast = jssc.sparkContext().broadcast("dest-topic");
// <topic, message> pair stream
JavaPairDStream<String, String> kafkaPairDStream = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParam, topicSet);
// message stream
JavaDStream<String> message = kafkaPairDStream.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> v1) throws Exception {
return v1._2();
}
});
/*
..............
其他业务逻辑
.............
*/
// 输出到kafka
message.foreachRDD(new VoidFunction<JavaRDD<String>>() {
public void call(final JavaRDD<String> v1) throws Exception {
v1.foreachPartition(new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> stringIterator) throws Exception {
// 得到单例的 kafka producer
// (是在每个executor上单例,job中producer数目与executor数目相同,并行输出,性能较好)
KafkaProducer kafkaProducer = KafkaProducer.getInstance(brokerListBroadcast.getValue());
// 批量发送 推荐
List<KeyedMessage<String, String>> messageList = Lists.newArrayList();
while (stringIterator.hasNext()) {
messageList.add(new KeyedMessage<String, String>(topicBroadcast.getValue(), stringIterator.next()));
}
kafkaProducer.send(messageList);
// 逐条发送
/*
while (stringIterator.hasNext()) {
kafkaProducer.send(new KeyedMessage<String, String>(topicBroadcast.getValue(), stringIterator.next()));
}
*/
}
});
}
});
jssc.start();
jssc.awaitTermination();
}
}
kafka producer
package org.frey.example;
import com.google.common.base.Preconditions;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.List;
import java.util.Properties;
/**
* KafkaProducer
*
* @author FREY
* @date 2016/4/28
*/
public class KafkaProducer implements Serializable {
public static final String METADATA_BROKER_LIST_KEY = "metadata.broker.list";
public static final String SERIALIZER_CLASS_KEY = "serializer.class";
public static final String SERIALIZER_CLASS_VALUE = "kafka.serializer.StringEncoder";
private static KafkaProducer instance = null;
private Producer producer;
private KafkaProducer(String brokerList) {
Preconditions.checkArgument(StringUtils.isNotBlank(brokerList), "kafka brokerList is blank...");
// set properties
Properties properties = new Properties();
properties.put(METADATA_BROKER_LIST_KEY, brokerList);
properties.put(SERIALIZER_CLASS_KEY, SERIALIZER_CLASS_VALUE);
properties.put("kafka.message.CompressionCodec", "1");
properties.put("client.id", "streaming-kafka-output");
ProducerConfig producerConfig = new ProducerConfig(properties);
this.producer = new Producer(producerConfig);
}
public static synchronized KafkaProducer getInstance(String brokerList) {
if (instance == null) {
instance = new KafkaProducer(brokerList);
System.out.println("初始化 kafka producer...");
}
return instance;
}
// 单条发送
public void send(KeyedMessage<String, String> keyedMessage) {
producer.send(keyedMessage);
}
// 批量发送
public void send(List<KeyedMessage<String, String>> keyedMessageList) {
producer.send(keyedMessageList);
}
public void shutdown() {
producer.close();
}
}