scala spark 调用hivecontext

时间:2023-12-15 13:21:20
import org.apache.spark.rdd.RDD
def save(data: RDD[ModelReplay], modelKey: String, dt: String): Unit = {

  val tableName = s"tmp.model_replay_${modelKey}_di"

  val hiveContext = new org.apache.spark.sql.hive.HiveContext(data.sparkContext)

  import hiveContext.implicits._
data.toDF().registerTempTable("result_table") val sql_create_table = s"create table if not exists ${tableName}( " +
s"order_id bigint ," +
s"pctr double " +
s" )partitioned by (dt string comment '') stored as orc" val sql_insert = s"insert into ${tableName} partition(dt='${dt}') select order_id,pre from result_table" println("sql_create_table="+sql_create_table)
// 创建表
hiveContext.sql(sql_create_table) println("sql_insert="+sql_insert)
// 插入数据
hiveContext.sql(sql_insert) }