使用Spark Dataframe中的函数创建新列

时间:2022-09-27 22:55:32

I'm trying to figure out the new dataframe API in Spark. seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a functiin something like this:

我正试图在Spark中找出新的数据帧API。看起来好像是向前迈出了一大步,但却做了一件非常简单的事情。我有一个包含2列的数据框,“ID”和“Amount”。作为一个通用示例,假设我想返回一个名为“code”的新列,该列返回基于“Amt”值的代码。我可以写一个像这样的函数:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

当我尝试使用它时:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

I get type mismatch errors

我得到类型不匹配错误

found   : org.apache.spark.sql.Column
required: Integer

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting wrrors witht he function compiling because it wants a boolean in the if statement.

我已经尝试将我的函数的输入类型更改为org.apache.spark.sql.Column但是我随后在函数编译时开始得到错误,因为它在if语句中需要一个布尔值。

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

我做错了吗?有没有比使用withColumn更好/另一种方法?

Thanks for your help.

谢谢你的帮助。

3 个解决方案

#1


Let's say you have "Amt" column in your Schema:

假设您的架构中有“Amt”列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

I think withColumn is the right way to add a column

我认为withColumn是添加列的正确方法

#2


We should avoid defining udf functions as much as possible due to its overhead of serialization and deserialization of columns.

由于列的序列化和反序列化的开销,我们应该尽可能避免定义udf函数。

You can achieve the solution with simple when spark function as below

您可以通过简单的火花功能实现解决方案,如下所示

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))

#3


Another way of doing this: You can create any function but according to the above error, you should define function as a variable

另一种方法:您可以创建任何函数,但根据上述错误,您应该将函数定义为变量

Example:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Now this statement works perfectly:

现在这句话非常有效:

myDF.withColumn("Code", coder(myDF("Amt")))

#1


Let's say you have "Amt" column in your Schema:

假设您的架构中有“Amt”列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

I think withColumn is the right way to add a column

我认为withColumn是添加列的正确方法

#2


We should avoid defining udf functions as much as possible due to its overhead of serialization and deserialization of columns.

由于列的序列化和反序列化的开销,我们应该尽可能避免定义udf函数。

You can achieve the solution with simple when spark function as below

您可以通过简单的火花功能实现解决方案,如下所示

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))

#3


Another way of doing this: You can create any function but according to the above error, you should define function as a variable

另一种方法:您可以创建任何函数,但根据上述错误,您应该将函数定义为变量

Example:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Now this statement works perfectly:

现在这句话非常有效:

myDF.withColumn("Code", coder(myDF("Amt")))