基于Spark-Streaming滑动窗口实现——实时排名与统计

时间:2021-12-04 20:48:13

基于Spark-Streaming滑动窗口实现——实时排名与统计


1、主流程

 def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("StockerRealRank"); //.setMaster("local[5]");
    val sc = new SparkContext(conf);
    val ssc = new StreamingContext(sc, Seconds(5));
    //缓存2天的数据
    ssc.remember(Minutes(60 * 48));
    val sqlContext = new HiveContext(sc);

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

    //1.注册UDF
    val udf = UDFUtils();
    udf.registerUdf(sqlContext);

    //2.kafka数据处理
    val kafkaService = KakfaService();
    val urlClickLogPairsDStream = kafkaService.kafkaDStreamForStocker(ssc);

    //3.缓存hive中的数据
    val cacheUtils = CacheUtils();
    cacheUtils.cacheStockInfoData(sqlContext);

    //4.缓存窗口函数数据处理
    val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Minutes(60 * 2),
      Seconds(25));

    //第二次消费kafka数据
    val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Minutes(60 * 48),
      Seconds(35));

    //5.处理业务逻辑
    urlClickCnt(urlClickCountsDStream, sqlContext);
    urlClickCntByDay(urlClickCountsDStreamByDay, sqlContext);

    //6.启动streaming任务
    ssc.start();
    ssc.awaitTermination();

  }


2、注册UDF,采用scala的伴生对象实现

import org.apache.spark.sql.hive.HiveContext
import java.util.regex.Pattern

/**
 * @author Administrator
 */
class UDFUtils {
  def registerUdf(sqlContext: HiveContext) {
    sqlContext.udf.register("strLen", (str: String) => str.length())

    sqlContext.udf.register("concat", (str1: String, str2: String, str3: String) => str1 + str2 + str3)

    sqlContext.udf.register("concat4", (str1: String, str2: String, str3: String, str4: String) => str1 + str2 + str3 + str4)

    sqlContext.udf.register("regexp_extract", (str: String, pattern: String) => {
      val matcher = Pattern.compile(pattern, 1).matcher(str)
      var res = ""
      while (matcher.find()) {
        res = matcher.group()
      }
      res
    })

    sqlContext.udf.register("getHost", (url: String) => {
      var strURL = "";
      try {
        strURL = url.toString();
        if (strURL.contains("://") && (strURL.indexOf("://") < 6) && strURL.length() > (strURL.indexOf("://") + 4)) {
          strURL = strURL.substring(strURL.indexOf("://") + 3);
        }

        if (strURL.contains("/")) {
          strURL = strURL.substring(0, strURL.indexOf("/"));
        }

        if (strURL.contains(":")) {
          strURL = strURL.substring(0, strURL.indexOf(":"));
        }

      } catch {
        case e: Exception => println("registerUdf Exception")
      }
      strURL;
    })

  }
}

object UDFUtils {
  def apply() = new UDFUtils();
}

3、kafka数据处理

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import java.util.regex.Pattern

/**
 * @author Administrator
 */
class KakfaService {
  def kafkaDStream(ssc: StreamingContext): DStream[(String, Int)] = {
    val topics = Set("teststreaming");

    val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder");

    // Create a direct stream
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);

