I need to modify the data to give input to CEP system, my current data looks like below
我需要修改数据给CEP系统输入,我现在的数据如下
val rdd = {"var":"system-ready","value":0.0,"objectID":"2018","partnumber":2,"t":"2017-08-25 11:27:39.000"}
I need output like
我需要输出像
t = "2017-08-25 11:27:39.000
Check = { var = "system-ready",value = 0.0, objectID = "2018", partnumber = 2 }
I have to write RDD map operations to achieve this if anybody suggests better option welcome. colcount is the number of columns.
我必须编写RDD地图操作来实现这一点,如果有人建议更好的选项欢迎。colcount是列数。
rdd.map(x => x.split("\":").mkString("\" ="))
.map((f => (f.dropRight(1).split(",").last.toString, f.drop(1).split(",").toSeq.take(colCount-1).toString)))
.map(f => (f._1, f._2.replace("WrappedArray(", "Check = {")))
.map(f => (f._1.drop(0).replace("\"t\"", "t"), f._2.dropRight(1).replace("(", "{"))) /
.map(f => f.toString().split(",C").mkString("\nC").replace(")", "}").drop(0).replace("(", "")) // replacing , with \n, droping (
.map(f => f.replace("\" =\"", "=\"").replace("\", \"", "\",").replace("\" =", "=").replace(", \"", ",").replace("{\"", "{"))
1 个解决方案
#1
2
Scala's JSON parser seems to be a good choice for this problem:
对于这个问题,Scala的JSON解析器似乎是一个不错的选择:
import scala.util.parsing.json
rdd.map( x => {
JSON.parseFull(x).get.asInstanceOf[Map[String,String]]
})
This will result in an RDD[Map[String, String]]
. You can then access the t
field from the JSON, for example, using:
这将导致RDD[Map[String, String]]。然后您可以从JSON中访问t字段,例如,使用:
.map(dict => "t = "+dict("t"))
#1
2
Scala's JSON parser seems to be a good choice for this problem:
对于这个问题,Scala的JSON解析器似乎是一个不错的选择:
import scala.util.parsing.json
rdd.map( x => {
JSON.parseFull(x).get.asInstanceOf[Map[String,String]]
})
This will result in an RDD[Map[String, String]]
. You can then access the t
field from the JSON, for example, using:
这将导致RDD[Map[String, String]]。然后您可以从JSON中访问t字段,例如,使用:
.map(dict => "t = "+dict("t"))