使用PySpark Dataframe平均超过2000个值

时间:2022-04-18 23:11:53

I have a PySpark dataframe with about a billion rows. I want to average over every 2000 values, like average of rows with indeces 0-1999, average of rows with indeces 2000-3999, and so on. How do I do this? Alternatively, I could also average 10 values for every 2000, like average of rows with indeces 0-9, average of rows with indeces 2000-2009, and so on. The purpose of this is to downsample the data. I don't currently have an index row, so if I need this, how do I do it?

我有一个大约有十亿行的PySpark数据帧。我希望平均每2000个值,例如具有0-1999的行的平均值,具有indeces 2000-3999的行的平均值,等等。我该怎么做呢?或者,我也可以为每2000个平均10个值,例如具有0到9的行的平均值,具有indeces 2000-2009的行的平均值,等等。这样做的目的是对数据进行下采样。我目前没有索引行,所以如果我需要这个,我该怎么做?

2 个解决方案

#1


0  

You can use monotonically_increasing_id() to generate row ids, divide it and use a ceiling function to generate an id over whatever interval you want. Then use window function to partition over that id and generate the average. For example, something like following should work, assuming your dataframe is data and you want to average over the column value.

您可以使用monotonically_increasing_id()生成行ID,将其划分并使用ceiling函数在您想要的任何时间间隔内生成id。然后使用窗口函数对该id进行分区并生成平均值。例如,假设您的数据框是数据并且您希望对列值进行平均,那么类似下面的内容应该可以正常工作。

import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.partitionBy($"rowId")
data.withColumn("rowId", floor(monotonically_increasing_id()/2000.0)).withColumn("avg", avg(data("value")) over(partitionWindow)).show()

Hope that helps.

希望有所帮助。

#2


0  

Here is a way to do it by determining the row number for each value.

这是通过确定每个值的行号来实现的。

  1. Create a unique, increasing id column using pyspark.sql.functions.monotonically_increasing_id().

    使用pyspark.sql.functions.monotonically_increasing_id()创建一个唯一的,增加的id列。

  2. Create a pyspark.sql.Window() that does an orderBy() on the id column.

    创建一个pyspark.sql.Window(),它在id列上执行orderBy()。

  3. Use pyspark.sql.functions.row_number() over the window to get the row number for each value.

    在窗口上使用pyspark.sql.functions.row_number()来获取每个值的行号。

  4. Divide the row_number - 1 (because it starts at 1) by the number of groups and take the floor to get the group number.

    将row_number - 1(因为它从1开始)除以组的数量,并发言以获取组号。

  5. groupBy() the group number and compute the average.

    groupBy()组号并计算平均值。


Here is an example:

这是一个例子:

Create Example Data

创建示例数据

For this example, I will create a dataframe of 5 consecutive values starting at each multiple of 10 from 10 to 40 (inclusive). The group size in this example will be 5- we will want the average of 5 consecutive values.

对于这个例子,我将创建一个5个连续值的数据帧,从10到40(包括10)的每个倍数开始。此示例中的组大小将为5-我们将需要5个连续值的平均值。

data = map(
    lambda y: (y, ),
    reduce(
        list.__add__,
        [range(x, x+5) for x in range(10, 50, 10)]
    )
)
df = sqlCtx.createDataFrame(data, ["col1"])
df.show()
#+----+
#|col1|
#+----+
#|  10|
#|  11|
#|  12|
#|  13|
#|  14|
#|  20|
#|  21|
#|  22|
#|  23|
#|  24|
#|  30|
#|  31|
#|  32|
#|  33|
#|  34|
#|  40|
#|  41|
#|  42|
#|  43|
#|  44|
#+----+

Add ID Columns

添加ID列

I'm showing this step to demonstrate the fact that monotonically_increasing_id() is not guaranteed to be sequential.

我正在展示这一步,以证明monotonically_increasing_id()不能保证是顺序的。

import pyspark.sql.functions as f
df = df.withColumn('id', f.monotonically_increasing_id())
df.show()
#+----+----------+
#|col1|        id|
#+----+----------+
#|  10|         0|
#|  11|         1|
#|  12|         2|
#|  13|         3|
#|  14|         4|
#|  20|         5|
#|  21|         6|
#|  22|         7|
#|  23|         8|
#|  24|         9|
#|  30|8589934592|
#|  31|8589934593|
#|  32|8589934594|
#|  33|8589934595|
#|  34|8589934596|
#|  40|8589934597|
#|  41|8589934598|
#|  42|8589934599|
#|  43|8589934600|
#|  44|8589934601|
#+----+----------+

Compute the group number

计算组号

from pyspark.sql import Window
group_size = 5
w = Window.orderBy('id')
df = df.withColumn('group', f.floor((f.row_number().over(w) - 1) / group_size))\
    .select('col1', 'group')
df.show()
#+----+-----+
#|col1|group|
#+----+-----+
#|  10|    0|
#|  11|    0|
#|  12|    0|
#|  13|    0|
#|  14|    0|
#|  20|    1|
#|  21|    1|
#|  22|    1|
#|  23|    1|
#|  24|    1|
#|  30|    2|
#|  31|    2|
#|  32|    2|
#|  33|    2|
#|  34|    2|
#|  40|    3|
#|  41|    3|
#|  42|    3|
#|  43|    3|
#|  44|    3|
#+----+-----+

Get the average per group

获得每组的平均值

df.groupBy('group').agg(f.avg('col1').alias('avg')).show()
#+-----+----+
#|group| avg|
#+-----+----+
#|    0|12.0|
#|    1|22.0|
#|    2|32.0|
#|    3|42.0|
#+-----+----+

#1


0  

You can use monotonically_increasing_id() to generate row ids, divide it and use a ceiling function to generate an id over whatever interval you want. Then use window function to partition over that id and generate the average. For example, something like following should work, assuming your dataframe is data and you want to average over the column value.

您可以使用monotonically_increasing_id()生成行ID,将其划分并使用ceiling函数在您想要的任何时间间隔内生成id。然后使用窗口函数对该id进行分区并生成平均值。例如,假设您的数据框是数据并且您希望对列值进行平均,那么类似下面的内容应该可以正常工作。

import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.partitionBy($"rowId")
data.withColumn("rowId", floor(monotonically_increasing_id()/2000.0)).withColumn("avg", avg(data("value")) over(partitionWindow)).show()

Hope that helps.

希望有所帮助。

#2


0  

Here is a way to do it by determining the row number for each value.

这是通过确定每个值的行号来实现的。

  1. Create a unique, increasing id column using pyspark.sql.functions.monotonically_increasing_id().

    使用pyspark.sql.functions.monotonically_increasing_id()创建一个唯一的,增加的id列。

  2. Create a pyspark.sql.Window() that does an orderBy() on the id column.

    创建一个pyspark.sql.Window(),它在id列上执行orderBy()。

  3. Use pyspark.sql.functions.row_number() over the window to get the row number for each value.

    在窗口上使用pyspark.sql.functions.row_number()来获取每个值的行号。

  4. Divide the row_number - 1 (because it starts at 1) by the number of groups and take the floor to get the group number.

    将row_number - 1(因为它从1开始)除以组的数量,并发言以获取组号。

  5. groupBy() the group number and compute the average.

    groupBy()组号并计算平均值。


Here is an example:

这是一个例子:

Create Example Data

创建示例数据

For this example, I will create a dataframe of 5 consecutive values starting at each multiple of 10 from 10 to 40 (inclusive). The group size in this example will be 5- we will want the average of 5 consecutive values.

对于这个例子,我将创建一个5个连续值的数据帧,从10到40(包括10)的每个倍数开始。此示例中的组大小将为5-我们将需要5个连续值的平均值。

data = map(
    lambda y: (y, ),
    reduce(
        list.__add__,
        [range(x, x+5) for x in range(10, 50, 10)]
    )
)
df = sqlCtx.createDataFrame(data, ["col1"])
df.show()
#+----+
#|col1|
#+----+
#|  10|
#|  11|
#|  12|
#|  13|
#|  14|
#|  20|
#|  21|
#|  22|
#|  23|
#|  24|
#|  30|
#|  31|
#|  32|
#|  33|
#|  34|
#|  40|
#|  41|
#|  42|
#|  43|
#|  44|
#+----+

Add ID Columns

添加ID列

I'm showing this step to demonstrate the fact that monotonically_increasing_id() is not guaranteed to be sequential.

我正在展示这一步,以证明monotonically_increasing_id()不能保证是顺序的。

import pyspark.sql.functions as f
df = df.withColumn('id', f.monotonically_increasing_id())
df.show()
#+----+----------+
#|col1|        id|
#+----+----------+
#|  10|         0|
#|  11|         1|
#|  12|         2|
#|  13|         3|
#|  14|         4|
#|  20|         5|
#|  21|         6|
#|  22|         7|
#|  23|         8|
#|  24|         9|
#|  30|8589934592|
#|  31|8589934593|
#|  32|8589934594|
#|  33|8589934595|
#|  34|8589934596|
#|  40|8589934597|
#|  41|8589934598|
#|  42|8589934599|
#|  43|8589934600|
#|  44|8589934601|
#+----+----------+

Compute the group number

计算组号

from pyspark.sql import Window
group_size = 5
w = Window.orderBy('id')
df = df.withColumn('group', f.floor((f.row_number().over(w) - 1) / group_size))\
    .select('col1', 'group')
df.show()
#+----+-----+
#|col1|group|
#+----+-----+
#|  10|    0|
#|  11|    0|
#|  12|    0|
#|  13|    0|
#|  14|    0|
#|  20|    1|
#|  21|    1|
#|  22|    1|
#|  23|    1|
#|  24|    1|
#|  30|    2|
#|  31|    2|
#|  32|    2|
#|  33|    2|
#|  34|    2|
#|  40|    3|
#|  41|    3|
#|  42|    3|
#|  43|    3|
#|  44|    3|
#+----+-----+

Get the average per group

获得每组的平均值

df.groupBy('group').agg(f.avg('col1').alias('avg')).show()
#+-----+----+
#|group| avg|
#+-----+----+
#|    0|12.0|
#|    1|22.0|
#|    2|32.0|
#|    3|42.0|
#+-----+----+