将Scala Spark Streaming数据保存到MongoDB

时间:2022-04-28 20:52:49

Here's my simplified Apache Spark Streaming code which gets input via Kafka Streams, combine, print and save them to a file. But now i want the incoming stream of data to be saved in MongoDB.

这是我简化的Apache Spark Streaming代码,它通过Kafka Streams获取输入,组合,打印并将它们保存到文件中。但现在我想将传入的数据流保存在MongoDB中。

val conf = new SparkConf().setMaster("local[*]")
                          .setAppName("StreamingDataToMongoDB")
                          .set("spark.streaming.concurrentJobs", "2")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topicName1 = List("KafkaSimple").toSet
val topicName2 = List("SimpleKafka").toSet

val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName1)
val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName2)

val lines1 = stream1.map(_._2)
val lines2 = stream2.map(_._2)

val allThelines = lines1.union(lines2)
allThelines.print()
allThelines.repartition(1).saveAsTextFiles("File", "AllTheLinesCombined")

I have tried Stratio Spark-MongoDB Library and some other resources but still no success. Someone please help me proceed or redirect me to some useful working resource/tutorial. Cheers :)

我已经尝试过Stratio Spark-MongoDB库和其他一些资源,但仍然没有成功。有人请帮我继续或重定向到一些有用的工作资源/教程。干杯:)

2 个解决方案

#1


0  

If you want to write out to a format which isn't directly supported on DStreams you can use foreachRDD to write out each batch one-by-one using the RDD based API for Mongo.

如果要写出DStreams不直接支持的格式,可以使用foreachRDD使用基于RDD的Mongo API逐个写出每个批处理。

#2


0  

lines1.foreachRDD ( rdd => {
      rdd.foreach( data =>
        if (data != null) {

            // Save data here

        } else {

          println("Got no data in this window")

        }
      )
    })

Do same for lines2.

对于lines2也一样。

#1


0  

If you want to write out to a format which isn't directly supported on DStreams you can use foreachRDD to write out each batch one-by-one using the RDD based API for Mongo.

如果要写出DStreams不直接支持的格式,可以使用foreachRDD使用基于RDD的Mongo API逐个写出每个批处理。

#2


0  

lines1.foreachRDD ( rdd => {
      rdd.foreach( data =>
        if (data != null) {

            // Save data here

        } else {

          println("Got no data in this window")

        }
      )
    })

Do same for lines2.

对于lines2也一样。