Wie importiere ich Daten von Mongodb in Pandas?

96

Ich habe eine große Datenmenge in einer Sammlung in Mongodb, die ich analysieren muss. Wie importiere ich diese Daten in Pandas?

Ich bin neu in Pandas und Numpy.

BEARBEITEN: Die Mongodb-Sammlung enthält Sensorwerte, die mit Datum und Uhrzeit gekennzeichnet sind. Die Sensorwerte sind vom Datentyp float.

Beispieldaten:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
Nithin
quelle
Die Verwendung eines benutzerdefinierten Feldtyps mit MongoEngine kann das Speichern und Abrufen von Pandas DataFrames so einfach machen wiemongo_doc.data_frame = my_pandas_df
Jthorpe

Antworten:

130

pymongo könnte Ihnen helfen, folgendes sind einige Codes, die ich benutze:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
Wartenkuo
quelle
Danke, das ist die Methode, die ich letztendlich verwendet habe. Ich hatte auch eine Reihe von eingebetteten Dokumenten in jeder Zeile. Also musste ich das auch in jeder Zeile wiederholen. Gibt es einen besseren Weg, dies zu tun?
Nithin
Ist es möglich, einige Beispiele der Struktur Ihres Mongodb bereitzustellen?
Warten am
3
Beachten Sie, dass das list()Innere df = pd.DataFrame(list(cursor))als Liste oder Generator ausgewertet wird, um die CPU kühl zu halten. Wenn Sie einunddreißig Datenelemente haben und die nächsten paar Zeilen einigermaßen aufgeteilt, detailliert und abgeschnitten wären, ist das ganze Shmegegge immer noch sicher. Schön.
Phlip
2
Es ist sehr langsam @ df = pd.DataFrame(list(cursor)). Reine Datenbankabfrage ist viel schneller. Könnten wir das listCasting in etwas anderes ändern ?
Peter.k
1
@ Peter diese Linie fiel mir auch auf. Das Umwandeln eines Datenbankcursors, der iterierbar ist und möglicherweise große Datenmengen umschließt, in eine speicherinterne Liste erscheint mir nicht klug.
Rafa
38

Mit diesem Code können Sie Ihre Mongodb-Daten in pandas DataFrame laden. Für mich geht das. Hoffentlich auch für dich.

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
saimadhu.polamuri
quelle
23

Monarymacht genau das und es ist super schnell . (ein weiterer Link )

Sehen Sie sich diesen coolen Beitrag an, der ein kurzes Tutorial und einige Timings enthält.

shx2
quelle
Unterstützt Monary den String-Datentyp?
Snehal Parmar
Ich habe Monary ausprobiert, aber es braucht viel Zeit. Fehlt mir eine Optimierung? Versucht client = Monary(host, 27017, database="db_tmp") columns = ["col1", "col2"] data_type = ["int64", "int64"] arrays = client.query("db_tmp", "coll", {}, columns, data_type)für 50000Aufzeichnungen dauert herum 200s.
Nishant
Das klingt extrem langsam ... Ehrlich gesagt, ich weiß nicht, wie der Status dieses Projekts ist, jetzt, 4 Jahre später ...
shx2
15

Nach PEP ist einfach besser als kompliziert:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

Sie können Bedingungen einschließen, wie Sie es mit einer normalen MongoDB-Datenbank tun würden, oder sogar find_one () verwenden, um nur ein Element aus der Datenbank usw. abzurufen.

und voila!

Cy Bu
quelle
pd.DataFrame.from_records scheint so langsam zu sein wie DataFrame (list ()), aber die Ergebnisse sind sehr inkonsistent. %% Zeit zeigte alles von 800 ms bis 1,9 s
AFD
1
Dies ist nicht gut für große Datensätze, da dies keinen Speicherfehler anzeigt. Instread hängt das System für zu große Datenmengen. während pd.DataFrame (Liste (Cursor)) Speicherfehler anzeigt.
Amulya Acharya
12
import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)
fengwt
quelle
9

Sie können das Python Blaze-Ökosystem Blaze / Dask / Odo ausprobieren, um Daten außerhalb des Kerns (die nicht in den RAM passen) effizient zu verarbeiten (dh bei paralleler Ausführung) .

Blaze (und Odo ) verfügen über sofort einsatzbereite Funktionen für MongoDB.

Ein paar nützliche Artikel zum Starten:

Und ein Artikel, der zeigt, welche erstaunlichen Dinge mit Blaze Stack möglich sind: Analysieren von 1,7 Milliarden Reddit-Kommentaren mit Blaze und Impala (im Wesentlichen Abfragen von 975 GB Reddit-Kommentaren in Sekunden).

PS Ich bin mit keiner dieser Technologien verbunden.

Dennis Golomazov
quelle
1
Ich habe auch einen Beitrag mit Jupyter Notebook geschrieben, in dem ein Beispiel aufgeführt ist, wie Dask die Ausführung beschleunigt, selbst wenn Daten in den Speicher passen, indem mehrere Kerne auf einem einzelnen Computer verwendet werden.
Dennis Golomazov
8

Eine andere Option, die ich sehr nützlich fand, ist:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

Auf diese Weise können Sie verschachtelte Mongodb-Dokumente kostenlos entfalten.

Ikar Pohorský
quelle
2
Ich habe einen Fehler mit dieser MethodeTypeError: data argument can't be an iterator
Gabriel Fair
2
Seltsamerweise funktioniert das auf meiner Python 3.6.7mit Pandas 0.24.2. Vielleicht kannst du es df = json_normalize(list(cursor))stattdessen versuchen ?
Ikar Pohorský
Für +1. docs, max_level-Argument definiert die maximale Ebene der Diktat-Tiefe. Ich habe gerade einen Test gemacht und es ist nicht wahr, daher müssten einige Spalten mit .str-Accesrors geteilt werden. Trotzdem sehr schöne Funktion für die Arbeit mit Mongodb.
Mauricio Maroto
5

Verwenden von

pandas.DataFrame(list(...))

verbraucht viel Speicher, wenn das Ergebnis des Iterators / Generators groß ist

Besser kleine Brocken und Concat am Ende zu generieren

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)
Deo Leung
quelle
1

Nach dieser großartigen Antwort von waitkuo möchte ich die Möglichkeit hinzufügen, dies mit Chunksize in Übereinstimmung mit .read_sql () und .read_csv () zu tun . Ich vergrößere die Antwort von Deu Leung, indem ich vermeide, nacheinander jeden 'Datensatz' des 'Iterators' / 'Cursors' zu gehen. Ich werde die vorherige Funktion read_mongo ausleihen .

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df
Rafael Valero
quelle
1

Ein ähnlicher Ansatz wie Rafael Valero, Waitkuo und Deu Leung mit Paginierung :

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
Jordy Cuan
quelle
0

Mit pdmongo können Sie in drei Zeilen erreichen, was Sie wollen :

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

Wenn Ihre Daten sehr groß sind, können Sie zuerst eine aggregierte Abfrage durchführen, indem Sie nicht gewünschte Daten filtern und sie dann Ihren gewünschten Spalten zuordnen.

Hier ist ein Beispiel für die Zuordnung Readings.azu einer Spalte aund die Filterung nach reportCountSpalte:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongoakzeptiert die gleichen Argumente wie Pymongo-Aggregat

pakallis
quelle