在使用故障恢复的时候采用此方法进行业务逻辑进行恢复的时候,所有的业务逻辑应该放在 functionToCreateContext 函数内部才能实现checkpoint目录数据的恢复。
import java.util.Date
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
//org.spark.streaming.checkpoint.recovery.MyRecoverableNetworkWordCount
object RecoverableKafkaDirectWordCount {
// var checkpointDirectory = "hdfs:///user/spark/streaming_checkpoint"
val formatDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 将所有的业务逻辑放在该方法中
def createContext( topic: String, checkpointDirectory: String) = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("------------Creating new context------------")
val sparkConf = new SparkConf().setAppName("RecoverableKafkaDirectWordCount.scala")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDirectory)
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.val kafkaParams = Map(
"metadata.broker.list" -> " xxx:9092" ,
// "auto.offset.reset" -> "smallest",
"group.id" -> "streaming-group");
val topics = Set(topic)
var kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics)
val logger = LogFactory.getLog("RecoverableKafkaDirectWordCount.scala")
var batchDur = 4
var windowDuration = 20
var parallelism = 20
// kafkaStream.print()
val forAppDStream = kafkaStream.map(_._2).map { x =>
{
x + "-" + formatDate.format(new Date())
}
}
forAppDStream.print(2)
val forapp = forAppDStream.foreachRDD(locusRDD => {
locusRDD.foreachPartition(
locusPoints => {
val tableName = "test:user"
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum", " ")
myConf.set("hbase.zookeeper.property.clientPort", "21810")
val myTable = new HTable(myConf, TableName.valueOf(tableName))
myTable.setAutoFlush(false, false)
myTable.setWriteBufferSize(3 * 1024 * 1024)
import collection.JavaConversions._
var putList = scala.collection.immutable.List[Put]() //
while (locusPoints.hasNext) {
val name = locusPoints.next();
val p = new Put(Bytes.toBytes(name));
p.addColumn("f1".getBytes, "gps_time".getBytes, Bytes.toBytes(formatDate.format(new Date())));
putList = putList.::(p)
if (putList.size % 200 == 0) {
myTable.put(putList)
putList = scala.collection.immutable.List[Put]()
}
}
myTable.put(putList)
myTable.flushCommits()
myTable.close()
})
})
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
ssc
}
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
| . <hostname> and <port> describe the TCP server that Spark
| Streaming would connect to receive data. <checkpoint-directory> directory to
| HDFS-compatible file system which checkpoint data <output-file> file to which the
| word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
}
val Array( topic, checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(topic, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}
I think you have to put your streaming related logic into the function functionToCreateContext, you could refer to the related Spark Streaming example RecoverableNetworkWordCountto change your code.
refer to this link ->https://issues.apache.org/jira/browse/SPARK-6770