读取Amazon Kinesis Firehose流写入s3的数据

时间:2022-06-27 23:09:28

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

我正在为Kinesis Firehose流写记录,最终由Amazon Kinesis Firehose写入S3文件。

My record object looks like

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

The data is written to S3 looks like:

数据写入S3看起来像:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

NO COMMA SEPERATION.

没有COMMA分离。

NO STARTING BRACKET as in a Json Array

像Json阵列一样没有启动支架

[

NO ENDING BRACKET as in a Json Array

像Json数组中一样没有结束括号

]

I want to read this data get a list of ItemPurchase objects.

我想读取这些数据获取ItemPurchase对象的列表。

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

What is the correct way to read this data?

读取此数据的正确方法是什么?

5 个解决方案

#1


3  

I've had the same issue.

我有同样的问题。

It would have been better if AWS allowed us to set a delimiter but we can do it on our own.

如果AWS允许我们设置分隔符会更好,但我们可以自己完成。

In my use case, I've been listening on a stream of tweets, and once receiving a new tweet I immediately put it to Firehose.

在我的用例中,我一直在听一些推文,一旦收到一条新的推文,我立即把它放到了Firehose。

This, of course, resulted in a 1-line file which could not be parsed.

当然,这导致了一个无法解析的1行文件。

So, to solve this, I have concatenated the tweet's JSON with a \n. This, in turn, let me use some packages that can output lines when reading stream contents, and parse the file easily.

因此,为了解决这个问题,我将推文的JSON与\ n连接起来。反过来,这让我使用一些在读取流内容时可以输出行的包,并轻松地解析文件。

Hope this helps you.

希望这对你有所帮助。

#2


2  

I also had the same problem, here is how I solved.

我也有同样的问题,这是我如何解决的。

  1. replace "}{" with "}\n{"
  2. 将“} {”替换为}} \ n {“
  3. line split by "\n".

    由“\ n”分割的行。

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

A nested json object has several "}"s, so split line by "}" doesn't solve the problem.

嵌套的json对象有几个“}”,因此用“}”分割线并不能解决问题。

#3


1  

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

令人难以置信的是,Amazon Firehose以这种方式将JSON消息转储到S3,并且不允许您设置分隔符或任何内容。

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

最终,我发现处理问题的技巧是使用JSON raw_decode方法处理文本文件

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

这将允许您读取一堆连接的JSON记录,它们之间没有任何分隔符。

Python code:

Python代码:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1

#4


0  

If there's a way to change the way data is written, please separate all the records by a line. That way you can read the data simply, line by line. If not, then simply build a scanner object which takes "}" as a delimiter and use the scanner to read. That would do the job.

如果有办法改变数据的写入方式,请将所有记录分开。这样,您可以简单地逐行读取数据。如果没有,那么只需构建一个扫描仪对象,将“}”作为分隔符并使用扫描仪进行读取。那就可以了。

#5


0  

I think the best ways to tackle this is to first create a properly formatted json file containing well separated json objects within them. In my case I added ',' to the events which was pushed into the firehose. Then After a file is saved in s3, all the files will contain json object separated by some delimitter(comma- in our case). Another thing that must be added are '[' and ']' at the beginning and end of the file. Then you have a proper json file containing multiple json objects. Parsing them will be possible now.

我认为解决这个问题的最佳方法是首先创建一个格式正确的json文件,其中包含分离良好的json对象。在我的情况下,我将','添加到被推入firehose的事件中。然后在s3中保存文件后,所有文件都将包含由某个分隔符分隔的json对象(在我们的例子中为逗号)。另一件必须添加的是文件开头和结尾的'['和']'。然后你有一个包含多个json对象的正确的json文件。现在可以解析它们。

#1


3  

I've had the same issue.

我有同样的问题。

It would have been better if AWS allowed us to set a delimiter but we can do it on our own.

如果AWS允许我们设置分隔符会更好,但我们可以自己完成。

In my use case, I've been listening on a stream of tweets, and once receiving a new tweet I immediately put it to Firehose.

在我的用例中,我一直在听一些推文,一旦收到一条新的推文,我立即把它放到了Firehose。

This, of course, resulted in a 1-line file which could not be parsed.

当然,这导致了一个无法解析的1行文件。

So, to solve this, I have concatenated the tweet's JSON with a \n. This, in turn, let me use some packages that can output lines when reading stream contents, and parse the file easily.

因此,为了解决这个问题,我将推文的JSON与\ n连接起来。反过来,这让我使用一些在读取流内容时可以输出行的包,并轻松地解析文件。

Hope this helps you.

希望这对你有所帮助。

#2


2  

I also had the same problem, here is how I solved.

我也有同样的问题,这是我如何解决的。

  1. replace "}{" with "}\n{"
  2. 将“} {”替换为}} \ n {“
  3. line split by "\n".

    由“\ n”分割的行。

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

A nested json object has several "}"s, so split line by "}" doesn't solve the problem.

嵌套的json对象有几个“}”,因此用“}”分割线并不能解决问题。

#3


1  

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

令人难以置信的是,Amazon Firehose以这种方式将JSON消息转储到S3,并且不允许您设置分隔符或任何内容。

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

最终,我发现处理问题的技巧是使用JSON raw_decode方法处理文本文件

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

这将允许您读取一堆连接的JSON记录,它们之间没有任何分隔符。

Python code:

Python代码:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1

#4


0  

If there's a way to change the way data is written, please separate all the records by a line. That way you can read the data simply, line by line. If not, then simply build a scanner object which takes "}" as a delimiter and use the scanner to read. That would do the job.

如果有办法改变数据的写入方式,请将所有记录分开。这样,您可以简单地逐行读取数据。如果没有,那么只需构建一个扫描仪对象,将“}”作为分隔符并使用扫描仪进行读取。那就可以了。

#5


0  

I think the best ways to tackle this is to first create a properly formatted json file containing well separated json objects within them. In my case I added ',' to the events which was pushed into the firehose. Then After a file is saved in s3, all the files will contain json object separated by some delimitter(comma- in our case). Another thing that must be added are '[' and ']' at the beginning and end of the file. Then you have a proper json file containing multiple json objects. Parsing them will be possible now.

我认为解决这个问题的最佳方法是首先创建一个格式正确的json文件,其中包含分离良好的json对象。在我的情况下,我将','添加到被推入firehose的事件中。然后在s3中保存文件后,所有文件都将包含由某个分隔符分隔的json对象(在我们的例子中为逗号)。另一件必须添加的是文件开头和结尾的'['和']'。然后你有一个包含多个json对象的正确的json文件。现在可以解析它们。