    val strReg = "(.*?[^0-9][1][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][^0-9]).*?";
    val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11)
      .filter(urlClickLog => {
        val url = urlClickLog(7)
        val matcher = Pattern.compile(strReg, 1).matcher(url)
        var res = ""
        while (matcher.find()) {
          res = matcher.group();
        }
        (!res.equals("") && (res.length() > 15) && !url.contains("blog") && !url.contains("bookmark") && !url.contains("tg.hexun.com") && !url.contains("yanbao"))
      }).map { urlClickLog =>
        {
          val url = urlClickLog(7);
          val matcher = Pattern.compile(strReg, 1).matcher(url);
          var res = "";
          while (matcher.find()) {
            res = matcher.group();
          }
          val artId = url.substring(res.length() - 10, res.length() - 1);
          (urlClickLog(7) + "\001" + artId, 1);
        }
      };
    return urlClickLogPairsDStream;
  }

  def kafkaDStreamForStocker(ssc: StreamingContext): DStream[(String, Int)] = {
    val topics = Set("teststreaming");
    val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder");

    // Create a direct stream
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);
    val strReg = "(.*?[^0-9][0|3|6][0][0-9][0-9][0-9][0-9]).*?";

    val urlClickLogPairsDStream = kafkaStream.map(_._2.split(" ")).filter(_.length >= 11)
      .filter(urlClickLog => {
        val requesturl = urlClickLog(7)
        var pattern = Pattern.compile(strReg, 1)
        var matcher = pattern.matcher(requesturl)
        var res = ""
        while (matcher.find()) {
          res = matcher.group();
        }

        pattern = Pattern.compile("(.*?index.*?)", 1);
        matcher = pattern.matcher(requesturl);
        val isIndex = matcher.matches();

        pattern = Pattern.compile("(.*?fund.*?)", 1);
        matcher = pattern.matcher(requesturl);
        val isFund = matcher.matches();

        val flag1 = requesturl.contains("stockdata.stock.hexun.com");
        val flag2 = requesturl.contains("http://q.m.hexun.com/stock");
        val flag3 = requesturl.contains("http://wap.hexun.com/2.0/stockinfo");
        val flag4 = requesturl.contains("http://corpwap.hexun.com/2.0/stockinfo");
        val flag5 = requesturl.contains("http://vol.stock.hexun.com");
        val flag6 = requesturl.contains("m.guba.hexun.com");
        val flag7 = requesturl.contains("http://guba.hexun.com");
        val flag12 = requesturl.contains("http://baidu.hexun.com/stock");
        val flag13 = requesturl.contains("http://yuce.stock.hexun.com/stock");
        val flag14 = requesturl.contains(",guba,");
        val flag15 = requesturl.contains("http://guba.hexun.com/d/");
        val flag16 = requesturl.contains("http://guba.hexun.com/t/");
        val length = requesturl.length();

        (!res.equals("") && !isIndex && !isFund && (flag1 || flag2 || flag3 || flag4 || flag5 || flag6 || flag12 || flag13 || (flag7 && length == 38) || (flag7 && flag14 && (!flag15 && !flag16))))
      }).map { urlClickLog =>
        {
          val url = urlClickLog(7);
          val matcher = Pattern.compile(strReg, 1).matcher(url);
          var res = "";
          while (matcher.find()) {
            res = matcher.group();
          }
          val stockId = url.substring(res.length() - 6, res.length());
          (urlClickLog(0) + "\001" + urlClickLog(1) + "\001" + urlClickLog(3) + "\001" + urlClickLog(7) + "\001" + stockId, 1);
        }
      };
    return urlClickLogPairsDStream;
  }
}

object KakfaService {
  def apply() = new KakfaService();
}

4、缓存hive的数据,并定时刷新
import java.text.SimpleDateFormat
import org.apache.kafka.clients.producer.Producer
import org.apache.spark.sql.hive.HiveContext
import java.util.Date
import org.apache.spark.storage.StorageLevel

/**
 * @author Administrator
 */
class CacheUtils {
  def cacheHiveData(sqlContext: HiveContext) {
    val cmsChannelDF = sqlContext.sql("select channel_name,channel_desc,channel_id from ods.cms_channel");
    cmsChannelDF.registerTempTable("cms_channel_tmp");
    cmsChannelDF.persist(StorageLevel.MEMORY_ONLY_2);
    cmsChannelDF.show();

    cacheEntity(sqlContext);
    new Thread(new Producer(sqlContext)).start();
  }

  def cacheEntity(sqlContext: HiveContext) {
    val sdf = new SimpleDateFormat("yyyyMMdd");
    val date = new Date();
    val lastWeek = new Date(date.getTime - 3 * 365 * 24 * 3600 * 1000l)
    val lastWeekFm = sdf.format(lastWeek)
    val lastCmsEnityDF = sqlContext.sql("select distinct entity_id,entity_desc,entity_url,entity_channel from stage.CMS_ENTITY_BY_DAY where day>='" + lastWeekFm + "' ");
    lastCmsEnityDF.unpersist();
    lastCmsEnityDF.persist(StorageLevel.MEMORY_AND_DISK);
    lastCmsEnityDF.registerTempTable("cms_entity_tmp");
    lastCmsEnityDF.show();
  }

