代码
读取MySQL数据表转化为DataFrame
包含三种方法:
前两种方法通过spark.read()返回一个DataFrameReader,然后format("jdbc"),设定读取格式为jdbc,配置连接信息,最后通过load()加载DataFrame。两种不同方法不同在于配置信息的方式不同。
后一种方法也通过spark.read()返回一个DataFrameReader,但直接通过jdbc()来返回DataFrame。
首先先设定一些参数如下,根据具体情况而定。
private static String driver = "com.mysql.jdbc.Driver"; private String url = "jdbc:mysql://hostname:3306"; //hostname: mysql所在的主机名或ip地址 private String db = "dbname"; //dbname: 数据库名 private String user = "username"; //username: 数据库用户名 private String pwd = "password"; //password: 数据库密码;若无密码,则为""
private SparkSession spark = SparkSession.builder() .appName("Spark") .master("mode") .getOrCreate();
方法1:通过option配置连接信息
每个option配置一个参数。
public void readTableBySparkOption(String tablename) { Dataset<Row> jdbcDF = spark.read() .format("jdbc") .option("url", url) .option("dbtable", db + "." + tablename) .option("user", user) .option("password", pwd) .load(); jdbcDF.show(); spark.stop(); }
方法2:通过options配置连接信息
创建一个包含配置信息的Map对象,作为参数传入options,一次配置多个参数。
public void readTableBySparkOptions(String tablename) { Map<String, String> map = new HashMap<String, String>(){{ put("driver", driver); put("url", url); put("dbtable", db + "." + tablename); put("user", user); put("password", pwd); }}; Dataset<Row> jdbcDF = spark.read().format("jdbc").options(map).load(); jdbcDF.show(); }
方法3:通过Properties配置连接信息
创建一个Properties对象,保存配置信息,作为jdbc()的参数。
public void readTableBySparkProperty(String tablename) { Properties connectedProperties = new Properties(); connectedProperties.put("user", user); connectedProperties.put("password", pwd); connectedProperties.put("customSchema", "id STRING, name STRING"); //用来具体化表结构,去掉不影响程序执行 Dataset<Row> jdbcDF2 = spark.read() .jdbc(url, db + "." + tablename, connectedProperties); jdbcDF2.show(); }
jdbc()读取方式分很多种,上述方式用来读取整张表,但是,如果只想读取部分数据可以通过如下方式。
Dataset<Row> jdbcDF2 = spark.read() .jdbc(url, "(select id from testdb.info) t", connectedProperties);
将DataFrame中的数据存入MySQL表
首先通过的反射的方式创建DataFrame。
假设一个info.txt中的数据为如下所示,第一列为id号,第二列为name:
0001,Alice 0002,Bob
读取info.txt,转化为RDD,然后再将每条数据转化为一个Info对象,变成JavaRDD<Info>,最后通过createDataFrame的方式将RDD转换成DataFrame。不过Spark的Java源码中实际上没有DataFrame这个类,都是统一用Dataset<Row>表示DataFrame。
public Dataset<Row> createDF() { JavaRDD<Info> infoRDD = spark.read() .textFile("info.txt") .javaRDD() .map(line -> { String[] values = line.split(","); Info info = new Info(values[0], values[1]); return info; }); Dataset<Row> infoDF = spark.createDataFrame(infoRDD, Info.class); infoDF.printSchema(); return infoDF; }
上述代码中Info类由自己根据表的结构进行定义。如下所示,可将Info定义成一个static标识的内部类,该类应用了Serializable的接口。但是,值得注意的是,必须要为所有列对应的字段(比如id,name)定义相应的get和set方法。否则,将无法完成反射。
public static class Info implements Serializable { private String id; private String name; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } Info(String id, String name) { this.id = id; this.name = name; } }
至此,可以通过createDF来得到DataFrame了,接下来将其导入MySQL的表中。存入数据的信息配置方式与读取数据时相同,在此不再详述。仅以两种方式为例说明。
方法1:通过option配置连接信息
以option()的方式为例,代码如下所示。mode()用来配置存储模式SaveMode。
public void saveDataBySparkOption(String tablename) { Dataset<Row> df = createDF(); df.write().mode(SaveMode.Append).format("jdbc") .option("url", url) .option("dbtable", db + "." + tablename) .option("user", user) .option("password", pwd) .save(); }
SaveMode有四种:
SaveMode.ErrorIfExists | 默认模式。若表不存在,创建新表,存入数据;若表存在,报异常。 |
SaveMode.Append | 若表不存在,创建新表,存入数据,若表或数据存在,新数据附加在表中,不会覆盖原数据。 |
SaveMode.Overwrite | 若表已存在,则删除该表,重新建表,存入数据。 |
SaveMode.Ignore | 若表不存在,创建新表,存入数据;若数据存在,则不再保存DataFrame的数据。 |
方法2:通过properties配置连接信息
注意,通过配置createtableColumnTypes参数可以定义表的具体结构。
public void saveDataBySparkProperty(String tablename) { Dataset<Row> df = createDF(); Properties connectedProperties = new Properties(); connectedProperties.put("user", user); connectedProperties.put("password", pwd); df.write() .option("createTableColumnTypes", "id char(20), name char(30)") .jdbc(url, db + "." + tablename, connectedProperties); }
参数配置:
用于read的参数
- partitionColumn:必须是表中的数字列。
- lowerBound和upperBound:仅用于决定分区的步幅,而不是用于过滤表中的行。表中的所有行将被分割并返回。
- fetchsize:JDBC提取大小,用于确定每次获取的行数。这可以帮助JDBC驱动程序调优性能,默认的值通常很小,Oracle默认每次提取10行。
用于write的参数
- batchsize:JDBC批量大小,用于确定每次insert的行数。这可以帮助JDBC驱动程序调优性能。默认为1000。
- isolationLevel:事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,缺省值为标准事务隔离级别READ_UNCOMMITTED。
- truncate:默认为false。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除再重建表。这样效率更高,并且可以防止表元数据(例如,索引)被删除。但是,在某些情况下该值不起作用,比如当新数据表结构与旧表不同时。
- createTableOptions:此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项。
参考文献:
http://blog.sina.com.cn/s/blog_45922fd70102x8yk.html
https://www.cnblogs.com/wwxbi/p/6978774.html
https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes