Sqoop通过--split-by指定切分的字段,--m设置mapper的数量。通过这两个参数分解生成m个where子句,进行分段查询。因此sqoop的split可以理解为where子句的切分。
第一步,获取切分字段的MIN()和MAX()
为了根据mapper的个数切分table,sqoop首先会执行一个sql,用于获取table中该字段的最小值和最大值,源码片段为org.apache.sqoop.mapreduce.DataDrivenImportJob 224行
,大体为:
private String buildBoundaryQuery(String col, String query) {
....
return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
+ "FROM (" + query + ") AS " + alias;
}
获取到最大值和最小值,就可以根据不同的字段类型进行切分。
第二步,根据MIN和MAX不同的类型采用不同的切分方式
支持有Date,Text,Float,Integer,Boolean,NText,BigDecimal等等。
数字都是一个套路,就是
步长=(最大值-最小值)/mapper个数
,生成的区间为
[最小值,最小值+步长)
[最小值+2*步长,最小值+3*步长)
...
[最大值-步长,最大值]
可以参考下面的代码片段org.apache.sqoop.mapreduce.db.FloatSplitter 43行
:
List<InputSplit> splits = new ArrayList<InputSplit>();
...
int numSplits = ConfigurationHelper.getConfNumMaps(conf);
double splitSize = (maxVal - minVal) / (double) numSplits;
...
double curLower = minVal;
double curUpper = curLower + splitSize;
while (curUpper < maxVal) {
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + Double.toString(curLower),
highClausePrefix + Double.toString(curUpper)));
curLower = curUpper;
curUpper += splitSize;
}
这样最后每个mapper会执行自己的sql语句,比如第一个mapper执行:
select * from t where splitcol >= min and splitcol < min+splitsize
第二个mapper又会执行
select * from t where splitcol >= min+splitsize and splitcol < min+2*splitsize
其他字段类型
对于日期,会转变成时间戳,同样采用数字这种套路。
复杂的是字符串这种类型,最简单的方式就是m小于26的时候,比如2,那么按照开头字母就可以切分,[A,M),[M,Z].但是对于hello,helaa这种就只能到第四个字母才能切分了。因此字符串采用的算法是下面这种:
The algorithm used is as follows:
Since there are 2**16 unicode characters, we interpret characters as digits in base 65536. Given a string 's' containing characters s_0, s_1.. s_n, we interpret the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the low and high strings into floating-point values, we then use the BigDecimalSplitter to establish the even split points, then map the resulting floating point values back into strings.
实在看不懂英文!等再细致研究下在分享。