I'm looking for a way to scan huge Google BigTable with filter dynamically composed based on the events and make bulk update/delete on huge numbers of rows.
我正在寻找一种方法来扫描巨大的Google BigTable,其中过滤器根据事件动态组合,并在大量行上进行批量更新/删除。
At the moment, I'm trying to combine BigTable with java-based Dataflow (for intensive serverless compute power). I reached to the point where I can compose "Scan" object with dynamic filter based on the events but I still can't find a way to stream results from CloudBigtableIO.read() to subsequent dataflow pipeline.
目前,我正在尝试将BigTable与基于java的Dataflow相结合(用于密集的无服务器计算能力)。我达到了可以根据事件使用动态过滤器编写“扫描”对象的程度,但我仍然无法找到将CloudBigtableIO.read()的结果传输到后续数据流管道的方法。
Appreciate any advice.
感谢任何建议。
1 个解决方案
#1
1
Extend your DoFn from AbstractCloudBigtableTableDoFn. That will give you access to a getConnection() method. You'll do something like this:
从AbstractCloudBigtableTableDoFn扩展您的DoFn。这将使您可以访问getConnection()方法。你会做这样的事情:
try(Connection c = getConnection();
Table t = c.getTable(YOUR_TABLE_NAME);
ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) {
for(Result r : resultScanner) {
Mutation m = ... // construct a Put or Delete
context.output(m)
}
}
I'm assuming that your pipeline starts with CloudBigtableIO.read()
, has the AbstractCloudBigtableTableDoFn
next, and then has a CloudBigtableIO.write()
.
我假设您的管道以CloudBigtableIO.read()开头,接下来是AbstractCloudBigtableTableDoFn,然后是CloudBigtableIO.write()。
#1
1
Extend your DoFn from AbstractCloudBigtableTableDoFn. That will give you access to a getConnection() method. You'll do something like this:
从AbstractCloudBigtableTableDoFn扩展您的DoFn。这将使您可以访问getConnection()方法。你会做这样的事情:
try(Connection c = getConnection();
Table t = c.getTable(YOUR_TABLE_NAME);
ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) {
for(Result r : resultScanner) {
Mutation m = ... // construct a Put or Delete
context.output(m)
}
}
I'm assuming that your pipeline starts with CloudBigtableIO.read()
, has the AbstractCloudBigtableTableDoFn
next, and then has a CloudBigtableIO.write()
.
我假设您的管道以CloudBigtableIO.read()开头,接下来是AbstractCloudBigtableTableDoFn,然后是CloudBigtableIO.write()。