启用了DStream checkpoint,但是具有其功能的DStreams不是可序列化的

时间:2023-01-16 20:49:21

I want to send DStream to Kafka , but it doesn't still work.

我想把DStream发给Kafka,但它还是不行。

searchWordCountsDStream.foreachRDD(rdd =>
rdd.foreachPartition(

    partitionOfRecords =>
    {
      val props = new HashMap[String, Object]()

      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outbroker)

      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String,String](props)

      partitionOfRecords.foreach
      {
        case (x:String,y:String)=>{
          println(x)
          val message=new ProducerRecord[String, String](outtopic,null,x)
          producer.send(message)
        }
      }
      producer.close()
    })
)

this is some error info :

这是一些错误信息:

16/10/31 14:44:15 ERROR StreamingContext: Error starting the context, marking it as stopped java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable spider.app.job.MeetMonitor Serialization stack: - object not serializable (class: spider.app.job.MeetMonitor, value: spider.app.job.MeetMonitor@433c6abb) - field (class: spider.app.job.MeetMonitor$$anonfun$createContext$2, name: $outer, type: class spider.app.job.MeetMonitor) - object (class spider.app.job.MeetMonitor$$anonfun$createContext$2, ) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@3ac3f6f) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files

16/10/31 14:44:15错误流上下文:错误启动上下文,将其标记为已停止的java.io。NotSerializableException: DStream checkpoint已经启用,但是具有其功能的DStreams不是可序列化的spider.app.job。会议监控序列化堆栈:-对象不是可序列化的(类:spider.app.job)。meeting monitor, value: spider.app.job. meetmonitor@433c6abb) - field(类:spider.app.job)。MeetMonitor$ anonfun$createContext$2, name: $outer,类型:class spider.app.job.monitor)—对象(类spider.app.job)。$anonfun$createContext$2) -字段(类:org.apache.spark.streaming.dstream。DStream$ anonfun$foreachRDD$1$ anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.stream.dstream . object)。DStream$ anonfun$foreachRDD$1$ anonfun$apply$mcV$sp$3) - writeObject数据(class: org.apache.sparks . stream.dstream .DStream) - object (class org.apache.sparkstream.dstream .DStream)。ForEachDStream, org. apache.sparks . streaming.dstream.foreachdstream@3ac3f6f) - writeObject数据(类:org. apache.sparks . streaming.dstream.dstreamcheckpointdata) -对象(类:org. apache.spark.dstream .dstream. dstream. dstream. dstream. dstream. dstream. dstream. dstream。DStreamCheckpointData,[0检查点文件

]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@6f9c5048) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@6f9c5048, org.apache.spark.streaming.dstream.ForEachDStream@3ac3f6f)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files

)- writeObject数据(类:org.apache.spark.stream .dstream. dstream. dstream. dstream. dstream。ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@6f9c5048)—数组元素(索引:0)—数组(类[Ljava.lang.Object;,大小16)—字段(类:scala.collection. mutablecomable)。ArrayBuffer, name: array, type: class [Ljava.lang.Object;] - object (class scala.collection. mutablecopy)。ArrayBuffer ArrayBuffer(org.apache.spark.streaming.dstream。@ ForEachDStream@6f9c5048, org.apache.spark.streamk.stream.dstream.foreachdstream@3ac3f6f) - writeObject数据(class: org.apache.sparks . streamk.stream.dstreamcheckpointdata) -对象(class: org.apache.sparksc.brach.dstream)。DStreamCheckpointData,[0检查点文件

])

])

1 个解决方案

#1


1  

I encountered the same problem and found an answer here

我遇到了同样的问题,在这里找到了答案

https://forums.databricks.com/questions/382/why-is-my-spark-streaming-application-throwing-a-n.html

https://forums.databricks.com/questions/382/why-is-my-spark-streaming-application-throwing-a-n.html

It seems that using checkpoint with foreachRDD causes the problem. After removing checkpoint in my code, everything is fine.

似乎使用带有foreachRDD的检查点会导致这个问题。在我的代码中删除了检查点之后,一切都很好。

P/S. I just want to comment, but I do not have enough reputation to do so.

P / S。我只是想评论一下,但是我没有足够的声誉去评论。

#1


1  

I encountered the same problem and found an answer here

我遇到了同样的问题,在这里找到了答案

https://forums.databricks.com/questions/382/why-is-my-spark-streaming-application-throwing-a-n.html

https://forums.databricks.com/questions/382/why-is-my-spark-streaming-application-throwing-a-n.html

It seems that using checkpoint with foreachRDD causes the problem. After removing checkpoint in my code, everything is fine.

似乎使用带有foreachRDD的检查点会导致这个问题。在我的代码中删除了检查点之后,一切都很好。

P/S. I just want to comment, but I do not have enough reputation to do so.

P / S。我只是想评论一下,但是我没有足够的声誉去评论。