将Json的数据集列解析为Dataset

时间:2021-01-24 23:12:25

Having Dataset<Row> of single column of json strings:

拥有单列json字符串的数据集 :

+--------------------+
|               value|
+--------------------+
|{"Context":"00AA0...|
+--------------------+

Json sample:

{"Context":"00AA00AA","MessageType":"1010","Module":"1200"}

How can I most efficiently get Dataset<Row> that looks like this:

我怎样才能最有效地获得如下所示的数据集 :

+--------+-----------+------+
| Context|MessageType|Module|
+--------+-----------+------+
|00AA00AA|       1010|  1200|
+--------+-----------+------+

I'm processing those data in stream, i know that spark can do this by him self when i'm reading it from a file:

我正在流处理这些数据,我知道当我从文件中读取它时,spark可以自己做这件事:

spark
.readStream()
.schema(MyPojo.getSchema())
.json("src/myinput")

but now i'm reading data from kafka and it gives me data in another form. I know that i can use some parsers like Gson, but i would like to let spark to do it for me.

但现在我正在读取kafka的数据,它以另一种形式提供数据。我知道我可以使用像Gson这样的解析器,但我想让火花为我做。

1 个解决方案

#1


1  

Try this sample.

试试这个样本。

public class SparkJSONValueDataset {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkJSONValueDataset")
                .config("spark.sql.warehouse.dir", "/file:C:/temp")
                .master("local")
                .getOrCreate();

        //Prepare data Dataset<Row>
        List<String> data = Arrays.asList("{\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"}");
        Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value");
        df.show();

        //convert to Dataset<String> and Read
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = spark.read().json(df1.javaRDD());
        df2.show();
        spark.stop();
    }
 }

#1


1  

Try this sample.

试试这个样本。

public class SparkJSONValueDataset {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkJSONValueDataset")
                .config("spark.sql.warehouse.dir", "/file:C:/temp")
                .master("local")
                .getOrCreate();

        //Prepare data Dataset<Row>
        List<String> data = Arrays.asList("{\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"}");
        Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value");
        df.show();

        //convert to Dataset<String> and Read
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = spark.read().json(df1.javaRDD());
        df2.show();
        spark.stop();
    }
 }