“java.io.NotSerializableException:org.apache.spark.streaming.StreamingContext”执行spark streaming时

时间:2021-02-07 20:52:04

When i executor spark streaming application on yarn, i continued to receive the following error

当我执行者在纱线上引发流媒体应用时,我继续收到以下错误

Why the error happened and how to solve it ? Any suggestion will help, thank you~

为什么错误发生以及如何解决?任何建议都会有所帮助,谢谢〜

15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
    15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
    15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
    java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

The spark streaming application code as follow, i execute it in spark-shell

火花流应用程序代码如下,我在spark-shell中执行它

    import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(0)
}

val ssc = new StreamingContext(sc,
  new Duration(5000))
ssc.checkpoint(".")

val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))

val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()

ssc.start()
ssc.awaitTermination()

1 个解决方案

#1


The reference to val updateFunc used within the closure of updateStateByKey is pulling the rest of that instance into the closure and taking the StreamingContext with it.

在updateStateByKey的闭包中使用的对val updateFunc的引用是将该实例的其余部分拉入闭包并使用StreamingContext。

Two options:

  • Quick fix: Declare streaming context transient => @transient val ssc= ... Also a good idea to annotate the dstream declarations as @transient as well.
  • 快速修复:声明流上下文瞬态=> @transient val ssc = ...也是一个好主意,也可以将dstream声明注释为@transient。

  • A better fix: Put your functions in a separate object
  • 更好的解决方法:将您的函数放在一个单独的对象中

Like this:

case object TransformFunctions {
    val updateFunc = ???
}

#1


The reference to val updateFunc used within the closure of updateStateByKey is pulling the rest of that instance into the closure and taking the StreamingContext with it.

在updateStateByKey的闭包中使用的对val updateFunc的引用是将该实例的其余部分拉入闭包并使用StreamingContext。

Two options:

  • Quick fix: Declare streaming context transient => @transient val ssc= ... Also a good idea to annotate the dstream declarations as @transient as well.
  • 快速修复:声明流上下文瞬态=> @transient val ssc = ...也是一个好主意,也可以将dstream声明注释为@transient。

  • A better fix: Put your functions in a separate object
  • 更好的解决方法:将您的函数放在一个单独的对象中

Like this:

case object TransformFunctions {
    val updateFunc = ???
}