Sqoop 使用详解(内含对官方文档的解析)

时间:2020-12-05 08:29:05

Sqoop 是 Cloudera 公司创造的一个数据同步工具,现在已经完全开源了。

目前已经是 hadoop 生态环境中数据迁移的首选,另外还有 ali 开发的 DataX 属于同类型工具,由于社区的广泛使用和文档的健全,调研之后决定使用 Sqoop 来做我们之后数据同步的工具。

我们首先来看下 Sqoop 的工作流

Sqoop 使用详解(内含对官方文档的解析)

他将我们传统的关系型数据库 | 文件型数据库 | 企业数据仓库 同步到我们的 hadoop 生态集群中。

同时也可以将 hadoop 生态集群中的数据导回到传统的关系型数据库 | 文件型数据库 | 企业数据仓库中。

那么 Sqoop 如何抽取数据呢

Sqoop 使用详解(内含对官方文档的解析)

1. 首先 Sqoop 去 rdbms 抽取元数据。

2. 当拿到元数据之后将任务切成多个任务分给多个 map。

3. 然后再由每个 map 将自己的任务完成之后输出到文件。

Sqoop import Command:

先从最简单的任务开始

sqoop import\
--connect jdbc:mysql://10.66.38.125:3306/user_db \
--username cloudera \
--password secretkey \
--table department \
--target-dir /sqoopdata/departments \      # HDFS 的目标存储位置
--where "department_id = 1000" \         # 指定条件,只有达成这个条件的才会被 import 进来
-- m 1

就这个语句就可以将我们关系型数据库中的某个表 import 进 HDFS 的某个位置。

同样我们可以 import 某些字段进来生成文件

sqoop import \
--connect jdbc:mysql://localhost:3306/retry_db \
--username cloudera \
--password secret \
--table departments \
--columns "dept_id, name" \  # 指定需要的字段
--as-avrodatafile        # 指定存成 avro 数据文件

如果我们要 import 一个库里面的所有表可以使用

sqoop import-all-tables \
--connect jdbc:mysql://localhost:3306/retry_db \
--username cloudera \
--password secret \
--warehouse-dir /mydata # HDFS parent for table 这个会将所有这些表都放到 HDFS 这个文件夹下面

Sqoop import Command:

我们将数据从 Hadooop HDFS 导出向 RDBMS

sqoop export \
--connect jdbc:mysql://localhost:3306/retry_db \
--username cloudera \
--password departments \
--export-dir /sqoopdata/departments \    # HDFS source path for the export
--table departments

Sqoop Job:

Sqoop 提供一种能力,可以把我们经常会执行的任务存储成 jobs. 这些 jobs 可以在未来任何一个时间点被我们拿来使用。

sqoop job \
--create job_name \
--import \
--connect jdbc:mysql://localhost:3306/retry_db \
--username cloudera \
--password departments

常用姿势上面就介绍完了,当我们需要将 MySQL 数据同步到 Hive 去的时候如果表还没有创建我们只需要执行:

sudo-u hive sqoop import \
--connect jdbc:mysql://10.66.38.15:3306/user \      # 连接需要被同步的 MySQL
--username xxx \
--password xxx \
--table user \                         # 需要被同步的表
--delete-target-dir \                     # 之前有同步的文件已经存在删除掉- m \                             # 开一个 map 这个值得注意,不是每个 source 表都可以轻松被分为多个 map 的。如果你这里要填写非 的数最好去熟悉一些细节
--hive-import \                        
--hive-tableuser.user \
--create-hive-table \                     # 创建 hive 表
--hive-drop-import-delims                  # Drops \n, \r, and \ from string fields when importing to Hive.

如果是表已经创建好而需要全量同步数据执行:

sudo -u hive sqoop import\
--connect jdbc:mysql://10.66.38.125:16033/user \
--username xxx \
--password xxx \
--table user \
--delete-target-dir \
--hive-overwrite \          # 全量重写数据
- m 1 \
--hive-import \
--hive-table user.user \
--hive-drop-import-delims

同样的 Sqoop 还支持 Hive 的增量同步。但是基于 mapreduce 的全量同步速度也快得超出想象。实测在三机集群上(12核 | 32内存)机器上1分钟基本能完成对 20 个字段左右的表 250w 条记录的抽取。并且对目标数据库机器的压力不算大。是非常理想的大数据同步工具。

Sqoop 的配置参数非常之多,在使用的时候建议先撸一遍文档(文档不长大概撸一次 2 3 个小时左右),找到自己需要注意的地方和更多适合自己的功能在使用的时候就能避免踩坑。比如上面使用的   hive-drop-import-delims 参数的使用就是还没看完文档就使用造成的坑。我们数据库中有字段没有过滤 \n 。有个用户的名字被误操作使用 \n 开头导致 hive 误以为遇到了换行符,该数据不仅错乱而且后面字段全部被置为 NULL。要避免这种问题一方面要对这个使用链上各个组件有所了解,更是应该读一读文档可以最大程度的避免踩坑。

----------------------------------------------------------分割线----------------------------------------------------------

下面将纪录一下我全量阅读 Sqoop 文档觉得需要纪录的一些东西。

7.2. 语法(Syntax)

首先我们上面看到命令 Sqoop Command 这个 Command 其实是指定 Sqoop 使用哪种 Tool 。

$ sqoop help
usage: sqoop COMMAND [ARGS] Available commands:
codegen Generate code to interact with database records
create-hive-table Import a table definition into Hive
eval Evaluate a SQL statement and display the results
export Export an HDFS directory to a database table
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
import-mainframe Import mainframe datasets to HDFS
list-databases List available databases on a server
list-tables List available tables in a database
version Display version information See 'sqoop help COMMAND' for information on a specific command.

可以看到我上面举例的所有内容都只是简单的使用到了 export 和 import 还有 import-all-tables  工具。 还有非常多的工具没有使用到。

因为 sqoop 是依赖 hadoop 生态的关系,所以也有响应的查找链,因为使用了 CDH 大礼包,所以我只是简单的安装了一下,相关的依赖都已经被配置好了包括 path

lrwxrwxrwx  root root  Nov  : /usr/bin/sqoop -> /etc/alternatives/sqoop

7.2.1 连接数据库服务器(Connecting to a Database Server)

下面我们在使用 import tool 的时候遵循这个原则:

sqoop import (generic-args) (import-args)
sqoop-import (generic-args) (import-args)
While the Hadoop generic arguments must precede any import arguments, you can type the import arguments in any order with respect to one another.

当我们在写语句的时候应该首先使用了 generic-args 参数可以是以下的参数。

Argument Description
--connect <jdbc-uri> Specify JDBC connect string
--connection-manager <class-name> Specify connection manager class to use
--driver <class-name> Manually specify JDBC driver class to use
--hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME
--help Print usage instructions
--password-file Set path for a file containing the authentication password
-P Read password from console
--password <password> Set authentication password
--username <username> Set authentication username
--verbose Print more information while working
--connection-param-file <filename> Optional properties file that provides connection parameters
--relaxed-isolation Set connection transaction isolation to read uncommitted for the mappers.

后面的 import args 可选项就非常丰富。

比如可以导入校验使用的 class 删除控制参数啥的。

Argument Description
--validate Enable validation of data copied, supports single table copy only.
--validator <class-name> Specify validator class to use.
--validation-threshold <class-name> Specify validation threshold class to use.
--validation-failurehandler <class-name> Specify validation failure handler class to use.
Argument Description
--append Append data to an existing dataset in HDFS
--as-avrodatafile Imports data to Avro Data Files
--as-sequencefile Imports data to SequenceFiles
--as-textfile Imports data as plain text (default)
--as-parquetfile Imports data to Parquet Files
--boundary-query <statement> Boundary query to use for creating splits
--columns <col,col,col…> Columns to import from table
--delete-target-dir Delete the import target directory if it exists
--direct Use direct connector if exists for the database
--fetch-size <n> Number of entries to read from database at once.
--inline-lob-limit <n> Set the maximum size for an inline LOB
-m,--num-mappers <n> Use n map tasks to import in parallel
-e,--query <statement> Import the results of statement.
--split-by <column-name> Column of the table used to split work units. Cannot be used with --autoreset-to-one-mapper option.
--split-limit <n> Upper Limit for each split size. This only applies to Integer and Date columns. For date or timestamp fields it is calculated in seconds.
--autoreset-to-one-mapper Import should use one mapper if a table has no primary key and no split-by column is provided. Cannot be used with --split-by <col> option.
--table <table-name> Table to read
--target-dir <dir> HDFS destination dir
--temporary-rootdir <dir> HDFS directory for temporary files created during import (overrides default "_sqoop")
--warehouse-dir <dir> HDFS parent for table destination
--where <where clause> WHERE clause to use during import
-z,--compress Enable compression
--compression-codec <c> Use Hadoop codec (default gzip)
--null-string <null-string> The string to be written for a null value for string columns
--null-non-string <null-string> The string to be written for a null value for non-string columns

