Spark Streaming之dataset实例

时间:2022-07-24 20:49:05

  Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。

  bin/spark-submit --class Streaming /home/wx/Stream.jar
  hadoop fs -put /home/wx/123.txt /user/wx/

文本123.txt

NOTICE:07-26 logId[0072]
NOTICE:07-26 logId[0073]
NOTICE:07-26 logId[0074]
NOTICE:07-26 logId[0075]
NOTICE:07-26 logId[0076]

 

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SparkSession

object Streaming {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("RegexpExtract")
    val ssc = new StreamingContext(conf, Seconds(1))

    println("hello world")

    val lines = ssc.textFileStream("hdfs://name-ha/user/wx/")

    val ds = lines.flatMap(_.split("\n"))

    ds.print()

    ds.foreachRDD { rdd =>

      // Get the singleton instance of SparkSession
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      // Convert RDD[String] to DataFrame
      val wordsDataFrame = rdd.toDF("str_col")

      // Create a temporary view
      wordsDataFrame.createOrReplaceTempView("df")

      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame =
        spark.sql(raw"""
          select str_col,
          regexp_extract(str_col,"NOTICE:\\d{2}",0) notice,
          regexp_extract(str_col,"logId\\[(.*?)\\]",0) logId 
          from df""")
      wordCountsDataFrame.show(false)
    }

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}

 

执行结果

hello world
-------------------------------------------
Time: 1501501752000 ms
-------------------------------------------

NOTICE:07-26 logId[0072]
NOTICE:07-26 logId[0073]
NOTICE:07-26 logId[0074]
NOTICE:07-26 logId[0075]
NOTICE:07-26 logId[0076]

+------------------------+---------+-----------+
|str_col                 |notice   |logId      |
+------------------------+---------+-----------+
|NOTICE:07-26 logId[0072]|NOTICE:07|logId[0072]|
|NOTICE:07-26 logId[0073]|NOTICE:07|logId[0073]|
|NOTICE:07-26 logId[0074]|NOTICE:07|logId[0074]|
|NOTICE:07-26 logId[0075]|NOTICE:07|logId[0075]|
|NOTICE:07-26 logId[0076]|NOTICE:07|logId[0076]|
+------------------------+---------+-----------+

-------------------------------------------
Time: 1501501770000 ms
-------------------------------------------