I have a Dataflow job to write to BigQuery. It works well for non-nested schema, however fails for the nested schema.
我有一个Dataflow作业要写入BigQuery。它适用于非嵌套模式,但嵌套模式失败。
Here is my Dataflow pipeline:
这是我的Dataflow管道:
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
schema = 'url: STRING,' \
'ua: STRING,' \
'method: STRING,' \
'man: RECORD,' \
'man.ip: RECORD,' \
'man.ip.cc: STRING,' \
'man.ip.city: STRING,' \
'man.ip.as: INTEGER,' \
'man.ip.country: STRING,' \
'man.res: RECORD,' \
'man.res.ip_dom: STRING'
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=schema)
)
I created BigQuery Table using the following Schema is:
我使用以下Schema创建了BigQuery Table:
[
{
"mode": "NULLABLE",
"name": "url",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ua",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "method",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "man",
"type": "RECORD",
"fields":
[
{
"mode": "REPEATED",
"name": "ip",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "cc",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "as",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
}
]
},
{
"mode": "REPEATED",
"name": "res",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "ip_dom",
"type": "STRING"
}
]
}
]
}
]
I am getting the following error:
我收到以下错误:
BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
Message: Invalid value for: url is not a valid value
HTTP Code: 400
Question Can someone please guide me? What am I doing wrong? Also, If there is a better way to iterate through all the nested schema and write to BigQuery please suggest?
问题有人可以指导我吗?我究竟做错了什么?另外,如果有更好的方法来遍历所有嵌套模式并写入BigQuery,请建议?
Additional info My data file:
其他信息我的数据文件:
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
1 个解决方案
#1
5
The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported. In order to push nested records into BigQuery from Apache Beam you need to create TableSchema
object, i.e using built-in parser:
您的代码的问题是您尝试使用嵌套字段,同时将BigQuery表架构指定为字符串,这是不受支持的。为了将嵌套记录从Apache Beam推送到BigQuery,您需要创建TableSchema对象,即使用内置解析器:
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)
You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools
installed):
您需要在那里将模式作为JSON字符串传递,您可以在终端中使用以下命令获取它(我假设您已安装了gcloud工具):
bq --project=your-gcp-project-name --format=json show your.table.name > schema.json
and in Python use it as follows:
并在Python中使用它如下:
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))
Then in your pipeline:
然后在你的管道中:
beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=table_schema)
You can also take a look at the example showing manual creation of TableSchema
object: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
您还可以查看显示手动创建TableSchema对象的示例:https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
which is (from the linked example):
这是(来自链接的例子):
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)
# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)
then just use table_schema
variable in beam.io.WriteToBigQuery
.
然后在beam.io.WriteToBigQuery中使用table_schema变量。
#1
5
The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported. In order to push nested records into BigQuery from Apache Beam you need to create TableSchema
object, i.e using built-in parser:
您的代码的问题是您尝试使用嵌套字段,同时将BigQuery表架构指定为字符串,这是不受支持的。为了将嵌套记录从Apache Beam推送到BigQuery,您需要创建TableSchema对象,即使用内置解析器:
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)
You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools
installed):
您需要在那里将模式作为JSON字符串传递,您可以在终端中使用以下命令获取它(我假设您已安装了gcloud工具):
bq --project=your-gcp-project-name --format=json show your.table.name > schema.json
and in Python use it as follows:
并在Python中使用它如下:
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))
Then in your pipeline:
然后在你的管道中:
beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=table_schema)
You can also take a look at the example showing manual creation of TableSchema
object: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
您还可以查看显示手动创建TableSchema对象的示例:https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
which is (from the linked example):
这是(来自链接的例子):
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)
# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)
then just use table_schema
variable in beam.io.WriteToBigQuery
.
然后在beam.io.WriteToBigQuery中使用table_schema变量。