  def cacheStockInfoData(sqlContext: HiveContext) {
    val stockInfoDF = sqlContext.sql("select code,abbr  from dms.d_stock_info");
    stockInfoDF.registerTempTable("d_stock_info_tmp");
    stockInfoDF.persist(StorageLevel.MEMORY_ONLY_2);
    stockInfoDF.show();

    new Thread(new Producer(sqlContext)).start();
  }

  class Producer(sqlContext: HiveContext) extends Runnable {
    override def run(): Unit =
      {
        while (true) {
          Thread.sleep(1000 * 60 * 31);
          cacheEntity(sqlContext);
        }
      }
  }
}

object CacheUtils {
  def apply() = new CacheUtils();
}

5、滑动窗口数据处理 reduceByKeyAndWindow应用

//4.缓存窗口函数数据处理
    val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Minutes(60 * 2),
      Seconds(25));

    //第二次消费kafka数据
    val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Minutes(60 * 48),
      Seconds(35));

6、业务逻辑处理

def urlClickCnt(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) {
    urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
      val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
        val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
        val cookieid = tuple._1.split("\001")(2);
        val url = tuple._1.split("\001")(3);
        val stockId = tuple._1.split("\001")(4);
        val click_count = tuple._2;
        Row(datetime, cookieid, url, stockId, click_count);
      });

      val structType = StructType(Array(
        StructField("datetime", StringType, true),
        StructField("cookieid", StringType, true),
        StructField("url", StringType, true),
        StructField("stockId", StringType, true),
        StructField("click_count", IntegerType, true)));

      val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);

      stockCountDF.registerTempTable("stock_click_log");

      val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15";

      val stockCntShowDF = sqlContext.sql(sql);

      stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4)));

      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      val date = new Date();
      val lastHour = new Date(date.getTime - 3600 * 1000l)
      val lastHourFm = sdf.format(lastHour)
      val last2Hour = new Date(date.getTime - 2 * 3600 * 1000l)
      val last2HourFm = sdf.format(last2Hour)

      val lastDay = new Date(date.getTime - 24 * 3600 * 1000l)
      val lastDayFm = sdf.format(lastDay)
      val lastDayS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l)
      val lastDayFmS = sdf.format(lastDayS)

      val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt,SPLYcnt  from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt,"
        .concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt,")
        .concat(" round((t1.clickcnt / (case when t3.clickcntyesdayhour is null then 1 else t3.clickcntyesdayhour end) - 1) * 100, 2) as SPLYcnt")
        .concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt")
        .concat(" from stock_click_log where datetime>='" + lastHourFm + "' ")
        .concat(" group by stockId")
        .concat(" order by clickcnt desc limit 20) t1")
        .concat(" left join (select stockId, count(stockId) clickcntyesday")
        .concat(" from stock_click_log ")
        .concat(" where datetime>='" + last2HourFm + "'  and datetime<='" + lastHourFm + "' ")
        .concat(" group by stockId) t2")
        .concat(" on t1.stockId = t2.stockId ")
        .concat(" left join (select stockId, count(stockId) clickcntyesdayhour")
        .concat(" from stock_click_log ")
        .concat(" where datetime>='" + lastDayFmS + "'  and datetime<='" + lastDayFm + "' ")
        .concat(" group by stockId) t3")
        .concat(" on t1.stockId = t3.stockId) tmp")
        .concat(" left join d_stock_info_tmp info on tmp.stockId=info.code");

      val stockCntHourDF = sqlContext.sql(hoursql);
      stockCntHourDF.show();
      stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3) + "," + x(4)));

      val jdbcUtils = JDBCUtils();
      val conn = jdbcUtils.getConn();
      val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss");

      val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark "
        .concat("(stockId,url,clickcnt,LPcnt,SPLYcnt,type,recordtime)")
        .concat(" VALUES(?,?,?,?,?,?,?)");
      var pstat = jdbcUtils.getPstat(conn, sqlStockHour);
      stockCntHourDF.collect().foreach(x =>
        {
          pstat.setString(1, x.get(0) + "");
          pstat.setString(2, x.get(1) + "");
          pstat.setInt(3, (x.getLong(2) + "").toInt);
          pstat.setFloat(4, (x.getDouble(3) + "").toFloat);
          pstat.setFloat(5, (x.getDouble(4) + "").toFloat);
          pstat.setString(6, "02");
          pstat.setString(7, stattime);
          pstat.executeUpdate();

        });
      jdbcUtils.closeConn(conn, pstat);

    })
  }

  def urlClickCntByDay(urlClickCountsDStream: DStream[(String, Int)], sqlContext: HiveContext) {
    urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
      val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
        val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
        val cookieid = tuple._1.split("\001")(2);
        val url = tuple._1.split("\001")(3);
        val stockId = tuple._1.split("\001")(4);
        val click_count = tuple._2;
        Row(datetime, cookieid, url, stockId, click_count);
      });

      val structType = StructType(Array(
        StructField("datetime", StringType, true),
        StructField("cookieid", StringType, true),
        StructField("url", StringType, true),
        StructField("stockId", StringType, true),
        StructField("click_count", IntegerType, true)));

      val stockCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);

      stockCountDF.registerTempTable("stock_click_log");

      val sql = "select datetime,cookieid,url,stockId,click_count from stock_click_log order by click_count desc limit 15";

      val stockCntShowDF = sqlContext.sql(sql);

      stockCntShowDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3)));

      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      val date = new Date();
      val lastHour = new Date(date.getTime - 24 * 3600 * 1000l)
      val lastHourFm = sdf.format(lastHour)
      val last2Hour = new Date(date.getTime - 2 * 24 * 3600 * 1000l)
      val last2HourFm = sdf.format(last2Hour)
      val last2HourS = new Date(date.getTime - 24 * 3600 * 1000l - 3600 * 1000l)
      val last2HourFmS = sdf.format(last2HourS)

      val hoursql = "select concat4(abbr,'(',stockId,')'),url,clickcnt,LPcnt  from(select t1.stockId as stockId,t1.url as url,t1.clickcnt as clickcnt,"
        .concat(" round((t1.clickcnt / (case when t2.clickcntyesday is null then 1 else t2.clickcntyesday end) - 1) * 100, 2) as LPcnt")
        .concat(" from(select stockId,concat('http://stockdata.stock.hexun.com/', stockId,'.shtml') url,count(stockId) clickcnt")
        .concat(" from stock_click_log where datetime>='" + lastHourFm + "' ")
        .concat(" group by stockId")
        .concat(" order by clickcnt desc limit 20) t1")
        .concat(" left join (select stockId, count(stockId) clickcntyesday")
        .concat(" from stock_click_log ")
        .concat(" where datetime>='" + last2HourFm + "'  and datetime<='" + lastHourFm + "' ")
        .concat(" group by stockId) t2")
        .concat(" on t1.stockId = t2.stockId ) tmp")
        .concat(" left join d_stock_info_tmp info on tmp.stockId=info.code");

      val stockCntHourDF = sqlContext.sql(hoursql);
      stockCntHourDF.show();
      stockCntHourDF.collect().foreach(x => println(x(0) + "," + x(1) + "," + x(2) + "," + x(3)));

      val jdbcUtils = JDBCUtils();
      val conn = jdbcUtils.getConn();
      val stattime = DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss");

      val sqlStockHour = "INSERT INTO stock_realtime_analysis_spark "
        .concat("(stockId,url,clickcnt,LPcnt,type,recordtime)")
        .concat(" VALUES(?,?,?,?,?,?)");
      var pstat = jdbcUtils.getPstat(conn, sqlStockHour);
      stockCntHourDF.collect().foreach(x =>
        {
          pstat.setString(1, x.get(0) + "");
          pstat.setString(2, x.get(1) + "");
          pstat.setInt(3, (x.getLong(2) + "").toInt);
          pstat.setFloat(4, (x.getDouble(3) + "").toFloat);
          pstat.setString(5, "01");
          pstat.setString(6, stattime);
          pstat.executeUpdate();

        });
      jdbcUtils.closeConn(conn, pstat);

    })
  }