I am trying to create a stream in Apache/KAFKA KSQL The topic contains (somewhat complex JSON)
我试图在Apache / KAFKA KSQL中创建一个流主题包含(有点复杂的JSON)
{
"agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
"created_at": "2018-02-17 16:00:00.000Z",
"id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
"event_type": "data",
"total_charged_amount": {
"tax_free_amount": null,
"tax_amounts": [],
"tax_included_amount": {
"amount": 0.0241,
"currency": "EUR"
}
}
"used_service_units": [
{
"amount": 2412739,
"currency": null,
"unit_of_measure": "bytes"
}
]
}
Now creating a stream is easy for just simple stuff like event_type and created_at. That would be like this
现在创建一个流很容易就像event_type和created_at这样简单的东西。那会是这样的
CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');
CREATE STREAM tstream(event_type varchar,created_at varchar)WITH(kafka_topic ='usage_events',value_format ='json');
But now I need to access the used_service_units.... and I would like to extract the "amount" in the JSON above
但现在我需要访问used_service_units ....我想在上面的JSON中提取“金额”
How would I do this ?
我该怎么办?
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');
Results in
line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...
And if I instead create a stream like so
如果我改为创建一个像这样的流
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');
And then does a SQL SELECT on the stream like this
然后像这样在流上执行SQL SELECT
SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;
Neither of these alternatives work...
这些替代品都不起作用......
This one gave me
这个给了我
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'
Code generation failed for SelectValueMapper
1 个解决方案
#1
4
It seems that ONE solution to this problem is to make the column datatype an array i.e.
似乎这个问题的一个解决方案是使列数据类型成为一个数组,即
CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');
Now I am able to do the following:
现在我能够做到以下几点:
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage
#1
4
It seems that ONE solution to this problem is to make the column datatype an array i.e.
似乎这个问题的一个解决方案是使列数据类型成为一个数组,即
CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');
Now I am able to do the following:
现在我能够做到以下几点:
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage