spark psv文件到数据帧转换错误

时间:2021-12-14 16:55:34

The spark version I am using is 2.0+ All I am trying to do is just read a pipe (|) separated values file into a Dataframe and then run SQL like queries. I have tried comma delimited file too. I am interacting with spark using spark-shell I have downloaded spark-csv jar and ran spark-shell with --packages option to import it into my session. It was imported successfully.

我使用的spark版本是2.0+所有我想要做的只是将一个管道(|)分隔值文件读入Dataframe,然后像查询一样运行SQL。我也尝试过逗号分隔文件。我正在使用spark-shell与spark进行交互我已经下载了spark-csv jar并使用--packages选项运行spark-shell将其导入到我的会话中。它已成功导入。

import spark.implicits._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
val session = 
SparkSession.builder().appName("test").master("local").getOrCreate()
    val df = session.read.format("com.databricks.spark.csv").option("header", "true").option("mode", "DROPMALFORMED").load("testdata.txt");

WARN Hive: Failed to access metastore. This class should not accessed in runtime.
apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hi
 at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236)
 at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
 at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)
 at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
 at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:171)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
 at java.lang.reflect.Constructor.newInstance(Unknown Source)
 at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
 at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
 at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
 at org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)

1 个解决方案

#1


0  

You can directly load psv file into RDD then split it as per your requirement and then you can apply schema on it. This is the java example.

您可以直接将psv文件加载到RDD中,然后根据您的要求将其拆分,然后您可以在其上应用模式。这是java的例子。

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

public class RDDtoDF_Update {
    public static void main(final  String[] args) throws Exception {

        SparkSession spark = SparkSession
                .builder()
                .appName("RDDtoDF_Updated")
                .master("local[2]")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        StructType schema = DataTypes
                .createStructType(new StructField[] {
                        DataTypes.createStructField("eid", DataTypes.IntegerType, false),
                        DataTypes.createStructField("eName", DataTypes.StringType, false),
                        DataTypes.createStructField("eAge", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eDept", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eSal", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eGen", DataTypes.StringType,true)});


        String filepath = "F:/Hadoop/Data/EMPData.txt";
        JavaRDD<Row> empRDD = spark.read()
                .textFile(filepath)
                .javaRDD()
                .map(line -> line.split("\t"))
                .map(r -> RowFactory.create(Integer.parseInt(r[0]), r[1].trim(),Integer.parseInt(r[2]),
                        Integer.parseInt(r[3]),Integer.parseInt(r[4]),r[5].trim() ));


        Dataset<Row> empDF = spark.createDataFrame(empRDD, schema);
        empDF.groupBy("edept").max("esal").show();

Thanks.

#1


0  

You can directly load psv file into RDD then split it as per your requirement and then you can apply schema on it. This is the java example.

您可以直接将psv文件加载到RDD中,然后根据您的要求将其拆分,然后您可以在其上应用模式。这是java的例子。

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

public class RDDtoDF_Update {
    public static void main(final  String[] args) throws Exception {

        SparkSession spark = SparkSession
                .builder()
                .appName("RDDtoDF_Updated")
                .master("local[2]")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        StructType schema = DataTypes
                .createStructType(new StructField[] {
                        DataTypes.createStructField("eid", DataTypes.IntegerType, false),
                        DataTypes.createStructField("eName", DataTypes.StringType, false),
                        DataTypes.createStructField("eAge", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eDept", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eSal", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eGen", DataTypes.StringType,true)});


        String filepath = "F:/Hadoop/Data/EMPData.txt";
        JavaRDD<Row> empRDD = spark.read()
                .textFile(filepath)
                .javaRDD()
                .map(line -> line.split("\t"))
                .map(r -> RowFactory.create(Integer.parseInt(r[0]), r[1].trim(),Integer.parseInt(r[2]),
                        Integer.parseInt(r[3]),Integer.parseInt(r[4]),r[5].trim() ));


        Dataset<Row> empDF = spark.createDataFrame(empRDD, schema);
        empDF.groupBy("edept").max("esal").show();

Thanks.