将Spark DataFrame的内容保存为单个CSV文件

时间:2021-05-24 16:52:31

Say I have a Spark DataFrame which I want to save as CSV file. After Spark 2.0.0 , DataFrameWriter class directly supports saving it as a CSV file.

假设我有一个Spark DataFrame,我想将其保存为CSV文件。在Spark 2.0.0之后,DataFrameWriter类直接支持将其保存为CSV文件。

The default behavior is to save the output in multiple part-*.csv files inside the path provided.

默认行为是将输出保存在提供的路径中的多个部分 - * .csv文件中。

How would I save a DF with :

如何保存DF:

  1. Path mapping to the exact file name instead of folder
  2. 路径映射到确切的文件名而不是文件夹
  3. Header available in first line
  4. 标题在第一行可用
  5. Save as a single file instead of multiple files.
  6. 保存为单个文件而不是多个文件。

One way to deal with it, is to coalesce the DF and then save the file.

处理它的一种方法是合并DF然后保存文件。

df.coalesce(1).write.option("header", "true").csv("sample_file.csv")

However this has disadvantage in collecting it on Master machine and needs to have a master with enough memory.

然而,这在主机上收集它并且需要拥有足够内存的主机时有缺点。

Is it possible to write a single CSV file without using coalesce ? If not, is there a efficient way than the above code ?

是否可以在不使用合并的情况下编写单个CSV文件?如果没有,是否有比上述代码更有效的方法?

8 个解决方案

#1


2  

Use: df.toPandas().to_csv("sample_file.csv", header=True)

使用:df.toPandas()。to_csv(“sample_file.csv”,header = True)

See documentation for details: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.toPandas

有关详细信息,请参阅文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight = datata #pyspark.sql.DataFrame.toPandas

#2


1  

This solution is based on a Shell Script and is not parallelized, but is still very fast, especially on SSDs. It uses cat and output redirection on Unix systems. Suppose that the CSV directory containing partitions is located on /my/csv/dir and that the output file is /my/csv/output.csv:

此解决方案基于Shell脚本,并未进行并行化,但仍然非常快,尤其是在SSD上。它在Unix系统上使用cat和输出重定向。假设包含分区的CSV目录位于/ my / csv / dir,输出文件是/my/csv/output.csv:

#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
done
echo "Done"

It will remove each partition after appending it to the final CSV in order to free space.

它会在将每个分区附加到最终的CSV后删除每个分区以释放空间。

"col1,col2,col3" is the CSV header (here we have three columns of name col1, col2 and col3). You must tell Spark to don't put the header in each partition (this is accomplished with .option("header", "false") because the Shell Script will do it.

“col1,col2,col3”是CSV标题(这里我们有三列名称col1,col2和col3)。你必须告诉Spark不要在每个分区中放置标题(这是通过.option(“header”,“false”)完成的,因为Shell脚本会这样做。

#3


1  

For those still wanting to do this here's how I got it done using spark 2.1 in scala with some java.nio.file help.

对于那些仍然想要这样做的人来说,我是如何使用带有一些java.nio.file帮助的scala中的spark 2.1来完成它的。

Based on https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

基于https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

    val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
    val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

    import scala.collection.JavaConversions._

    // write csv into temp directory which contains the additional spark output files
    // could use Files.createTempDirectory instead
    val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
    df.coalesce(1)
        .write.format("com.databricks.spark.csv")
        .option("header", "true")
        .save(tempDir.toAbsolutePath.toString)

    // find the actual csv file
    val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
        val fname = p.getFileName.toString
        fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
    }.get

    // move to desired final path
    Files.move(tmpCsvFile, file)

    // delete temp directory
    Files.walk(tempDir)
        .sorted(java.util.Comparator.reverseOrder())
        .iterator().toSeq
        .foreach(Files.delete(_))

#4


1  

This is how distributed computing work! Multiple files inside a directory is exactly how distributed computing works, this is not a problem at all since all software can handle it.

这就是分布式计算的工作原理!目录中的多个文件正是分布式计算的工作原理,这根本不是问题,因为所有软件都可以处理它。

Your question should be "how is it possible to download a CSV composed of multiple files?" -> there are already lof of solutions in SO.

