版权声明:本文为博主原创文章,未经博主允许不得转载!!
欢迎访问:/qq_21439395/article/details/80710180
交流QQ: 824203453
欢迎关注B站,收看更多视频内容:/383891492
SparkSql 版本为 2.2.0
sparksql解析json格式的数据源
首先,获取操作sparkSql的SparkSession操作实例:
val session = ()
.master("local[*]")
.appName()
.getOrCreate()
// 导入隐式转换和functions
import ._
import ._
import ._
1.1. 根据json数据,创建Dataset
指定嵌套json格式的数据:
val opds = (
// 三引号中,编写json字符串
List("""{"name":"xx","address":{"city":"bj"}}""")
)
val otherPeople = (opds)
()
schema如下:
1.2. 读取普通json文件
json数据格式为:
val json1: DataFrame =("")
()
获取schema为:
1.3. 读取嵌套json文件
数据格式为:
val json: DataFrame = ("")
()
schema信息如下:
操作嵌套json的方式:
//DSL 语法的查询
("").show()
// 使用sql语法查询
("v_tmp")
("select from v_tmp").show()
1.4. 操作嵌套json数组-explode函数
数据格式为:
读取json数组的数据:
val json3 = ("")
()
()
schema信息为:
示例数据为:
这种结果的展示数据,查询非常不方便。
解决方案:
利用explode函数,把数组数据进行展开。
// 导入sparksql中的函数
import ._
// 利用explode函数 把json数组进行展开, 数组中的每一条数据,都是一条记录
val explodeDF = ($"name", explode($"myScore")).toDF("name", "score")
()
// 再次进行查询 类似于普通的数据库表 默认schema: score1, 可以通过as 指定schema名称
val json3Res: DataFrame = ($"name", $"score.score1",
$"score.score2" as "score222")
// 创建临时视图
("v_jsonArray")
// 写sql,分别求平均值
("select name,avg(score1),avg(score222) from v_jsonArray group by name")
.show()
explodeDF的schema信息为:
最终,查询结果为:
1.5. get_json_object() 方法
get_json_object() 方法 从一个json 字符串中根据指定的json路径抽取一个json 对象
根据指定数据,获取一个DataFrame
val json4 = Seq(
(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cn": "United States"}"""))
.toDF("id", "json")
()
schema信息为:
使用get_json_object 从json字符串中提取列:
// 利用get_json_object 从 json字符串中,提取列
val jsDF = ($"id",
get_json_object($"json", "$.device_type").alias("device_type"),
get_json_object($"json", "$.ip").alias("ip"),
get_json_object($"json", "$.cn").alias("cn"))
()
schema信息为:
更多复杂操作:可参考:/developer/article/1032532
版权声明:本文为博主原创文章,未经博主允许不得转载!!
欢迎访问:/qq_21439395/article/details/80710180
交流QQ: 824203453
欢迎关注B站,收看更多视频内容:/383891492