spark streaming 输出数据到kafka

时间:2021-05-14 20:49:23

一般都使用spark streaming从kafka 中消费数据,然后写到其他存储;项目中需要从kafka topic中读数据然后经过 spark streaming 处理后回写到kafka 另一个topic,此处记录下实现方法。

环境:

spark:1.6.1
stremaing-kafka: spark-streaming-kafka_2.10,1.6.1

本例中,每个executor上存在一个单例的kafka producer,message在每个executor本地并行输出;当然,如果性能不满足需求,可以考虑再在executor上使用kafka producer pool,实现executor间并行,executor并发输出。

核心代码

package org.frey.example;

import com.google.common.collect.Lists;
import org.frey.example.KafkaProducer;
import kafka.producer.KeyedMessage;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

/**
* KafkaOutput
*
* @author FREY
* @date 2016/4/28
*/

public class KafkaOutput {

static final String KAFKA_CONSUMER_GROUP_ID = "streaming-kafka-output";

public static void main(String[] args) {

String topic = "origin-topic";

HashSet<String> topicSet = new HashSet<String>();
topicSet.add(topic);

final HashMap<String, String> kafkaParam = new HashMap<String, String>();
kafkaParam.put("metadata.broker.list", "broker_001:9092,broker_002:9092");
kafkaParam.put("group.id", KAFKA_CONSUMER_GROUP_ID);

// 初始化javaStreamingContext
SparkConf sparkConf = new SparkConf().setAppName("streaming-kafka-output-app");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5 * 1000));

// 广播brokerlist
final Broadcast<String> brokerListBroadcast = jssc.sparkContext().broadcast("broker_001:9092,broker_002:9092");
// 广播topic
final Broadcast<String> topicBroadcast = jssc.sparkContext().broadcast("dest-topic");

// <topic, message> pair stream
JavaPairDStream<String, String> kafkaPairDStream = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParam, topicSet);

// message stream
JavaDStream<String> message = kafkaPairDStream.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> v1) throws Exception {
return v1._2();
}
});

/*
..............
其他业务逻辑
.............

*/


// 输出到kafka
message.foreachRDD(new VoidFunction<JavaRDD<String>>() {
public void call(final JavaRDD<String> v1) throws Exception {

v1.foreachPartition(new VoidFunction<Iterator<String>>() {

public void call(Iterator<String> stringIterator) throws Exception {

// 得到单例的 kafka producer
// (是在每个executor上单例,job中producer数目与executor数目相同,并行输出,性能较好)
KafkaProducer kafkaProducer = KafkaProducer.getInstance(brokerListBroadcast.getValue());

// 批量发送 推荐
List<KeyedMessage<String, String>> messageList = Lists.newArrayList();
while (stringIterator.hasNext()) {
messageList.add(new KeyedMessage<String, String>(topicBroadcast.getValue(), stringIterator.next()));
}
kafkaProducer.send(messageList);

// 逐条发送
/*
while (stringIterator.hasNext()) {
kafkaProducer.send(new KeyedMessage<String, String>(topicBroadcast.getValue(), stringIterator.next()));
}
*/

}

});

}
});

jssc.start();
jssc.awaitTermination();

}


}

kafka producer

package org.frey.example;

import com.google.common.base.Preconditions;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.List;
import java.util.Properties;

/**
* KafkaProducer
*
* @author FREY
* @date 2016/4/28
*/

public class KafkaProducer implements Serializable {

public static final String METADATA_BROKER_LIST_KEY = "metadata.broker.list";
public static final String SERIALIZER_CLASS_KEY = "serializer.class";
public static final String SERIALIZER_CLASS_VALUE = "kafka.serializer.StringEncoder";

private static KafkaProducer instance = null;

private Producer producer;

private KafkaProducer(String brokerList) {

Preconditions.checkArgument(StringUtils.isNotBlank(brokerList), "kafka brokerList is blank...");

// set properties
Properties properties = new Properties();
properties.put(METADATA_BROKER_LIST_KEY, brokerList);
properties.put(SERIALIZER_CLASS_KEY, SERIALIZER_CLASS_VALUE);
properties.put("kafka.message.CompressionCodec", "1");
properties.put("client.id", "streaming-kafka-output");
ProducerConfig producerConfig = new ProducerConfig(properties);
this.producer = new Producer(producerConfig);
}

public static synchronized KafkaProducer getInstance(String brokerList) {
if (instance == null) {
instance = new KafkaProducer(brokerList);
System.out.println("初始化 kafka producer...");
}
return instance;
}

// 单条发送
public void send(KeyedMessage<String, String> keyedMessage) {
producer.send(keyedMessage);
}

// 批量发送
public void send(List<KeyedMessage<String, String>> keyedMessageList) {
producer.send(keyedMessageList);
}

public void shutdown() {
producer.close();
}


}