过滤数据aframe最有效的方法是什么

时间:2021-04-08 11:01:12

... by checking whether a columns' value is in a seq.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq?

…通过检查列的值是否在seq中。也许我解释得不是很好,我主要想要这个(用常规SQL表示):DF_Column在seq中?

First I did it using a broadcast var (where I placed the seq), UDF (that did the checking) and registerTempTable.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable with ScalaIDE.

首先,我使用了一个广播var(放置seq)、UDF(进行检查)和registerTempTable。问题是我没有对它进行测试,因为我遇到了一个已知的bug,这个bug显然只在使用ScalaIDE的registerTempTable时出现。

I ended up creating a new DataFrame out of seq and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.

我最终在seq中创建了一个新的DataFrame并与它进行内部连接(交集),但我怀疑这是否是完成任务的最有效的方式。

Thanks

谢谢

EDIT: (in response to @YijieShen):
How to do filter based on whether elements of one DataFrame's column are in another DF's column (like SQL select * from A where login in (select username from B))?

编辑:(响应@YijieShen):如何根据一个DataFrame列的元素是否在另一个DF的列中(比如SQL select * from A where login(从B中选择username)进行筛选?

E.g: First DF:

E。g:首先DF:

login      count
login1     192  
login2     146  
login3     72   

Second DF:

第二个DF:

username
login2
login3
login4

The result:

结果:

login      count
login2     146  
login3     72   

Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2

尝试:编辑-2:我认为,现在bug已经修复,这些应该可以工作了。结束EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

and

ordered.select("login").filter($"login" in empLogins("username"))

which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException, respectively:

在线程“main”org.apache.spark.sql中都抛出异常。AnalysisException,分别为:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

and

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);

2 个解决方案

#1


12  

  1. You should broadcast a Set, instead of an Array, much faster searches than linear.

    你应该广播一个集合,而不是一个数组,比线性搜索快得多。

  2. You can make Eclipse run your Spark application. Here's how:

    您可以让Eclipse运行您的Spark应用程序。方法如下:

As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:

正如邮件列表中指出的,spark-sql假定其类是由原始类加载器加载的。在Eclipse中并非如此,如果Java和Scala库作为引导类路径的一部分被加载,而用户代码及其依赖项则在另一个类路径中。您可以在启动配置对话框中轻松修复:

  • remove Scala Library and Scala Compiler from the "Bootstrap" entries
  • 从“引导”条目中删除Scala库和Scala编译器
  • add (as external jars) scala-reflect, scala-library and scala-compiler to the user entry.
  • 向用户条目添加(作为外部jar) scala-反射、scala-library和scala-编译器。

The dialog should look like this:

对话框应该如下所示:

过滤数据aframe最有效的方法是什么

Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)

编辑:Spark bug被修复了,这个解决方案不再需要了(从1.4.0开始)

#2


15  

My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT on these two configurations:

我的代码(按照您的第一个方法的描述)在Spark 1.4.0-SNAPSHOT中正常运行这两种配置:

  • Intellij IDEA's test
  • Intellij IDEA的测试
  • Spark Standalone cluster with 8 nodes (1 master, 7 worker)
  • 星火独立集群,8个节点(1个主节点,7个worker节点)

Please check if any differences exists

请检查是否有差异

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

Output

输出

name cnt
login1 192
login2 146
login3 72

命名为cnt login1 192 login2 146 login3 72

name cnt
login3 72

名字问login3 72

#1


12  

  1. You should broadcast a Set, instead of an Array, much faster searches than linear.

    你应该广播一个集合,而不是一个数组,比线性搜索快得多。

  2. You can make Eclipse run your Spark application. Here's how:

    您可以让Eclipse运行您的Spark应用程序。方法如下:

As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:

正如邮件列表中指出的,spark-sql假定其类是由原始类加载器加载的。在Eclipse中并非如此,如果Java和Scala库作为引导类路径的一部分被加载,而用户代码及其依赖项则在另一个类路径中。您可以在启动配置对话框中轻松修复:

  • remove Scala Library and Scala Compiler from the "Bootstrap" entries
  • 从“引导”条目中删除Scala库和Scala编译器
  • add (as external jars) scala-reflect, scala-library and scala-compiler to the user entry.
  • 向用户条目添加(作为外部jar) scala-反射、scala-library和scala-编译器。

The dialog should look like this:

对话框应该如下所示:

过滤数据aframe最有效的方法是什么

Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)

编辑:Spark bug被修复了,这个解决方案不再需要了(从1.4.0开始)

#2


15  

My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT on these two configurations:

我的代码(按照您的第一个方法的描述)在Spark 1.4.0-SNAPSHOT中正常运行这两种配置:

  • Intellij IDEA's test
  • Intellij IDEA的测试
  • Spark Standalone cluster with 8 nodes (1 master, 7 worker)
  • 星火独立集群,8个节点(1个主节点,7个worker节点)

Please check if any differences exists

请检查是否有差异

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

Output

输出

name cnt
login1 192
login2 146
login3 72

命名为cnt login1 192 login2 146 login3 72

name cnt
login3 72

名字问login3 72