如何从JSONArray提取每个JSONobject并保存到spark流中的cassandra

时间:2021-02-21 20:48:58

I'm trying to get kafka streaming data which is JSONArray in spark streaming, each JSONArray contain several JSONObject.

我正在尝试获取kafka流数据,它是spark流中的JSONArray,每个JSONArray包含几个JSONObject。

I want to save each JSONObject into datadrames, and save to cassandra table after mapping with the other table.

我想将每个JSONObject保存到datadrames中,并在与另一个表进行映射后保存到cassandra表。

I've tried to create dataframe to save JSONObject, but when I create dataframe in stream.foreachRDD, it throws out NullPointerException. Is it because spark doesn't support nested RDD? If so, how do I save JSONObject to cassandra?

我尝试创建dataframe以保存JSONObject,但是在流中创建dataframe时。foreachRDD抛出NullPointerException。是因为spark不支持嵌套RDD吗?如果是这样,如何将JSONObject保存到cassandra?

Data format is as below:

数据格式如下:

[  
   {  
      "temperature":"21.8",
      "humidity":"65.6",
      "creatime":"2016-11-14 13:50:24",
      "id":"3303136",
      "msgtype":"th",
      "sensorID":"001"
   },
   {  
      "temperature":"23.1",
      "humidity":"60.6",
      "creatime":"2016-11-14 13:50:24",
      "id":"3303137",
      "msgtype":"th",
      "sensorID":"002"
   }
]

My Code:

我的代码:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector._

import org.apache.spark.SparkConf
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import net.sf.json.JSONObject
import net.sf.json.JSONArray

object getkafkadata {

  def main(args: Array[String]) {

    val cassandraHostIP = "10.2.1.67"
    val keyspaceToGet = "iot_test"

    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("PageViewStream")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.cassandra.connection.host", cassandraHostIP)
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val sqc = new SQLContext(sc)

    val sqlContext = SQLContextSingleton.getInstance(sc)
    import sqlContext.implicits._

    val cc = new CassandraSQLContext(sc)
    cc.setKeyspace(keyspaceToGet)

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "10.2.1.67:6667",
      "group.id" -> "a13",
      "auto.offset.reset" -> "smallest")

    val topics = Set("test1208")
    println("kafkaParams=", kafkaParams, "topics=", topics)

    val offsetsList = 0
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    println("Line3 good!")

    println("Start to parse json...")

    val datas = stream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(line => {
          val event = JSONArray.fromObject(line._2)
          for (n <- 0 to event.size() - 1) {
            val eventobj = event.getJSONObject(n)

            println("======= Message =======")
            println(eventobj.toString())

            //data lost exception handling
            var sensorID = "no_data"
            var humidity = "0"
            var temperature = "0"
            var msgtype = "no_data"
            var creatime = "0"
            var id = "no_data"

            if (eventobj.has("sensorID"))
              sensorID = eventobj.getString("sensorID")
            if (eventobj.has("humidity"))
              humidity = eventobj.getString("humidity")
            if (eventobj.has("temperature"))
              temperature = eventobj.getString("temperature")
            if (eventobj.has("msgtype"))
              msgtype = eventobj.getString("msgtype")
            if (eventobj.has("creatime"))
              creatime = eventobj.getString("creatime")
            if (eventobj.has("id"))
              id = eventobj.getString("id")

            var df = cc.createDataFrame(Seq(
              (sensorID, humidity, temperature, msgtype, creatime, id)))
              .toDF("sensorID", "humidity", "temperature", "msgtype", "creatime", "id")

            println("==========df create done=========")
            df.show()

          }
        })
      })
    })
ssc.start()
ssc.awaitTermination()

}

Exception Message:

异常信息:

16/12/12 09:28:35 ERROR JobScheduler: Error running job streaming job 1481506110000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more
16/12/12 09:28:35 INFO DAGScheduler: ResultStage 1 (foreachPartition at getkafkadata.scala:75) finished in 0.063 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more
16/12/12 09:28:35 INFO DAGScheduler: Job 1 finished: foreachPartition at getkafkadata.scala:75, took 0.098511 s

1 个解决方案

#1


1  

It's not possible to create a dataframe within an RDD closure. Dataframes operations do not make sense at the executor level.

在RDD闭包中创建dataframe是不可能的。Dataframes操作在执行程序级别没有意义。

Instead, we transform the data in the RDD in the desired format and do the dataframe operations at the driver level.

相反,我们以期望的格式转换RDD中的数据,并在驱动级执行dataframe操作。

eg.: partial code to illustrate the structural changes. Note how the rdd data is transformed first and then converted into a dataframe in the driver.

如。:部分代码来说明结构的变化。注意如何首先转换rdd数据,然后在驱动程序中转换为dataframe。

val datas = stream.foreachRDD{rdd => 
      val parsedData = rdd.flatMap{record => 
            val events = JSONArray.fromObject(record._2)
            events.map(json => // parse + transform each entry into Record)
          }
      val df = cc.createDataFrame(parsedData)
      // write to Cassandra
      df.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "sensordata", "keyspace" -> "iot"))
        .save()
}

#1


1  

It's not possible to create a dataframe within an RDD closure. Dataframes operations do not make sense at the executor level.

在RDD闭包中创建dataframe是不可能的。Dataframes操作在执行程序级别没有意义。

Instead, we transform the data in the RDD in the desired format and do the dataframe operations at the driver level.

相反,我们以期望的格式转换RDD中的数据,并在驱动级执行dataframe操作。

eg.: partial code to illustrate the structural changes. Note how the rdd data is transformed first and then converted into a dataframe in the driver.

如。:部分代码来说明结构的变化。注意如何首先转换rdd数据,然后在驱动程序中转换为dataframe。

val datas = stream.foreachRDD{rdd => 
      val parsedData = rdd.flatMap{record => 
            val events = JSONArray.fromObject(record._2)
            events.map(json => // parse + transform each entry into Record)
          }
      val df = cc.createDataFrame(parsedData)
      // write to Cassandra
      df.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "sensordata", "keyspace" -> "iot"))
        .save()
}