在Spark Streaming中,数据抽象是DStream(离散数据流)。底层是靠封装RDD实现,而Spark Mllib是早期的机器学习库,主要也是基于RDD抽象数据集实现的算法。因此在Spark Streaming上想要使用Spark Mllib首先就要获取到DStream对应的RDD,而DStream中可以获取到RDD的方法有如下:
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean): Unit def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]具体方法实现参考源码,观察方法签名我们可以得知foreachRDD是不可以的,因为foreachRDD返回值类型Unit,获取不到结果(但是通过在foreachRDD方法中实现预测结果输出外部存储的话,也是可以实现的)。所以方便实现的就是transform算子,需要一个RDD=>RDD的函数,我们就可以将这个函数定义为预测函数,然后传入(识别模型我们可以通过在加载方式加载离线模型)
具体实现伪代码如下:
import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by Daxin on 2017/8/9. * 本程序主要想要说明如何在Spark Streaming上使用Spark Mllib */ object SparkStreamingMl { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("socketStream") //基于Reciver模式,所以线程数目需要大于1,否则只能接受数据无法处理数据 conf.setMaster("local[*]") //如果设置conf.setMaster("local[1]")的话,将会没有线程负责计算 val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("") //TODO 最终创建一个SocketInputDStream返回 val line = ssc.socketTextStream("node", 9999) /** * 模拟识别的预测逻辑 * * @param rdd * @tparam T * @return */ def predict[T](rdd: RDD[T]): RDD[T] = { //省略预测的逻辑 rdd } /** * 直接定义函数 */ val predictFunc = (rdd: RDD[String]) => { rdd } val func = predict[String] _ //方法转函数,此处主要想复习一下Scala知识点 line.transform(func(_)).print() ssc.start() ssc.awaitTermination() } }
/** * Allows the execution of relational queries, including those expressed in SQL using Spark. * * @groupname dataType Data types * @groupdesc Spark SQL data types. * @groupprio dataType -3 * @groupname field Field * @groupprio field -2 * @groupname row Row * @groupprio row -1 */ package object sql { /** * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting * with the query planner and is not designed to be stable across spark releases. Developers * writing libraries should instead consider using the stable APIs provided in * [[org.apache.spark.sql.sources]] */ @DeveloperApi @InterfaceStability.Unstable type Strategy = SparkStrategy type DataFrame = Dataset[Row] }