基于Spark-Streaming滑动窗口实现——实时排名与统计
1、主流程
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("StockerRealRank"); //.setMaster("local[5]");
val sc = new SparkContext(conf);
val ssc = new StreamingContext(sc, Seconds(5));
//缓存2天的数据
ssc.remember(Minutes(60 * 48));
val sqlContext = new HiveContext(sc);
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
//1.注册UDF
val udf = UDFUtils();
udf.registerUdf(sqlContext);
//2.kafka数据处理
val kafkaService = KakfaService();
val urlClickLogPairsDStream = kafkaService.kafkaDStreamForStocker(ssc);
//3.缓存hive中的数据
val cacheUtils = CacheUtils();
cacheUtils.cacheStockInfoData(sqlContext);
//4.缓存窗口函数数据处理
val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 2),
Seconds(25));
//第二次消费kafka数据
val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 48),
Seconds(35));
//5.处理业务逻辑
urlClickCnt(urlClickCountsDStream, sqlContext);
urlClickCntByDay(urlClickCountsDStreamByDay, sqlContext);
//6.启动streaming任务
ssc.start();
ssc.awaitTermination();
}
2、注册UDF,采用scala的伴生对象实现
import org.apache.spark.sql.hive.HiveContext
import java.util.regex.Pattern
/**
* @author Administrator
*/
class UDFUtils {
def registerUdf(sqlContext: HiveContext) {
sqlContext.udf.register("strLen", (str: String) => str.length())
sqlContext.udf.register("concat", (str1: String, str2: String, str3: String) => str1 + str2 + str3)
sqlContext.udf.register("concat4", (str1: String, str2: String, str3: String, str4: String) => str1 + str2 + str3 + str4)
sqlContext.udf.register("regexp_extract", (str: String, pattern: String) => {
val matcher = Pattern.compile(pattern, 1).matcher(str)
var res = ""
while (matcher.find()) {
res = matcher.group()
}
res
})
sqlContext.udf.register("getHost", (url: String) => {
var strURL = "";
try {
strURL = url.toString();
if (strURL.contains("://") && (strURL.indexOf("://") < 6) && strURL.length() > (strURL.indexOf("://") + 4)) {
strURL = strURL.substring(strURL.indexOf("://") + 3);
}
if (strURL.contains("/")) {
strURL = strURL.substring(0, strURL.indexOf("/"));
}
if (strURL.contains(":")) {
strURL = strURL.substring(0, strURL.indexOf(":"));
}
} catch {
case e: Exception => println("registerUdf Exception")
}
strURL;
})
}
}
object UDFUtils {
def apply() = new UDFUtils();
}
3、kafka数据处理
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import java.util.regex.Pattern
/**
* @author Administrator
*/
class KakfaService {
def kafkaDStream(ssc: StreamingContext): DStream[(String, Int)] = {
val topics = Set("teststreaming");
val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder");
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);
val strReg = "(.*?[^0-9][1][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][^0-9]).*?";
val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11)
.filter(urlClickLog => {
val url = urlClickLog(7)
val matcher = Pattern.compile(strReg, 1).matcher(url)
var res = ""
while (matcher.find()) {
res = matcher.group();
}
(!res.equals("") && (res.length() > 15) && !url.contains("blog") && !url.contains("bookmark") && !url.contains("tg.hexun.com") && !url.contains("yanbao"))
}).map { urlClickLog =>
{
val url = urlClickLog(7);
val matcher = Pattern.compile(strReg, 1).matcher(url);
var res = "";
while (matcher.find()) {
res = matcher.group();
}
val artId = url.substring(res.length() - 10, res.length() - 1);
(urlClickLog(7) + "\001" + artId, 1);
}
};
return urlClickLogPairsDStream;
}
def kafkaDStreamForStocker(ssc: StreamingContext): DStream[(String, Int)] = {
val topics = Set("teststreaming");
val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder");
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);
val strReg = "(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?";
val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11)
.filter(urlClickLog => {
val requesturl = urlClickLog(7)
var pattern = Pattern.compile(strReg, 1)
var matcher = pattern.matcher(requesturl)
var res = ""
while (matcher.find()) {
res = matcher.group();
}
pattern = Pattern.compile("(.*?index.*?)", 1);
matcher = pattern.matcher(requesturl);
val isIndex = matcher.matches();
pattern = Pattern.compile("(.*?fund.*?)", 1);
matcher = pattern.matcher(requesturl);
val isFund = matcher.matches();
val flag1 = requesturl.contains("stockdata.stock.hexun.com");
val flag2 = requesturl.contains("http://q.m.hexun.com/stock");
val flag3 = requesturl.contains("http://wap.hexun.com/2.0/stockinfo");
val flag4 = requesturl.contains("http://corpwap.hexun.com/2.0/stockinfo");
val flag5 = requesturl.contains("http://vol.stock.hexun.com");
val flag6 = requesturl.contains("m.guba.hexun.com");
val flag7 = requesturl.contains("http://guba.hexun.com");
val flag12 = requesturl.contains("http://baidu.hexun.com/stock");
val flag13 = requesturl.contains("http://yuce.stock.hexun.com/stock");
val flag14 = requesturl.contains(",guba,");
val flag15 = requesturl.contains("http://guba.hexun.com/d/");
val flag16 = requesturl.contains("http://guba.hexun.com/t/");
val length = requesturl.length();
(!res.equals("") && !isIndex && !isFund && (flag1 || flag2 || flag3 || flag4 || flag5 || flag6 || flag12 || flag13 || (flag7 && length == 38) || (flag7 && flag14 && (!flag15 && !flag16))))
}).map { urlClickLog =>
{
val url = urlClickLog(7);
val matcher = Pattern.compile(strReg, 1).matcher(url);
var res = "";
while (matcher.find()) {
res = matcher.group();
}
val stockId = url.substring(res.length() - 6, res.length());
(urlClickLog(0) + "\001" + urlClickLog(1) + "\001" + urlClickLog(3) + "\001" + urlClickLog(7) + "\001" + stockId, 1);
}
};
return urlClickLogPairsDStream;
}
}
object KakfaService {
def apply() = new KakfaService();
}
4、缓存hive的数据,并定时刷新
import java.text.SimpleDateFormat
import org.apache.kafka.clients.producer.Producer
import org.apache.spark.sql.hive.HiveContext
import java.util.Date
import org.apache.spark.storage.StorageLevel
/**
* @author Administrator
*/
class CacheUtils {
def cacheHiveData(sqlContext: HiveContext) {
val cmsChannelDF = sqlContext.sql("select channel_name,channel_desc,channel_id from ods.cms_channel");
cmsChannelDF.registerTempTable("cms_channel_tmp");
cmsChannelDF.persist(StorageLevel.MEMORY_ONLY_2);
cmsChannelDF.show();
cacheEntity(sqlContext);
new Thread(new Producer(sqlContext)).start();
}
def cacheEntity(sqlContext: HiveContext) {
val sdf = new SimpleDateFormat("yyyyMMdd");
val date = new Date();
val lastWeek = new Date(date.getTime - 3 * 365 * 24 * 3600 * 1000l)
val lastWeekFm = sdf.format(lastWeek)
val lastCmsEnityDF = sqlContext.sql("select distinct entity_id,entity_desc,entity_url,entity_channel from stage.CMS_ENTITY_BY_DAY where day>='" + lastWeekFm + "' ");
lastCmsEnityDF.unpersist();
lastCmsEnityDF.persist(StorageLevel.MEMORY_AND_DISK);
lastCmsEnityDF.registerTempTable("cms_entity_tmp");
lastCmsEnityDF.show();
}
def cacheStockInfoData(sqlContext: HiveContext) {
val stockInfoDF = sqlContext.sql("select code,abbr from dms.d_stock_info");
stockInfoDF.registerTempTable("d_stock_info_tmp");
stockInfoDF.persist(StorageLevel.MEMORY_ONLY_2);
stockInfoDF.show();
new Thread(new Producer(sqlContext)).start();
}
class Producer(sqlContext: HiveContext) extends Runnable {
override def run(): Unit =
{
while (true) {
Thread.sleep(1000 * 60 * 31);
cacheEntity(sqlContext);
}
}
}
}
object CacheUtils {
def apply() = new CacheUtils();
}
5、滑动窗口数据处理 reduceByKeyAndWindow应用
//4.缓存窗口函数数据处理
val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 2),
Seconds(25));
//第二次消费kafka数据
val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 48),
Seconds(35));
6、业务逻辑处理
def urlClickCnt(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) {
urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
val cookieid = tuple._1.split("\001")(2);
val url = tuple._1.split("\001")(3);
val stockId = tuple._1.split("\001")(4);
val click_count = tuple._2;
Row(datetime, cookieid, url, stockId, click_count);
});
val structType = StructType(Array(
StructField("datetime", StringType, true),
StructField("cookieid", StringType, true),
StructField("url", StringType, true),
StructField("stockId", StringType, true),
StructField("click_count", IntegerType, true)));
val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);
stockCountDF.registerTempTable("stock_click_log");
val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15";
val stockCntShowDF = sqlContext.sql(sql);
stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4)));
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
val date = new Date();
val lastHour = new Date(date.getTime - 3600 * 1000l)
val lastHourFm = sdf.format(lastHour)
val last2Hour = new Date(date.getTime - 2 * 3600 * 1000l)
val last2HourFm = sdf.format(last2Hour)
val lastDay = new Date(date.getTime - 24 * 3600 * 1000l)
val lastDayFm = sdf.format(lastDay)
val lastDayS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l)
val lastDayFmS = sdf.format(lastDayS)
val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt,SPLYcnt from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt,"
.concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt,")
.concat(" round((t1.clickcnt / (case when t3.clickcntyesdayhour is null then 1 else t3.clickcntyesdayhour end) - 1) * 100, 2) as SPLYcnt")
.concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt")
.concat(" from stock_click_log where datetime>='" + lastHourFm + "' ")
.concat(" group by stockId")
.concat(" order by clickcnt desc limit 20) t1")
.concat(" left join (select stockId, count(stockId) clickcntyesday")
.concat(" from stock_click_log ")
.concat(" where datetime>='" + last2HourFm + "' and datetime<='" + lastHourFm + "' ")
.concat(" group by stockId) t2")
.concat(" on t1.stockId = t2.stockId ")
.concat(" left join (select stockId, count(stockId) clickcntyesdayhour")
.concat(" from stock_click_log ")
.concat(" where datetime>='" + lastDayFmS + "' and datetime<='" + lastDayFm + "' ")
.concat(" group by stockId) t3")
.concat(" on t1.stockId = t3.stockId) tmp")
.concat(" left join d_stock_info_tmp info on tmp.stockId=info.code");
val stockCntHourDF = sqlContext.sql(hoursql);
stockCntHourDF.show();
stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4)));
val jdbcUtils = JDBCUtils();
val conn = jdbcUtils.getConn();
val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss");
val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark "
.concat("(stockId,url,clickcnt,LPcnt,SPLYcnt,type,recordtime)")
.concat(" VALUES(?,?,?,?,?,?,?)");
var pstat = jdbcUtils.getPstat(conn, sqlStockHour);
stockCntHourDF.collect().foreach(x =>
{
pstat.setString(1, x.get(0) + "");
pstat.setString(2, x.get(1) + "");
pstat.setInt(3, (x.getLong(2) + "").toInt);
pstat.setFloat(4, (x.getDouble(3) + "").toFloat);
pstat.setFloat(5, (x.getDouble(4) + "").toFloat);
pstat.setString(6, "02");
pstat.setString(7, stattime);
pstat.executeUpdate();
});
jdbcUtils.closeConn(conn, pstat);
})
}
def urlClickCntByDay(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) {
urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
val cookieid = tuple._1.split("\001")(2);
val url = tuple._1.split("\001")(3);
val stockId = tuple._1.split("\001")(4);
val click_count = tuple._2;
Row(datetime, cookieid, url, stockId, click_count);
});
val structType = StructType(Array(
StructField("datetime", StringType, true),
StructField("cookieid", StringType, true),
StructField("url", StringType, true),
StructField("stockId", StringType, true),
StructField("click_count", IntegerType, true)));
val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);
stockCountDF.registerTempTable("stock_click_log");
val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15";
val stockCntShowDF = sqlContext.sql(sql);
stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3)));
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
val date = new Date();
val lastHour = new Date(date.getTime - 24 * 3600 * 1000l)
val lastHourFm = sdf.format(lastHour)
val last2Hour = new Date(date.getTime - 2 * 24 * 3600 * 1000l)
val last2HourFm = sdf.format(last2Hour)
val last2HourS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l)
val last2HourFmS = sdf.format(last2HourS)
val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt,"
.concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt")
.concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt")
.concat(" from stock_click_log where datetime>='" + lastHourFm + "' ")
.concat(" group by stockId")
.concat(" order by clickcnt desc limit 20) t1")
.concat(" left join (select stockId, count(stockId) clickcntyesday")
.concat(" from stock_click_log ")
.concat(" where datetime>='" + last2HourFm + "' and datetime<='" + lastHourFm + "' ")
.concat(" group by stockId) t2")
.concat(" on t1.stockId = t2.stockId ) tmp")
.concat(" left join d_stock_info_tmp info on tmp.stockId=info.code");
val stockCntHourDF = sqlContext.sql(hoursql);
stockCntHourDF.show();
stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3)));
val jdbcUtils = JDBCUtils();
val conn = jdbcUtils.getConn();
val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss");
val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark "
.concat("(stockId,url,clickcnt,LPcnt,type,recordtime)")
.concat(" VALUES(?,?,?,?,?,?)");
var pstat = jdbcUtils.getPstat(conn, sqlStockHour);
stockCntHourDF.collect().foreach(x =>
{
pstat.setString(1, x.get(0) + "");
pstat.setString(2, x.get(1) + "");
pstat.setInt(3, (x.getLong(2) + "").toInt);
pstat.setFloat(4, (x.getDouble(3) + "").toFloat);
pstat.setString(5, "01");
pstat.setString(6, stattime);
pstat.executeUpdate();
});
jdbcUtils.closeConn(conn, pstat);
})
}