... by checking whether a columns' value is in a seq
.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq
?
…通过检查列的值是否在seq中。也许我解释得不是很好,我主要想要这个(用常规SQL表示):DF_Column在seq中?
First I did it using a broadcast var
(where I placed the seq), UDF
(that did the checking) and registerTempTable
.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable
with ScalaIDE.
首先,我使用了一个广播var(放置seq)、UDF(进行检查)和registerTempTable。问题是我没有对它进行测试,因为我遇到了一个已知的bug,这个bug显然只在使用ScalaIDE的registerTempTable时出现。
I ended up creating a new DataFrame
out of seq
and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.
我最终在seq中创建了一个新的DataFrame并与它进行内部连接(交集),但我怀疑这是否是完成任务的最有效的方式。
Thanks
谢谢
EDIT: (in response to @YijieShen):
How to do filter
based on whether elements of one DataFrame
's column are in another DF's column (like SQL select * from A where login in (select username from B)
)?
编辑:(响应@YijieShen):如何根据一个DataFrame列的元素是否在另一个DF的列中(比如SQL select * from A where login(从B中选择username)进行筛选?
E.g: First DF:
E。g:首先DF:
login count
login1 192
login2 146
login3 72
Second DF:
第二个DF:
username
login2
login3
login4
The result:
结果:
login count
login2 146
login3 72
Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2
尝试:编辑-2:我认为,现在bug已经修复,这些应该可以工作了。结束EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
and
和
ordered.select("login").filter($"login" in empLogins("username"))
which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException
, respectively:
在线程“main”org.apache.spark.sql中都抛出异常。AnalysisException,分别为:
resolved attribute(s) username#10 missing from login#8 in operator
!Filter Contains(login#8, username#10);
and
和
resolved attribute(s) username#10 missing from login#8 in operator
!Filter login#8 IN (username#10);
2 个解决方案
#1
12
-
You should broadcast a
Set
, instead of anArray
, much faster searches than linear.你应该广播一个集合,而不是一个数组,比线性搜索快得多。
-
You can make Eclipse run your Spark application. Here's how:
您可以让Eclipse运行您的Spark应用程序。方法如下:
As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:
正如邮件列表中指出的,spark-sql假定其类是由原始类加载器加载的。在Eclipse中并非如此,如果Java和Scala库作为引导类路径的一部分被加载,而用户代码及其依赖项则在另一个类路径中。您可以在启动配置对话框中轻松修复:
- remove Scala Library and Scala Compiler from the "Bootstrap" entries
- 从“引导”条目中删除Scala库和Scala编译器
- add (as external jars)
scala-reflect
,scala-library
andscala-compiler
to the user entry. - 向用户条目添加(作为外部jar) scala-反射、scala-library和scala-编译器。
The dialog should look like this:
对话框应该如下所示:
Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)
编辑:Spark bug被修复了,这个解决方案不再需要了(从1.4.0开始)
#2
15
My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT
on these two configurations:
我的代码(按照您的第一个方法的描述)在Spark 1.4.0-SNAPSHOT中正常运行这两种配置:
-
Intellij IDEA's test
- Intellij IDEA的测试
-
Spark Standalone cluster
with 8 nodes (1 master, 7 worker) - 星火独立集群,8个节点(1个主节点,7个worker节点)
Please check if any differences exists
请检查是否有差异
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
Output
输出
name cnt
login1 192
login2 146
login3 72命名为cnt login1 192 login2 146 login3 72
name cnt
login3 72名字问login3 72
#1
12
-
You should broadcast a
Set
, instead of anArray
, much faster searches than linear.你应该广播一个集合,而不是一个数组,比线性搜索快得多。
-
You can make Eclipse run your Spark application. Here's how:
您可以让Eclipse运行您的Spark应用程序。方法如下:
As pointed out on the mailing list, spark-sql assumes its classes are loaded by the primordial classloader. That's not the case in Eclipse, were the Java and Scala library are loaded as part of the boot classpath, while the user code and its dependencies are in another one. You can easily fix that in the launch configuration dialog:
正如邮件列表中指出的,spark-sql假定其类是由原始类加载器加载的。在Eclipse中并非如此,如果Java和Scala库作为引导类路径的一部分被加载,而用户代码及其依赖项则在另一个类路径中。您可以在启动配置对话框中轻松修复:
- remove Scala Library and Scala Compiler from the "Bootstrap" entries
- 从“引导”条目中删除Scala库和Scala编译器
- add (as external jars)
scala-reflect
,scala-library
andscala-compiler
to the user entry. - 向用户条目添加(作为外部jar) scala-反射、scala-library和scala-编译器。
The dialog should look like this:
对话框应该如下所示:
Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)
编辑:Spark bug被修复了,这个解决方案不再需要了(从1.4.0开始)
#2
15
My code (following the description of your first method) runs normally in Spark 1.4.0-SNAPSHOT
on these two configurations:
我的代码(按照您的第一个方法的描述)在Spark 1.4.0-SNAPSHOT中正常运行这两种配置:
-
Intellij IDEA's test
- Intellij IDEA的测试
-
Spark Standalone cluster
with 8 nodes (1 master, 7 worker) - 星火独立集群,8个节点(1个主节点,7个worker节点)
Please check if any differences exists
请检查是否有差异
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
Output
输出
name cnt
login1 192
login2 146
login3 72命名为cnt login1 192 login2 146 login3 72
name cnt
login3 72名字问login3 72