7.2.3 *的表查询导入(Free-form Query Imports)

包括支持 free-form query .使用 --query 参数然后写一个 sql 来过滤自己想要 import 的数据 just like

$ sqoop import \
--query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
--split-by a.id --target-dir /user/foo/joinresults

这个使用方法必须要使用 --target-dir

7.2.4 掌控并行处理(Controlling Parallelism)

如果需要控制并行操作普遍使用的是 -m 参数,--num-mapers参数。我们可以显示的指定使用的用来并行分配的键,使用例如 --split-by employee_id 达到目标。

如果说我们没有使用 --split-by 参数主键也不是 int 型,可能会导致指定 -m 大于 1 的时候出问题。因为程序没有办法知道应该根据哪个键来分配 map 任务。

另外我们可以使用 --autoreset-to-one-mapper 选项 --autoreset-to-one-mapper Import should use one mapper if a table has no primary key and no split-by column is provided. Cannot be used with --split-by <col> option.

7.2.5 掌控分布式缓存(Controlling Distributed Cache)

使用 Oozie 调起 Sqoop job 执行任务的时候要注意一个 Controlling Distributed Cache 的问题。在第一个Sqoop作业期间,Oozie只会在每个工作节点上对Sqoop依赖项进行一次本地化,并会在工作节点上重用jar来执行子节点作业。在Oozie启动Sqoop命令时使用option - skip-dist-cache,可以跳过Sqoop将依赖项复制到作业缓存并保存大量I/O的步骤。达到优化的目的。

7.2.6 掌控导入过程(Controlling the Import Process)

在控制导入的过程中也有很多优化的地方可以做,例如我们在对关系行数据库 MySQL 进行导入的时候,可以通过使用关键字 --direct 加速导入的速度。他的原理是默认情况下我们会使用 JDBC 对数据库进行连接,但是有一些数据库提供了更高性能可以指定数据库进行转移的工具。比如 MySQL 提供的 MySQL 提供的工具 mysqldump 使用 --direct 参数就可以尝试让 Sqoop 使用这种方式去导出数据,可能会得到更高的效能。当我们在使用 --direct option 的时候还可以传递一些潜在的参数给这个命令类似于这样 将命令跟在 -- 后面

$ sqoop import --connect jdbc:mysql://server.foo.com/db --table bar \
--direct -- --default-character-set=latin1

就可以将后面的 --default-character-set=latin1 传递给 mysqldump 。

在 import 表的时候有两个指定路径的参数是冲突的  --warehouse-dir 和 --target-dir 都用于指定将生成的表放到指定的这个目录下面。他们俩是冲突的,指定其中一个 option 即可。

在默认情况下 import 这个工具都会将表导到一个新的路径下面。如果路径下面已经有相同名字的文件存在了,将会被拒绝导入。

如果使用 --append 参数 Sqoop将会将文件导入到临时的文件目录,然后重命名该文件成不与目标文件夹里面名字冲突的名字。

7.2.7 掌控事务隔离级别(Controlling transaction isolation)

Sqoop 提供读取数据库 read-uncommitted 事务的能力,只需要带上参数 --relaxed-isolation 即可。这个操作真是非常骚啊,一般应该不会用到而且也不是所有数据库都支持,比如官方文档说 ORACLE 就是不支持的。

7.2.8 掌控 mapping 时候的字段类型(Controlling type mapping

可以对指定同步的表进行 schema 的映射转换,并且可以指定通过 java 或者 hive 类型的转换。例如:

Argument    Description
--map-column-java <mapping> Override mapping from SQL to Java type for configured columns.
--map-column-hive <mapping> Override mapping from SQL to Hive type for configured columns. Sqoop is expecting comma separated list of mapping in form <name of column>=<new type>. For example: $ sqoop import ... --map-column-java id=String,value=Integer

另外需要注意的是  --map-column-hive 使用该参数需要使用 urlencode 对转换 key value 进行转换。例如

use DECIMAL(%2C%) instead of DECIMAL(, )

如果转换不正确,Sqoop 会 raise exception

7.2.10 增量更新(Incremental Imports)

关于使用 Sqoop 进行增量更新处理, Sqoop 提供了三个字段来处理增量更新相关的内容

Argument Description
--check-column (col) Specifies the column to be examined when determining which rows to import. (the column should not be of type CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
--incremental (mode) Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value) Specifies the maximum value of the check column from the previous import.

Sqoop 本身支持两种不同的方式进行增量更新,分别是 append 和 lastmodified 我们使用 --incremental 参数去指定要使用的增量更新类型。

增量更新的文章有很多基本上建立在两个基础上。(之前的数据如果被 update 没有办法通过这两种增量更新机制被更新)

1. 可以提供类似于自增 id 这样的字段,并且小于这个点的字段可以从上次这个点位继续往后增加。使用 --last-value 需要注意的是可以使用 Sqoop job 在第一次指定了开始的 last-value 值之后 Sqoop 会保存下来这次执行完之后 last-value 值的节点,下次执行的时候会基于这个继续执行。

2. 可以提供一个最后修改的字段,例如 update_time 这样的字段,所有大于这个 update_time 时间的字段将在下个节点被增量追加到后面。--check-column update_time

7.2.11 文档格式化(File Formats)

我们通常导入两种格式的文件形式,一种是 textfile 也是默认类型。还有一种是 SequenceFiles

我们可以通过指定 --as-textfile 参数显示指定使用 textfile 导入。textfile 又称 delimited text 在非二进制数据情况下非常通用,而且很容易支持类似于像 Hive 这种数据库表的生成。

SequenceFiels 是一种二进制格式用于往自定义的记录指定的 data types 中存储独立的记录。这些 data types 表现为 java 的类。

另外我们也可以使用表协议 比如我们可以使用 Apache Avro。

默认情况下 Sqoop 不会帮我们压缩文件使用 -z 或者 --compress 参数或者使用其他压缩参数比如 --compression-codec 对 SequenceFile text 或者 Avro 文件进行压缩。

7.2.12. 大的对象的处理(Large Objects)

Sqoop 对 blob 和 clob  columns 都有特别的处理方式。他们尽量不要像常规字段这样全部 load 进内存进行操作。而是使用流式的方法来进行处理,并且和其他数据进行内联存储。(这一块我完全没有看懂是什么意思,水平不够可以自行前往官方文档查看。。。。。。)

Table 6. Output line formatting arguments:

Argument Description
--enclosed-by <char> Sets a required field enclosing character
--escaped-by <char> Sets the escape character
--fields-terminated-by <char> Sets the field separator character
--lines-terminated-by <char> Sets the end-of-line character
--mysql-delimiters Uses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: '
--optionally-enclosed-by <char> Sets a field enclosing character

默认情况下 Sqoop 会使用逗号 comma(,) 来作为字段之间的分隔符,使用换行符 \n 来区别每一条记录。

Sqoop 官方文档推荐我们使用 unambiguous 也就是显示清晰的去指定字段分隔符和行分隔符。比如直接使用 --mysql-delimiters

下面的叙述我想了很久想翻译成中文我都觉得不是很直接 所以还是直接贴文档吧。

If unambiguous delimiters cannot be presented, then use enclosing and escaping characters. The combination of (optional) enclosing and escaping characters will allow unambiguous parsing of lines. For example, suppose one column of a dataset contained the following values:

Some string, with a comma.
Another "string with quotes"

The following arguments would provide delimiters which can be unambiguously parsed:

$ sqoop import --fields-terminated-by , --escaped-by \\ --enclosed-by '\"' ...

(Note that to prevent the shell from mangling the enclosing character, we have enclosed that argument itself in single-quotes.)

The result of the above arguments applied to the above dataset would be:

"Some string, with a comma.","1","2","3"...
"Another \"string with quotes\"","4","5","6"...

Here the imported strings are shown in the context of additional columns ("1","2","3", etc.) to demonstrate the full effect of enclosing and escaping. The enclosing character is only strictly necessary when delimiter characters appear in the imported text. The enclosing character can therefore be specified as optional:

$ sqoop import --optionally-enclosed-by '\"' (the rest as above)...

Which would result in the following import:

"Some string, with a comma.",1,2,3...
"Another \"string with quotes\"",4,5,6...
Sqoop 使用详解(内含对官方文档的解析) Note

Even though Hive supports escaping characters, it does not handle escaping of new-line character. Also, it does not support the notion of enclosing characters that may include field delimiters in the enclosed string. It is therefore recommended that you choose unambiguous field and record-terminating delimiters without the help of escaping and enclosing characters when working with Hive; this is due to limitations of Hive’s input parsing abilities.

The --mysql-delimiters argument is a shorthand argument which uses the default delimiters for the mysqldump program. If you use the mysqldump delimiters in conjunction with a direct-mode import (with --direct), very fast imports can be achieved.

While the choice of delimiters is most important for a text-mode import, it is still relevant if you import to SequenceFiles with --as-sequencefile. The generated class' toString() method will use the delimiters you specify, so subsequent formatting of the output data will rely on the delimiters you choose.

Table 8. Hive arguments:

Argument Description
--hive-home <dir> Override $HIVE_HOME
--hive-import Import tables into Hive (Uses Hive’s default delimiters if none are set.)
--hive-overwrite Overwrite existing data in the Hive table.
--create-hive-table If set, then the job will fail if the target hive
  table exists. By default this property is false.
--hive-table <table-name> Sets the table name to use when importing to Hive.
--hive-drop-import-delims Drops \n\r, and \01 from string fields when importing to Hive.
--hive-delims-replacement Replace \n\r, and \01 from string fields with user defined string when importing to Hive.
--hive-partition-key Name of a hive field to partition are sharded on
--hive-partition-value <v> String-value that serves as partition key for this imported into hive in this job.
--map-column-hive <map> Override default mapping from SQL type to Hive type for configured columns. If specify commas in this argument, use URL encoded keys and values, for example, use DECIMAL(1%2C%201) instead of DECIMAL(1, 1).

我们想要使用 Sqoop 抽取 RDBMS 的数据到 Hive 可能是再常见不过的情形了,所以这一部分很重要也可能是我们最常使用的部分。

7.2.13 导入数据到 Hive (Importing Data Into Hive)

Sqoop 抽取 RDBMS 的数据到 Hive 会先将数据抽取出来在 HDFS 上的指定路径上放一下。如果指定路径上已经有文件,但是 Hive 里面却没有你的表你还需要指定 --delete-target-dir 来删除 HDFS 的文件,然后重新上传一份。当上传到 HDFS 结束之后,会使用 Hive 的命令 LOAD DATA INPATH 将文件移动到 Hive 的 warehouse 目录下面如果指定了 Hive 表的创建表参数会直接创建 Hive 表并且映射上数据。

如果表已经存在了 可以使用 --hive-overwrite 将数据直接覆盖。虽然Hive支持转义字符,但它不处理换行字符。此外,它不支持在封闭字符串中包含字段分隔符的封闭字符的概念。因此,在使用Hive时,建议您选择明确的字段和记录终止分隔符,而无需转义和包围字符;这是由于Hive的输入解析能力的限制。如果您在将数据导入到Hive时使用了--escapby,--enclosed-by, or -optionally-enclosed-by, Sqoop将打印一条警告消息。

Hive 默认会使用 \n 分割行,使用\01 分割字段。如果说我们的数据里面有这些字段就可能会有冲突,我们需要使用 --hive-drop-import-delims 把这些都替换掉。上面表可以参照这个 option 的意义。另外也可以使用 --hive-delims-replacement 将冲突的字段给替换掉。

另外还有一个值得注意的地方 Hive 表默认将从 RDBMS 里面抽取出来的 NULL value 数据转换成 null string 。这个在使用的时候就会出现问题,因为之前是一个空,现在却变成了一个 null 字符串。所以我们需要处理一下, Hive在自己的体系里面使用 \N 来表示 NULL 我们使用 --null-string 和 --null-non-string 参数处理 import job 使用 --input-null-string 和 --input-null-non-string  处理 export job 。举个