如何向Spark DataFrame添加新列(使用PySpark)?

时间:2022-05-28 14:59:17

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列。

I've tried the following without any success:

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

Also got an error using this:

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

6 个解决方案

#1


119  

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

您无法在Spark中向DataFrame添加任意列。只能使用文字创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transforming an existing column:

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

包含使用连接:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

或使用function / udf生成:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

映射到Catalyst表达式的性能方面的内置函数(pyspark.sql.functions)通常比Python用户定义的函数更受欢迎。

If you want to add content of an arbitrary RDD as a column you can

如果要将任意RDD的内容添加为列,则可以

  • add row numbers to existing data frame
  • 将行号添加到现有数据框
  • call zipWithIndex on RDD and convert it to data frame
  • 在RDD上调用zipWithIndex并将其转换为数据框
  • join both using index as a join key
  • 使用index作为连接键加入

#2


40  

To add a column using a UDF:

要使用UDF添加列:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

#3


18  

For Spark 2.0

对于Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

#4


0  

I would like to offer a generalized example for a very similar use case:

我想为一个非常相似的用例提供一个通用的例子:

Use Case: I have a csv consisting of:

使用案例:我有一个csv包括:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

I need to perform some transformations and the final csv needs to look like

我需要执行一些转换,最终的csv需要看起来像

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.

我需要这样做,因为这是由某些模型定义的模式,我需要我的最终数据可以与SQL批量插入和其他东西互操作。

so:

所以:

1) I read the original csv using spark.read and call it "df".

1)我使用spark.read读取原始csv并将其命名为“df”。

2) I do something to the data.

2)我对数据做了些什么。

3) I add the null columns using this script:

3)我使用这个脚本添加空列:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).

通过这种方式,您可以在加载csv后构建模式(如果必须对许多表执行此操作,也可以用于重新排序列)。

#5


-1  

You can define a new udf when adding a column_name:

添加column_name时可以定义新的udf:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

#6


-1  

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))

#1


119  

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

您无法在Spark中向DataFrame添加任意列。只能使用文字创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transforming an existing column:

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

包含使用连接:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

或使用function / udf生成:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

映射到Catalyst表达式的性能方面的内置函数(pyspark.sql.functions)通常比Python用户定义的函数更受欢迎。

If you want to add content of an arbitrary RDD as a column you can

如果要将任意RDD的内容添加为列,则可以

  • add row numbers to existing data frame
  • 将行号添加到现有数据框
  • call zipWithIndex on RDD and convert it to data frame
  • 在RDD上调用zipWithIndex并将其转换为数据框
  • join both using index as a join key
  • 使用index作为连接键加入

#2


40  

To add a column using a UDF:

要使用UDF添加列:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

#3


18  

For Spark 2.0

对于Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

#4


0  

I would like to offer a generalized example for a very similar use case:

我想为一个非常相似的用例提供一个通用的例子:

Use Case: I have a csv consisting of:

使用案例:我有一个csv包括:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

I need to perform some transformations and the final csv needs to look like

我需要执行一些转换,最终的csv需要看起来像

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.

我需要这样做,因为这是由某些模型定义的模式,我需要我的最终数据可以与SQL批量插入和其他东西互操作。

so:

所以:

1) I read the original csv using spark.read and call it "df".

1)我使用spark.read读取原始csv并将其命名为“df”。

2) I do something to the data.

2)我对数据做了些什么。

3) I add the null columns using this script:

3)我使用这个脚本添加空列:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).

通过这种方式,您可以在加载csv后构建模式(如果必须对许多表执行此操作,也可以用于重新排序列)。

#5


-1  

You can define a new udf when adding a column_name:

添加column_name时可以定义新的udf:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

#6


-1  

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))