如何从mongodb导入数据到熊猫?

时间:2021-09-09 06:53:50

I have a large amount of data in a collection in mongodb which I need to analyze. How do i import that data to pandas?

我在mongodb的一个集合中有大量的数据需要分析。我如何将这些数据导入大熊猫?

I am new to pandas and numpy.

我对熊猫很陌生。

EDIT: The mongodb collection contains sensor values tagged with date and time. The sensor values are of float datatype.

编辑:mongodb集合包含带有日期和时间的传感器值。传感器值为浮点数据类型。

Sample Data:

样本数据:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}

11 个解决方案

#1


77  

pymongo might give you a hand, followings are some codes I'm using:

pymongo可能会帮你一把,以下是我使用的一些代码:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

#2


18  

Monary does exactly that, and it's super fast. (another link)

肺就是这么做的,而且速度非常快。(另一个链接)

See this cool post which includes a quick tutorial and some timings.

看这个很酷的帖子,包括一个快速教程和一些计时。

#3


17  

You can load your mongodb data to pandas DataFrame using this code. It works for me. Hopefully for you too.

您可以使用此代码将mongodb数据加载到熊猫DataFrame。它适合我。希望对你也一样。

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

#4


8  

import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)

#5


7  

As per PEP, simple is better than complicated:

就PEP而言,简单比复杂好:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

You can include conditions as you would working with regular mongoDB database or even use find_one() to get only one element from the database, etc.

您可以包括使用常规mongoDB数据库的条件,甚至可以使用find_one()从数据库中获取一个元素,等等。

and voila!

瞧!

#6


4  

For dealing with out-of-core (not fitting into RAM) data efficiently (i.e. with parallel execution), you can try Python Blaze ecosystem: Blaze / Dask / Odo.

为了有效地处理非核心(不适合RAM)数据(即并行执行),您可以尝试Python火焰生态系统:火焰/ Dask / Odo。

Blaze (and Odo) has out-of-the-box functions to deal with MongoDB.

fire(和Odo)有开箱即用的功能来处理MongoDB。

A few useful articles to start off:

一些有用的文章开始:

And an article which shows what amazing things are possible with Blaze stack: Analyzing 1.7 Billion Reddit Comments with Blaze and Impala (essentially, querying 975 Gb of Reddit comments in seconds).

还有一篇文章,展示了火焰堆场的惊人之处:用火焰和黑斑来分析17亿个Reddit的评论(本质上是,在几秒钟内查询975 Gb的Reddit评论)。

P.S. I'm not affiliated with any of these technologies.

附注:我与这些技术没有任何关系。

#7


2  

http://docs.mongodb.org/manual/reference/mongoexport

http://docs.mongodb.org/manual/reference/mongoexport

export to csv and use read_csv or JSON and use DataFrame.from_records

导出到csv并使用read_csv或JSON,并使用DataFrame.from_records。

#8


1  

Using

使用

pandas.DataFrame(list(...))

will consume a lot of memory if the iterator/generator result is large

如果迭代器/生成器结果很大,会消耗大量内存吗?

better to generate small chunks and concat at the end

最好在最后生成小块和concat。

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

#9


1  

A similar approach like Rafael Valero, waitingkuo and Deu Leung using pagination:

类似的方法,如Rafael Valero, waitingkuo和Deu Leung使用页码:

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

#10


1  

Another option I found very useful is:

我发现另一个非常有用的选择是:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

this way you get the unfolding of nested mongodb documents for free.

通过这种方式,您可以免费获得嵌套的mongodb文档。

#11


0  

Following this great answer by waitingkuo I would like to add the possibility of doing that using chunksize in line with .read_sql() and .read_csv(). I enlarge the answer from Deu Leung by avoiding go one by one each 'record' of the 'iterator' / 'cursor'. I will borrow previous read_mongo function.

通过waitingkuo的这一伟大的回答,我想增加使用chunksize与.read_sql()和.read_csv()的可能性。我用“迭代器”/“游标”的每一个“记录”来避免一个接一个地从Deu Leung那里得到答案。我将借用之前的read_mongo函数。

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df

#1


77  

pymongo might give you a hand, followings are some codes I'm using:

pymongo可能会帮你一把,以下是我使用的一些代码:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

#2


18  

Monary does exactly that, and it's super fast. (another link)

肺就是这么做的,而且速度非常快。(另一个链接)

See this cool post which includes a quick tutorial and some timings.

看这个很酷的帖子,包括一个快速教程和一些计时。

#3


17  

You can load your mongodb data to pandas DataFrame using this code. It works for me. Hopefully for you too.

您可以使用此代码将mongodb数据加载到熊猫DataFrame。它适合我。希望对你也一样。

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))

#4


8  

import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)

#5


7  

As per PEP, simple is better than complicated:

就PEP而言,简单比复杂好:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

You can include conditions as you would working with regular mongoDB database or even use find_one() to get only one element from the database, etc.

您可以包括使用常规mongoDB数据库的条件,甚至可以使用find_one()从数据库中获取一个元素,等等。

and voila!

瞧!

#6


4  

For dealing with out-of-core (not fitting into RAM) data efficiently (i.e. with parallel execution), you can try Python Blaze ecosystem: Blaze / Dask / Odo.

为了有效地处理非核心(不适合RAM)数据(即并行执行),您可以尝试Python火焰生态系统:火焰/ Dask / Odo。

Blaze (and Odo) has out-of-the-box functions to deal with MongoDB.

fire(和Odo)有开箱即用的功能来处理MongoDB。

A few useful articles to start off:

一些有用的文章开始:

And an article which shows what amazing things are possible with Blaze stack: Analyzing 1.7 Billion Reddit Comments with Blaze and Impala (essentially, querying 975 Gb of Reddit comments in seconds).

还有一篇文章,展示了火焰堆场的惊人之处:用火焰和黑斑来分析17亿个Reddit的评论(本质上是,在几秒钟内查询975 Gb的Reddit评论)。

P.S. I'm not affiliated with any of these technologies.

附注:我与这些技术没有任何关系。

#7


2  

http://docs.mongodb.org/manual/reference/mongoexport

http://docs.mongodb.org/manual/reference/mongoexport

export to csv and use read_csv or JSON and use DataFrame.from_records

导出到csv并使用read_csv或JSON,并使用DataFrame.from_records。

#8


1  

Using

使用

pandas.DataFrame(list(...))

will consume a lot of memory if the iterator/generator result is large

如果迭代器/生成器结果很大,会消耗大量内存吗?

better to generate small chunks and concat at the end

最好在最后生成小块和concat。

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

#9


1  

A similar approach like Rafael Valero, waitingkuo and Deu Leung using pagination:

类似的方法,如Rafael Valero, waitingkuo和Deu Leung使用页码:

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df

#10


1  

Another option I found very useful is:

我发现另一个非常有用的选择是:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

this way you get the unfolding of nested mongodb documents for free.

通过这种方式,您可以免费获得嵌套的mongodb文档。

#11


0  

Following this great answer by waitingkuo I would like to add the possibility of doing that using chunksize in line with .read_sql() and .read_csv(). I enlarge the answer from Deu Leung by avoiding go one by one each 'record' of the 'iterator' / 'cursor'. I will borrow previous read_mongo function.

通过waitingkuo的这一伟大的回答,我想增加使用chunksize与.read_sql()和.read_csv()的可能性。我用“迭代器”/“游标”的每一个“记录”来避免一个接一个地从Deu Leung那里得到答案。我将借用之前的read_mongo函数。

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df