val conf = new SparkConf().setAppName("checkoutresult") //设置应用程序的名称,在程序运行的监控界面可以看到名称 // conf.setMaster("local[4]") //此时,程序在Spark集群 // val conf = new SparkConf().setMaster("local").setAppName("checkoutresult")
// val conf = new SparkConf().setAppName("checkoutresult") //设置应用程序的名称,在程序运行的监控界面可以看到名称 val ssc = new StreamingContext(conf,Seconds(300)) // ssc.sparkContext.setLogLevel("ERROR") // ssc.checkpoint("/library/checkpoint") Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) ssc.sparkContext.setLogLevel("OFF") //设置检查点 ssc.checkpoint("/checkoutresult/checkpoint") val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// Kafka configurations val topics = Set("your_topic")
//测试环境kafka val brokers = "192.168.2.6:9292,192.168.2.7:9292,192.168.2.8:9292"
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//获取数据 格式为 key##cookieid val events = kafkaStream.flatMap(line => { //获取事件名称 val key = JSON.parseObject(line._2).getJSONObject("o").getJSONObject("i").getString("key")
//获取cookieid val cid = JSON.parseObject(line._2).getJSONObject("o").getJSONObject("c").getString("kid")
//获取 siteid val siteid = JSON.parseObject(line._2).getJSONObject("o").getJSONObject("c").getBigInteger("tid")
//获取 国家 val country = JSON.parseObject(line._2).getJSONObject("o").getJSONObject("l").getString("cc")
val data=key+"##"+cid+"##"+siteid+"##"+country Some(data) })
//pv 数据 val splitRdd = events.map{ t => val x = t.split("##") (x(0).toString,x(1).toString,x(2).toString,x(3).toString) }.filter({case (x)=> x._1=="[CLY]_view"})
//下单事件数据 val checkout = events.map{ t => val x = t.split("##") (x(0).toString,x(1).toString,x(2).toString,x(3).toString) }.filter({case (x)=> x._1=="key_checkout_result"})
val cookieid = adInfo._1.split("##")(0) val siteid = adInfo._1.split("##")(1) val country = adInfo._1.split("##")(2) val is_checkout = adInfo._2._2.getOrElse((0))
(cookieid, siteid, country, is_checkout) })
result.foreachRDD(rdd=>{ val sqlContext = SQLContextSingleton1.getInstance(rdd.sparkContext) import sqlContext.implicits._ //构造case class: OrderConv,提取日志中相应的字段 val OrderConvFrame = rdd.map(w => OrderConv(w._1,w._2.toInt,w._3,w._4)).toDF() //注册为tempTable OrderConvFrame.registerTempTable("OrderConv")
val dt=dateFormat.format(new Date()) val logCountsDataFrame = sqlContext.sql( "select '" + dt+ "' as time,country ,siteid" + ",count(distinct cookieid) " + "as uv ,count(distinct case" + " when is_checkout=1 then cookieid else null end )as checkoutuv from OrderConv " + "group by '" + dt +"', country,siteid") //打印查询结果 // logCountsDataFrame.show() // val result=logCountsDataFrame.select("time","country","siteid","uv").rdd
val prop = new java.util.Properties prop.setProperty("user","xxxx") prop.setProperty("password","xxxxx")
// 调用DataFrameWriter将数据写入mysql val dataResult = logCountsDataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.6:3306/xxx?autoReconnect=true&useSSL=false","pv1018",prop) // 表可以不存在