您的问题应该是“如何下载由多个文件组成的CSV?” - > SO中已经有很多解决方案了。

Another approach could be to use Spark as a JDBC source (with the awesome Spark Thrift server), write a SQL query and transform the result to CSV.

另一种方法可能是使用Spark作为JDBC源(使用令人敬畏的Spark Thrift服务器),编写SQL查询并将结果转换为CSV。

In order to prevent OOM in the driver (since the driver will get ALL the data), use incremental collect (spark.sql.thriftServer.incrementalCollect=true), more info at http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/.

为了防止驱动程序中的OOM(因为驱动程序将获取所有数据),请使用增量收集(spark.sql.thriftServer.incrementalCollect = true),更多信息,请访问http://www.russellspitzer.com/2017/05 / 19 /火花-SQL Thriftserver /。


Small recap about Spark "data partition" concept:

关于Spark“数据分区”概念的小概述:

INPUT (X PARTITIONs) -> COMPUTING (Y PARTITIONs) -> OUTPUT (Z PARTITIONs)

INPUT(X PARTITIONs) - >计算(Y PARTITIONs) - >输出(Z分区)

Between "stages", data can be transferred between partitions, this is the "shuffle". You want "Z" = 1, but with Y > 1, without shuffle? this is impossible.

在“阶段”之间,数据可以在分区之间传输,这就是“洗牌”。你想要“Z”= 1,但Y> 1,没有随机播放?这是不可能的。

#5


1  

Just solved this myself using pyspark with dbutils to get the .csv and rename to the wanted filename.

刚刚使用pyspark和dbutils来解决这个问题,以获得.csv并重命名为所需的文件名。

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder'
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

This answer can be improved by not using [-1], but the .csv seems to always be last in the folder. Simple and fast solution if you only work on smaller files and can use repartition(1) or coalesce(1).

不使用[-1]可以改善这个答案,但.csv似乎永远是文件夹中的最后一个。如果您只处理较小的文件并且可以使用重新分区(1)或合并(1),则可以使用简单快速的解决方案。

#6


1  

The following scala method works in local or client mode, and writes the df to a single csv of the chosen name. It requires that the df fit into memory, otherwise collect() will blow up.

以下scala方法在本地或客户端模式下工作,并将df写入所选名称的单个csv。它要求df适合内存,否则collect()会爆炸。

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    }
    
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    
    if (fs.exists(tempWritePath)) {
    
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
      assert(x)
    }
    
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
    
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
        bw.write(rowString)
      }
    
      bw.close()
      writeStream.close()
    
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
    
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      }
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {
      System.exit(-1)
    }
}

#7


0  

The FileUtil.copyMerge() from the Hadoop API should solve your problem.

Hadoop API中的FileUtil.copyMerge()应该可以解决您的问题。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

See Write single CSV file using spark-csv

请参阅使用spark-csv编写单个CSV文件

#8


0  

df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")

#1


2  

Use: df.toPandas().to_csv("sample_file.csv", header=True)

使用:df.toPandas()。to_csv(“sample_file.csv”,header = True)

See documentation for details: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.toPandas

有关详细信息,请参阅文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight = datata #pyspark.sql.DataFrame.toPandas

#2


1  

This solution is based on a Shell Script and is not parallelized, but is still very fast, especially on SSDs. It uses cat and output redirection on Unix systems. Suppose that the CSV directory containing partitions is located on /my/csv/dir and that the output file is /my/csv/output.csv:

此解决方案基于Shell脚本,并未进行并行化,但仍然非常快,尤其是在SSD上。它在Unix系统上使用cat和输出重定向。假设包含分区的CSV目录位于/ my / csv / dir,输出文件是/my/csv/output.csv:

#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
done
echo "Done"

It will remove each partition after appending it to the final CSV in order to free space.

它会在将每个分区附加到最终的CSV后删除每个分区以释放空间。

"col1,col2,col3" is the CSV header (here we have three columns of name col1, col2 and col3). You must tell Spark to don't put the header in each partition (this is accomplished with .option("header", "false") because the Shell Script will do it.

