Here is how my input file looks like:
这是我的输入文件的样子:
{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
{"Id": 2, "Address": {"City":"Mumbai"}}
{"Id": 3, "Address": {"Street":"XYZ Road"}}
{"Id": 4}
{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}
In my dataflow pipeline, How I can I dynamically determine which fields are present in each row in order to adhere to the BigQuery table schema. e.g., In row #2, Street
is missing. I want the entry for column Address.Street
in the BigQuery to be "N/A"
or null
and don't want pipeline to fail because of schema change or missing data.
在我的数据流管道中,我如何动态确定每行中存在哪些字段以遵守BigQuery表模式。例如,在第2行,街道丢失了。我希望BigQuery中的列Address.Street的条目为“N / A”或null,并且不希望管道因架构更改或缺少数据而失败。
How can I handle this logic in my dataflow job before writing to BigQuery in Python?
在使用Python编写BigQuery之前,如何在数据流作业中处理此逻辑?
1 个解决方案
#1
0
I recommend writing your data into temp table with just one field line
of type string
我建议只使用一个字符串类型的字段行将数据写入临时表
After you done with bringing your data to BigQuery temp table - now you can apply schema logic and query your data out of temp table to your final table
完成将数据导入BigQuery临时表后 - 现在您可以应用模式逻辑并将临时表中的数据查询到最终表
Below example is for BigQuery Standard SQL of how to apply schema logic against table with whole row in one field
下面的示例是关于如何在一个字段中对整个行应用模式逻辑的BigQuery Standard SQL
#standardSQL
WITH t AS (
SELECT '{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}' line UNION ALL
SELECT '{"Id": 2, "Address": {"City":"Mumbai"}}' UNION ALL
SELECT '{"Id": 3, "Address": {"Street":"XYZ Road"}}' UNION ALL
SELECT '{"Id": 4} ' UNION ALL
SELECT '{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}'
)
SELECT
JSON_EXTRACT_SCALAR(line, '$.Id') id,
JSON_EXTRACT_SCALAR(line, '$.PhoneNumber') PhoneNumber,
JSON_EXTRACT_SCALAR(line, '$[Address].Street') Street,
JSON_EXTRACT_SCALAR(line, '$[Address].City') City
FROM t
with result as below
结果如下
Row id PhoneNumber Street City
1 1 null MG Road Pune
2 2 null null Mumbai
3 3 null XYZ Road null
4 4 null null null
5 5 12345678 ABCD Road Bangalore
#1
0
I recommend writing your data into temp table with just one field line
of type string
我建议只使用一个字符串类型的字段行将数据写入临时表
After you done with bringing your data to BigQuery temp table - now you can apply schema logic and query your data out of temp table to your final table
完成将数据导入BigQuery临时表后 - 现在您可以应用模式逻辑并将临时表中的数据查询到最终表
Below example is for BigQuery Standard SQL of how to apply schema logic against table with whole row in one field
下面的示例是关于如何在一个字段中对整个行应用模式逻辑的BigQuery Standard SQL
#standardSQL
WITH t AS (
SELECT '{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}' line UNION ALL
SELECT '{"Id": 2, "Address": {"City":"Mumbai"}}' UNION ALL
SELECT '{"Id": 3, "Address": {"Street":"XYZ Road"}}' UNION ALL
SELECT '{"Id": 4} ' UNION ALL
SELECT '{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}'
)
SELECT
JSON_EXTRACT_SCALAR(line, '$.Id') id,
JSON_EXTRACT_SCALAR(line, '$.PhoneNumber') PhoneNumber,
JSON_EXTRACT_SCALAR(line, '$[Address].Street') Street,
JSON_EXTRACT_SCALAR(line, '$[Address].City') City
FROM t
with result as below
结果如下
Row id PhoneNumber Street City
1 1 null MG Road Pune
2 2 null null Mumbai
3 3 null XYZ Road null
4 4 null null null
5 5 12345678 ABCD Road Bangalore