虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的。对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可。而Receiver的实现只需要简单地实现两个方法:
1、onStart():接收数据。
2、onStop():停止接收数据。
一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收。而onStop()方法负责确保这些接收数据的线程是停止的,在
Receiver
被关闭时调用了,可以做一些 close 工作。负责接收数据的线程可以通过isStopped()来判断是否要停止数据接收。
对于接收到的数据,需要存储到spark 框架里,使用的是store()方法。Receiver抽象类提供了4种store()方法,分别可用于存储:
1、
单条小数据
2、数组形式的块数据
3、ByteBuffer 形式的块数据
4、iterator 形式的块数据
这4种的store()方法的实现都是直接将数据传递给 ReceiverSupervisor 来进行存储的。所以要自定义一个 Receiver,只要在 onStart() 里创建数据接收的线程,并在接收到数据时 store() 到 Spark Streamimg 框架就可以了。
下面代码是一个基于xmemcached协议消息队列的Spark Streaming 接收器:
import Fqueue.FqueueReceiver import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class FqueueStreamingReceiver(val address: String, val connectionPoolSize: Int, val timeOut: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { private var receiver: Option[FqueueReceiver] = None def onStart() { new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop(): Unit = { receiver foreach { _.stop() } } private def receive(): Unit = { val fqueueReceiver = new FqueueReceiver(address, connectionPoolSize, timeOut) receiver = Some(fqueueReceiver) receiver foreach { _.connect() } try { var stop = false while (!isStopped() && !stop) { val data = fqueueReceiver.deQueue("track_BOdao2015*") data match { case Some(str) => store(str) case None => Thread.sleep(1000)//stop = true } } receiver foreach { _.stop() } } catch { case e: Exception => println("get data from fqueue err! pleace sure the server is live") println(e.getMessage) println(e.getStackTraceString) receiver foreach { _.stop() } } } }
在自定义了spark streaming的Receiver后,可以在应用中使用:
def main(args: Array[String]) { new Thread("fqueue sender") { override def run() { sendData() } }.start() val config = new SparkConf().setAppName("testfqueue").setMaster("local[2]") val ssc = new StreamingContext(config, Seconds(5)) val lines = ssc.receiverStream(new FqueueStreamingReceiver("localhost:18740", 4, 4000)) lines.print() ssc.start() ssc.awaitTermination() }
链接:
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
喜欢的话github项目上送个星星(⊙o⊙)哦........您的star是我的动力!