一 背景
Spark社区为Spark Streaming提供了很多数据源接口,但是有些比较偏的数据源没有覆盖,由于公司技术栈选择,用了阿里云的MQ服务ONS,要做实时需求,要自己编写Receiver
二 技术实现
1.官网的例子已经比较详细,但是进入实践还需要慢慢调试,官方文档。
2.实现代码,由三部分组成,receiver,inputstream,util
3.receiver代码
import java.io.Serializable import java.util.Properties import com.aliyun.openservices.ons.api._ import com.aliyun.openservices.ons.api.impl.ONSFactoryImpl import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class OnsReceiver( cid: String, accessKey: String, secretKey: String, addr: String, topic: String, tag: String, func: Message => Array[Byte]) extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Serializable with Logging { receiver => private var consumer: Consumer = null private var workerThread: Thread = null override def onStart(): Unit = { workerThread = new Thread(new Runnable { override def run(): Unit = { val properties = new Properties properties.put(PropertyKeyConst.ConsumerId, cid) properties.put(PropertyKeyConst.AccessKey, accessKey) properties.put(PropertyKeyConst.SecretKey, secretKey) properties.put(PropertyKeyConst.ONSAddr, addr) properties.put(PropertyKeyConst.MessageModel, "CLUSTERING") properties.put(PropertyKeyConst.ConsumeThreadNums, "50") val onsFactoryImpl = new ONSFactoryImpl consumer = onsFactoryImpl.createConsumer(properties) consumer.subscribe(topic, tag, new MessageListener() { override def consume(message: Message, context: ConsumeContext): Action = { try { receiver.store(func(message)) Action.CommitMessage } catch { case e: Throwable => e.printStackTrace() Action.ReconsumeLater } } }) consumer.start() } }) workerThread.setName(s"Aliyun ONS Receiver $streamId") workerThread.setDaemon(true) workerThread.start() } override def onStop(): Unit = { if (workerThread != null) { if (consumer != null) { consumer.shutdown() } workerThread.join() workerThread = null logInfo(s"Stopped receiver for streamId $streamId") } } }
input代码
import com.aliyun.openservices.ons.api.Message import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver class OnsInputDStream( @transient _ssc: StreamingContext, cid: String, topic: String, tag: String, accessKey: String, secretKey: String, addr:String, func: Message => Array[Byte] ) extends ReceiverInputDStream[Array[Byte]](_ssc) { override def getReceiver(): Receiver[Array[Byte]] = { new OnsReceiver(cid,accessKey,secretKey,addr,topic,tag,func) } }
util代码
import com.aliyun.openservices.ons.api.Message import org.apache.spark.annotation.Experimental import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object OnsUtils { @Experimental def createStream( ssc: StreamingContext, cid: String, topic: String, tag: String, accessKey: String, secretKey: String, addr: String, func: Message => Array[Byte]): ReceiverInputDStream[Array[Byte]] = { new OnsInputDStream(ssc, cid, topic, tag, accessKey, secretKey, addr, func) } @Experimental def createStreams( ssc: StreamingContext, consumerIdTopicTags: Array[(String, String, String)], accessKey: String, secretKey: String, addr: String, func: Message => Array[Byte]): DStream[Array[Byte]] = { val invalidTuples1 = consumerIdTopicTags.groupBy(e => (e._1, e._2)).filter(e => e._2.length > 1) val invalidTuples2 = consumerIdTopicTags.map(e => (e._1, e._2)).groupBy(e => e._1).filter(e => e._2.length > 1) if (invalidTuples1.size > 1 || invalidTuples2.size > 1) { throw new RuntimeException("Inconsistent consumer subscription.") } else { ssc.union(consumerIdTopicTags.map({ case (consumerId, topic, tags) => createStream(ssc, consumerId, topic, tags, accessKey, secretKey, addr, func) })) } } }
三 调用
val stream = (0 until 3).map(i => { OnsUtils.createStream(ssc, "CID", "BI_CALL", "call_log_ons", config.getString("ons.access_key"), config.getString("ons.sercet_key"), config.getString("ons.ons_addr"), func) }) val unionStream = ssc.union(stream).foreachRDD(...)
stream可以决定设置多少个receiver,这个数量必须小于等于spark on yarn的num-executors,内存默认占用executors的内存的一半。