Spark Streaming自定义Receiver

时间:2021-08-22 20:55:45

一 背景

Spark社区为Spark Streaming提供了很多数据源接口,但是有些比较偏的数据源没有覆盖,由于公司技术栈选择,用了阿里云的MQ服务ONS,要做实时需求,要自己编写Receiver

二 技术实现

1.官网的例子已经比较详细,但是进入实践还需要慢慢调试,官方文档

2.实现代码,由三部分组成,receiver,inputstream,util

3.receiver代码

import java.io.Serializable
import java.util.Properties

import com.aliyun.openservices.ons.api._
import com.aliyun.openservices.ons.api.impl.ONSFactoryImpl
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class OnsReceiver(
    cid: String,
    accessKey: String,
    secretKey: String,
    addr: String,
    topic: String,
    tag: String,
    func: Message => Array[Byte])
  extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Serializable with Logging {
  receiver =>

  private var consumer: Consumer = null
  private var workerThread: Thread = null

  override def onStart(): Unit = {
    workerThread = new Thread(new Runnable {
      override def run(): Unit = {
        val properties = new Properties
        properties.put(PropertyKeyConst.ConsumerId, cid)
        properties.put(PropertyKeyConst.AccessKey, accessKey)
        properties.put(PropertyKeyConst.SecretKey, secretKey)
        properties.put(PropertyKeyConst.ONSAddr, addr)
        properties.put(PropertyKeyConst.MessageModel, "CLUSTERING")
        properties.put(PropertyKeyConst.ConsumeThreadNums, "50")
        val onsFactoryImpl = new ONSFactoryImpl
        consumer = onsFactoryImpl.createConsumer(properties)
        consumer.subscribe(topic, tag, new MessageListener() {
          override def consume(message: Message, context: ConsumeContext): Action = {
            try {
              receiver.store(func(message))
              Action.CommitMessage
            } catch {
              case e: Throwable => e.printStackTrace()
                Action.ReconsumeLater
            }
          }
        })
        consumer.start()
      }
    })
    workerThread.setName(s"Aliyun ONS Receiver $streamId")
    workerThread.setDaemon(true)
    workerThread.start()
  }

  override def onStop(): Unit = {
    if (workerThread != null) {
      if (consumer != null) {
        consumer.shutdown()
      }

      workerThread.join()
      workerThread = null
      logInfo(s"Stopped receiver for streamId $streamId")
    }
  }
}

input代码

import com.aliyun.openservices.ons.api.Message
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class OnsInputDStream(
    @transient _ssc: StreamingContext,
    cid: String,
    topic: String,
    tag: String,
    accessKey: String,
    secretKey: String,
    addr:String,
    func: Message => Array[Byte]
  ) extends ReceiverInputDStream[Array[Byte]](_ssc) {

  override def getReceiver(): Receiver[Array[Byte]] = {
    new OnsReceiver(cid,accessKey,secretKey,addr,topic,tag,func)
  }

}

util代码

import com.aliyun.openservices.ons.api.Message
import org.apache.spark.annotation.Experimental
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object OnsUtils {
  @Experimental
  def createStream(
                    ssc: StreamingContext,
                    cid: String,
                    topic: String,
                    tag: String,
                    accessKey: String,
                    secretKey: String,
                    addr: String,
                    func: Message => Array[Byte]): ReceiverInputDStream[Array[Byte]] = {
    new OnsInputDStream(ssc, cid, topic, tag, accessKey, secretKey, addr, func)
  }

  @Experimental
  def createStreams(
                     ssc: StreamingContext,
                     consumerIdTopicTags: Array[(String, String, String)],
                     accessKey: String,
                     secretKey: String,
                     addr: String,
                     func: Message => Array[Byte]): DStream[Array[Byte]] = {
    val invalidTuples1 = consumerIdTopicTags.groupBy(e => (e._1, e._2)).filter(e => e._2.length > 1)
    val invalidTuples2 = consumerIdTopicTags.map(e => (e._1, e._2)).groupBy(e => e._1).filter(e => e._2.length > 1)
    if (invalidTuples1.size > 1 || invalidTuples2.size > 1) {
      throw new RuntimeException("Inconsistent consumer subscription.")
    } else {
      ssc.union(consumerIdTopicTags.map({
        case (consumerId, topic, tags) =>
          createStream(ssc, consumerId, topic, tags, accessKey, secretKey, addr, func)
      }))
    }
  }

}

 

三 调用

val stream = (0 until 3).map(i => {
      OnsUtils.createStream(ssc,
        "CID",
        "BI_CALL",
        "call_log_ons",
        config.getString("ons.access_key"),
        config.getString("ons.sercet_key"),
        config.getString("ons.ons_addr"),
        func)
    })
    val unionStream = ssc.union(stream).foreachRDD(...)

stream可以决定设置多少个receiver,这个数量必须小于等于spark on yarn的num-executors,内存默认占用executors的内存的一半。