使用Scala在Spark DataFrame中重用JSON中的Schema

时间:2022-07-20 23:11:44

I have some JSON data like this:

我有一些像这样的JSON数据:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]}

I can read it in:

我可以阅读:

val df = sqlContext.read.json("./data/full.json")

I can print the schema with df.printSchema

我可以使用df.printSchema打印模式

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

I can show the data df.show(10,false)

我可以显示数据df.show(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|comments             |createHour           |gid|replies            |revisions                                                |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|[]                   |2014-10-20 01:00:00.0|111|[]                 |[[2014-11-20 01:40:37.0,2], [2014-11-20 01:40:40.0,4]]   |
|[]                   |2014-12-20 01:00:00.0|222|[]                 |[[2014-11-20 01:39:31.0,2], [2014-11-20 01:39:34.0,4]]   |
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25], [2014-11-21 00:47:10.0,110]]|
|[]                   |2015-09-20 23:00:00.0|444|[]                 |[[2014-11-20 23:23:47.0,2]]                              |
|[]                   |2016-01-21 01:00:00.0|555|[]                 |[[2014-11-21 01:01:58.0,135]]                            |
|[]                   |2016-04-23 19:00:00.0|666|[]                 |[[2014-11-23 19:50:51.0,136]]                            |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+

I can print / read the schema val dfSc = df.schema as:

我可以打印/读取架构val dfSc = df.schema:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), StructField(createHour,StringType,true), StructField(gid,StringType,true), StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true))

I can print this out nicer:

我可以更好地打印出来:

println(df.schema.fields.mkString(",\n"))
StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
StructField(createHour,StringType,true),
StructField(gid,StringType,true),
StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

Now if I read in the same file without the comments and replies row, with val df2 = sqlContext.read. json("./data/partialRevOnly.json") simply deleting those rows, I get something like this with printSchema:

现在,如果我在同一个文件中读取没有注释和回复的行,则使用val df2 = sqlContext.read。 json(“./ data / partialRevOnly.json”)只是删除那些行,我用printSchema得到这样的东西:

root
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

I don't like that, so I use:

我不喜欢那样,所以我使用:

val df3 = sqlContext.read.
  schema(dfSc).
  json("./data/partialRevOnly.json")

where the original schema was dfSc. So now I get exactly the schema I had before with the removed data:

其中原始架构是dfSc。所以现在我完全获得了之前删除数据的模式:

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

This is perfect ... well almost. I would like to assign this schema to a variable similar to this:

这很完美......差不多。我想将此架构分配给类似于此的变量:

val textSc =  StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
    StructField(createHour,StringType,true),
    StructField(gid,StringType,true),
    StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
    StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

OK - This won't work due to double quotes, and 'some other structural' stuff, so try this (with error):

好的 - 由于双引号和“其他一些结构”的东西,这不起作用,所以试试这个(有错误):

import org.apache.spark.sql.types._

val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),
    StructField("createHour",StringType,true),
    StructField("gid",StringType,true),
    StructField("replies",ArrayType(StructType(StructField("content",StringType,true), StructField("repId",StringType,true)),true),true),
    StructField("revisions",ArrayType(StructType(StructField("modDate",StringType,true), StructField("revId",StringType,true)),true),true)
))

Name: Compile Error
Message: <console>:78: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),

... Without this error (that I cannot figure a quick way around), I would like to then use textSc in place of dfSc to read in the JSON data with an imposed schema.

...没有这个错误(我无法快速解决),我想使用textSc代替dfSc来读取带有强制模式的JSON数据。

I cannot find a '1-to-1 match' way of getting (via println or ...) the schema with acceptable syntax (sort of like above). I suppose some coding can be done with case matching to iron out the double quotes. However, I'm still unclear what rules are required to get the exact schema out of the test fixture that I can simply re-use in my recurring production (versus test fixture) code. Is there a way to get this schema to print exactly as I would code it?

我无法找到一种“一对一匹配”的方式(通过println或...)获得具有可接受语法的模式(如上所述)。我想一些编码可以通过大小写匹配来完成双引号。但是,我仍然不清楚需要哪些规则才能从测试夹具中获取确切的架构,我可以在我的定期生产(与测试夹具)代码中重复使用。有没有办法让这个架构完全像我编码一样打印?

Note: This includes double quotes and all the proper StructField/Types and so forth to be code-compatible drop in.

注意:这包括双引号和所有正确的StructField / Types等等,以便与代码兼容。

As a sidebar, I thought about saving a fully-formed golden JSON file to use at the start of the Spark job, but I would like to eventually use date fields and other more concise types instead of strings at the applicable structural locations.

作为侧边栏,我考虑保存一个完整格式的黄金JSON文件,以便在Spark作业开始时使用,但我希望最终在适用的结构位置使用日期字段和其他更简洁的类型而不是字符串。

How can I get the dataFrame information coming out of my test harness (using a fully-formed JSON input row with comments and replies) to a point where I can drop the schema as source-code into production code Scala Spark job?

如何从我的测试工具中获取dataFrame信息(使用带有注释和回复的完整形式的JSON输入行)到我可以将模式作为源代码放入生产代码Scala Spark工作的位置?

Note: The best answer is some coding means, but an explanation so I can trudge, plod, toil, wade, plow and slog thru the coding is helpful too. :)

注意:最好的答案是一些编码方式,但是一个解释,所以我可以通过编码跋涉,plod,toil,wade,犁和slog也是有帮助的。 :)

2 个解决方案

#1


1  

Well, the error message should tell you everything you have to know here - StructType expects a sequence of fields as an argument. So in your case schema should look like this:

好吧,错误消息应该告诉你在这里你必须知道的一切 - StructType期望一系列字段作为参数。因此,在您的情况下,架构应如下所示:

StructType(Seq(
  StructField("comments", ArrayType(StructType(Seq(       // <- Seq[StructField]
    StructField("comId", StringType, true),
    StructField("content", StringType, true))), true), true), 
  StructField("createHour", StringType, true),
  StructField("gid", StringType, true),
  StructField("replies", ArrayType(StructType(Seq(        // <- Seq[StructField]
    StructField("content", StringType, true),
    StructField("repId", StringType, true))), true), true),
  StructField("revisions", ArrayType(StructType(Seq(      // <- Seq[StructField]
    StructField("modDate", StringType, true),
    StructField("revId", StringType, true))),true), true)))

#2


5  

I recently ran into this. I'm using Spark 2.0.2 so I don't know if this solution works with earlier versions.

我最近碰到了这个。我正在使用Spark 2.0.2,所以我不知道这个解决方案是否适用于早期版本。

import scala.util.Try
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.types.{DataType, StructType}

/** Produce a Schema string from a Dataset */
def serializeSchema(ds: Dataset[_]): String = ds.schema.json

/** Produce a StructType schema object from a JSON string */
def deserializeSchema(json: String): StructType = {
    Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match {
        case t: StructType => t
        case _ => throw new RuntimeException(s"Failed parsing StructType: $json")
    }
}

Note that the "deserialize" function I just copied from a private function in the Spark StructType object. I don't know how well it will be supported across versions.

请注意,我刚从Spark StructType对象中的私有函数复制的“反序列化”函数。我不知道它在不同版本中的支持程度如何。

#1


1  

Well, the error message should tell you everything you have to know here - StructType expects a sequence of fields as an argument. So in your case schema should look like this:

好吧,错误消息应该告诉你在这里你必须知道的一切 - StructType期望一系列字段作为参数。因此,在您的情况下,架构应如下所示:

StructType(Seq(
  StructField("comments", ArrayType(StructType(Seq(       // <- Seq[StructField]
    StructField("comId", StringType, true),
    StructField("content", StringType, true))), true), true), 
  StructField("createHour", StringType, true),
  StructField("gid", StringType, true),
  StructField("replies", ArrayType(StructType(Seq(        // <- Seq[StructField]
    StructField("content", StringType, true),
    StructField("repId", StringType, true))), true), true),
  StructField("revisions", ArrayType(StructType(Seq(      // <- Seq[StructField]
    StructField("modDate", StringType, true),
    StructField("revId", StringType, true))),true), true)))

#2


5  

I recently ran into this. I'm using Spark 2.0.2 so I don't know if this solution works with earlier versions.

我最近碰到了这个。我正在使用Spark 2.0.2,所以我不知道这个解决方案是否适用于早期版本。

import scala.util.Try
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.types.{DataType, StructType}

/** Produce a Schema string from a Dataset */
def serializeSchema(ds: Dataset[_]): String = ds.schema.json

/** Produce a StructType schema object from a JSON string */
def deserializeSchema(json: String): StructType = {
    Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match {
        case t: StructType => t
        case _ => throw new RuntimeException(s"Failed parsing StructType: $json")
    }
}

Note that the "deserialize" function I just copied from a private function in the Spark StructType object. I don't know how well it will be supported across versions.

请注意,我刚从Spark StructType对象中的私有函数复制的“反序列化”函数。我不知道它在不同版本中的支持程度如何。