如何有效地迭代Pandas数据帧的连续块

时间:2021-10-11 02:22:20

I have a large dataframe (several million rows).

我有一个大型数据帧(几百万行)。

I want to be able to do a groupby operation on it, but just grouping by arbitrary consecutive (preferably equal-sized) subsets of rows, rather than using any particular property of the individual rows to decide which group they go to.

我希望能够对它进行groupby操作,但只需按任意连续(最好是相等大小)的行子集进行分组,而不是使用各行的任何特定属性来决定它们去哪个组。

The use case: I want to apply a function to each row via a parallel map in IPython. It doesn't matter which rows go to which back-end engine, as the function calculates a result based on one row at a time. (Conceptually at least; in reality it's vectorized.)

用例:我想通过IPython中的并行映射将函数应用于每一行。哪个行转到哪个后端引擎并不重要,因为该函数一次基于一行计算结果。 (从概念上讲,至少;实际上它是矢量化的。)

I've come up with something like this:

我想出了这样的事情:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)

But this seems very long-winded, and doesn't guarantee equal sized chunks. Especially if the index is sparse or non-integer or whatever.

但这看起来很啰嗦,并不能保证大小相等。特别是如果索引是稀疏的或非整数的或其他什么。

Any suggestions for a better way?

有什么更好的方法吗?

Thanks!

4 个解决方案

#1


24  

In practice, you can't guarantee equal-sized chunks: the number of rows might be prime, after all, in which case your only chunking options would be chunks of size 1 or one big chunk. I tend to pass an array to groupby. Starting from:

在实践中,你不能保证大小相等的块:毕竟行数可能是素数,在这种情况下,你的唯一分块选项是大小为1或一大块的块。我倾向于将数组传递给groupby。从...开始:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:

我故意通过将索引设置为0来使索引无法提供信息,我们只需确定我们的大小(此处为10)并按整数除以数组:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b] to ignore the index values and access data by position.

虽然您始终可以使用.iloc [a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与此不兼容时失败。

#2


27  

I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.

我不确定这是否正是你想要的,但我在另一个SO线程上发现这些分组函数对于执行多处理器池非常有用。

Here's a short example from that thread, which might do something like what you want:

这是该线程的一个简短示例,它可能会执行您想要的操作:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

Which gives you something like this:

这给你这样的东西:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

I hope that helps.

我希望有所帮助。

EDIT

In this case, I used this function with pool of processors in (approximately) this manner:

在这种情况下,我以(近似)这种方式使用此函数和处理器池:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.

我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。

#3


14  

Use numpy has this built in: np.array_split()

使用numpy内置了这个:np.array_split()

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
    assert len(chunk) == len(data) / 5

#4


11  

A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo

良好环境的标志是很多选择,所以我将从Anaconda Blaze添加这个,真的使用Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe

#1


24  

In practice, you can't guarantee equal-sized chunks: the number of rows might be prime, after all, in which case your only chunking options would be chunks of size 1 or one big chunk. I tend to pass an array to groupby. Starting from:

在实践中,你不能保证大小相等的块:毕竟行数可能是素数,在这种情况下,你的唯一分块选项是大小为1或一大块的块。我倾向于将数组传递给groupby。从...开始:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:

我故意通过将索引设置为0来使索引无法提供信息,我们只需确定我们的大小(此处为10)并按整数除以数组:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b] to ignore the index values and access data by position.

虽然您始终可以使用.iloc [a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与此不兼容时失败。

#2


27  

I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.

我不确定这是否正是你想要的,但我在另一个SO线程上发现这些分组函数对于执行多处理器池非常有用。

Here's a short example from that thread, which might do something like what you want:

这是该线程的一个简短示例,它可能会执行您想要的操作:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

Which gives you something like this:

这给你这样的东西:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

I hope that helps.

我希望有所帮助。

EDIT

In this case, I used this function with pool of processors in (approximately) this manner:

在这种情况下,我以(近似)这种方式使用此函数和处理器池:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.

我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。

#3


14  

Use numpy has this built in: np.array_split()

使用numpy内置了这个:np.array_split()

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
    assert len(chunk) == len(data) / 5

#4


11  

A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo

良好环境的标志是很多选择,所以我将从Anaconda Blaze添加这个,真的使用Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe