I have a large dataframe (several million rows).
我有一个大型数据帧(几百万行)。
I want to be able to do a groupby operation on it, but just grouping by arbitrary consecutive (preferably equal-sized) subsets of rows, rather than using any particular property of the individual rows to decide which group they go to.
我希望能够对它进行groupby操作,但只需按任意连续(最好是相等大小)的行子集进行分组,而不是使用各行的任何特定属性来决定它们去哪个组。
The use case: I want to apply a function to each row via a parallel map in IPython. It doesn't matter which rows go to which back-end engine, as the function calculates a result based on one row at a time. (Conceptually at least; in reality it's vectorized.)
用例:我想通过IPython中的并行映射将函数应用于每一行。哪个行转到哪个后端引擎并不重要,因为该函数一次基于一行计算结果。 (从概念上讲,至少;实际上它是矢量化的。)
I've come up with something like this:
我想出了这样的事情:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
But this seems very long-winded, and doesn't guarantee equal sized chunks. Especially if the index is sparse or non-integer or whatever.
但这看起来很啰嗦,并不能保证大小相等。特别是如果索引是稀疏的或非整数的或其他什么。
Any suggestions for a better way?
有什么更好的方法吗?
Thanks!
4 个解决方案
#1
24
In practice, you can't guarantee equal-sized chunks: the number of rows might be prime, after all, in which case your only chunking options would be chunks of size 1 or one big chunk. I tend to pass an array to groupby
. Starting from:
在实践中,你不能保证大小相等的块:毕竟行数可能是素数,在这种情况下,你的唯一分块选项是大小为1或一大块的块。我倾向于将数组传递给groupby。从...开始:
>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 13 0.510273 0.051608 0.230402 0.756921
0 14 0.950544 0.576539 0.642602 0.907850
[15 rows x 5 columns]
where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:
我故意通过将索引设置为0来使索引无法提供信息,我们只需确定我们的大小(此处为10)并按整数除以数组:
>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
... print(k,g)
...
0 0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 8 0.241049 0.246149 0.241935 0.563428
0 9 0.493819 0.918858 0.193236 0.266257
[10 rows x 5 columns]
1 0 1 2 3 4
0 10 0.037693 0.370789 0.369117 0.401041
0 11 0.721843 0.862295 0.671733 0.605006
[...]
0 14 0.950544 0.576539 0.642602 0.907850
[5 rows x 5 columns]
Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b]
to ignore the index values and access data by position.
虽然您始终可以使用.iloc [a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与此不兼容时失败。
#2
27
I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.
我不确定这是否正是你想要的,但我在另一个SO线程上发现这些分组函数对于执行多处理器池非常有用。
Here's a short example from that thread, which might do something like what you want:
这是该线程的一个简短示例,它可能会执行您想要的操作:
import numpy as np
import pandas as pds
df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])
def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
for i in chunker(df,5):
print i
Which gives you something like this:
这给你这样的东西:
a b c d
0 0.860574 0.059326 0.339192 0.786399
1 0.029196 0.395613 0.524240 0.380265
2 0.235759 0.164282 0.350042 0.877004
3 0.545394 0.881960 0.994079 0.721279
4 0.584504 0.648308 0.655147 0.511390
a b c d
5 0.276160 0.982803 0.451825 0.845363
6 0.728453 0.246870 0.515770 0.343479
7 0.971947 0.278430 0.006910 0.888512
8 0.044888 0.875791 0.842361 0.890675
9 0.200563 0.246080 0.333202 0.574488
a b c d
10 0.971125 0.106790 0.274001 0.960579
11 0.722224 0.575325 0.465267 0.258976
12 0.574039 0.258625 0.469209 0.886768
13 0.915423 0.713076 0.073338 0.622967
I hope that helps.
我希望有所帮助。
EDIT
In this case, I used this function with pool of processors in (approximately) this manner:
在这种情况下,我以(近似)这种方式使用此函数和处理器池:
from multiprocessing import Pool
nprocs = 4
pool = Pool(nprocs)
for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()
I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.
我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。
#3
14
Use numpy has this built in: np.array_split()
使用numpy内置了这个:np.array_split()
import numpy as np
import pandas as pd
data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5
#4
11
A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo
良好环境的标志是很多选择,所以我将从Anaconda Blaze添加这个,真的使用Odo
import blaze as bz
import pandas as pd
df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})
for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe
#1
24
In practice, you can't guarantee equal-sized chunks: the number of rows might be prime, after all, in which case your only chunking options would be chunks of size 1 or one big chunk. I tend to pass an array to groupby
. Starting from:
在实践中,你不能保证大小相等的块:毕竟行数可能是素数,在这种情况下,你的唯一分块选项是大小为1或一大块的块。我倾向于将数组传递给groupby。从...开始:
>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 13 0.510273 0.051608 0.230402 0.756921
0 14 0.950544 0.576539 0.642602 0.907850
[15 rows x 5 columns]
where I've deliberately made the index uninformative by setting it to 0, we simply decide on our size (here 10) and integer-divide an array by it:
我故意通过将索引设置为0来使索引无法提供信息,我们只需确定我们的大小(此处为10)并按整数除以数组:
>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
... print(k,g)
...
0 0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 8 0.241049 0.246149 0.241935 0.563428
0 9 0.493819 0.918858 0.193236 0.266257
[10 rows x 5 columns]
1 0 1 2 3 4
0 10 0.037693 0.370789 0.369117 0.401041
0 11 0.721843 0.862295 0.671733 0.605006
[...]
0 14 0.950544 0.576539 0.642602 0.907850
[5 rows x 5 columns]
Methods based on slicing the DataFrame can fail when the index isn't compatible with that, although you can always use .iloc[a:b]
to ignore the index values and access data by position.
虽然您始终可以使用.iloc [a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与此不兼容时失败。
#2
27
I'm not sure if this is exactly what you want, but I found these grouper functions on another SO thread fairly useful for doing a multiprocessor pool.
我不确定这是否正是你想要的,但我在另一个SO线程上发现这些分组函数对于执行多处理器池非常有用。
Here's a short example from that thread, which might do something like what you want:
这是该线程的一个简短示例,它可能会执行您想要的操作:
import numpy as np
import pandas as pds
df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])
def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
for i in chunker(df,5):
print i
Which gives you something like this:
这给你这样的东西:
a b c d
0 0.860574 0.059326 0.339192 0.786399
1 0.029196 0.395613 0.524240 0.380265
2 0.235759 0.164282 0.350042 0.877004
3 0.545394 0.881960 0.994079 0.721279
4 0.584504 0.648308 0.655147 0.511390
a b c d
5 0.276160 0.982803 0.451825 0.845363
6 0.728453 0.246870 0.515770 0.343479
7 0.971947 0.278430 0.006910 0.888512
8 0.044888 0.875791 0.842361 0.890675
9 0.200563 0.246080 0.333202 0.574488
a b c d
10 0.971125 0.106790 0.274001 0.960579
11 0.722224 0.575325 0.465267 0.258976
12 0.574039 0.258625 0.469209 0.886768
13 0.915423 0.713076 0.073338 0.622967
I hope that helps.
我希望有所帮助。
EDIT
In this case, I used this function with pool of processors in (approximately) this manner:
在这种情况下,我以(近似)这种方式使用此函数和处理器池:
from multiprocessing import Pool
nprocs = 4
pool = Pool(nprocs)
for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()
I assume this should be very similar to using the IPython distributed machinery, but I haven't tried it.
我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。
#3
14
Use numpy has this built in: np.array_split()
使用numpy内置了这个:np.array_split()
import numpy as np
import pandas as pd
data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5
#4
11
A sign of a good environment is many choices, so I'll add this from Anaconda Blaze, really using Odo
良好环境的标志是很多选择,所以我将从Anaconda Blaze添加这个,真的使用Odo
import blaze as bz
import pandas as pd
df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})
for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe