spark提交方式如下
/usr/bigdata/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class in.appcook.test.TestSpark \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 2 \
--files /data/apps/config.properties \
/data/apps/test-1-jar-with-dependencies.jar
在以下三个地方都可以获取到配置文件
object TestSpark2 {
def handler(ssc: StreamingContext, kafkaZkQuorum: String, group: String, kafkaTopics: Map[String, Int]) = {
// Create a direct stream
val kafkaStream = KafkaUtils.createStream(ssc, kafkaZkQuorum, group, kafkaTopics)
kafkaStream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))
LogUtil.info("一")
props.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t一" + props.getProperty(x.toString()))
}
partition.foreach { x =>
LogUtil.info(x)
val filePath1 = "config.properties"
LogUtil.info(filePath1)
val props1 = new Properties()
props1.load(new FileInputStream(filePath1))
LogUtil.info("二")
props1.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t二" + props1.getProperty(x.toString()))
}
}
}
}
}
def main(args: Array[String]): Unit = {
var kafkaZkQuorum = ""
var group = "EventETL_test_group"
var topics = ""
var numThreads = 1
var timeDuration = 3
var checkpointDir = "/Users/test/sparktemp"
println("Usage: configuration file")
val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))
props.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t" + props.getProperty(x.toString()))
}
kafkaZkQuorum = props.getProperty("zookeeper.quorum")
group = props.getProperty("kafka.group.id.test")
topics = props.getProperty("kafka.topics.test")
numThreads = props.getProperty("num.threads.test").toInt
timeDuration = props.getProperty("spark.time.duration").toInt
checkpointDir = "hdfs:/tmp/test_group" + topics.split(",").mkString("-")
val topicMap = topics.split(",").map((_, numThreads)).toMap
val appName = "EventETL" + "_" + topics.split(",").mkString("-")
def createStreamingContext(kafkaZkQuorum: String, group: String, kafkaTopics: Map[String, Int]): StreamingContext = {
val conf = new SparkConf()
conf.setAppName(appName)
val ssc = new StreamingContext(conf, Seconds(timeDuration))
handler(ssc, kafkaZkQuorum, group, kafkaTopics)
//进行备份
//ssc.checkpoint(checkpointDir)
ssc
}
//如果不是第一次启动的话 需要从checkpoint_dir 恢复数据
val ssc = StreamingContext.getOrCreate(checkpointDir, () => {
createStreamingContext(kafkaZkQuorum, group, topicMap)
})
try {
ssc.start()
ssc.awaitTermination()
} finally {
ssc.stop()
}
}
}