I have a Dataflow pipeline running locally on my machine writing to BigQuery. BigQuery in this batch job, requires a temporary location. I have provided one in my Cloud Storage. The relevant parts are:
我在我的机器上本地运行Dataflow管道写入BigQuery。这个批处理作业中的BigQuery需要一个临时位置。我在我的云存储中提供了一个。相关部分是:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://folder/temp");
Pipeline p = Pipeline.create(options);
....
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("uuid").setType("STRING"));
fields.add(new TableFieldSchema().setName("start_time").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("end_time").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to("myproject:db.table"));
Where for FormatAsTableRowFn
I have:
对于FormatAsTableRowFn,我有:
static class FormatAsTableRowFn extends DoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess{
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("uuid", c.element().getKey())
// include a field for the window timestamp
.set("start_time", ((IntervalWindow) c.window()).start().toInstant()) //NOTE: I tried both with and without
.set("end_time", ((IntervalWindow) c.window()).end().toInstant()); // .toInstant receiving the same error
c.output(row);
}
}
If I print out row.toString()
I will get legit timestamps:
如果我打印出row.toString(),我将得到合法的时间戳:
{uuid=00:00:00:00:00:00, start_time=2016-09-22T07:34:38.000Z, end_time=2016-09-22T07:39:38.000Z}
When I run this code JAVA says: Failed to create the load job beam_job_XXX
当我运行此代码时,JAVA说:无法创建加载作业beam_job_XXX
Manually inspecting the temp
folder in GCS, the objects look like:
手动检查GCS中的临时文件夹,对象如下所示:
{"mac":"00:00:00:00:00:00","start_time":{"millis":1474529678000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false},"end_time":{"millis":1474529978000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false}}
Looking at the failed job report in BigQuery, the Error says:
查看BigQuery中失败的作业报告,错误说:
JSON object specified for non-record field: start_time (error code: invalid)
为非记录字段指定的JSON对象:start_time(错误代码:无效)
This is very strange, because I am pretty sure I said this is a TIMESTAMP, and I am 100% sure my schema in BigQuery conforms with the TableSchema
in the SDK. (NOTE: setting the withCreateDisposition...CREATE_IF_NEEDED
yields the same result)
这很奇怪,因为我很确定我说这是一个TIMESTAMP,我100%确定我在BigQuery中的模式符合SDK中的TableSchema。 (注意:设置withCreateDisposition ... CREATE_IF_NEEDEDyields相同的结果)
Could someone please tell me how I need to remedy this to get the data inside BigQuery?
请问有人可以告诉我如何解决这个问题以获取BigQuery中的数据?
1 个解决方案
#1
1
Don't use Instant
objects. Try using milliseconds/seconds.
不要使用Instant对象。尝试使用毫秒/秒。
https://cloud.google.com/bigquery/data-types
https://cloud.google.com/bigquery/data-types
A positive number specifies the number of seconds since the epoch
正数表示自纪元以来的秒数
So, something like this should work:
所以,这样的事情应该有效:
.getMillis() / 1000
.getMillis()/ 1000
#1
1
Don't use Instant
objects. Try using milliseconds/seconds.
不要使用Instant对象。尝试使用毫秒/秒。
https://cloud.google.com/bigquery/data-types
https://cloud.google.com/bigquery/data-types
A positive number specifies the number of seconds since the epoch
正数表示自纪元以来的秒数
So, something like this should work:
所以,这样的事情应该有效:
.getMillis() / 1000
.getMillis()/ 1000