I am looking at a standard or preferred way to do this: I have a number of text files, each of the format:
我正在寻找一种标准或首选方式来执行此操作:我有许多文本文件,每种格式:
file1:
word1 tag1 0.5
word1 tag2 0.4
word2 tag2 0.6
file2:
word1 tag3 0.7
word3 tag1 0.9
word2 tag2 0.3
As you see a word may have multiple "tag"s, in this case I need only to retain the one with the highest score within each file. so that
如你所见,一个单词可能有多个“标签”,在这种情况下,我只需保留每个文件中得分最高的单词。以便
word1 tag2 0.4
is removed.
expected result:
word1 tag1 0.5
word1 tag3 0.7
word2 tag2 0.6
word3 tag1 0.9
word2 tag2 0.3 //keep this because it is from file2
I know I can read each file as individual RDDs and then sort and merge/join to produce my result, but is there a better way? e.g. feed all input files at once utilising
我知道我可以将每个文件作为单独的RDD读取,然后排序和合并/连接以产生我的结果,但是有更好的方法吗?例如一次性使用所有输入文件
ctx.textFile(String.join(",", myfiles)); // myfiles = [file1, file2]
Thanks,
1 个解决方案
#1
0
You need to do two things: Add the filename to the data and find the relevant maximum afterwards.
您需要做两件事:将文件名添加到数据中,然后找到相关的最大值。
val df = spark.read.options(Map("sep"->" ")).csv("/file*").withColumn("filename",input_file_name())
This results in:
这导致:
scala> df.show()
+-----+----+---+--------------------+
| _c0| _c1|_c2| filename|
+-----+----+---+--------------------+
|word1|tag1|0.5|hdfs://hmaster:90...|
|word1|tag2|0.4|hdfs://hmaster:90...|
|word2|tag2|0.6|hdfs://hmaster:90...|
|word1|tag3|0.7|hdfs://hmaster:90...|
|word3|tag1|0.9|hdfs://hmaster:90...|
|word2|tag2|0.3|hdfs://hmaster:90...|
+-----+----+---+--------------------+
now we want to keep only the relevant ones, we can do this using argmax (see https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):
现在我们只想保留相关的,我们可以使用argmax(请参阅https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):
import org.apache.spark.sql.functions._
val targetDF = df.groupBy($"_c0", $"filename").agg(max(struct('_c2, '_c1)) as "tmp").select($"_c0", $"tmp.*")
Result:
scala> targetDF.show()
+-----+---+----+
| _c0|_c2| _c1|
+-----+---+----+
|word2|0.3|tag2|
|word1|0.7|tag3|
|word2|0.6|tag2|
|word1|0.5|tag1|
|word3|0.9|tag1|
+-----+---+----+
You should probably rename the columns of course
您当然应该重命名列
#1
0
You need to do two things: Add the filename to the data and find the relevant maximum afterwards.
您需要做两件事:将文件名添加到数据中,然后找到相关的最大值。
val df = spark.read.options(Map("sep"->" ")).csv("/file*").withColumn("filename",input_file_name())
This results in:
这导致:
scala> df.show()
+-----+----+---+--------------------+
| _c0| _c1|_c2| filename|
+-----+----+---+--------------------+
|word1|tag1|0.5|hdfs://hmaster:90...|
|word1|tag2|0.4|hdfs://hmaster:90...|
|word2|tag2|0.6|hdfs://hmaster:90...|
|word1|tag3|0.7|hdfs://hmaster:90...|
|word3|tag1|0.9|hdfs://hmaster:90...|
|word2|tag2|0.3|hdfs://hmaster:90...|
+-----+----+---+--------------------+
now we want to keep only the relevant ones, we can do this using argmax (see https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):
现在我们只想保留相关的,我们可以使用argmax(请参阅https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html):
import org.apache.spark.sql.functions._
val targetDF = df.groupBy($"_c0", $"filename").agg(max(struct('_c2, '_c1)) as "tmp").select($"_c0", $"tmp.*")
Result:
scala> targetDF.show()
+-----+---+----+
| _c0|_c2| _c1|
+-----+---+----+
|word2|0.3|tag2|
|word1|0.7|tag3|
|word2|0.6|tag2|
|word1|0.5|tag1|
|word3|0.9|tag1|
+-----+---+----+
You should probably rename the columns of course
您当然应该重命名列