使用spark将MySQL数据导入hive

时间:2025-03-23 08:28:35
object spark_from_mysql_to_hive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = ().master(“local[*]”).enableHiveSupport()
.config(“”, “100”)
.config(“”, “100”)
.config(“”, “thrift://ip:9083”)
//由于 Hive 和 SparkSQL 在 Decimal 类型上使用了不同的转换方式写入 Parquet,
// 导致 Hive 无法正确读取 SparkSQL 所导入的数据。对于已有的使用 SparkSQL 导入的数据,
// 如果有被 Hive/Impala 使用的需求,建议加上 =true,重新导入数据。
.config(“”, true)
.appName(“mysql_to_hive”).getOrCreate();
import ._
/**
* 使用临时表,建hive表,用insert into语句插入
* 直接用saveAsTable生成hive表,之前要执行
*/
val jdbcDF: DataFrame = (“jdbc”)
.option(“url”,“jdbc:mysql://ip:3306/finance? characterEncoding=utf-8&serverTimezone=UTC&useSSL=false”)
.option(“driver”,“”)
.option(“dbtable”,“base_store”)
.option(“user”,“root”)
.option(“password”,“d0sD6Ffs7sGkHDF8mPnHJ2cl”)
.load();
// (“create table finance.base_store( id int, student String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’;”);
();
(“temp”);
println("+"*500);
(“select id,store_brand from temp”).show();
(“use finance”);
// (“set =false”);
(“drop table if exists base_store”);
(“CREATE TABLE if not exists finance.base_store(id int,store_brand String)ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’”);
println(“开始导入”);
// (“insert into finance.base_store select id,store_brand from temp”);
// 在saveAsTable之前要执行
val df = (“temp”);
// (“”, true);
().saveAsTable(“hive_records”);
(“select * from hive_records”).show();
// (“hive”).saveAsTable(“base_store”);
// ().saveAsTable(“base_store”);
// (“Overwrite”).saveAsTable(“base_store”);