“大数据”工作流程使用大熊猫。

时间:2021-08-15 23:48:32

I have tried to puzzle out an answer to this question for many months while learning pandas. I use SAS for my day-to-day work and it is great for it's out-of-core support. However, SAS is horrible as a piece of software for numerous other reasons.

在学习大熊猫的过程中,我已经尝试了好几个月来解答这个问题。我在日常工作中使用SAS,这对它的核心支持非常有用。然而,由于许多其他的原因,SAS非常可怕。

One day I hope to replace my use of SAS with python and pandas, but I currently lack an out-of-core workflow for large datasets. I'm not talking about "big data" that requires a distributed network, but rather files too large to fit in memory but small enough to fit on a hard-drive.

有一天我希望用python和熊猫代替我使用SAS,但是我目前缺少一个大型数据集的核心工作流。我不是在说需要一个分布式网络的“大数据”,而是那些文件太大而无法装入内存,但又足够小,可以安装在硬盘上。

My first thought is to use HDFStore to hold large datasets on disk and pull only the pieces I need into dataframes for analysis. Others have mentioned MongoDB as an easier to use alternative. My question is this:

我的第一个想法是使用HDFStore来保存磁盘上的大型数据集,并只将需要的片段放入dataframes进行分析。其他人则认为MongoDB更易于使用。我的问题是:

What are some best-practice workflows for accomplishing the following:

完成以下工作的最佳实践工作流程如下:

  1. Loading flat files into a permanent, on-disk database structure
  2. 将平面文件加载到一个永久的、磁盘上的数据库结构中。
  3. Querying that database to retrieve data to feed into a pandas data structure
  4. 查询该数据库以检索数据以输入大熊猫的数据结构。
  5. Updating the database after manipulating pieces in pandas
  6. 在处理大熊猫的片段后更新数据库。

Real-world examples would be much appreciated, especially from anyone who uses pandas on "large data".

现实世界的例子将会非常受欢迎,尤其是那些在“大数据”上使用大熊猫的人。

Edit -- an example of how I would like this to work:

编辑——我希望这样做的一个例子:

  1. Iteratively import a large flat-file and store it in a permanent, on-disk database structure. These files are typically too large to fit in memory.
  2. 迭代地导入一个大的平面文件并将其存储在一个永久的磁盘上的数据库结构中。这些文件通常太大,无法装入内存。
  3. In order to use Pandas, I would like to read subsets of this data (usually just a few columns at a time) that can fit in memory.
  4. 为了使用大熊猫,我想要读取这些数据的子集(通常只需要几列),这些数据可以放在内存中。
  5. I would create new columns by performing various operations on the selected columns.
  6. 我将通过在选定的列上执行各种操作来创建新的列。
  7. I would then have to append these new columns into the database structure.
  8. 然后,我必须将这些新的列追加到数据库结构中。

I am trying to find a best-practice way of performing these steps. Reading links about pandas and pytables it seems that appending a new column could be a problem.

我正在努力寻找一种最佳实践方法来执行这些步骤。阅读关于熊猫和pytables的链接,似乎添加了一个新的专栏可能是一个问题。

Edit -- Responding to Jeff's questions specifically:

编辑——对杰夫的问题进行具体的回答:

  1. I am building consumer credit risk models. The kinds of data include phone, SSN and address characteristics; property values; derogatory information like criminal records, bankruptcies, etc... The datasets I use every day have nearly 1,000 to 2,000 fields on average of mixed data types: continuous, nominal and ordinal variables of both numeric and character data. I rarely append rows, but I do perform many operations that create new columns.
  2. 我正在建立消费者信用风险模型。数据类型包括电话、SSN和地址特征;属性值;犯罪记录、破产等贬损信息。我每天使用的数据集平均有1000到2000个字段,平均数据类型为混合数据类型:数字和字符数据的连续、标称和序数变量。我很少添加行,但我确实执行许多创建新列的操作。
  3. Typical operations involve combining several columns using conditional logic into a new, compound column. For example, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. The result of these operations is a new column for every record in my dataset.
  4. 典型的操作包括使用条件逻辑将多个列组合成一个新的复合列。例如,如果var1 > 2然后newvar = 'A' elif var2 = 4,那么newvar = 'B'。这些操作的结果是我的数据集中的每一个记录的新列。
  5. Finally, I would like to append these new columns into the on-disk data structure. I would repeat step 2, exploring the data with crosstabs and descriptive statistics trying to find interesting, intuitive relationships to model.
  6. 最后,我希望将这些新列添加到磁盘上的数据结构中。我将重复第2步,用交叉和描述性统计方法来研究数据,试图找到有趣的、直观的关系模型。
  7. A typical project file is usually about 1GB. Files are organized into such a manner where a row consists of a record of consumer data. Each row has the same number of columns for every record. This will always be the case.
  8. 典型的项目文件通常是1GB。文件被组织成这样一种方式,其中一行包含用户数据的记录。每一行都有相同数量的列,用于每个记录。情况永远是这样。
  9. It's pretty rare that I would subset by rows when creating a new column. However, it's pretty common for me to subset on rows when creating reports or generating descriptive statistics. For example, I might want to create a simple frequency for a specific line of business, say Retail credit cards. To do this, I would select only those records where the line of business = retail in addition to whichever columns I want to report on. When creating new columns, however, I would pull all rows of data and only the columns I need for the operations.
  10. 在创建新列时,我很少会按行进行子集。然而,在创建报表或生成描述性统计数据时,对我来说,在行的子集是很常见的。例如,我可能想为特定的业务线创建一个简单的频率,比如零售信用卡。为了做到这一点,我只选择那些在我想要报告的列之外的业务= retail的记录。然而,在创建新列时,我将会拉出所有的数据行,并且只需要执行操作所需的列。
  11. The modeling process requires that I analyze every column, look for interesting relationships with some outcome variable, and create new compound columns that describe those relationships. The columns that I explore are usually done in small sets. For example, I will focus on a set of say 20 columns just dealing with property values and observe how they relate to defaulting on a loan. Once those are explored and new columns are created, I then move on to another group of columns, say college education, and repeat the process. What I'm doing is creating candidate variables that explain the relationship between my data and some outcome. At the very end of this process, I apply some learning techniques that create an equation out of those compound columns.
  12. 建模过程要求我分析每一列,寻找与某个结果变量的有趣关系,并创建描述这些关系的新的复合列。我所探索的列通常是在小的集合中完成的。例如,我将重点关注一组20列,它们只处理属性值,并观察它们如何与贷款违约相关。一旦这些被探索和新的专栏被创建,我然后转向另一组的专栏,说大学教育,并重复这个过程。我所做的是创建候选变量来解释我的数据和一些结果之间的关系。在这个过程的最后,我应用了一些学习技巧,从这些复合列中创建了一个方程。

It is rare that I would ever add rows to the dataset. I will nearly always be creating new columns (variables or features in statistics/machine learning parlance).

我很少会向数据集添加行。我将几乎总是创建新的列(统计/机器学习术语中的变量或特性)。

13 个解决方案

#1


428  

I routinely use tens of gigabytes of data in just this fashion e.g. I have tables on disk that I read via queries, create data and append back.

我经常用这种方式使用几十兆字节的数据,例如,我在磁盘上有表格,通过查询,创建数据和追加数据。

It's worth reading the docs and late in this thread for several suggestions for how to store your data.

对于如何存储数据的一些建议,在此线程中读取文档和延迟是值得的。

Details which will affect how you store your data, like:
Give as much detail as you can; and I can help you develop a structure.

细节将会影响你如何存储你的数据,比如:尽可能多地提供细节;我可以帮助你建立一个结构。

  1. Size of data, # of rows, columns, types of columns; are you appending rows, or just columns?
  2. 数据的大小、行号、列、列的类型;您是添加行还是列?
  3. What will typical operations look like. E.g. do a query on columns to select a bunch of rows and specific columns, then do an operation (in-memory), create new columns, save these.
    (Giving a toy example could enable us to offer more specific recommendations.)
  4. 典型的操作是什么样的。例如,对列进行查询,以选择一组行和特定列,然后执行操作(内存中),创建新的列,保存这些列。(举个玩具例子,我们可以提供更具体的建议。)
  5. After that processing, then what do you do? Is step 2 ad hoc, or repeatable?
  6. 在处理之后,你会怎么做?步骤2是临时的还是可重复的?
  7. Input flat files: how many, rough total size in Gb. How are these organized e.g. by records? Does each one contains different fields, or do they have some records per file with all of the fields in each file?
  8. 输入平面文件:Gb中有多少个粗略的总大小。这些组织是如何组织起来的?每个文件是否包含不同的字段,或者每个文件中的每个字段都有一些记录?
  9. Do you ever select subsets of rows (records) based on criteria (e.g. select the rows with field A > 5)? and then do something, or do you just select fields A, B, C with all of the records (and then do something)?
  10. 您是否曾经根据标准选择行(记录)的子集(例如,选择带有字段A > 5的行)?然后做点什么,或者你只选择A, B, C,所有的记录(然后做一些事情)?
  11. Do you 'work on' all of your columns (in groups), or are there a good proportion that you may only use for reports (e.g. you want to keep the data around, but don't need to pull in that column explicity until final results time)?
  12. 你是否对所有的列(分组)进行“工作”,或者是否有一个很好的比例,你可能只用于报告(例如,你想要保留数据,但不需要在最后的结果时间之前进行解释)?

Solution

Ensure you have pandas at least 0.10.1 installed.

确保您已经安装了至少0.10.1的熊猫。

Read iterating files chunk-by-chunk and multiple table queries.

读取迭代文件块和多个表查询。

Since pytables is optimized to operate on row-wise (which is what you query on), we will create a table for each group of fields. This way it's easy to select a small group of fields (which will work with a big table, but it's more efficient to do it this way... I think I may be able to fix this limitation in the future... this is more intuitive anyhow):
(The following is pseudocode.)

由于pytables被优化为以行为操作(这是您所查询的),所以我们将为每组字段创建一个表。通过这种方式,可以很容易地选择一小部分字段(它将使用一个大表,但是这样做更有效…)我想我可以在将来解决这个限制……这更直观一些)(以下是伪代码)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Reading in the files and creating the storage (essentially doing what append_to_multiple does):

在文件中读取并创建存储(实际上是做append_to_multido):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Now you have all of the tables in the file (actually you could store them in separate files if you wish, you would prob have to add the filename to the group_map, but probably this isn't necessary).

现在您已经拥有了文件中的所有表(实际上您可以将它们存储在单独的文件中,如果您愿意的话,您需要将文件名添加到group_map中,但是可能这不是必需的)。

This is how you get columns and create new ones:

这就是如何获取列并创建新列的方法:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

When you are ready for post_processing:

当您准备好进行post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

About data_columns, you don't actually need to define ANY data_columns; they allow you to sub-select rows based on the column. E.g. something like:

关于data_columns,实际上不需要定义任何data_columns;它们允许您根据列选择行。例如像:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

They may be most interesting to you in the final report generation stage (essentially a data column is segregated from other columns, which might impact efficiency somewhat if you define a lot).

在最后的报告生成阶段,它们可能对您来说是最有趣的(本质上,数据列是与其他列隔离的,如果您定义了很多,可能会影响效率)。

You also might want to:

你也可能想:

  • create a function which takes a list of fields, looks up the groups in the groups_map, then selects these and concatenates the results so you get the resulting frame (this is essentially what select_as_multiple does). This way the structure would be pretty transparent to you.
  • 创建一个包含字段列表的函数,在groups_map中查找组,然后选择这些并将结果连接起来,这样就得到了结果框架(这实质上就是select_as_multido)。这种结构对你来说是相当透明的。
  • indexes on certain data columns (makes row-subsetting much faster).
  • 某些数据列上的索引(使行子设置更快)。
  • enable compression.
  • 启用压缩。

Let me know when you have questions!

有问题就告诉我!

#2


90  

I think the answers above are missing a simple approach that I've found very useful.

我认为上面的答案缺少一种我觉得非常有用的简单方法。

When I have a file that is too large to load in memory, I break up the file into multiple smaller files (either by row or cols)

当我有一个文件太大而无法载入内存时,我将文件分解成多个更小的文件(通过row或cols)

Example: In case of 30 days worth of trading data of ~30GB size, I break it into a file per day of ~1GB size. I subsequently process each file separately and aggregate results at the end

例如:如果30天的交易数据为~30GB大小,我每天将其分解成一个文件大小为~1GB的文件。我随后分别处理每个文件,最后汇总结果。

One of the biggest advantages is that it allows parallel processing of the files (either multiple threads or processes)

最大的优点之一是它允许并行处理文件(多个线程或进程)

The other advantage is that file manipulation (like adding/removing dates in the example) can be accomplished by regular shell commands, which is not be possible in more advanced/complicated file formats

另一个优点是,文件操作(例如在示例中添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的。

This approach doesn't cover all scenarios, but is very useful in a lot of them

这种方法不包括所有场景,但在很多情况下都非常有用。

#3


51  

If your datasets are between 1 and 20GB, you should get a workstation with 48GB of RAM. Then Pandas can hold the entire dataset in RAM. I know its not the answer you're looking for here, but doing scientific computing on a notebook with 4GB of RAM isn't reasonable.

如果您的数据集在1到20GB之间,那么您应该得到一个具有48GB RAM的工作站。然后,熊猫可以在RAM中保存整个数据集。我知道这不是你想要的答案,但是在一个4GB内存的笔记本上进行科学计算是不合理的。

#4


47  

I know this is an old thread but I think the Blaze library is worth checking out. It's built for these types of situations.

我知道这是一个古老的线索,但我认为火焰图书馆值得一看。它是为这些类型的情况而建立的。

From the docs:

从文档:

Blaze extends the usability of NumPy and Pandas to distributed and out-of-core computing. Blaze provides an interface similar to that of the NumPy ND-Array or Pandas DataFrame but maps these familiar interfaces onto a variety of other computational engines like Postgres or Spark.

火焰扩展了NumPy和熊猫的可用性,用于分布式和非核心计算。火焰提供了一个类似于NumPy ND-Array或熊猫DataFrame的接口,但是将这些熟悉的接口映射到各种其他的计算引擎,比如Postgres或Spark。

Edit: By the way, it's supported by ContinuumIO and Travis Oliphant, author of NumPy.

编辑:顺便说一下,它是由ContinuumIO和Travis Oliphant支持的,NumPy的作者。

#5


43  

This is the case for pymongo. I have also prototyped using sql server, sqlite, HDF, ORM (SQLAlchemy) in python. First and foremost pymongo is a document based DB, so each person would be a document (dict of attributes). Many people form a collection and you can have many collections (people, stock market, income).

这就是pymongo的情况。我还在python中使用了sql server、sqlite、HDF、ORM (SQLAlchemy)原型。首先,pymongo是一个基于文档的数据库,因此每个人都是一个文档(属性的命令)。许多人组成一个集合,你可以有许多收藏(人,股票市场,收入)。

pd.dateframe -> pymongo Note: I use the chunksize in read_csv to keep it to 5 to 10k records(pymongo drops the socket if larger)

pd。dateframe -> pymongo注意:我在read_csv中使用chunksize将其保存到5到10k记录(如果较大的话,pymongo会将套接字删除)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

querying: gt = greater than...

查询:gt =大于…

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() returns an iterator so I commonly use ichunked to chop into smaller iterators.

find()返回一个迭代器,因此我通常使用ichunked来将它切成更小的迭代器。

How about a join since I normally get 10 data sources to paste together:

加入一个join,因为我通常会得到10个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

then (in my case sometimes I have to agg on aJoinDF first before its "mergeable".)

然后(在我的例子中,有时我不得不在它的“mergeable”之前加入aJoinDF。)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

And you can then write the new info to your main collection via the update method below. (logical collection vs physical datasources).

然后,您可以通过下面的更新方法将新信息写入您的主要集合。(逻辑收集vs物理数据源)。

collection.update({primarykey:foo},{key:change})

On smaller lookups, just denormalize. For example, you have code in the document and you just add the field code text and do a dict lookup as you create documents.

在较小的查找中,只是将其反规范化。例如,在文档中有代码,只需添加字段代码文本,并在创建文档时执行命令查找。

Now you have a nice dataset based around a person, you can unleash your logic on each case and make more attributes. Finally you can read into pandas your 3 to memory max key indicators and do pivots/agg/data exploration. This works for me for 3 million records with numbers/big text/categories/codes/floats/...

现在您有了一个基于person的良好数据集,您可以在每个案例中释放您的逻辑,并生成更多的属性。最后,你可以将你的3读到记忆体的关键指标,并做数据透视/agg/数据的探索。这对我来说适用于300万条数字/大文本/分类/代码/浮动/…

You can also use the two methods built into MongoDB (MapReduce and aggregate framework). See here for more info about the aggregate framework, as it seems to be easier than MapReduce and looks handy for quick aggregate work. Notice I didn't need to define my fields or relations, and I can add items to a document. At the current state of the rapidly changing numpy, pandas, python toolset, MongoDB helps me just get to work :)

您还可以使用构建到MongoDB中的两个方法(MapReduce和聚合框架)。请参阅这里了解关于聚合框架的更多信息,因为它看起来比MapReduce更容易,而且看起来方便快速聚合。注意,我不需要定义我的字段或关系,我可以向文档添加项。在快速变化的numpy、熊猫、python工具集的当前状态下,MongoDB帮助我开始工作:)

#6


41  

There is now, two years after the question, an 'out-of-core' pandas equivalent: dask. It is excellent! Though it does not support all of pandas functionality, you can get really far with it.

现在,在这个问题出现两年后,一个“外核”的熊猫相当于:dask。这是优秀的!虽然它不支持所有的熊猫功能,但你可以做得很好。

#7


35  

I spotted this a little late, but I work with a similar problem (mortgage prepayment models). My solution has been to skip the pandas HDFStore layer and use straight pytables. I save each column as an individual HDF5 array in my final file.

我发现这有点晚了,但我也有类似的问题(抵押预付款模型)。我的解决方案是跳过熊猫的HDFStore层,使用直的pytable。我将每一列作为一个单独的HDF5数组保存在我的最终文件中。

My basic workflow is to first get a CSV file from the database. I gzip it, so it's not as huge. Then I convert that to a row-oriented HDF5 file, by iterating over it in python, converting each row to a real data type, and writing it to a HDF5 file. That takes some tens of minutes, but it doesn't use any memory, since it's only operating row-by-row. Then I "transpose" the row-oriented HDF5 file into a column-oriented HDF5 file.

我的基本工作流程是首先从数据库获取一个CSV文件。我把它压缩了,所以没有那么大。然后,我将其转换为面向行的HDF5文件,通过在python中迭代它,将每行转换为实际数据类型,并将其写入到HDF5文件中。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的HDF5文件“转置”到一个以列为导向的HDF5文件中。

The table transpose looks like:

表的转置是这样的:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Reading it back in then looks like:

读回来,看起来是这样的:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Now, I generally run this on a machine with a ton of memory, so I may not be careful enough with my memory usage. For example, by default the load operation reads the whole data set.

现在,我通常在一台有大量内存的机器上运行它,所以我可能对内存使用不够仔细。例如,默认情况下,load操作读取整个数据集。

This generally works for me, but it's a bit clunky, and I can't use the fancy pytables magic.

这通常适用于我,但它有点笨拙,我不能使用神奇的pytables魔术。

Edit: The real advantage of this approach, over the array-of-records pytables default, is that I can then load the data into R using h5r, which can't handle tables. Or, at least, I've been unable to get it to load heterogeneous tables.

编辑:这种方法的真正优点是,在记录pytables的默认情况下,我可以使用h5r将数据加载到R中,而hh5r不能处理表。或者,至少,我无法让它装载异构表。

#8


11  

One more variation

一个变化

Many of the operations done in pandas can also be done as a db query (sql, mongo)

在熊猫的许多操作也可以作为一个db查询(sql, mongo)完成。

Using a RDBMS or mongodb allows you to perform some of the aggregations in the DB Query (which is optimized for large data, and uses cache and indexes efficiently)

使用RDBMS或mongodb允许您在DB查询中执行一些聚合(对大数据进行优化,并有效地使用缓存和索引)

Later, you can perform post processing using pandas.

稍后,您可以使用熊猫进行后期处理。

The advantage of this method is that you gain the DB optimizations for working with large data, while still defining the logic in a high level declarative syntax - and not having to deal with the details of deciding what to do in memory and what to do out of core.

这种方法的优点是,你获得的数据库优化处理大型数据时,同时仍然在高水平声明性语法定义的逻辑,没有处理的细节决定做什么在内存和核心。

And although the query language and pandas are different, it's usually not complicated to translate part of the logic from one to another.

虽然查询语言和熊猫是不同的,但是将部分逻辑从一个转换到另一个并不复杂。

#9


9  

One trick I found helpful for "large data" use cases is to reduce the volume of the data by reducing float precision to 32-bit. It's not applicable in all cases, but in many applications 64-bit precision is overkill and the 2x memory savings are worth it. To make an obvious point even more obvious:

我发现对“大数据”用例有用的一个技巧是通过将浮点精度降低到32位来减少数据量。它在所有情况下都不适用,但是在许多应用程序中,64位精度是多余的,而2x内存的节省是值得的。更明显的一点是:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

#10


7  

Consider Ruffus if you go the simple path of creating a data pipeline which is broken down into multiple smaller files.

考虑Ruffus,如果您选择创建一个数据管道的简单路径,它被分解成多个较小的文件。

#11


6  

As noted by others, after some years an 'out-of-core' pandas equivalent has emerged: dask. Though dask is not a drop-in replacement of pandas and all of its functionality it stands out for several reasons:

正如其他人所指出的,在几年之后,一种“外核”的大熊猫已经出现了:dask。尽管dask并不是熊猫的替代品,它的所有功能都很突出,原因如下:

Dask is a flexible parallel computing library for analytic computing that is optimized for dynamic task scheduling for interactive computational workloads of “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments and scales from laptops to clusters.

Dask分析计算是一个灵活的并行计算库动态任务调度优化的交互式计算工作负载的“大数据”像并行数组集合,dataframes,延长NumPy等常用接口列表,熊猫,Python迭代器larger-than-memory或者分布式环境和尺度从笔记本电脑到集群。

Dask emphasizes the following virtues:

Dask强调以下优点:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • 熟悉:提供并行的NumPy阵列和熊猫DataFrame对象。
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • 灵活:为更多的自定义工作负载和与其他项目的集成提供任务调度接口。
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • 本机:支持在纯Python中使用PyData栈进行分布式计算。
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • 快速:运行低开销,低延迟,和最小的序列化,对于快速的数字算法。
  • Scales up: Runs resiliently on clusters with 1000s of cores Scales down: Trivial to set up and run on a laptop in a single process
  • 扩展:在具有1000s核心级别的集群上运行有弹性:在单个进程中设置和运行在笔记本电脑上是很容易的。
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans
  • 响应性:设计的交互式计算,它提供快速反馈和诊断帮助人类。

and to add a simple code sample:

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

replaces some pandas code like this:

替换一些熊猫代码如下:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

and, especially noteworthy, provides through the concurrent.futures interface a general for the submission of custom tasks:

而且,特别值得注意的是,通过并发提供。期货界面一般用于提交自定义任务:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

#12


3  

I recently came across a similar issue. I found simply reading the data in chunks and appending it as I write it in chunks to the same csv works well. My problem was adding a date column based on information in another table, using the value of certain columns as follows. This may help those confused by dask and hdf5 but more familiar with pandas like myself.

我最近遇到了一个类似的问题。我发现简单地用块读取数据,然后将其添加到相同的csv文件中。我的问题是在另一个表中添加基于信息的日期列,使用某些列的值如下所示。这可能会帮助那些被dask和hdf5迷惑的人,但更熟悉像我这样的熊猫。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

#13


1  

It is worth mentioning here Ray as well,
it's a distributed computation framework, that has it's own implementation for pandas in a distributed way.

这里值得一提的是,Ray也是一个分布式计算框架,它以分布式的方式实现了对熊猫的实现。

Just replace the pandas import, and the code should work as is:

只需要更换熊猫进口,代码应该是:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

can read more details here:

可以在这里阅读更多细节:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

#1


428  

I routinely use tens of gigabytes of data in just this fashion e.g. I have tables on disk that I read via queries, create data and append back.

我经常用这种方式使用几十兆字节的数据,例如,我在磁盘上有表格,通过查询,创建数据和追加数据。

It's worth reading the docs and late in this thread for several suggestions for how to store your data.

对于如何存储数据的一些建议,在此线程中读取文档和延迟是值得的。

Details which will affect how you store your data, like:
Give as much detail as you can; and I can help you develop a structure.

细节将会影响你如何存储你的数据,比如:尽可能多地提供细节;我可以帮助你建立一个结构。

  1. Size of data, # of rows, columns, types of columns; are you appending rows, or just columns?
  2. 数据的大小、行号、列、列的类型;您是添加行还是列?
  3. What will typical operations look like. E.g. do a query on columns to select a bunch of rows and specific columns, then do an operation (in-memory), create new columns, save these.
    (Giving a toy example could enable us to offer more specific recommendations.)
  4. 典型的操作是什么样的。例如,对列进行查询,以选择一组行和特定列,然后执行操作(内存中),创建新的列,保存这些列。(举个玩具例子,我们可以提供更具体的建议。)
  5. After that processing, then what do you do? Is step 2 ad hoc, or repeatable?
  6. 在处理之后,你会怎么做?步骤2是临时的还是可重复的?
  7. Input flat files: how many, rough total size in Gb. How are these organized e.g. by records? Does each one contains different fields, or do they have some records per file with all of the fields in each file?
  8. 输入平面文件:Gb中有多少个粗略的总大小。这些组织是如何组织起来的?每个文件是否包含不同的字段,或者每个文件中的每个字段都有一些记录?
  9. Do you ever select subsets of rows (records) based on criteria (e.g. select the rows with field A > 5)? and then do something, or do you just select fields A, B, C with all of the records (and then do something)?
  10. 您是否曾经根据标准选择行(记录)的子集(例如,选择带有字段A > 5的行)?然后做点什么,或者你只选择A, B, C,所有的记录(然后做一些事情)?
  11. Do you 'work on' all of your columns (in groups), or are there a good proportion that you may only use for reports (e.g. you want to keep the data around, but don't need to pull in that column explicity until final results time)?
  12. 你是否对所有的列(分组)进行“工作”,或者是否有一个很好的比例,你可能只用于报告(例如,你想要保留数据,但不需要在最后的结果时间之前进行解释)?

Solution

Ensure you have pandas at least 0.10.1 installed.

确保您已经安装了至少0.10.1的熊猫。

Read iterating files chunk-by-chunk and multiple table queries.

读取迭代文件块和多个表查询。

Since pytables is optimized to operate on row-wise (which is what you query on), we will create a table for each group of fields. This way it's easy to select a small group of fields (which will work with a big table, but it's more efficient to do it this way... I think I may be able to fix this limitation in the future... this is more intuitive anyhow):
(The following is pseudocode.)

由于pytables被优化为以行为操作(这是您所查询的),所以我们将为每组字段创建一个表。通过这种方式,可以很容易地选择一小部分字段(它将使用一个大表,但是这样做更有效…)我想我可以在将来解决这个限制……这更直观一些)(以下是伪代码)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

