SparkException: org.apache.spark.streaming.dstream.MappedDStream has not been initialized

时间:2022-02-10 20:55:09


在使用故障恢复的时候采用此方法进行业务逻辑进行恢复的时候,所有的业务逻辑应该放在 functionToCreateContext 函数内部才能实现checkpoint目录数据的恢复。


import java.text.SimpleDateFormat
import java.util.Date


import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils


import kafka.serializer.StringDecoder

 
//org.spark.streaming.checkpoint.recovery.MyRecoverableNetworkWordCount
object RecoverableKafkaDirectWordCount {


//  var checkpointDirectory = "hdfs:///user/spark/streaming_checkpoint"
  val formatDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

// 将所有的业务逻辑放在该方法中
  def createContext( topic: String, checkpointDirectory: String) = {


    // If you do not see this printed, that means the StreamingContext has been loaded  
    // from the new checkpoint  
    println("------------Creating new context------------")  
    val sparkConf = new SparkConf().setAppName("RecoverableKafkaDirectWordCount.scala")  
    // Create the context with a 1 second batch size  
    val ssc = new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint(checkpointDirectory)  
  
    // Create a ReceiverInputDStream on target ip:port and count the  
    // words in input stream of \n delimited test (eg. generated by 'nc')  

kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.

在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.

    val kafkaParams = Map(

      "metadata.broker.list" -> " xxx:9092" ,


      //      "auto.offset.reset" -> "largest",
      //      "auto.offset.reset" -> "smallest",
      "group.id" -> "streaming-group");
    val topics = Set(topic)
    var kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParams,
      topics)
    val logger = LogFactory.getLog("RecoverableKafkaDirectWordCount.scala")
    
    var batchDur = 4
    var windowDuration = 20
    var parallelism = 20
     
    //  kafkaStream.print()
    val forAppDStream = kafkaStream.map(_._2).map { x =>
      {
        x + "-" + formatDate.format(new Date())
      }
    }
    forAppDStream.print(2)
    val forapp = forAppDStream.foreachRDD(locusRDD => {
     
      locusRDD.foreachPartition(
        locusPoints => {
          val tableName = "test:user"
          val myConf = HBaseConfiguration.create()
          myConf.set("hbase.zookeeper.quorum", " ")
          myConf.set("hbase.zookeeper.property.clientPort", "21810")
          val myTable = new HTable(myConf, TableName.valueOf(tableName))
          myTable.setAutoFlush(false, false) 
          myTable.setWriteBufferSize(3 * 1024 * 1024)  
          import collection.JavaConversions._
          var putList = scala.collection.immutable.List[Put]() //   


          while (locusPoints.hasNext) {
            val name = locusPoints.next();
            val p = new Put(Bytes.toBytes(name));
            p.addColumn("f1".getBytes, "gps_time".getBytes, Bytes.toBytes(formatDate.format(new Date())));
            putList = putList.::(p)
            if (putList.size % 200 == 0) {
              myTable.put(putList)
              putList = scala.collection.immutable.List[Put]()
            }
          }
          myTable.put(putList)


          myTable.flushCommits()  
          myTable.close()
        })
    })


    // Update the cumulative count using updateStateByKey  
    // This will give a Dstream made of state (which is the cumulative count of the words)  
    ssc
  }  
  
  def main(args: Array[String]) {  
    if (args.length != 2) {  
      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))  
      System.err.println(  
        """  
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>  
          |     . <hostname> and <port> describe the TCP server that Spark  
          |     Streaming would connect to receive data. <checkpoint-directory> directory to  
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the  
          |     word counts will be appended  
          |  
          |In local mode, <master> should be 'local[n]' with n > 1  
          |Both <checkpoint-directory> and <output-file> must be absolute paths  
        """.stripMargin  
      )  
      System.exit(1)  
    }  
  
    val Array( topic, checkpointDirectory) = args  
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,  
      () => {  
        createContext(topic, checkpointDirectory)  
      })  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}



I think you have to put your streaming related logic into the function functionToCreateContext, you could refer to the related Spark Streaming example RecoverableNetworkWordCountto change your code.



refer to  this link ->https://issues.apache.org/jira/browse/SPARK-6770