044 hive与mysql两种数据源之间的join

时间:2023-03-08 20:08:27

  这篇文章是基于上一篇文章的续集

一:需求

1.图形表示

  044 hive与mysql两种数据源之间的join

二:程序

1.程序、

 package com.scala.it

 import java.util.Properties

 import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext} object HiveToMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("hive-yo-mysql")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = new HiveContext(sc)
val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456")
val props = new Properties()
props.put("user", username)
props.put("password", password) // ==================================
// 第一步:同步hive的dept表到mysql中
sqlContext
.read
.table("hadoop09.dept") // database.tablename
.write
.mode(SaveMode.Overwrite) // 存在覆盖
.jdbc(url, "mysql_dept", props) // 第二步:hive表和mysql表进行数据join操作 ==> 采用HQL语句实现
// 2.1 将mysql的数据注册成为临时表
sqlContext
.read
.jdbc(url, "mysql_dept", props)
.registerTempTable("temp_mysql_dept") // 临时表中不要出现"." // 第三步数据join
sqlContext.sql(
"""
|SELECT a.*,b.dname,b.loc
|FROM hadoop09.emp a join temp_mysql_dept b on a.deptno = b.deptno
""".stripMargin)
.write
.format("org.apache.spark.sql.execution.datasources.parquet")
.mode(SaveMode.Overwrite)
.save("/spark/join/parquet") // 检测数据是否join成功
sqlContext
.read
.format("parquet")
.load("/spark/join/parquet")
.show() }
}

2.效果

  044 hive与mysql两种数据源之间的join

三:知识点

1.format

  可以写包名。

  044 hive与mysql两种数据源之间的join