基于spark的关系型数据库到HDFS的数据导入

时间:2022-01-13 02:57:38

package com.shenyuchong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
public class App 
{
    /**
     * 作用:
     *         将关系型数据库数据导入hdfs(sql方式)
     *         支持mysql和oracle
     *         支持覆盖和追加模式
     *         支持增量导入(取checkColumn字段的最大值)
     *         支持去重导入(数据源主键inKey,hdfs表主键outKey,多字段使用concat函数(以实际数据源字段连接函数为准))
     */
    public static String ip = "127.0.0.1";
    public static String port = "3306";
    public static String baseType = "mysql";
    public static String inBase = "in_base";
    public static String userName = "un";
    public static String password = "pas";
    public static String sql = "select 1";
    public static String hdfs="hdfs://127.0.0.1:9000";
    public static String outBase = "base";
    public static String outTable = "table";
    public static String noticeUrl="http://127.0.0.1:6009/schedule/schedule/donothing";
    public static String writeMode = "append";
    public static String checkColumn = "";
    public static String inKey = "";
    public static String outKey = "";

    public static void main( String[] args )
    {
        for (int i = 0; i < args.length-1; i  ) {
            if (args[i].equals("-ip"))                     ip=args[i   1];         //数据源地址
            if (args[i].equals("-port"))                   port=args[i   1];       //数据源端口
            if (args[i].equals("-base_type"))              baseType=args[i   1];   //数据源类型
            if (args[i].equals("-in_base"))                inBase = args[i   1];   //数据源数据库名称
            if (args[i].equals("-in_key"))                 inKey = args[i   1];    //数据源主键
            if (args[i].equals("-out_key"))                outKey = args[i   1];   //HDFS表主键
            if (args[i].equals("-user_name"))              userName=args[i   1];   //数据源用户名
            if (args[i].equals("-password"))               password=args[i   1];   //数据源密码
            if (args[i].equals("-sql"))                    sql=args[i   1];        //导出语句(普通查询语句)
            if (args[i].equals("-hdfs"))                   hdfs=args[i   1];       //HDFS地址
            if (args[i].equals("-out_base"))               outBase=args[i   1];    //输出数据库名
            if (args[i].equals("-out_table"))              outTable=args[i   1];   //输出表名
            if (args[i].equals("-notice_url"))             noticeUrl=args[i   1];  //完成通知地址
            if (args[i].equals("-write_mode"))             writeMode=args[i   1];  //写入模式:overwrite|append
            if (args[i].equals("-check_column"))           checkColumn=args[i   1];//增量追加检查字段

        }
        /**
         * 必要的临时变量
         */
        SparkSession spark = SparkSession.builder().getOrCreate();
        String tmpTable = outBase "_" outTable;
        String condition = "";
        String driver = "";
        String url = "";
        /**
         * 根据数据源类型加载驱动
         */
        if ("mysql".equals(baseType.toLowerCase())) {
            driver = "com.mysql.cj.jdbc.Driver";
            url = "jdbc:mysql://"   ip   ":"   port   "/"   outBase  "?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false";
        } else if ("oracle".equals(baseType.toLowerCase())) {
            driver = "oracle.jdbc.driver.OracleDriver";
            url = "jdbc:oracle:thin:@"   ip   ":"   port   ":"   outBase;
        }
        /**
         * 写入模式:追加|覆盖
         */
        SaveMode saveMode = SaveMode.Append;
        if("overwrite".equals(writeMode))
            saveMode = SaveMode.Overwrite;

        String outSql = "select * from rdbTmpTable ";
        
        try {
            FileSystem fs = FileSystem.get(new URI(hdfs), new Configuration(), "root");
            /**
             * 检查给定库表的路径是否存在
             * 若存在则注册该路径到临时表
             * 表存在条件下checkColumn增量检查字段和inKey、outKey主键才起效,并拼装到导出语句
             */
            if(fs.exists(new Path("/user/" outBase "/" outTable))&&fs.exists(new Path("/user/" outBase "/" outTable "/_SUCCESS"))){
                spark.read().parquet(hdfs "/user/" outBase "/" outTable "/*").createOrReplaceTempView(outBase "_" outTable);
                /**
                 * 增量检查字段拼装
                 */
                if (checkColumn != null && !"".equals(checkColumn)) {
                    String lastValue = spark.sql("select max(" checkColumn ") from " outBase "_" outTable).collectAsList().get(0).get(0).toString();
                    condition = " where "   checkColumn   " >‘"   lastValue   "‘";
                }
                /**
                 * 加载远程数据源并注册临时表
                 */
                spark.read().format("jdbc").option("driver", driver).option("url", url)
                        .option("user", userName).option("password",password)
                        .option("dbtable", "(select * from ("  sql  ") tmp_table1 "   condition  ")  tmp_table2 ")
                        .load().registerTempTable("rdbTmpTable");
                /**
                 * 若inKey、outKey都不为空,添加主键约束
                 */
                if(!"".equals(inKey)&&!"".equals(outKey))
                    outSql = "select * from rdbTmpTable where " inKey " not in ( select " outKey " from " tmpTable ")";
            }
            /**
             * 打印
             */
            spark.sql("select * from rdbTmpTable").show();
            spark.sql("select " outKey " from " tmpTable).show();
            spark.sql(outSql).show();
            /**
             * 将数据写入hdfs
             */
            spark.sql(outSql).write().format("parquet").mode(saveMode).save(hdfs "/user/" outBase "/" outTable);
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 通知后续服务直到后续服务接受了请求
         */
        boolean noticed=false;
        try {
            while(!noticed){
                Thread.sleep(2000);
                noticed = connectSuccess(noticeUrl);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        spark.log().info("---------------:成功!!");
    }
    /**
     * 根据地址请求服务,请求成功则返回true
     */
    public static boolean connectSuccess(String path){
        URL url;
        try {
            url = new URL(noticeUrl);
            HttpURLConnection con = (HttpURLConnection) url.openConnection();
            if(con.getResponseCode()==200) return true;
        } catch (Exception e) {
            return false;
        }
        return false;
    }
}

maven打包后使用:

sh /opt/apps/spark/bin/spark-submit  --name mysql2hdfs --class com.gbd.App --master spark://127.0.0.1:7077  --deploy-mode client  --executor-memory 8G --total-executor-cores 4  /opt/apps/schedule/sparkrdbms2hdfs-2.0.jar -ip 127.0.0.1 -port 3306 -base_type mysql  -user_name root -password root -base_type mysql -out_base od -out_table table1 -hdfs hdfs://127.0.0.1:9000  -in_key "concat(id,datetime)" -out_key "concat(id,datetime)" -in_base ulanqab  -sql "select t.* from table1 t where datetime >=CONCAT(DATE_ADD(CURDATE(),INTERVAL 1 DAY),‘ ‘,‘00:00:00‘) and datetime <=CONCAT(DATE_ADD(CURDATE(),INTERVAL 2 DAY),‘ ‘,‘23:00:00‘)  " -notice_url http://127.0.0.1:6009/schedule/schedule/donothing

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>gbd</groupId>
    <artifactId>sparkrdbms2hdfs</artifactId>
    <version>2.0</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version><!--$NO-MVN-MAN-VER$ -->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>12.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.shenyuchong.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>