Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。
bin/spark-submit --class Streaming /home/wx/Stream.jar
hadoop fs -put /home/wx/123.txt /user/wx/
文本123.txt
NOTICE:07-26 logId[0072] NOTICE:07-26 logId[0073] NOTICE:07-26 logId[0074] NOTICE:07-26 logId[0075] NOTICE:07-26 logId[0076]
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql.SparkSession object Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("RegexpExtract") val ssc = new StreamingContext(conf, Seconds(1)) println("hello world") val lines = ssc.textFileStream("hdfs://name-ha/user/wx/") val ds = lines.flatMap(_.split("\n")) ds.print() ds.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("str_col") // Create a temporary view wordsDataFrame.createOrReplaceTempView("df") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql(raw""" select str_col, regexp_extract(str_col,"NOTICE:\\d{2}",0) notice, regexp_extract(str_col,"logId\\[(.*?)\\]",0) logId from df""") wordCountsDataFrame.show(false) } ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
执行结果
hello world ------------------------------------------- Time: 1501501752000 ms ------------------------------------------- NOTICE:07-26 logId[0072] NOTICE:07-26 logId[0073] NOTICE:07-26 logId[0074] NOTICE:07-26 logId[0075] NOTICE:07-26 logId[0076] +------------------------+---------+-----------+ |str_col |notice |logId | +------------------------+---------+-----------+ |NOTICE:07-26 logId[0072]|NOTICE:07|logId[0072]| |NOTICE:07-26 logId[0073]|NOTICE:07|logId[0073]| |NOTICE:07-26 logId[0074]|NOTICE:07|logId[0074]| |NOTICE:07-26 logId[0075]|NOTICE:07|logId[0075]| |NOTICE:07-26 logId[0076]|NOTICE:07|logId[0076]| +------------------------+---------+-----------+ ------------------------------------------- Time: 1501501770000 ms -------------------------------------------