I am setting up a Java Pipeline in DataFlow to read a .csv
file and to create a bunch of BigTable rows based on the content of the file. I see in the BigTable documentation the note that connecting to BigTable is an 'expensive' operation and that it's a good idea to do it only once and to share the connection among the functions that need it.
我在DataFlow中设置Java Pipeline来读取.csv文件并根据文件内容创建一堆BigTable行。我在BigTable文档中看到连接到BigTable是一个“昂贵”的操作,并且最好只做一次并在需要它的函数之间共享连接。
However, if I declare the Connection object as a public static
variable in the main class and first connect to BigTable in the main function, I get the NullPointerException
when I subsequently try to reference the connection in instances of DoFn
sub-classes' processElement()
function as part of my DataFlow pipeline.
但是,如果我将Connection对象声明为主类中的公共静态变量并首先连接到main函数中的BigTable,当我随后尝试在DoFn子类的processElement()实例中引用连接时,我得到NullPointerException作为我的DataFlow管道的一部分。
Conversely, if I declare the Connection as a static variable in the actual DoFn
class, then the operation works successfully.
相反,如果我在实际的DoFn类中将Connection声明为静态变量,那么该操作将成功运行。
What is the best-practice or optimal way to do this?
最佳实践或最佳方法是什么?
I'm concerned that if I implement the second option at scale, I will be wasting a lot of time and resources. If I keep the variable as static in the DoFn
class, is it enough to ensure that the APIs don't try to re-establish the connection every time?
我担心如果我大规模实施第二种选择,我将浪费大量的时间和资源。如果我将变量保持为DoFn类中的静态,是否足以确保API不会每次都尝试重新建立连接?
I realize there is a special BigTable I/O call to sync DataFlow pipeline objects with BigTable, but I think I need to write one on my own to build-in some special logic into the DoFn
processElement()
function...
我意识到有一个特殊的BigTable I / O调用来与BigTable同步DataFlow管道对象,但我想我需要自己编写一个来在DoFn processElement()函数中构建一些特殊的逻辑...
This is what the "working" code looks like:
这就是“工作”代码的样子:
class DigitizeBT extends DoFn<String, String>{
private static Connection m_locConn;
@Override
public void processElement(ProcessContext c)
{
try
{
m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes("CF1"),
Bytes.toBytes("SomeName"),
Bytes.toBytes("SomeValue"));
tbl.put(put);
}
catch (IOException e)
{
e.printStackTrace();
System.exit(1);
}
}
}
This is what updated code looks like, FYI:
这是更新代码的样子,仅供参考:
public void SmallKVJob()
{
CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
.withProjectId(DEF.ID_PROJ)
.withInstanceId(DEF.ID_INST)
.withTableId(DEF.ID_TBL_UNITS)
.build();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(DEF.ID_PROJ);
options.setStagingLocation(DEF.ID_STG_LOC);
// options.setNumWorkers(3);
// options.setMaxNumWorkers(5);
// options.setRunner(BlockingDataflowPipelineRunner.class);
options.setRunner(DirectPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(DEF.ID_BAL))
.apply(ParDo.of(new DoFn1()))
.apply(ParDo.of(new DoFn2()))
.apply(ParDo.of(new DoFn3(config)));
m_log.info("starting to run the job");
p.run();
m_log.info("finished running the job");
}
}
class DoFn1 extends DoFn<String, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
c.output(KV.of(c.element().split("\\,")[0],Integer.valueOf(c.element().split("\\,")[1])));
}
}
class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
@Override
public void processElement(ProcessContext c)
{
int max = c.element().getValue();
String name = c.element().getKey();
for(int i = 0; i<max;i++)
c.output(KV.of(name, 1));
}
}
class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{
public DoFn3(CloudBigtableConfiguration config)
{
super(config);
}
@Override
public void processElement(ProcessContext c)
{
try
{
Integer max = c.element().getValue();
for(int i = 0; i<max; i++)
{
String owner = c.element().getKey();
String rnd = UUID.randomUUID().toString();
Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
c.output("Success");
}
} catch (IOException e)
{
c.output(e.toString());
e.printStackTrace();
}
}
}
The input .csv file looks something like this:
Mary,3000
John,5000
Peter,2000
So, for each row in the .csv file, I have to put in x number of rows into BigTable, where x is the second cell in the .csv file...
输入.csv文件看起来像这样:Mary,3000 John,5000 Peter,2000所以,对于.csv文件中的每一行,我必须在x中输入x行数,其中x是第二个单元格。 .csv文件...
1 个解决方案
#1
0
We built AbstractCloudBigtableTableDoFn for this purpose. Extend that class instead of DoFn, and call getConnection()
instead of creating a Connection yourself.
为此,我们构建了AbstractCloudBigtableTableDoFn。扩展该类而不是DoFn,并调用getConnection()而不是自己创建Connection。
10,000 small rows should take a second or two of actual work.
10,000个小行应该花费一两秒的实际工作量。
EDIT: As per the comments, BufferedMutator should be used instead of Table.put() for best throughput.
编辑:根据评论,应使用BufferedMutator而不是Table.put()以获得最佳吞吐量。
#1
0
We built AbstractCloudBigtableTableDoFn for this purpose. Extend that class instead of DoFn, and call getConnection()
instead of creating a Connection yourself.
为此,我们构建了AbstractCloudBigtableTableDoFn。扩展该类而不是DoFn,并调用getConnection()而不是自己创建Connection。
10,000 small rows should take a second or two of actual work.
10,000个小行应该花费一两秒的实际工作量。
EDIT: As per the comments, BufferedMutator should be used instead of Table.put() for best throughput.
编辑:根据评论,应使用BufferedMutator而不是Table.put()以获得最佳吞吐量。