“col1,col2,col3”是CSV标题(这里我们有三列名称col1,col2和col3)。你必须告诉Spark不要在每个分区中放置标题(这是通过.option(“header”,“false”)完成的,因为Shell脚本会这样做。

#3


1  

For those still wanting to do this here's how I got it done using spark 2.1 in scala with some java.nio.file help.

对于那些仍然想要这样做的人来说,我是如何使用带有一些java.nio.file帮助的scala中的spark 2.1来完成它的。

Based on https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

基于https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

    val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
    val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

    import scala.collection.JavaConversions._

    // write csv into temp directory which contains the additional spark output files
    // could use Files.createTempDirectory instead
    val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
    df.coalesce(1)
        .write.format("com.databricks.spark.csv")
        .option("header", "true")
        .save(tempDir.toAbsolutePath.toString)

    // find the actual csv file
    val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
        val fname = p.getFileName.toString
        fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
    }.get

    // move to desired final path
    Files.move(tmpCsvFile, file)

    // delete temp directory
    Files.walk(tempDir)
        .sorted(java.util.Comparator.reverseOrder())
        .iterator().toSeq
        .foreach(Files.delete(_))

#4


1  

This is how distributed computing work! Multiple files inside a directory is exactly how distributed computing works, this is not a problem at all since all software can handle it.

这就是分布式计算的工作原理!目录中的多个文件正是分布式计算的工作原理,这根本不是问题,因为所有软件都可以处理它。

Your question should be "how is it possible to download a CSV composed of multiple files?" -> there are already lof of solutions in SO.

您的问题应该是“如何下载由多个文件组成的CSV?” - > SO中已经有很多解决方案了。

Another approach could be to use Spark as a JDBC source (with the awesome Spark Thrift server), write a SQL query and transform the result to CSV.

另一种方法可能是使用Spark作为JDBC源(使用令人敬畏的Spark Thrift服务器),编写SQL查询并将结果转换为CSV。

In order to prevent OOM in the driver (since the driver will get ALL the data), use incremental collect (spark.sql.thriftServer.incrementalCollect=true), more info at http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/.

为了防止驱动程序中的OOM(因为驱动程序将获取所有数据),请使用增量收集(spark.sql.thriftServer.incrementalCollect = true),更多信息,请访问http://www.russellspitzer.com/2017/05 / 19 /火花-SQL Thriftserver /。


Small recap about Spark "data partition" concept:

关于Spark“数据分区”概念的小概述:

INPUT (X PARTITIONs) -> COMPUTING (Y PARTITIONs) -> OUTPUT (Z PARTITIONs)

INPUT(X PARTITIONs) - >计算(Y PARTITIONs) - >输出(Z分区)

Between "stages", data can be transferred between partitions, this is the "shuffle". You want "Z" = 1, but with Y > 1, without shuffle? this is impossible.

在“阶段”之间,数据可以在分区之间传输,这就是“洗牌”。你想要“Z”= 1,但Y> 1,没有随机播放?这是不可能的。

#5


1  

Just solved this myself using pyspark with dbutils to get the .csv and rename to the wanted filename.

刚刚使用pyspark和dbutils来解决这个问题,以获得.csv并重命名为所需的文件名。

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder'
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

This answer can be improved by not using [-1], but the .csv seems to always be last in the folder. Simple and fast solution if you only work on smaller files and can use repartition(1) or coalesce(1).

不使用[-1]可以改善这个答案,但.csv似乎永远是文件夹中的最后一个。如果您只处理较小的文件并且可以使用重新分区(1)或合并(1),则可以使用简单快速的解决方案。

#6


1  

The following scala method works in local or client mode, and writes the df to a single csv of the chosen name. It requires that the df fit into memory, otherwise collect() will blow up.

以下scala方法在本地或客户端模式下工作,并将df写入所选名称的单个csv。它要求df适合内存,否则collect()会爆炸。

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    }
    
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    
    if (fs.exists(tempWritePath)) {
    
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
      assert(x)
    }
    
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
    
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
        bw.write(rowString)
      }
    
      bw.close()
      writeStream.close()
    
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
    
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      }
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {
      System.exit(-1)
    }
}

#7


0  

The FileUtil.copyMerge() from the Hadoop API should solve your problem.

Hadoop API中的FileUtil.copyMerge()应该可以解决您的问题。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

See Write single CSV file using spark-csv

请参阅使用spark-csv编写单个CSV文件

#8


0  

df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")