Reading in the files and creating the storage (essentially doing what append_to_multiple does):

在文件中读取并创建存储(实际上是做append_to_multido):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

Now you have all of the tables in the file (actually you could store them in separate files if you wish, you would prob have to add the filename to the group_map, but probably this isn't necessary).

现在您已经拥有了文件中的所有表(实际上您可以将它们存储在单独的文件中,如果您愿意的话,您需要将文件名添加到group_map中,但是可能这不是必需的)。

This is how you get columns and create new ones:

这就是如何获取列并创建新列的方法:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

When you are ready for post_processing:

当您准备好进行post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

About data_columns, you don't actually need to define ANY data_columns; they allow you to sub-select rows based on the column. E.g. something like:

关于data_columns,实际上不需要定义任何data_columns;它们允许您根据列选择行。例如像:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

They may be most interesting to you in the final report generation stage (essentially a data column is segregated from other columns, which might impact efficiency somewhat if you define a lot).

在最后的报告生成阶段,它们可能对您来说是最有趣的(本质上,数据列是与其他列隔离的,如果您定义了很多,可能会影响效率)。

You also might want to:

你也可能想:

  • create a function which takes a list of fields, looks up the groups in the groups_map, then selects these and concatenates the results so you get the resulting frame (this is essentially what select_as_multiple does). This way the structure would be pretty transparent to you.
  • 创建一个包含字段列表的函数,在groups_map中查找组,然后选择这些并将结果连接起来,这样就得到了结果框架(这实质上就是select_as_multido)。这种结构对你来说是相当透明的。
  • indexes on certain data columns (makes row-subsetting much faster).
  • 某些数据列上的索引(使行子设置更快)。
  • enable compression.
  • 启用压缩。

Let me know when you have questions!

有问题就告诉我!

#2


90  

I think the answers above are missing a simple approach that I've found very useful.

我认为上面的答案缺少一种我觉得非常有用的简单方法。

When I have a file that is too large to load in memory, I break up the file into multiple smaller files (either by row or cols)

当我有一个文件太大而无法载入内存时,我将文件分解成多个更小的文件(通过row或cols)

Example: In case of 30 days worth of trading data of ~30GB size, I break it into a file per day of ~1GB size. I subsequently process each file separately and aggregate results at the end

例如:如果30天的交易数据为~30GB大小,我每天将其分解成一个文件大小为~1GB的文件。我随后分别处理每个文件,最后汇总结果。

One of the biggest advantages is that it allows parallel processing of the files (either multiple threads or processes)

最大的优点之一是它允许并行处理文件(多个线程或进程)

The other advantage is that file manipulation (like adding/removing dates in the example) can be accomplished by regular shell commands, which is not be possible in more advanced/complicated file formats

另一个优点是,文件操作(例如在示例中添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的。

This approach doesn't cover all scenarios, but is very useful in a lot of them

这种方法不包括所有场景,但在很多情况下都非常有用。

#3


51  

If your datasets are between 1 and 20GB, you should get a workstation with 48GB of RAM. Then Pandas can hold the entire dataset in RAM. I know its not the answer you're looking for here, but doing scientific computing on a notebook with 4GB of RAM isn't reasonable.

如果您的数据集在1到20GB之间,那么您应该得到一个具有48GB RAM的工作站。然后,熊猫可以在RAM中保存整个数据集。我知道这不是你想要的答案,但是在一个4GB内存的笔记本上进行科学计算是不合理的。

#4


47  

I know this is an old thread but I think the Blaze library is worth checking out. It's built for these types of situations.

我知道这是一个古老的线索,但我认为火焰图书馆值得一看。它是为这些类型的情况而建立的。

From the docs:

从文档:

Blaze extends the usability of NumPy and Pandas to distributed and out-of-core computing. Blaze provides an interface similar to that of the NumPy ND-Array or Pandas DataFrame but maps these familiar interfaces onto a variety of other computational engines like Postgres or Spark.

火焰扩展了NumPy和熊猫的可用性,用于分布式和非核心计算。火焰提供了一个类似于NumPy ND-Array或熊猫DataFrame的接口,但是将这些熟悉的接口映射到各种其他的计算引擎,比如Postgres或Spark。

Edit: By the way, it's supported by ContinuumIO and Travis Oliphant, author of NumPy.

编辑:顺便说一下,它是由ContinuumIO和Travis Oliphant支持的,NumPy的作者。

#5


43  

This is the case for pymongo. I have also prototyped using sql server, sqlite, HDF, ORM (SQLAlchemy) in python. First and foremost pymongo is a document based DB, so each person would be a document (dict of attributes). Many people form a collection and you can have many collections (people, stock market, income).

这就是pymongo的情况。我还在python中使用了sql server、sqlite、HDF、ORM (SQLAlchemy)原型。首先,pymongo是一个基于文档的数据库,因此每个人都是一个文档(属性的命令)。许多人组成一个集合,你可以有许多收藏(人,股票市场,收入)。

pd.dateframe -> pymongo Note: I use the chunksize in read_csv to keep it to 5 to 10k records(pymongo drops the socket if larger)

pd。dateframe -> pymongo注意:我在read_csv中使用chunksize将其保存到5到10k记录(如果较大的话,pymongo会将套接字删除)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

querying: gt = greater than...

查询:gt =大于…

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() returns an iterator so I commonly use ichunked to chop into smaller iterators.

find()返回一个迭代器,因此我通常使用ichunked来将它切成更小的迭代器。

How about a join since I normally get 10 data sources to paste together:

加入一个join,因为我通常会得到10个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

then (in my case sometimes I have to agg on aJoinDF first before its "mergeable".)

然后(在我的例子中,有时我不得不在它的“mergeable”之前加入aJoinDF。)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

And you can then write the new info to your main collection via the update method below. (logical collection vs physical datasources).

然后,您可以通过下面的更新方法将新信息写入您的主要集合。(逻辑收集vs物理数据源)。

collection.update({primarykey:foo},{key:change})

On smaller lookups, just denormalize. For example, you have code in the document and you just add the field code text and do a dict lookup as you create documents.

在较小的查找中,只是将其反规范化。例如,在文档中有代码,只需添加字段代码文本,并在创建文档时执行命令查找。

Now you have a nice dataset based around a person, you can unleash your logic on each case and make more attributes. Finally you can read into pandas your 3 to memory max key indicators and do pivots/agg/data exploration. This works for me for 3 million records with numbers/big text/categories/codes/floats/...

现在您有了一个基于person的良好数据集,您可以在每个案例中释放您的逻辑,并生成更多的属性。最后,你可以将你的3读到记忆体的关键指标,并做数据透视/agg/数据的探索。这对我来说适用于300万条数字/大文本/分类/代码/浮动/…

You can also use the two methods built into MongoDB (MapReduce and aggregate framework). See here for more info about the aggregate framework, as it seems to be easier than MapReduce and looks handy for quick aggregate work. Notice I didn't need to define my fields or relations, and I can add items to a document. At the current state of the rapidly changing numpy, pandas, python toolset, MongoDB helps me just get to work :)

您还可以使用构建到MongoDB中的两个方法(MapReduce和聚合框架)。请参阅这里了解关于聚合框架的更多信息,因为它看起来比MapReduce更容易,而且看起来方便快速聚合。注意,我不需要定义我的字段或关系,我可以向文档添加项。在快速变化的numpy、熊猫、python工具集的当前状态下,MongoDB帮助我开始工作:)

#6


41  

There is now, two years after the question, an 'out-of-core' pandas equivalent: dask. It is excellent! Though it does not support all of pandas functionality, you can get really far with it.

现在,在这个问题出现两年后,一个“外核”的熊猫相当于:dask。这是优秀的!虽然它不支持所有的熊猫功能,但你可以做得很好。

#7


35  

I spotted this a little late, but I work with a similar problem (mortgage prepayment models). My solution has been to skip the pandas HDFStore layer and use straight pytables. I save each column as an individual HDF5 array in my final file.

我发现这有点晚了,但我也有类似的问题(抵押预付款模型)。我的解决方案是跳过熊猫的HDFStore层,使用直的pytable。我将每一列作为一个单独的HDF5数组保存在我的最终文件中。

My basic workflow is to first get a CSV file from the database. I gzip it, so it's not as huge. Then I convert that to a row-oriented HDF5 file, by iterating over it in python, converting each row to a real data type, and writing it to a HDF5 file. That takes some tens of minutes, but it doesn't use any memory, since it's only operating row-by-row. Then I "transpose" the row-oriented HDF5 file into a column-oriented HDF5 file.

我的基本工作流程是首先从数据库获取一个CSV文件。我把它压缩了,所以没有那么大。然后,我将其转换为面向行的HDF5文件,通过在python中迭代它,将每行转换为实际数据类型,并将其写入到HDF5文件中。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的HDF5文件“转置”到一个以列为导向的HDF5文件中。

The table transpose looks like:

表的转置是这样的:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

Reading it back in then looks like:

读回来,看起来是这样的:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

Now, I generally run this on a machine with a ton of memory, so I may not be careful enough with my memory usage. For example, by default the load operation reads the whole data set.

现在,我通常在一台有大量内存的机器上运行它,所以我可能对内存使用不够仔细。例如,默认情况下,load操作读取整个数据集。

This generally works for me, but it's a bit clunky, and I can't use the fancy pytables magic.

这通常适用于我,但它有点笨拙,我不能使用神奇的pytables魔术。

Edit: The real advantage of this approach, over the array-of-records pytables default, is that I can then load the data into R using h5r, which can't handle tables. Or, at least, I've been unable to get it to load heterogeneous tables.

编辑:这种方法的真正优点是,在记录pytables的默认情况下,我可以使用h5r将数据加载到R中,而hh5r不能处理表。或者,至少,我无法让它装载异构表。

#8


11  

One more variation

一个变化

Many of the operations done in pandas can also be done as a db query (sql, mongo)

在熊猫的许多操作也可以作为一个db查询(sql, mongo)完成。

Using a RDBMS or mongodb allows you to perform some of the aggregations in the DB Query (which is optimized for large data, and uses cache and indexes efficiently)

使用RDBMS或mongodb允许您在DB查询中执行一些聚合(对大数据进行优化,并有效地使用缓存和索引)

Later, you can perform post processing using pandas.

稍后,您可以使用熊猫进行后期处理。

The advantage of this method is that you gain the DB optimizations for working with large data, while still defining the logic in a high level declarative syntax - and not having to deal with the details of deciding what to do in memory and what to do out of core.

这种方法的优点是,你获得的数据库优化处理大型数据时,同时仍然在高水平声明性语法定义的逻辑,没有处理的细节决定做什么在内存和核心。

And although the query language and pandas are different, it's usually not complicated to translate part of the logic from one to another.

虽然查询语言和熊猫是不同的,但是将部分逻辑从一个转换到另一个并不复杂。

#9


9  

One trick I found helpful for "large data" use cases is to reduce the volume of the data by reducing float precision to 32-bit. It's not applicable in all cases, but in many applications 64-bit precision is overkill and the 2x memory savings are worth it. To make an obvious point even more obvious:

我发现对“大数据”用例有用的一个技巧是通过将浮点精度降低到32位来减少数据量。它在所有情况下都不适用,但是在许多应用程序中,64位精度是多余的,而2x内存的节省是值得的。更明显的一点是:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

#10


7  

Consider Ruffus if you go the simple path of creating a data pipeline which is broken down into multiple smaller files.

考虑Ruffus,如果您选择创建一个数据管道的简单路径,它被分解成多个较小的文件。

#11


6  

As noted by others, after some years an 'out-of-core' pandas equivalent has emerged: dask. Though dask is not a drop-in replacement of pandas and all of its functionality it stands out for several reasons:

正如其他人所指出的,在几年之后,一种“外核”的大熊猫已经出现了:dask。尽管dask并不是熊猫的替代品,它的所有功能都很突出,原因如下:

Dask is a flexible parallel computing library for analytic computing that is optimized for dynamic task scheduling for interactive computational workloads of “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments and scales from laptops to clusters.

Dask分析计算是一个灵活的并行计算库动态任务调度优化的交互式计算工作负载的“大数据”像并行数组集合,dataframes,延长NumPy等常用接口列表,熊猫,Python迭代器larger-than-memory或者分布式环境和尺度从笔记本电脑到集群。

Dask emphasizes the following virtues:

Dask强调以下优点:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • 熟悉:提供并行的NumPy阵列和熊猫DataFrame对象。
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • 灵活:为更多的自定义工作负载和与其他项目的集成提供任务调度接口。
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • 本机:支持在纯Python中使用PyData栈进行分布式计算。
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • 快速:运行低开销,低延迟,和最小的序列化,对于快速的数字算法。
  • Scales up: Runs resiliently on clusters with 1000s of cores Scales down: Trivial to set up and run on a laptop in a single process
  • 扩展:在具有1000s核心级别的集群上运行有弹性:在单个进程中设置和运行在笔记本电脑上是很容易的。
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans
  • 响应性:设计的交互式计算,它提供快速反馈和诊断帮助人类。

and to add a simple code sample:

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

replaces some pandas code like this:

替换一些熊猫代码如下:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

and, especially noteworthy, provides through the concurrent.futures interface a general for the submission of custom tasks:

而且,特别值得注意的是,通过并发提供。期货界面一般用于提交自定义任务:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

#12


3  

I recently came across a similar issue. I found simply reading the data in chunks and appending it as I write it in chunks to the same csv works well. My problem was adding a date column based on information in another table, using the value of certain columns as follows. This may help those confused by dask and hdf5 but more familiar with pandas like myself.

我最近遇到了一个类似的问题。我发现简单地用块读取数据,然后将其添加到相同的csv文件中。我的问题是在另一个表中添加基于信息的日期列,使用某些列的值如下所示。这可能会帮助那些被dask和hdf5迷惑的人,但更熟悉像我这样的熊猫。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

#13


1  

It is worth mentioning here Ray as well,
it's a distributed computation framework, that has it's own implementation for pandas in a distributed way.

这里值得一提的是,Ray也是一个分布式计算框架,它以分布式的方式实现了对熊猫的实现。

Just replace the pandas import, and the code should work as is:

只需要更换熊猫进口,代码应该是:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

can read more details here:

可以在这里阅读更多细节:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

https://rise.cs.berkeley.edu/blog/pandas-on-ray/