R jsonlite在加载前过滤记录。

时间:2022-04-27 20:23:13

I have many large json files (3G each) which I want to load efficiently to a strong RServer machine, however loading all record from all files will be redundant and exhausting (50M records multiply by 40). So I thought using jsonlite package because I heard it's efficient. The thing is that I do not need all records but only a subset of records where an embedded element ("source") have an existing field by the name "duration". This is currently my code:

我有很多大的json文件(每个都是3G),我想要有效地将它们加载到强大的RServer机器上,但是从所有文件加载所有记录将是多余的和令人筋疲力尽的(50M记录乘以40)。所以我想用jsonlite包,因为我听说它很有效。问题是,我不需要所有的记录,只需要一个记录子集,其中嵌入的元素(“source”)有一个名为“duration”的现有字段。这是我目前的代码:

library(jsonlite)
library(curl)
url <- "https://s3-eu-west-1.amazonaws.com/es-export-data/logstash-2016.02.15.json"
test <- stream_in(url(url))

it's only 1 extract of many. now, jsonlite package have a 'flatten' function to flatten embedded elements to create 1 wide flatten data frame. Then I could filter it. However, it seems not efficient. I think that pre-filter it when the data is loaded is much more efficient. here a dput of one record:

它只是许多的一个提取。现在,jsonlite包有一个“flatten”函数,可以将嵌入的元素平坦化,从而创建一个宽的flatten数据框架。然后我可以过滤它。然而,它似乎没有效率。我认为在加载数据时进行预过滤会更有效。这里有一个记录:

> dput(test_data)
"{\"_index\":\"logstash-2016.02.15\",\"_type\":\"productLogs\",\"_id\":\"AVLitaOtp4oNFTVKv9tZ\",\"_score\":0,\"_source\":{\"EntryType\":\"Event\",\"queryType\":\"clientQuery\",\"status\":\"success\",\"cubeName\":\"Hourly Targets Operations by Model\",\"cubeID\":\"aHourlyIAAaTargetsIAAaOperationsIAAabyIAAaModel\",\"startQueryTimeStamp\":\"2016-02-15T02:14:23+00:00\",\"endQueryTimeStamp\":\"2016-02-15T02:14:23+00:00\",\"queryResponeLengthBytes\":0,\"duration\":0,\"concurrentQuery\":14,\"action\":\"finishQueryJaql\",\"@timestamp\":\"2016-02-15T02:14:23.253Z\",\"appTypeName\":\"dataserver\",\"@version\":\"1\",\"host\":\"VDED12270\",\"type\":\"productLogs\",\"tags\":[],\"send_type\":\"PullGen1\",\"sisenseuid\":\"janos.kopecek@regenersis.com\",\"sisenseOwnerid\":\"janos.kopecek@regenersis.com\",\"sisenseVersion\":\" 5.8.1.29\",\"sisenseMonitoringVersion\":\"3.0.0.6\",\"inputType\":\"sqs\",\"token\":\"fTdyoSwaFZTalBlnFIlTsqvvzfKZVGle\",\"logstash_host\":\"vpc_cluster_1\"}}"
> 

any help appreciated

任何帮助表示赞赏

1 个解决方案

#1


0  

You have to add an handler function and specify which elements you need:

您必须添加一个处理程序函数,并指定需要哪些元素:

stream_in(url(url) , handler = function(x) x$"_source$duration")

#1


0  

You have to add an handler function and specify which elements you need:

您必须添加一个处理程序函数,并指定需要哪些元素:

stream_in(url(url) , handler = function(x) x$"_source$duration")