sc.textFile(path) allows to read an HDFS file but it does not accept parameters (like skip a number of rows, has_headers,...).
sc.textFile(path)允许读取HDFS文件,但它不接受参数(比如跳过多行,has_headers,......)。
in the "Learning Spark" O'Reilly e-book, it's suggested to use the following function to read a CSV (Example 5-12. Python load CSV example)
在“学习星火”O'Reilly电子书中,建议使用以下函数读取CSV(例5-12.Python加载CSV示例)
import csv
import StringIO
def loadRecord(line):
"""Parse a CSV line"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
My question is about how to be selective about the rows "taken":
我的问题是如何选择“采取”行:
- How to avoid loading the first row (the headers)
- 如何避免加载第一行(标题)
- How to remove a specific row (for instance, row 5)
- 如何删除特定行(例如,第5行)
I see some decent solutions here: select range of elements but I'd like to see if there is anything more simple.
我在这里看到一些不错的解决方案:选择元素范围,但我想看看是否有更简单的东西。
Thx!
谢谢!
1 个解决方案
#1
16
Don't worry about loading the rows/lines you don't need. When you do:
不要担心加载你不需要的行/行。当你这样做时:
input = sc.textFile(inputFile)
you are not loading the file. You are just getting an object that will allow you to operate on the file. So to be efficient, it is better to think in terms of getting only what you want. For example:
你没有加载文件。您只是获得了一个允许您对文件进行操作的对象。因此,为了提高效率,最好只考虑获得您想要的东西。例如:
header = input.take(1)[0]
rows = input.filter(lambda line: line != header)
Note that here I am not using an index to refer to the line I want to drop but rather its value. This has the side effect that other lines with this value will also be ignored but is more in the spirit of Spark as Spark will distribute your text file in different parts across the nodes and the concept of line numbers gets lost in each partition. This is also the reason why this is not easy to do in Spark(Hadoop) as each partition should be considered independent and a global line number would break this assumption.
请注意,这里我没有使用索引来引用我想要删除的行而是它的值。这有副作用,其他具有此值的行也将被忽略,但更符合Spark的精神,因为Spark会将您的文本文件分布在节点的不同部分,并且行号的概念在每个分区中都会丢失。这也是为什么在Spark(Hadoop)中不容易做到的原因,因为每个分区都应该被认为是独立的,并且全局行号会打破这个假设。
If you really need to work with line numbers I recommend that you add them to the file outside of Spark(see here) and then just filter by this column inside of Spark.
如果你真的需要使用行号,我建议你将它们添加到Spark之外的文件中(参见这里),然后只需在Spark中按此列过滤。
Edit: Added zipWithIndex
solution as suggested by @Daniel Darabos.
编辑:添加了@Daniel Darabos建议的zipWithIndex解决方案。
sc.textFile('test.txt')\
.zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ...
.filter(lambda x: x[1]!=5)\ # select columns
.map(lambda x: x[0])\ # [u'First', u'Second'
.collect()
#1
16
Don't worry about loading the rows/lines you don't need. When you do:
不要担心加载你不需要的行/行。当你这样做时:
input = sc.textFile(inputFile)
you are not loading the file. You are just getting an object that will allow you to operate on the file. So to be efficient, it is better to think in terms of getting only what you want. For example:
你没有加载文件。您只是获得了一个允许您对文件进行操作的对象。因此,为了提高效率,最好只考虑获得您想要的东西。例如:
header = input.take(1)[0]
rows = input.filter(lambda line: line != header)
Note that here I am not using an index to refer to the line I want to drop but rather its value. This has the side effect that other lines with this value will also be ignored but is more in the spirit of Spark as Spark will distribute your text file in different parts across the nodes and the concept of line numbers gets lost in each partition. This is also the reason why this is not easy to do in Spark(Hadoop) as each partition should be considered independent and a global line number would break this assumption.
请注意,这里我没有使用索引来引用我想要删除的行而是它的值。这有副作用,其他具有此值的行也将被忽略,但更符合Spark的精神,因为Spark会将您的文本文件分布在节点的不同部分,并且行号的概念在每个分区中都会丢失。这也是为什么在Spark(Hadoop)中不容易做到的原因,因为每个分区都应该被认为是独立的,并且全局行号会打破这个假设。
If you really need to work with line numbers I recommend that you add them to the file outside of Spark(see here) and then just filter by this column inside of Spark.
如果你真的需要使用行号,我建议你将它们添加到Spark之外的文件中(参见这里),然后只需在Spark中按此列过滤。
Edit: Added zipWithIndex
solution as suggested by @Daniel Darabos.
编辑:添加了@Daniel Darabos建议的zipWithIndex解决方案。
sc.textFile('test.txt')\
.zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ...
.filter(lambda x: x[1]!=5)\ # select columns
.map(lambda x: x[0])\ # [u'First', u'Second'
.collect()