详解数据挖掘

时间:2024-03-15 15:56:19

        数据挖掘(Data Mining),又译为资料探勘、数据采矿,是数据库知识发现(Knowledge-Discovery in Databases,简称:KDD)中的一个步骤。数据挖掘主要是指从大量的数据中,通过算法自动搜索隐藏于其中的特殊关系型信息的过程。在技术层面上,数据挖掘涉及从大量的、不完全的、有噪声的、模糊的和随机的数据中,提取出隐含的、事先未知的但具有潜在价值和有用信息的过程。

        数据挖掘通常与计算机科学紧密相关,它利用统计、在线分析处理、情报检索、机器学习、专家系统(依靠过去的经验法则)和模式识别等多种方法来实现其目标。数据挖掘过程涵盖了从定义目标、获取数据、数据探索(包括数据质量分析和数据特征分析)到数据预处理(包括数据清洗、数据集成、数据变换和数据规约)等一系列步骤。

        数据挖掘目前是人工智能和数据库领域研究的热点问题,其应用领域广泛,包括情报检索、情报分析、模式识别等。通过高度自动化地分析企业的数据,数据挖掘能够做出归纳性的整理,从中挖掘出潜在的模式,从而帮助决策者调整市场策略,减少风险。

        数据挖掘技术融合了统计学、人工智能、模式识别、机器学习、数据库和可视化技术等多个领域的知识和方法,使得人们能够更有效地从海量数据中提取有价值的信息和知识。随着技术的不断发展,数据挖掘将在更多领域发挥重要作用,为决策提供更加科学、准确的支持。  

        而在大数据的比赛中数据挖掘占有非常中要的地位,但也是最为困难的模块,通常数据挖掘模块与能否进入国赛有很大的关联。        

        以下是相应的代码:

  1. 根据Hive的dwd库中相关表或MySQL中shtd_store中相关表(order_detail、sku_info),计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),将10位用户id进行输出,若与多个用户购买的商品种类相同,则输出结果按照用户id升序排序,输出格式如下,将结果截图粘贴至客户端桌面【Release\任务C提交结果.docx】中对应的任务序号下;

结果格式如下:

-------------------相同种类前10的id结果展示为:--------------------

1,2,901,4,5,21,32,91,14,52

package com.rmx.gz.spark.gz2.excavate

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoder, StandardScaler, VectorAssembler}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{asc, avg, col, desc, udf}
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}

import java.util.Properties
import scala.+:

/**
 * 数据挖掘
 */
object Excavate {
  val mysqlUrl = "jdbc:mysql://bigdata1:3306/ds_pub?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useUnicode=true"
  System.setProperty("HADOOP_USER_NAME", "root")
  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("extraction")
      .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val mysqlProperties: Properties = new Properties()
    mysqlProperties.setProperty("user", "root")
    mysqlProperties.setProperty("password", "123456")
    val order_info = sparkSession.read.jdbc(mysqlUrl, "order_info", properties = mysqlProperties)

    val sku_info = sparkSession.read.jdbc(mysqlUrl, "sku_info", properties = mysqlProperties)

    val order_detail = sparkSession.read.jdbc(mysqlUrl, "order_detail", properties = mysqlProperties)

    val user_info = sparkSession.read.jdbc(url = mysqlUrl, table = "user_info", properties = mysqlProperties)

    /**
     * 宽表,普通join连接,剔除事实表中不存在现有维表中的记录
     */
    val width_table = order_info
      .join(order_detail, order_detail("order_id") === order_info("id"))
      .join(sku_info, sku_info("id") === order_detail("sku_id"))
      .join(user_info, order_info("user_id") === user_info("id"))

    /**
     * 与用户id为6708的用户所购买相同商品最多的前10位用户
     */
    // 用户8880购买过的商品(去重)
    val user8880BuySkuIds = width_table
      .filter(order_info("user_id") === 8880L)
      .select(order_detail("sku_id"))
      .dropDuplicates("sku_id")
      .collect()
      .map(row => row.getLong(0))
    // 与用户id为8880的用户所购买相同商品最多的前10位用户
    val top10User = width_table
        .filter(order_info("user_id") =!= 8880L && order_detail("sku_id").isin(user8880BuySkuIds: _*))
      .select(order_info("user_id"), order_detail("sku_id"))
      .dropDuplicates("user_id", "sku_id")
      .groupBy("user_id")
      .agg(functions.count("*").as("count"))
      .orderBy(desc("count"), asc("user_id"))
      .limit(10)
      .collect()
      .map(row => row.getLong(0))

    println("-------------------相同种类前10的id结果展示为:--------------------")
    println(top10User.mkString(","))

    /**
     * 商品表信息预处理
     */
    val sku_dimension_table = width_table
      .select(sku_info("*"),order_info("user_id"))
      .drop("sku_name", "sku_desc", "sku_default_img")
      .filter(col("id").isin(user8880BuySkuIds: _*) || col("user_id").isin(top10User: _*))
      .dropDuplicates( "id")


    val assemblerPrice = new VectorAssembler().setInputCols(Array("price")).setOutputCol("vector_price")
    val scalerPrice = new StandardScaler().setInputCol("vector_price").setOutputCol("standard_price")
    val assemblerWeight = new VectorAssembler().setInputCols(Array("weight")).setOutputCol("vector_weight")
    val scalerWeight = new StandardScaler().setInputCol("vector_weight").setOutputCol("standard_weight")
    val oneHotEncoder = new OneHotEncoder()
      .setInputCols(Array("spu_id", "tm_id", "category3_id"))
      .setOutputCols(Array("one_hot_spu_id", "one_hot_tm_id", "one_hot_category3_id"))
      .setDropLast(false)

    // 处理管道
    val pipeline = new Pipeline()

    val pretreatment_sku_info = pipeline
      .setStages(Array(assemblerPrice, assemblerWeight, scalerPrice, scalerWeight, oneHotEncoder))
      .fit(sku_dimension_table)
      .transform(sku_dimension_table)
    // 稀疏向量转值数组
    val sparseVectorToArray = udf { x: SparseVector => x.toArray }
    // 稠密向量转数组
    val denseVectorToArray = udf { x: DenseVector => x.toArray }

    import sparkSession.implicits._

    val task2Show = pretreatment_sku_info
      .orderBy("id")
      .limit(1)
      .select(
        col("id").cast(DoubleType) +:
          denseVectorToArray(col("standard_price"))(0).as("price") +:
          denseVectorToArray(col("standard_weight"))(0).as("weight") +:
          ((0 until 12).map((i: Int) => sparseVectorToArray(col("one_hot_spu_id"))(i) as s"spu_id#${i + 1}") ++
            (0 until 7).map((i: Int) => sparseVectorToArray(col("one_hot_tm_id"))(i) as s"tm_id#${i + 1}") ++
            (0 until 803).map((i: Int) => sparseVectorToArray(col("one_hot_category3_id"))(i) as s"category3_id#${i + 1}")): _*
      )

    val elements = task2Show
      .limit(1)
      .select(task2Show.columns.slice(0, 10).map(col(_: String)): _*)
      .collect()
      .head
    println("--------------------第一条数据前10列结果展示为:---------------------")
    println(elements.mkString(","))

    /**
     * 推荐
     */
    val cosineSimilarity: UserDefinedFunction = udf((v1: SparseVector, v2: SparseVector) => {
      //分子
      val member: Double = v2.toArray.zip(v1.toArray).map((num: (Double, Double)) => num._1 * num._2).sum
      // 分母
      val temp1: Double = math.sqrt(v2.toArray.map((num: Double) => math.pow(num, 2)).sum)
      val temp2: Double = math.sqrt(v1.toArray.map((num: Double) => math.pow(num, 2)).sum)
      val deno: Double = temp1 * temp2
      //余弦
      member / deno
    })
    val sku_vector: DataFrame = new VectorAssembler()
      .setInputCols(Array("standard_price", "standard_weight", "one_hot_spu_id", "one_hot_tm_id", "one_hot_category3_id"))
      .setOutputCol("vector")
      .transform(pretreatment_sku_info)
      .select("id", "vector")
    val user_8880_sku_info = sku_vector
      .filter(col("id").isin(user8880BuySkuIds: _*))
      .select(col("vector").as("v1"))

    val result = sku_vector
      .filter(!col("id").isin(user8880BuySkuIds: _*))
      .crossJoin(user_8880_sku_info)
      .withColumn("cosineSimilarity", cosineSimilarity(col("vector"), col("v1")))
      .groupBy("id")
      .agg(avg("cosineSimilarity") as "cosineSimilarity")
      .orderBy(desc("cosineSimilarity"))


    val tuples: Array[(String, String)] = result
      .collect()
      .map((r: Row) => (r(0).toString, r(1).toString))
      .slice(0, 6)
    println("------------------------推荐Top5结果如下------------------------")
    for (i <- tuples.indices) {
      println(s"相似度top${i + 1}(商品id:${tuples(i)._1},平均相似度:${tuples(i)._2})")
    }

  }
}

        近年来,数据挖掘引起了信息产业界的极大关注,其主要原因是存在大量数据,可以广泛使用,并且迫切需要将这些数据转换成有用的信息和知识。获取的信息和知识可以广泛用于各种应用,包括商务管理、生产控制、市场分析、工程设计和科学探索等。数据挖掘利用了来自如下一些领域的思想:①来自统计学的抽样、估计和假设检验;②人工智能、模式识别和机器学习的搜索算法建模技术学习理论。数据挖掘也迅速地接纳了来自其他领域的思想,这些领域包括最优化、进化计算信息论信号处理、可视化和信息检索。一些其他领域也起到重要的支撑作用。特别地,需要数据库系统提供有效的存储、索引和查询处理支持。源于高性能(并行)计算的技术在处理海量数据集方面常常是重要的。分布式技术也能帮助处理海量数据,并且当数据不能集中到一起处理时更是至关重要。

数据挖掘的系统结构