Spark-Streaming DirectKafka count 案例

时间:2022-05-16 20:50:13

Spark-Streaming DirectKafka count 统计跟直接 kafka 统计类似,只不过这里使用的是 Direct 的方式,Direct方式使用的 kafka 低级API,不同的地方主要是在 createDirectStream这里。

统计代码如下

package com.hw.streaming

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object DirectKafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <brokers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
//    smallest和from beiginning是一样的
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset"->"smallest"
    )
//    生成Dstream
    val messages = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(",")(1))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // 开始计算
    ssc.start()
    ssc.awaitTermination()

  }

}

启动相关的 flume,kafka,参见:

https://www.cnblogs.com/hanwen1014/p/11260456.html