在用PySpark操作HBase时默认是scan
操作,通常情况下我们希望加上rowkey
指定范围,即只获取一部分数据参加运算。翻遍了spark的python相关文档,搜遍了google
和*
也没有具体的解决方案。既然java和scala都支持,python肯定也支持的。
翻了一下hbase源码
setConf
方法里原来是根据特定的字符串对scan
进行配置,那么在Python里对conf就可以进行相应的设置,这些设置主要包括:
hbase.mapreduce.scan.row.start
hbase.mapreduce.scan.row.stop
hbase.mapreduce.scan.column.family
hbase.mapreduce.scan.columns
hbase.mapreduce.scan.timestamp
hbase.mapreduce.scan.timerange.start
hbase.mapreduce.scan.timerange.end
hbase.mapreduce.scan.maxversions
hbase.mapreduce.scan.cacheblocks
hbase.mapreduce.scan.cachedrows
hbase.mapreduce.scan.batchsize
首先创建测试表
hbase> create 'test', 'f1'
hbase> put 'test', 'row1', 'f1', 'value1'
hbase> put 'test', 'row2', 'f1', 'value2'
hbase> put 'test', 'row3', 'f1', 'value3'
hbase> put 'test', 'row4', 'f1', 'value4'
然后,设置scan范围的示例代码如下
sc = SparkContext(appName=settings.APP_NAME)
conf = {
"hbase.zookeeper.quorum": settings.HBASE_HOST,
"hbase.mapreduce.inputtable": "test",
"hbase.mapreduce.scan.row.start": "row2"
}
rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter="org.valux.converters.ImmutableBytesWritableToStringConverter",
valueConverter="org.valux.converters.HBaseResultToStringConverter",
conf=conf)
result = rdd.collect()
for (k, v) in result
print k, v
org.valux.converters.ImmutableBytesWritableToStringConverter
org.valux.converters.HBaseResultToStringConverter
是我自己实现的两个转换类,也可以用spark默认自带的converter,具体可以参考hbase_inputformat.py
,不过提交时请带上相应的jar包