基于Spark-Streaming滑动窗口实现——实时排名与统计
1、主流程
def main(args: Array[String]) { val conf = new SparkConf().setAppName("StockerRealRank"); //.setMaster("local[5]"); val sc = new SparkContext(conf); val ssc = new StreamingContext(sc, Seconds(5)); //缓存2天的数据 ssc.remember(Minutes(60 * 48)); val sqlContext = new HiveContext(sc); Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); //1.注册UDF val udf = UDFUtils(); udf.registerUdf(sqlContext); //2.kafka数据处理 val kafkaService = KakfaService(); val urlClickLogPairsDStream = kafkaService.kafkaDStreamForStocker(ssc); //3.缓存hive中的数据 val cacheUtils = CacheUtils(); cacheUtils.cacheStockInfoData(sqlContext); //4.缓存窗口函数数据处理 val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Minutes(60 * 2), Seconds(25)); //第二次消费kafka数据 val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Minutes(60 * 48), Seconds(35)); //5.处理业务逻辑 urlClickCnt(urlClickCountsDStream, sqlContext); urlClickCntByDay(urlClickCountsDStreamByDay, sqlContext); //6.启动streaming任务 ssc.start(); ssc.awaitTermination(); }
2、注册UDF,采用scala的伴生对象实现
import org.apache.spark.sql.hive.HiveContext import java.util.regex.Pattern /** * @author Administrator */ class UDFUtils { def registerUdf(sqlContext: HiveContext) { sqlContext.udf.register("strLen", (str: String) => str.length()) sqlContext.udf.register("concat", (str1: String, str2: String, str3: String) => str1 + str2 + str3) sqlContext.udf.register("concat4", (str1: String, str2: String, str3: String, str4: String) => str1 + str2 + str3 + str4) sqlContext.udf.register("regexp_extract", (str: String, pattern: String) => { val matcher = Pattern.compile(pattern, 1).matcher(str) var res = "" while (matcher.find()) { res = matcher.group() } res }) sqlContext.udf.register("getHost", (url: String) => { var strURL = ""; try { strURL = url.toString(); if (strURL.contains("://") && (strURL.indexOf("://") < 6) && strURL.length() > (strURL.indexOf("://") + 4)) { strURL = strURL.substring(strURL.indexOf("://") + 3); } if (strURL.contains("/")) { strURL = strURL.substring(0, strURL.indexOf("/")); } if (strURL.contains(":")) { strURL = strURL.substring(0, strURL.indexOf(":")); } } catch { case e: Exception => println("registerUdf Exception") } strURL; }) } } object UDFUtils { def apply() = new UDFUtils(); }
3、kafka数据处理
import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import kafka.serializer.StringDecoder import java.util.regex.Pattern /** * @author Administrator */ class KakfaService { def kafkaDStream(ssc: StreamingContext): DStream[(String, Int)] = { val topics = Set("teststreaming"); val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092"; val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder"); // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics); val strReg = "(.*?[^0-9][1][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][^0-9]).*?"; val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11) .filter(urlClickLog => { val url = urlClickLog(7) val matcher = Pattern.compile(strReg, 1).matcher(url) var res = "" while (matcher.find()) { res = matcher.group(); } (!res.equals("") && (res.length() > 15) && !url.contains("blog") && !url.contains("bookmark") && !url.contains("tg.hexun.com") && !url.contains("yanbao")) }).map { urlClickLog => { val url = urlClickLog(7); val matcher = Pattern.compile(strReg, 1).matcher(url); var res = ""; while (matcher.find()) { res = matcher.group(); } val artId = url.substring(res.length() - 10, res.length() - 1); (urlClickLog(7) + "\001" + artId, 1); } }; return urlClickLogPairsDStream; } def kafkaDStreamForStocker(ssc: StreamingContext): DStream[(String, Int)] = { val topics = Set("teststreaming"); val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092"; val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder"); // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics); val strReg = "(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?"; val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11) .filter(urlClickLog => { val requesturl = urlClickLog(7) var pattern = Pattern.compile(strReg, 1) var matcher = pattern.matcher(requesturl) var res = "" while (matcher.find()) { res = matcher.group(); } pattern = Pattern.compile("(.*?index.*?)", 1); matcher = pattern.matcher(requesturl); val isIndex = matcher.matches(); pattern = Pattern.compile("(.*?fund.*?)", 1); matcher = pattern.matcher(requesturl); val isFund = matcher.matches(); val flag1 = requesturl.contains("stockdata.stock.hexun.com"); val flag2 = requesturl.contains("http://q.m.hexun.com/stock"); val flag3 = requesturl.contains("http://wap.hexun.com/2.0/stockinfo"); val flag4 = requesturl.contains("http://corpwap.hexun.com/2.0/stockinfo"); val flag5 = requesturl.contains("http://vol.stock.hexun.com"); val flag6 = requesturl.contains("m.guba.hexun.com"); val flag7 = requesturl.contains("http://guba.hexun.com"); val flag12 = requesturl.contains("http://baidu.hexun.com/stock"); val flag13 = requesturl.contains("http://yuce.stock.hexun.com/stock"); val flag14 = requesturl.contains(",guba,"); val flag15 = requesturl.contains("http://guba.hexun.com/d/"); val flag16 = requesturl.contains("http://guba.hexun.com/t/"); val length = requesturl.length(); (!res.equals("") && !isIndex && !isFund && (flag1 || flag2 || flag3 || flag4 || flag5 || flag6 || flag12 || flag13 || (flag7 && length == 38) || (flag7 && flag14 && (!flag15 && !flag16)))) }).map { urlClickLog => { val url = urlClickLog(7); val matcher = Pattern.compile(strReg, 1).matcher(url); var res = ""; while (matcher.find()) { res = matcher.group(); } val stockId = url.substring(res.length() - 6, res.length()); (urlClickLog(0) + "\001" + urlClickLog(1) + "\001" + urlClickLog(3) + "\001" + urlClickLog(7) + "\001" + stockId, 1); } }; return urlClickLogPairsDStream; } } object KakfaService { def apply() = new KakfaService(); }
4、缓存hive的数据,并定时刷新
import java.text.SimpleDateFormat import org.apache.kafka.clients.producer.Producer import org.apache.spark.sql.hive.HiveContext import java.util.Date import org.apache.spark.storage.StorageLevel /** * @author Administrator */ class CacheUtils { def cacheHiveData(sqlContext: HiveContext) { val cmsChannelDF = sqlContext.sql("select channel_name,channel_desc,channel_id from ods.cms_channel"); cmsChannelDF.registerTempTable("cms_channel_tmp"); cmsChannelDF.persist(StorageLevel.MEMORY_ONLY_2); cmsChannelDF.show(); cacheEntity(sqlContext); new Thread(new Producer(sqlContext)).start(); } def cacheEntity(sqlContext: HiveContext) { val sdf = new SimpleDateFormat("yyyyMMdd"); val date = new Date(); val lastWeek = new Date(date.getTime - 3 * 365 * 24 * 3600 * 1000l) val lastWeekFm = sdf.format(lastWeek) val lastCmsEnityDF = sqlContext.sql("select distinct entity_id,entity_desc,entity_url,entity_channel from stage.CMS_ENTITY_BY_DAY where day>='" + lastWeekFm + "' "); lastCmsEnityDF.unpersist(); lastCmsEnityDF.persist(StorageLevel.MEMORY_AND_DISK); lastCmsEnityDF.registerTempTable("cms_entity_tmp"); lastCmsEnityDF.show(); } def cacheStockInfoData(sqlContext: HiveContext) { val stockInfoDF = sqlContext.sql("select code,abbr from dms.d_stock_info"); stockInfoDF.registerTempTable("d_stock_info_tmp"); stockInfoDF.persist(StorageLevel.MEMORY_ONLY_2); stockInfoDF.show(); new Thread(new Producer(sqlContext)).start(); } class Producer(sqlContext: HiveContext) extends Runnable { override def run(): Unit = { while (true) { Thread.sleep(1000 * 60 * 31); cacheEntity(sqlContext); } } } } object CacheUtils { def apply() = new CacheUtils(); }
5、滑动窗口数据处理 reduceByKeyAndWindow应用
//4.缓存窗口函数数据处理 val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Minutes(60 * 2), Seconds(25)); //第二次消费kafka数据 val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Minutes(60 * 48), Seconds(35));
6、业务逻辑处理
def urlClickCnt(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) { urlClickCountsDStream.foreachRDD(urlClickCountsRDD => { val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => { val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1); val cookieid = tuple._1.split("\001")(2); val url = tuple._1.split("\001")(3); val stockId = tuple._1.split("\001")(4); val click_count = tuple._2; Row(datetime, cookieid, url, stockId, click_count); }); val structType = StructType(Array( StructField("datetime", StringType, true), StructField("cookieid", StringType, true), StructField("url", StringType, true), StructField("stockId", StringType, true), StructField("click_count", IntegerType, true))); val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType); stockCountDF.registerTempTable("stock_click_log"); val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15"; val stockCntShowDF = sqlContext.sql(sql); stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4))); val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); val date = new Date(); val lastHour = new Date(date.getTime - 3600 * 1000l) val lastHourFm = sdf.format(lastHour) val last2Hour = new Date(date.getTime - 2 * 3600 * 1000l) val last2HourFm = sdf.format(last2Hour) val lastDay = new Date(date.getTime - 24 * 3600 * 1000l) val lastDayFm = sdf.format(lastDay) val lastDayS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l) val lastDayFmS = sdf.format(lastDayS) val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt,SPLYcnt from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt," .concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt,") .concat(" round((t1.clickcnt / (case when t3.clickcntyesdayhour is null then 1 else t3.clickcntyesdayhour end) - 1) * 100, 2) as SPLYcnt") .concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt") .concat(" from stock_click_log where datetime>='" + lastHourFm + "' ") .concat(" group by stockId") .concat(" order by clickcnt desc limit 20) t1") .concat(" left join (select stockId, count(stockId) clickcntyesday") .concat(" from stock_click_log ") .concat(" where datetime>='" + last2HourFm + "' and datetime<='" + lastHourFm + "' ") .concat(" group by stockId) t2") .concat(" on t1.stockId = t2.stockId ") .concat(" left join (select stockId, count(stockId) clickcntyesdayhour") .concat(" from stock_click_log ") .concat(" where datetime>='" + lastDayFmS + "' and datetime<='" + lastDayFm + "' ") .concat(" group by stockId) t3") .concat(" on t1.stockId = t3.stockId) tmp") .concat(" left join d_stock_info_tmp info on tmp.stockId=info.code"); val stockCntHourDF = sqlContext.sql(hoursql); stockCntHourDF.show(); stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4))); val jdbcUtils = JDBCUtils(); val conn = jdbcUtils.getConn(); val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss"); val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark " .concat("(stockId,url,clickcnt,LPcnt,SPLYcnt,type,recordtime)") .concat(" VALUES(?,?,?,?,?,?,?)"); var pstat = jdbcUtils.getPstat(conn, sqlStockHour); stockCntHourDF.collect().foreach(x => { pstat.setString(1, x.get(0) + ""); pstat.setString(2, x.get(1) + ""); pstat.setInt(3, (x.getLong(2) + "").toInt); pstat.setFloat(4, (x.getDouble(3) + "").toFloat); pstat.setFloat(5, (x.getDouble(4) + "").toFloat); pstat.setString(6, "02"); pstat.setString(7, stattime); pstat.executeUpdate(); }); jdbcUtils.closeConn(conn, pstat); }) } def urlClickCntByDay(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) { urlClickCountsDStream.foreachRDD(urlClickCountsRDD => { val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => { val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1); val cookieid = tuple._1.split("\001")(2); val url = tuple._1.split("\001")(3); val stockId = tuple._1.split("\001")(4); val click_count = tuple._2; Row(datetime, cookieid, url, stockId, click_count); }); val structType = StructType(Array( StructField("datetime", StringType, true), StructField("cookieid", StringType, true), StructField("url", StringType, true), StructField("stockId", StringType, true), StructField("click_count", IntegerType, true))); val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType); stockCountDF.registerTempTable("stock_click_log"); val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15"; val stockCntShowDF = sqlContext.sql(sql); stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3))); val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); val date = new Date(); val lastHour = new Date(date.getTime - 24 * 3600 * 1000l) val lastHourFm = sdf.format(lastHour) val last2Hour = new Date(date.getTime - 2 * 24 * 3600 * 1000l) val last2HourFm = sdf.format(last2Hour) val last2HourS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l) val last2HourFmS = sdf.format(last2HourS) val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt," .concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt") .concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt") .concat(" from stock_click_log where datetime>='" + lastHourFm + "' ") .concat(" group by stockId") .concat(" order by clickcnt desc limit 20) t1") .concat(" left join (select stockId, count(stockId) clickcntyesday") .concat(" from stock_click_log ") .concat(" where datetime>='" + last2HourFm + "' and datetime<='" + lastHourFm + "' ") .concat(" group by stockId) t2") .concat(" on t1.stockId = t2.stockId ) tmp") .concat(" left join d_stock_info_tmp info on tmp.stockId=info.code"); val stockCntHourDF = sqlContext.sql(hoursql); stockCntHourDF.show(); stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3))); val jdbcUtils = JDBCUtils(); val conn = jdbcUtils.getConn(); val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss"); val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark " .concat("(stockId,url,clickcnt,LPcnt,type,recordtime)") .concat(" VALUES(?,?,?,?,?,?)"); var pstat = jdbcUtils.getPstat(conn, sqlStockHour); stockCntHourDF.collect().foreach(x => { pstat.setString(1, x.get(0) + ""); pstat.setString(2, x.get(1) + ""); pstat.setInt(3, (x.getLong(2) + "").toInt); pstat.setFloat(4, (x.getDouble(3) + "").toFloat); pstat.setString(5, "01"); pstat.setString(6, stattime); pstat.executeUpdate(); }); jdbcUtils.closeConn(conn, pstat); }) }