I have a Google bigQuery Table and I want to stream the entire table into pub-sub Topic
我有一个Google bigQuery表,我想将整个表流式传输到pub-sub主题
what should be the easy/fast way to do it?
什么应该是简单/快速的方式来做到这一点?
Thank you in advance,
先谢谢你,
2 个解决方案
#1
2
That really depends on the size of the table.
这实际上取决于桌子的大小。
If it's a small table (a few thousand records, a couple doze columns) then you could setup a process to query the entire table, convert the response into a JSON array, and push to pub-sub.
如果它是一个小表(几千个记录,几个打盹列),那么你可以设置一个进程来查询整个表,将响应转换为JSON数组,然后推送到pub-sub。
If it's a big table (millions/billions of records, hundreds of columns) you'd have to export to file, and then prepare/ship to pub-sub
如果它是一个大表(数百万/十亿条记录,数百列),你必须导出到文件,然后准备/运送到pub-sub
It also depends on your partitioning policy - if your tables are set up to partition by date you might be able to, again, query instead of export.
它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可以再次查询而不是导出。
Last but not least, it also depends on the frequency - is this a one time deal (then export) or a continuous process (then use table decorators to query only the latest data)?
最后但并非最不重要的是,它还取决于频率 - 这是一次性交易(然后是导出)还是连续过程(然后使用表装饰器仅查询最新数据)?
Need some more information if you want a truly helpful answer.
如果您想要一个真正有用的答案,需要更多信息。
Edit
编辑
Based on your comments for the size of the table, I think the best way would be to have a script that would:
根据您对表格大小的评论,我认为最好的方法是拥有一个脚本:
-
Export the table to GCS as newline delimited JSON
将表导出为GCS作为换行符分隔的JSON
-
Process the file (read line by line) and send to pub-sub
处理文件(逐行读取)并发送到pub-sub
There are client libraries for most programming languages. I've done similar things with Python, and it's fairly straight forward.
大多数编程语言都有客户端库。我用Python做过类似的事情,而且相当直接。
#2
4
The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.
我所知道的最简单的方法是通过Google Cloud Dataflow,它本身知道如何访问BigQuery和Pub / Sub。
In theory it should be as easy as the following Python lines:
从理论上讲,它应该像以下Python行一样简单:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).
Python / Dataflow / BigQuery / PubSub的这种组合在今天不起作用(Python Dataflow处于测试阶段,但密切关注更改日志)。
We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:
我们可以用Java做同样的事情,它运行良好 - 我只是测试它。它既可以在本地运行,也可以在托管的Dataflow运行器中运行:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
c.output(c.element().toString());
}
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
Test if the messages are there with:
测试消息是否存在:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
Hosted Dataflow screenshot:
托管数据流截图:
#1
2
That really depends on the size of the table.
这实际上取决于桌子的大小。
If it's a small table (a few thousand records, a couple doze columns) then you could setup a process to query the entire table, convert the response into a JSON array, and push to pub-sub.
如果它是一个小表(几千个记录,几个打盹列),那么你可以设置一个进程来查询整个表,将响应转换为JSON数组,然后推送到pub-sub。
If it's a big table (millions/billions of records, hundreds of columns) you'd have to export to file, and then prepare/ship to pub-sub
如果它是一个大表(数百万/十亿条记录,数百列),你必须导出到文件,然后准备/运送到pub-sub
It also depends on your partitioning policy - if your tables are set up to partition by date you might be able to, again, query instead of export.
它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可以再次查询而不是导出。
Last but not least, it also depends on the frequency - is this a one time deal (then export) or a continuous process (then use table decorators to query only the latest data)?
最后但并非最不重要的是,它还取决于频率 - 这是一次性交易(然后是导出)还是连续过程(然后使用表装饰器仅查询最新数据)?
Need some more information if you want a truly helpful answer.
如果您想要一个真正有用的答案,需要更多信息。
Edit
编辑
Based on your comments for the size of the table, I think the best way would be to have a script that would:
根据您对表格大小的评论,我认为最好的方法是拥有一个脚本:
-
Export the table to GCS as newline delimited JSON
将表导出为GCS作为换行符分隔的JSON
-
Process the file (read line by line) and send to pub-sub
处理文件(逐行读取)并发送到pub-sub
There are client libraries for most programming languages. I've done similar things with Python, and it's fairly straight forward.
大多数编程语言都有客户端库。我用Python做过类似的事情,而且相当直接。
#2
4
The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.
我所知道的最简单的方法是通过Google Cloud Dataflow,它本身知道如何访问BigQuery和Pub / Sub。
In theory it should be as easy as the following Python lines:
从理论上讲,它应该像以下Python行一样简单:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).
Python / Dataflow / BigQuery / PubSub的这种组合在今天不起作用(Python Dataflow处于测试阶段,但密切关注更改日志)。
We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:
我们可以用Java做同样的事情,它运行良好 - 我只是测试它。它既可以在本地运行,也可以在托管的Dataflow运行器中运行:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
c.output(c.element().toString());
}
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
Test if the messages are there with:
测试消息是否存在:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
Hosted Dataflow screenshot:
托管数据流截图: