HBase1.2官方文档——HBase and MapReduce

时间:2021-11-14 05:41:56

HBase 和 MapReduce

Apache MapReduce是一个用来分析海量数据的软件框架,也是Apache Hadoop最常用的框架。MapReduce本身超出了这个文档的范围。MapReduce2(MR2)现在是YARN的一部分。

http://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html开始学习MapReduce的知识。

本章讨论了在HBase中使用MapReduce的具体配置步骤。此外,它还讨论了HBase和MapReduce作业之间的其他交互和问题。最终讨论Cascading,一个MapReduce的alternative API

HBase1.2官方文档——HBase and MapReduce
mapred    mapreduce

在HBase中有两个mapreduce包,就像mapreduce本身一样:org.apache.hadoop.hbase.mapred 和 org.apache.hadoop.hbase.mapreduce。前者对应旧的API,后者对应新的API。后者有更多的功能,不过您通常可以在较旧的包中找到相应的功能。选择与MapReduce部署相匹配的包。当有疑问或开始时,选择org.apache.hadoop.hbase.mapreduce。下面的内容中,我们引用o.a.h.h.mapreduce,但如果你用的是mapred,你可以换成o.a.h.h.mapred。

 

 

 

 

 

46. HBase, MapReduce, 和 CLASSPATH

默认情况下,部署到MapReduce集群的MapReduce作业不能访问$HBASE_CONF_DIR下的HBase配置文件或HBase类。

为MapReduce作业提供所需的访问权限,你可以添加 hbase-site.xml 到 $HADOOP_HOME/conf ,添加HBase jars 到$HADOOP_HOME/lib路径。然后你需要在集群中复制这些更改,或者你可以编辑$HADOOP_HOME/conf/hadoop-env.sh ,把hbase-site.xml和HBase jars加到HADOOP_CLASSPATH变量中。然而,不推荐使用这种方法,因为它会使用HBase引用污染您的Hadoop安装。它还要求您在Hadoop能够使用HBase数据之前重新启动Hadoop集群。

推荐的方法是让HBase添加它本身的依赖性jar,并使用HADOOP_CLASSPATH或-libjars。

自从HBase 0.90开始,HBase将其依赖jar添加到作业配置本身。这些依赖包只需要在本地CLASSPATH可用。下面的例子运行了HBase自带的运行在usertable表上的MapReduce作业RowCounter。如果你没有在命令中设置预期的环境变量(由$符号和花括号括起来的部分),您可以使用实际的系统路径。一定要使用正确版本的HBase JAR,以供您的系统使用。反引号(符号`)导致shell执行子命令,将hbase classpath的输出(用于转储HBase CLASSPATH的命令)设置为HADOOP_CLASSPATH。本例假设您使用了一个与bash兼容的shell。

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-VERSION.jar rowcounter usertable

当这个命令在内部运行时,HBase JAR会找到它需要为ZooKeeper提供的依赖、Guava和其他依赖于已传递的HADOOP_CLASSPATH的依赖项,并将JAR添加到MapReduce作业配置中。查看TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)中的源代码,看看是怎么做的。

命令hbase mapredcp可以帮你转储MapReduce需要的CLASSPATH条目,这些条目和TableMapReduceUtil#addDependencyJars添加的jars相同。你可以把它们和HBase conf路径一起添加到HADOOP_CLASSPATH中。对于不用把它们的依赖打包或调用TableMapReduceUtil#addDependencyJars的作业,需要用以下的命令结构:

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...
HBase1.2官方文档——HBase and MapReduce

如果您从构建目录中运行HBase而不是安装位置,那么这个示例可能无法工作。你可能会看到如下的错误:

java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper

如果发生这种情况,请尝试如下修改命令,以使命令使用来自构建环境中target/ 路径下的HBase JARs。

$ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar rowcounter usertable

 

HBase1.2官方文档——HBase and MapReduce
注意HBase 0.96.1和0.98.4之间的MapReduce用户

一些使用HBase的MapReduce作业启动失败。症状是类似于以下的一个异常:

Exception in thread "main" java.lang.IllegalAccessError: class
    com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass
    com.google.protobuf.LiteralByteString
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at
    org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)
    at
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)
    at
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)
    at
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)
    at
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)
    at
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)
...

这是由HBASE-9867引入的优化引起的,它无意中引入了类加载器的依赖。

使用了-libjars选项和"胖jar"的作业都会受到影响,胖jar就是把依赖打包到内嵌的lib文件夹下的程序jar包。

为了满足新的类加载器需求,hbase-protocol.jar必须包含在Hadoop的classpath中。参照HBase, MapReduce, and the CLASSPATH以获取当前classpath错误的解决方案。以下提及的内容是有历史原因的。

可以通过在Hadoop lib路径下包含对hbase-protocol.jar的引用来解决整个系统的问题,可以通过一个符号链接或将jar复制到新的位置。

这也可以通过在提及每个作业时把它包含在HADOOP CLASSPATH环境变量中来实现。当启动已打包它们的依赖项的作业包时,下面的三个工作启动命令都满足这个需求:

 
$ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
$ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
$ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

对于那些不打包它们的依赖项的jar,下面的命令结构是必需的:

$ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...

参见 HBASE-10304 获取关于这个问题更多的讨论。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  


47. MapReduce Scan 缓存

TableMapReduceUtil现在恢复了在扫描对象上通过输入设置扫描缓存(在将结果返回给客户端之前缓存的行数)的选项。

这个功能在HBase 0.95因为一个bug(HBASE-11558)没有被实现,这个bug在HBase 0.98.5 和 0.96.3被修正。选择扫描器缓存的优先顺序如下:

  1. 在扫描对象上设置缓存。

  2. 通过给配置选项hbase.client.scanner.cachinghbase.client.scanner.caching指定值设置,这个值可以手动在 hbase-site.xml 或方法 TableMapReduceUtil.setScannerCaching()设置。

  3. HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING的默认值是100.

优化缓存设置是在客户端等待结果的时间和客户端需要接收的结果集之间的一种平衡。如果缓存设置太大,客户端可能会等待很长时间,或者请求甚至超时。如果设置太小,则扫描结果需要分片返回。

如果你把扫描看作是铲子,一个更大的缓存设置类似于一个更大的铲,一个较小的缓存设置相当于铲取更多的次数来装满这个桶。

上面提到的优先级列表允许您设置一个合理的默认值,并对特定的操作复写这个值。

查看Scan的API文档获取更多细节。

48. HBase 附带的MapReduce Jobs

HBase JAR也可以作为一些附带的MapReduce作业的Driver。要学习附带的MapReduce作业,运行以下命令。

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar
An example program must be given as the first argument.
Valid program names are:
  copytable: Export a table from local cluster to peer cluster
  completebulkload: Complete a bulk data load.
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table

每一个合法的程序名都是附带的MapReduce作业。要运行这些作业中的一个,按以下示例建模您的命令

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar rowcounter myTable

49. 作为MapReduce作业数据源和数据接收点的HBase

对于MapReduce作业来说,HBase可以作为数据源——TableInputFormat,和数据接收点——TableOutputFormat 或者 MultiTableOutputFormat

编写MapReduce作业读取或写入HBase,推荐使用TableMapper 或 TableReducer的子类。参阅IdentityTableMapper 和 IdentityTableReducer了解基本使用。

要了解更多的例子,参见 RowCounter或者 org.apache.hadoop.hbase.mapreduce.TestTableMapReduce 单元测试。

如果你运行使用HBase作为源或接收点的MapReduce作业,则需要在配置中指定源和接收点的表名和列名。

当你从HBase中读取数据,TableInputFormat需要HBase中的Region列表,并生成一个map,这个map可以是map-per-regionmapreduce.job.maps map,取其中较小的。

如果你的作业只有两个map,提高mapreduce.job.maps的值,这个数值要比Region的个数多。

Maps会运行在相邻的TaskTracker/NodeManager,如果你在每个节点运行了一个TaskTracer/NodeManager 和 RegionServer。

当写数据到HBase时,避免Reduce步骤并从map中把数据写入HBase可能是正确的。当你的作业不需要对Map发送出的数据做排序时,这个办法是可行的。

在Insert操作中,HBase会'排序',所以没必要进行两次排序(在MapReduce集群中混洗数据)除非你需要这样做。

如果您不需要Reduce操作,那么你的Map可能会发出在作业结束时处理的记录的计数作为报告,或者将减少的数量设置为0,并使用TableOutputFormat。

如果在你的情景下运行Reduce步骤是有意义的,那么你应该使用多个reducer,这样负载就会分布在HBase集群中。

一个新的HBase partitioner——HRegionPartitioner,可以运行和已存在的Region数目一样多的reducer。当你的表很大,你的上传不会在完成的时候很大程度上改变已有Region的数量,HRegionPartitioner是合适的。否则就使用默认的partitioner。

50. 在Bulk Import时直接写HFiles

如果您正在导入一个新表,您可以绕过HBase API,直接将内容写到文件系统中,格式化为HBase数据文件(HFiles)。你的导入会运行得更快,也许会快一个数量级。参见Bulk Loading以了解更多这个机制如何工作。

51. RowCounter 例子

自带的RowCounter MapReduce作业使用TableInputFormat,计算指定表中所有行的行数。要运行它,使用以下的命令:

$ ./bin/hadoop jar hbase-X.X.X.jar

它将调用HBase MapReduce Driver类。从提供的作业选项中,选择rowcounter。这将把rowcounter的使用建议打印到标准输出。指定表名,要计数的列,和输出路径。

如果出现classpath错误,参见HBase, MapReduce, and the CLASSPATH

52. Map-Task Splitting

52.1. 默认的HBase MapReduce Splitter

TableInputFormat在一个MapReduce作业中用来作为一个HBase表的数据源时,它的splitter将创建为表中的每一个Region一个map任务。

因此,如果表中有100个Region,作业将会有100个map任务,不论在Scan操作中选择了有多少列族。

52.2. 自定义 Splitters

要自定义splitter,参见TableInputFormatBase中的getSplits方法。这就是分配map-task逻辑的所在。

53. HBase MapReduce实例

53.1. HBase MapReduce 读实例

下面是一个以只读方式使用HBase作为MapReduce源的示例。特别指出,有一个Mapper实例,但没有Reducer,而且没有数据从Mapper中发送出来。定义这个作业如下:

Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class);     // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs
...

TableMapReduceUtil.initTableMapperJob(
  tableName,        // input HBase table name
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper
  null,             // mapper output key
  null,             // mapper output value
  job);
job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

…​这个Mapper 实例将扩展 TableMapper…​

public static class MyMapper extends TableMapper<Text, Text> {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
    // process data for the row from the Result instance.
   }
}

53.2. HBase MapReduce Read/Write Example

这个例子是使用HBase作为MapReduce的数据源和数据接收点。这个例子简单地把一个表的数据拷贝到另一个表中。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) {
    throw new IOException("error with job!");
}

需要解释的是TableMapReduceUtil是做什么的,特别是在Reducer中。TableOutputFormat被用作OutputFormat类,几个参数要在配置中设置(如TableOutputFormat.OUTPUT_TABLE),也要设置Reducer输出的key(ImmutableBytesWritable)可以和Reducer输出的Value(Writable)。这些可以在作业和配置中由程序员设置,而TableMapReduceUtil尝试使这些更简单。

下面是一个Mapper的例子,生成了一个Put以及匹配输入的Result并发送它。注意:这是CopyTable要做的事情:

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    // this example is just copying the data from the source table...
      context.write(row, resultToPut(row,value));
    }

    private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
      Put put = new Put(key.get());
      for (KeyValue kv : result.raw()) {
        put.add(kv);
      }
      return put;
    }
}

实际上,没有一个Reducer的步骤,所以TableOutputFormat将Put发送到了目的表。

这只是一个例子,开发者可以选择不使用TableOutputFormat,自己去连接目标表

53.3. HBase MapReduce 多表读/写 的例子

TODO: example for MultiTableOutputFormat.

53.4. HBase MapReduce 汇总到HBase表的实例

以下的例子使用HBase作为MapReduce的数据源和数据接收点以总结数据。这个例子计算的是一个表中的每一个值的个数,并且把个数值输出到另一个表里。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

在这个例子中,Mapper一个String值的列作为要总结的列值。这个值用来作为从这个Mapper发送出的key,一个IntWritable表示计数器。

public static class MyMapper extends TableMapper<Text, IntWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] ATTR1 = "attr1".getBytes();

  private final IntWritable ONE = new IntWritable(1);
  private Text text = new Text();

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    String val = new String(value.getValue(CF, ATTR1));
    text.set(val);     // we can only emit Writables...
    context.write(text, ONE);
  }
}

在Reducer中,"ones"被计数(有些像其他MR例子中做得一样),然后发送出一个Put

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] COUNT = "count".getBytes();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(CF, COUNT, Bytes.toBytes(i));

    context.write(null, put);
  }
}

53.5. HBase MapReduce 汇总到文件中的实例

这个和上面的例子非常相似,不同的是使用HBase作为MapReduce的数据源,而使用HDFS作为数据接收点。不同点在于作业的建立和Reducer中,Mapper相同。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
job.setReducerClass(MyReducer.class);    // reducer class
job.setNumReduceTasks(1);    // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

如上所述,不用改变前一个Mapper就可以在这个例子中运行。而对于Reducer,它是一个"一般的"Reducer,而不是TableMapper的子类,也不用发送出Puts。

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    context.write(key, new IntWritable(i));
  }
}

53.6. HBase MapReduce不使用Reducer汇总到HBase

如果你使用HBase作为一个Reducer做总结的话,也可以不用自己实现Reducer。

为作业总结,需要先建立一个HBase目标表。Table的方法incrementColumnValue被用来原子地累加值。从性能角度来看,保存一个包含为每个Map-Task的value累加的Map,并在mapper的cleanup方法中对每个键进行更新,这可能是有意义的。然而,根据处理的行数和惟一的键,你的处理时间可能会有所不同。

最后,总结结果会被输出到HBase中。

53.7. HBase MapReduce 汇总到 RDBMS

有时将总结结果输出到RDBMS会更合适。在这些场景中,可以通过自定义Reducer将产生的总结结果直接输出到RDBMS中。

setup方法可以连接到一个RDBMS(连接信息可以作为context中的自定义参数传进来),cleanup方法可以关闭连接。

理解作业的Reducer数量会影响总结的实现是很重要的,你将不得不把对这个问题的理解设计到你的Reducer中。特别是作业要被设计成单个Reducer运行还是多个Reducer运行。

这一点没有孰对孰错,它取决于用例。要知道作业要执行更多的Reducer,更多的RDBMS连接就要同时建立,这将会扩大规模,但只会达到一个点。

public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  private Connection c = null;

  public void setup(Context context) {
    // create DB connection...
  }

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    // do summarization
    // in this example the keys are Text, but this is just an example
  }

  public void cleanup(Context context) {
    // close db connection
  }

}

最后,汇总的结果会被写入到RDBMS表中。

54. 在一个MapReduce作业中访问其他的HBase Tables

虽然这个框架允许一个表作为MapReduce作业的输入,但其他HBase表也可以作为参照表被访问,在一个MapReduce作业的Mapper的setup方法中创建一个Table实例。

public class MyMapper extends TableMapper<Text, LongWritable> {
  private Table myOtherTable;

  public void setup(Context context) {
    // In here create a Connection to the cluster and save it or use the Connection
    // from the existing table
    myOtherTable = connection.getTable("myOtherTable");
  }

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    // process Result...
    // use 'myOtherTable' for lookups
  }

55. 推测执行

通常建议关掉针对HBase的MapReduce job的预测执行(speculative execution)功能。这个功能也可以用每个Job的配置来完成,也可以对整个集群配置完成。

特别对于长时间运行的job,使用预测执行将创建map-task的拷贝,并将你的数据写入到HBase两次。这可不是你所希望的。

参考spec.ex以获得更多信息。

56. 级联 Cascading

Cascading 对MapReduce操作来说,是一个备用API,它实际上是使用MapReduce,但是允许你以一种简单的方式编写MapReduce代码。

下面的例子展示了一个Cascading Flow将数据输出到一个HBase集群中。相同的hBaseTap API可以用在HBase数据源上。

// read data from the default filesystem
// emits two fields: "offset" and "line"
Tap source = new Hfs( new TextLine(), inputFileLhs );

// store data in an HBase cluster
// accepts fields "num", "lower", and "upper"
// will automatically scope incoming fields to their proper familyname, "left" or "right"
Fields keyFields = new Fields( "num" );
String[] familyNames = {"left", "right"};
Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );

// a simple pipe assembly to parse the input into fields
// a real app would likely chain multiple Pipes together for more complex processing
Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );

// "plan" a cluster executable Flow
// this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );

// start the flow, and block until complete
parseFlow.complete();

// open an iterator on the HBase table we stuffed data into
TupleEntryIterator iterator = parseFlow.openSink();

while(iterator.hasNext())
  {
  // print out each tuple from HBase
  System.out.println( "iterator.next() = " + iterator.next() );
  }

iterator.close();