spark读取配置文件中的配置

时间:2022-03-18 04:39:05

spark提交方式如下

/usr/bigdata/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class in.appcook.test.TestSpark \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 2 \
--files /data/apps/config.properties \
/data/apps/test-1-jar-with-dependencies.jar

在以下三个地方都可以获取到配置文件

object TestSpark2 {
def handler(ssc: StreamingContext, kafkaZkQuorum: String, group: String, kafkaTopics: Map[String, Int]) = {
// Create a direct stream
val kafkaStream = KafkaUtils.createStream(ssc, kafkaZkQuorum, group, kafkaTopics)

kafkaStream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>

val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))

LogUtil.info("一")
props.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t一" + props.getProperty(x.toString()))
}

partition.foreach { x =>
LogUtil.info(x)

val filePath1 = "config.properties"
LogUtil.info(filePath1)
val props1 = new Properties()
props1.load(new FileInputStream(filePath1))

LogUtil.info("二")
props1.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t二" + props1.getProperty(x.toString()))
}

}
}
}
}

def main(args: Array[String]): Unit = {

var kafkaZkQuorum = ""
var group = "EventETL_test_group"
var topics = ""
var numThreads = 1
var timeDuration = 3

var checkpointDir = "/Users/test/sparktemp"

println("Usage: configuration file")

val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))

props.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t" + props.getProperty(x.toString()))
}

kafkaZkQuorum = props.getProperty("zookeeper.quorum")
group = props.getProperty("kafka.group.id.test")
topics = props.getProperty("kafka.topics.test")
numThreads = props.getProperty("num.threads.test").toInt
timeDuration = props.getProperty("spark.time.duration").toInt

checkpointDir = "hdfs:/tmp/test_group" + topics.split(",").mkString("-")

val topicMap = topics.split(",").map((_, numThreads)).toMap

val appName = "EventETL" + "_" + topics.split(",").mkString("-")
def createStreamingContext(kafkaZkQuorum: String, group: String, kafkaTopics: Map[String, Int]): StreamingContext = {
val conf = new SparkConf()
conf.setAppName(appName)

val ssc = new StreamingContext(conf, Seconds(timeDuration))
handler(ssc, kafkaZkQuorum, group, kafkaTopics)

//进行备份
//ssc.checkpoint(checkpointDir)
ssc
}

//如果不是第一次启动的话 需要从checkpoint_dir 恢复数据
val ssc = StreamingContext.getOrCreate(checkpointDir, () => {
createStreamingContext(kafkaZkQuorum, group, topicMap)
})

try {
ssc.start()
ssc.awaitTermination()
} finally {
ssc.stop()
}

}
}