如何跳过Spark中CSV文件的标题?

时间:2022-12-20 23:12:22

Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?

假设我提供了三个文件路径到Spark上下文来读取,每个文件在第一行中都有一个模式。我们如何从头文件中跳过架构线?

val rdd=sc.textFile("file1,file2,file3")

Now, how can we skip header lines from this rdd?

现在,我们如何跳过此rdd的标题行?

13 个解决方案

#1


51  

If there were just one header line in the first record, then the most efficient way to filter it out would be:

如果第一条记录中只有一个标题行,那么过滤它的最有效方法是:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

如果当然有许多文件中包含许多标题行,这没有用。事实上,你可以通过这种方式结合三个RDD。

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

您也可以编写一个只匹配可能是标题的行的过滤器。这很简单,但效率较低。

Python equivalent:

Python等价物:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)

#2


81  

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

#3


44  

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

在Spark 2.0中,CSV阅读器内置于Spark中,因此您可以轻松加载CSV文件,如下所示:

spark.read.option("header","true").csv("filePath")

#4


10  

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

从Spark 2.0开始,您可以使用SparkSession将其作为一个内容来完成:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

然后作为@SandeepPurohit说:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

我希望它能解决你的问题!

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

P.S:SparkSession是Spark 2.0中引入的新入口点,可以在spark_sql包下找到

#5


5  

You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0) and then union all the file RDDs.

您可以单独加载每个文件,使用file.zipWithIndex()过滤它们。过滤(_._ 2> 0)然后联合所有文件RDD。

If the number of files is too large, the union could throw a *Exeption.

如果文件数太大,联合可能会抛出*Exeption。

#6


5  

In PySpark you can use a dataframe and set header as True:

在PySpark中,您可以使用数据框并将标头设置为True:

df = spark.read.csv(dataPath, header=True)

#7


3  

Use the filter() method in PySpark by filtering out the first column name to remove the header:

通过过滤掉第一个列名来删除标题,在PySpark中使用filter()方法:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)

#8


0  

Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):

或者,您可以使用spark-csv软件包(或者在Spark 2.0中,这或多或少可以本地作为CSV使用)。请注意,这需要每个文件上的标头(如您所愿):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)

#9


0  

It's an option that you pass to the read() command:

这是一个传递给read()命令的选项:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")

#10


0  

Working in 2018 (Spark 2.3)

工作于2018年(Spark 2.3)

Python

蟒蛇

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Scala

斯卡拉

val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

val myDf = spark.read.option(“header”,“true”)。format(“csv”)。schema(myManualSchema).load(“maestraDestacados.csv”)

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

PD1:myManualSchema是我编写的预定义模式,您可以跳过该部分代码

#11


-1  

This should work fine

这应该工作正常

def dropHeader(data: RDD[String]): RDD[String] = {

     data.filter(r => r!=data.first)
 }

#12


-1  

//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}

#13


-1  

For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.

对于python开发人员。我用spark2.0测试过。假设您要删除前14行。

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn is df function. So below will not work in RDD style as used above.

withColumn是df函数。因此,下面将无法使用上面使用的RDD样式。

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

#1


51  

If there were just one header line in the first record, then the most efficient way to filter it out would be:

如果第一条记录中只有一个标题行,那么过滤它的最有效方法是:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.

如果当然有许多文件中包含许多标题行,这没有用。事实上,你可以通过这种方式结合三个RDD。

You could also just write a filter that matches only a line that could be a header. This is quite simple, but less efficient.

您也可以编写一个只匹配可能是标题的行的过滤器。这很简单,但效率较低。

Python equivalent:

Python等价物:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)

#2


81  

data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

#3


44  

In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:

在Spark 2.0中,CSV阅读器内置于Spark中,因此您可以轻松加载CSV文件,如下所示:

spark.read.option("header","true").csv("filePath")

#4


10  

From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:

从Spark 2.0开始,您可以使用SparkSession将其作为一个内容来完成:

val spark = SparkSession.builder.config(conf).getOrCreate()

and then as @SandeepPurohit said:

然后作为@SandeepPurohit说:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

I hope it solved your question !

我希望它能解决你的问题!

P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package

P.S:SparkSession是Spark 2.0中引入的新入口点,可以在spark_sql包下找到

#5


5  

You could load each file separately, filter them with file.zipWithIndex().filter(_._2 > 0) and then union all the file RDDs.

您可以单独加载每个文件,使用file.zipWithIndex()过滤它们。过滤(_._ 2> 0)然后联合所有文件RDD。

If the number of files is too large, the union could throw a *Exeption.

如果文件数太大,联合可能会抛出*Exeption。

#6


5  

In PySpark you can use a dataframe and set header as True:

在PySpark中,您可以使用数据框并将标头设置为True:

df = spark.read.csv(dataPath, header=True)

#7


3  

Use the filter() method in PySpark by filtering out the first column name to remove the header:

通过过滤掉第一个列名来删除标题,在PySpark中使用filter()方法:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)

#8


0  

Alternatively, you can use the spark-csv package (or in Spark 2.0 this is more or less available natively as CSV). Note that this expects the header on each file (as you desire):

或者,您可以使用spark-csv软件包(或者在Spark 2.0中,这或多或少可以本地作为CSV使用)。请注意,这需要每个文件上的标头(如您所愿):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)

#9


0  

It's an option that you pass to the read() command:

这是一个传递给read()命令的选项:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")

#10


0  

Working in 2018 (Spark 2.3)

工作于2018年(Spark 2.3)

Python

蟒蛇

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Scala

斯卡拉

val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

val myDf = spark.read.option(“header”,“true”)。format(“csv”)。schema(myManualSchema).load(“maestraDestacados.csv”)

PD1: myManualSchema is a predefined schema written by me, you could skip that part of code

PD1:myManualSchema是我编写的预定义模式,您可以跳过该部分代码

#11


-1  

This should work fine

这应该工作正常

def dropHeader(data: RDD[String]): RDD[String] = {

     data.filter(r => r!=data.first)
 }

#12


-1  

//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}

#13


-1  

For python developers. I have tested with spark2.0. Let's say you want to remove first 14 rows.

对于python开发人员。我用spark2.0测试过。假设您要删除前14行。

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn is df function. So below will not work in RDD style as used above.

withColumn是df函数。因此,下面将无法使用上面使用的RDD样式。

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)