spark删除每个文本文件中的重复项

时间:2021-12-02 23:12:06

I am looking at a standard or preferred way to do this: I have a number of text files, each of the format:

我正在寻找一种标准或首选方式来执行此操作:我有许多文本文件,每种格式:

file1:

word1 tag1 0.5
word1 tag2 0.4
word2 tag2 0.6

file2:

word1 tag3 0.7
word3 tag1 0.9
word2 tag2 0.3

As you see a word may have multiple "tag"s, in this case I need only to retain the one with the highest score within each file. so that

如你所见,一个单词可能有多个“标签”,在这种情况下,我只需保留每个文件中得分最高的单词。以便

word1 tag2 0.4

is removed.

expected result:

word1 tag1 0.5
word1 tag3 0.7
word2 tag2 0.6
word3 tag1 0.9
word2 tag2 0.3 //keep this because it is from file2

I know I can read each file as individual RDDs and then sort and merge/join to produce my result, but is there a better way? e.g. feed all input files at once utilising

我知道我可以将每个文件作为单独的RDD读取,然后排序和合并/连接以产生我的结果,但是有更好的方法吗?例如一次性使用所有输入文件

ctx.textFile(String.join(",", myfiles)); // myfiles = [file1, file2]

Thanks,

1 个解决方案

#1


0  

You need to do two things: Add the filename to the data and find the relevant maximum afterwards.

您需要做两件事:将文件名添加到数据中,然后找到相关的最大值。

val df = spark.read.options(Map("sep"->" ")).csv("/file*").withColumn("filename",input_file_name())

This results in:

这导致:

scala> df.show()
+-----+----+---+--------------------+
|  _c0| _c1|_c2|            filename|
+-----+----+---+--------------------+
|word1|tag1|0.5|hdfs://hmaster:90...|
|word1|tag2|0.4|hdfs://hmaster:90...|
|word2|tag2|0.6|hdfs://hmaster:90...|
|word1|tag3|0.7|hdfs://hmaster:90...|
|word3|tag1|0.9|hdfs://hmaster:90...|
|word2|tag2|0.3|hdfs://hmaster:90...|
+-----+----+---+--------------------+

now we want to keep only the relevant ones, we can do this using argmax (see https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):

现在我们只想保留相关的,我们可以使用argmax(请参阅https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):

import org.apache.spark.sql.functions._
val targetDF = df.groupBy($"_c0", $"filename").agg(max(struct('_c2, '_c1)) as "tmp").select($"_c0", $"tmp.*")

Result:

scala> targetDF.show()
+-----+---+----+
|  _c0|_c2| _c1|
+-----+---+----+
|word2|0.3|tag2|
|word1|0.7|tag3|
|word2|0.6|tag2|
|word1|0.5|tag1|
|word3|0.9|tag1|
+-----+---+----+

You should probably rename the columns of course

您当然应该重命名列

#1


0  

You need to do two things: Add the filename to the data and find the relevant maximum afterwards.

您需要做两件事:将文件名添加到数据中,然后找到相关的最大值。

val df = spark.read.options(Map("sep"->" ")).csv("/file*").withColumn("filename",input_file_name())

This results in:

这导致:

scala> df.show()
+-----+----+---+--------------------+
|  _c0| _c1|_c2|            filename|
+-----+----+---+--------------------+
|word1|tag1|0.5|hdfs://hmaster:90...|
|word1|tag2|0.4|hdfs://hmaster:90...|
|word2|tag2|0.6|hdfs://hmaster:90...|
|word1|tag3|0.7|hdfs://hmaster:90...|
|word3|tag1|0.9|hdfs://hmaster:90...|
|word2|tag2|0.3|hdfs://hmaster:90...|
+-----+----+---+--------------------+

now we want to keep only the relevant ones, we can do this using argmax (see https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):

现在我们只想保留相关的,我们可以使用argmax(请参阅https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):

import org.apache.spark.sql.functions._
val targetDF = df.groupBy($"_c0", $"filename").agg(max(struct('_c2, '_c1)) as "tmp").select($"_c0", $"tmp.*")

Result:

scala> targetDF.show()
+-----+---+----+
|  _c0|_c2| _c1|
+-----+---+----+
|word2|0.3|tag2|
|word1|0.7|tag3|
|word2|0.6|tag2|
|word1|0.5|tag1|
|word3|0.9|tag1|
+-----+---+----+

You should probably rename the columns of course

您当然应该重命名列