如何在Spark中写入CSV

时间:2022-11-11 20:20:54

I'm trying to find an effective way of saving the result of my Spark Job as a csv file. I'm using Spark with Hadoop and so far all my files are saved as part-00000.

我正在尝试找到一种将我的Spark Job的结果保存为csv文件的有效方法。我正在使用Spark和Hadoop,到目前为止我的所有文件都保存为00000部分。

Any ideas how to make my spark saving to file with a specified file name?

任何想法如何使我的火花保存到具有指定文件名的文件?

6 个解决方案

#1


52  

Since Spark uses Hadoop File System API to write data to files, this is sort of inevitable. If you do

由于Spark使用Hadoop文件系统API将数据写入文件,因此这是不可避免的。如果你这样做

rdd.saveAsTextFile("foo")

It will be saved as "foo/part-XXXXX" with one part-* file every partition in the RDD you are trying to save. The reason each partition in the RDD is written a separate file is for fault-tolerance. If the task writing 3rd partition (i.e. to part-00002) fails, Spark simply re-run the task and overwrite the partially written/corrupted part-00002, with no effect on other parts. If they all wrote to the same file, then it is much harder recover a single task for failures.

它将被保存为“foo / part-XXXXX”,其中包含您要保存的RDD中每个分区的一个part- *文件。 RDD中每个分区写入单独文件的原因是容错。如果编写第3个分区(即部分00002)的任务失败,Spark只需重新运行该任务并覆盖部分写入/损坏的部分00002,而不会影响其他部分。如果他们都写入同一个文件,那么恢复单个任务的失败要困难得多。

The part-XXXXX files are usually not a problem if you are going to consume it again in Spark / Hadoop-based frameworks because since they all use HDFS API, if you ask them to read "foo", they will all read all the part-XXXXX files inside foo as well.

如果你要在基于Spark / Hadoop的框架中再次使用它,那么部分-XXXXX文件通常不是问题,因为因为它们都使用HDFS API,如果你要求它们阅读“foo”,它们都会读取所有部分foo里面的-XXXXX文件也是如此。

#2


10  

I'll suggest to do it in this way (Java example):

我建议以这种方式(Java示例):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);

#3


3  

There is another approach based on Hadoop FileSystem ops.

还有另一种基于Hadoop FileSystem操作的方法。

#4


1  

Extending Tathagata Das answer to Spark 2.x and Scala 2.11

Using Spark SQL we can do this in one liner

使用Spark SQL,我们可以在一个班轮中完成

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

Then you can go head and proceed with adoalonso's answer.

#5


1  

I have an idea, but not ready code snippet. Internally (as name suggest) Spark uses Hadoop output format. (as well as InputFormat when reading from HDFS).

我有一个想法,但没有准备好的代码片段。在内部(顾名思义)Spark使用Hadoop输出格式。 (以及从HDFS读取时的InputFormat)。

In the hadoop's FileOutputFormat there is protected member setOutputFormat, which you can call from the inherited class to set other base name.

在hadoop的FileOutputFormat中有受保护的成员setOutputFormat,您可以从继承的类中调用它来设置其他基本名称。

#6


0  

It's not really a clean solution, but inside a foreachRDD() you can basically do whatever you like, also create a new file.

这不是一个干净的解决方案,但在foreachRDD()中你基本上可以做任何你喜欢的事情,也可以创建一个新文件。

In my solution this is what I do: I save the output on HDFS (for fault tolerance reasons), and inside a foreachRDD I also create a TSV file with statistics in a local folder.

在我的解决方案中,这就是我所做的:我将输出保存在HDFS上(出于容错原因),在foreachRDD中我还创建了一个TSV文件,其中包含本地文件夹中的统计信息。

I think you could probably do the same if that's what you need.

如果你需要的话,我想你可能会做同样的事情。

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

#1


52  

Since Spark uses Hadoop File System API to write data to files, this is sort of inevitable. If you do

由于Spark使用Hadoop文件系统API将数据写入文件,因此这是不可避免的。如果你这样做

rdd.saveAsTextFile("foo")

It will be saved as "foo/part-XXXXX" with one part-* file every partition in the RDD you are trying to save. The reason each partition in the RDD is written a separate file is for fault-tolerance. If the task writing 3rd partition (i.e. to part-00002) fails, Spark simply re-run the task and overwrite the partially written/corrupted part-00002, with no effect on other parts. If they all wrote to the same file, then it is much harder recover a single task for failures.

它将被保存为“foo / part-XXXXX”,其中包含您要保存的RDD中每个分区的一个part- *文件。 RDD中每个分区写入单独文件的原因是容错。如果编写第3个分区(即部分00002)的任务失败,Spark只需重新运行该任务并覆盖部分写入/损坏的部分00002,而不会影响其他部分。如果他们都写入同一个文件,那么恢复单个任务的失败要困难得多。

The part-XXXXX files are usually not a problem if you are going to consume it again in Spark / Hadoop-based frameworks because since they all use HDFS API, if you ask them to read "foo", they will all read all the part-XXXXX files inside foo as well.

如果你要在基于Spark / Hadoop的框架中再次使用它,那么部分-XXXXX文件通常不是问题,因为因为它们都使用HDFS API,如果你要求它们阅读“foo”,它们都会读取所有部分foo里面的-XXXXX文件也是如此。

#2


10  

I'll suggest to do it in this way (Java example):

我建议以这种方式(Java示例):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);

#3


3  

There is another approach based on Hadoop FileSystem ops.

还有另一种基于Hadoop FileSystem操作的方法。

#4


1  

Extending Tathagata Das answer to Spark 2.x and Scala 2.11

Using Spark SQL we can do this in one liner

使用Spark SQL,我们可以在一个班轮中完成

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

Then you can go head and proceed with adoalonso's answer.

#5


1  

I have an idea, but not ready code snippet. Internally (as name suggest) Spark uses Hadoop output format. (as well as InputFormat when reading from HDFS).

我有一个想法,但没有准备好的代码片段。在内部(顾名思义)Spark使用Hadoop输出格式。 (以及从HDFS读取时的InputFormat)。

In the hadoop's FileOutputFormat there is protected member setOutputFormat, which you can call from the inherited class to set other base name.

在hadoop的FileOutputFormat中有受保护的成员setOutputFormat,您可以从继承的类中调用它来设置其他基本名称。

#6


0  

It's not really a clean solution, but inside a foreachRDD() you can basically do whatever you like, also create a new file.

这不是一个干净的解决方案,但在foreachRDD()中你基本上可以做任何你喜欢的事情,也可以创建一个新文件。

In my solution this is what I do: I save the output on HDFS (for fault tolerance reasons), and inside a foreachRDD I also create a TSV file with statistics in a local folder.

在我的解决方案中,这就是我所做的:我将输出保存在HDFS上(出于容错原因),在foreachRDD中我还创建了一个TSV文件,其中包含本地文件夹中的统计信息。

I think you could probably do the same if that's what you need.

如果你需要的话,我想你可能会做同样的事情。

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations