基于动态过滤器的bigtable批量更新

时间:2021-07-19 15:35:22

I'm looking for a way to scan huge Google BigTable with filter dynamically composed based on the events and make bulk update/delete on huge numbers of rows.

我正在寻找一种方法来扫描巨大的Google BigTable,其中过滤器根据事件动态组合,并在大量行上进行批量更新/删除。

At the moment, I'm trying to combine BigTable with java-based Dataflow (for intensive serverless compute power). I reached to the point where I can compose "Scan" object with dynamic filter based on the events but I still can't find a way to stream results from CloudBigtableIO.read() to subsequent dataflow pipeline.

目前,我正在尝试将BigTable与基于java的Dataflow相结合(用于密集的无服务器计算能力)。我达到了可以根据事件使用动态过滤器编写“扫描”对象的程度,但我仍然无法找到将CloudBigtableIO.read()的结果传输到后续数据流管道的方法。

Appreciate any advice.

感谢任何建议。

1 个解决方案

#1


1  

Extend your DoFn from AbstractCloudBigtableTableDoFn. That will give you access to a getConnection() method. You'll do something like this:

从AbstractCloudBigtableTableDoFn扩展您的DoFn。这将使您可以访问getConnection()方法。你会做这样的事情:

try(Connection c = getConnection();
    Table t = c.getTable(YOUR_TABLE_NAME);
    ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) {
  for(Result r : resultScanner) {
     Mutation m = ... // construct a Put or Delete
     context.output(m)
  }
}

I'm assuming that your pipeline starts with CloudBigtableIO.read(), has the AbstractCloudBigtableTableDoFn next, and then has a CloudBigtableIO.write().

我假设您的管道以CloudBigtableIO.read()开头,接下来是AbstractCloudBigtableTableDoFn,然后是CloudBigtableIO.write()。

#1


1  

Extend your DoFn from AbstractCloudBigtableTableDoFn. That will give you access to a getConnection() method. You'll do something like this:

从AbstractCloudBigtableTableDoFn扩展您的DoFn。这将使您可以访问getConnection()方法。你会做这样的事情:

try(Connection c = getConnection();
    Table t = c.getTable(YOUR_TABLE_NAME);
    ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) {
  for(Result r : resultScanner) {
     Mutation m = ... // construct a Put or Delete
     context.output(m)
  }
}

I'm assuming that your pipeline starts with CloudBigtableIO.read(), has the AbstractCloudBigtableTableDoFn next, and then has a CloudBigtableIO.write().

我假设您的管道以CloudBigtableIO.read()开头,接下来是AbstractCloudBigtableTableDoFn,然后是CloudBigtableIO.write()。