Spark 2.3.0操作MySQL

时间:2021-04-14 13:46:06

代码

读取MySQL数据表转化为DataFrame

包含三种方法:

前两种方法通过spark.read()返回一个DataFrameReader,然后format("jdbc"),设定读取格式为jdbc,配置连接信息,最后通过load()加载DataFrame。两种不同方法不同在于配置信息的方式不同。

后一种方法也通过spark.read()返回一个DataFrameReader,但直接通过jdbc()来返回DataFrame。

首先先设定一些参数如下,根据具体情况而定。

private static String driver = "com.mysql.jdbc.Driver";  
private String url = "jdbc:mysql://hostname:3306";  //hostname: mysql所在的主机名或ip地址  
private String db = "dbname";  //dbname: 数据库名  
private String user = "username"; //username: 数据库用户名  
private String pwd = "password"; //password: 数据库密码;若无密码,则为""  
private SparkSession spark = SparkSession.builder()
            .appName("Spark")
            .master("mode")
            .getOrCreate();

方法1:通过option配置连接信息

每个option配置一个参数。

public void readTableBySparkOption(String tablename) {
        Dataset<Row> jdbcDF = spark.read()
                .format("jdbc")
                .option("url", url)
                .option("dbtable", db + "." + tablename)
                .option("user", user)
                .option("password", pwd)
                .load();
        jdbcDF.show();
        spark.stop();
    }

方法2:通过options配置连接信息

创建一个包含配置信息的Map对象,作为参数传入options,一次配置多个参数。

public void readTableBySparkOptions(String tablename) {
        Map<String, String> map = new HashMap<String, String>(){{
            put("driver", driver);
            put("url", url);
            put("dbtable", db + "." + tablename);
            put("user", user);
            put("password", pwd);
        }};
        Dataset<Row> jdbcDF = spark.read().format("jdbc").options(map).load();
        jdbcDF.show();
    }

方法3:通过Properties配置连接信息

创建一个Properties对象,保存配置信息,作为jdbc()的参数。

public void readTableBySparkProperty(String tablename) {
        Properties connectedProperties = new Properties();
        connectedProperties.put("user", user);
        connectedProperties.put("password", pwd);
        connectedProperties.put("customSchema", "id STRING, name STRING");  //用来具体化表结构,去掉不影响程序执行
        Dataset<Row> jdbcDF2 = spark.read()
                .jdbc(url, db + "." + tablename, connectedProperties);
        jdbcDF2.show();
    }

jdbc()读取方式分很多种,上述方式用来读取整张表,但是,如果只想读取部分数据可以通过如下方式。

Dataset<Row> jdbcDF2 = spark.read()
                .jdbc(url, "(select id from testdb.info) t", connectedProperties);

将DataFrame中的数据存入MySQL表

首先通过的反射的方式创建DataFrame。

假设一个info.txt中的数据为如下所示,第一列为id号,第二列为name:

0001,Alice
0002,Bob

读取info.txt,转化为RDD,然后再将每条数据转化为一个Info对象,变成JavaRDD<Info>,最后通过createDataFrame的方式将RDD转换成DataFrame。不过Spark的Java源码中实际上没有DataFrame这个类,都是统一用Dataset<Row>表示DataFrame。

public Dataset<Row> createDF() {
        JavaRDD<Info> infoRDD = spark.read()
                .textFile("info.txt")
                .javaRDD()
                .map(line -> {
                    String[] values = line.split(",");
                    Info info = new Info(values[0], values[1]);
                    return info;
                });
        Dataset<Row> infoDF = spark.createDataFrame(infoRDD, Info.class);
        infoDF.printSchema();
        return infoDF;
    }

上述代码中Info类由自己根据表的结构进行定义。如下所示,可将Info定义成一个static标识的内部类,该类应用了Serializable的接口。但是,值得注意的是,必须要为所有列对应的字段(比如id,name)定义相应的get和set方法。否则,将无法完成反射。

public static class Info implements Serializable {
        private String id;
        private String name;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        Info(String id, String name) {
            this.id = id;
            this.name = name;
        }
    }

至此,可以通过createDF来得到DataFrame了,接下来将其导入MySQL的表中。存入数据的信息配置方式与读取数据时相同,在此不再详述。仅以两种方式为例说明。

方法1:通过option配置连接信息

以option()的方式为例,代码如下所示。mode()用来配置存储模式SaveMode。

public void saveDataBySparkOption(String tablename) {
        Dataset<Row> df = createDF();
        df.write().mode(SaveMode.Append).format("jdbc")
                .option("url", url)
                .option("dbtable", db + "." + tablename)
                .option("user", user)
                .option("password", pwd)
                .save();
    }

SaveMode有四种:

SaveMode
SaveMode.ErrorIfExists 默认模式。若表不存在,创建新表,存入数据;若表存在,报异常。
SaveMode.Append 若表不存在,创建新表,存入数据,若表或数据存在,新数据附加在表中,不会覆盖原数据。
SaveMode.Overwrite 若表已存在,则删除该表,重新建表,存入数据。
SaveMode.Ignore 若表不存在,创建新表,存入数据;若数据存在,则不再保存DataFrame的数据。

方法2:通过properties配置连接信息

注意,通过配置createtableColumnTypes参数可以定义表的具体结构。

public void saveDataBySparkProperty(String tablename) {
        Dataset<Row> df = createDF();
        Properties connectedProperties = new Properties();
        connectedProperties.put("user", user);
        connectedProperties.put("password", pwd);
        df.write()
                .option("createTableColumnTypes", "id char(20), name char(30)")
                .jdbc(url, db + "." + tablename, connectedProperties);
    }

参数配置:

用于read的参数

  • partitionColumn:必须是表中的数字列。
  • lowerBoundupperBound:仅用于决定分区的步幅,而不是用于过滤表中的行。表中的所有行将被分割并返回。
  • fetchsize:JDBC提取大小,用于确定每次获取的行数。这可以帮助JDBC驱动程序调优性能,默认的值通常很小,Oracle默认每次提取10行。

用于write的参数

  • batchsize:JDBC批量大小,用于确定每次insert的行数。这可以帮助JDBC驱动程序调优性能。默认为1000。
  • isolationLevel:事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,缺省值为标准事务隔离级别READ_UNCOMMITTED。
  • truncate:默认为false。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除再重建表。这样效率更高,并且可以防止表元数据(例如,索引)被删除。但是,在某些情况下该值不起作用,比如当新数据表结构与旧表不同时。
  • createTableOptions:此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项。

参考文献:

http://blog.sina.com.cn/s/blog_45922fd70102x8yk.html

https://www.cnblogs.com/wwxbi/p/6978774.html

https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes