Masseneinsatz mit SQLAlchemy ORM

130

Gibt es eine Möglichkeit, SQLAlchemy dazu zu bringen, eine Masseneinfügung durchzuführen, anstatt jedes einzelne Objekt einzufügen? dh

tun:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

eher, als:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Ich habe gerade Code konvertiert, um SQLalchemie anstelle von Roh-SQL zu verwenden, und obwohl es jetzt viel schöner ist, damit zu arbeiten, scheint es jetzt langsamer zu sein (bis zu einem Faktor von 10), frage ich mich, ob dies der Grund ist.

Vielleicht könnte ich die Situation durch effizientere Sitzungen verbessern. Im Moment habe autoCommit=Falseund mache session.commit()ich ein, nachdem ich ein paar Sachen hinzugefügt habe. Obwohl dies dazu zu führen scheint, dass die Daten veraltet sind, wenn die Datenbank an einer anderen Stelle geändert wird, wie auch wenn ich eine neue Abfrage durchführe, erhalte ich immer noch alte Ergebnisse zurück?

Danke für Ihre Hilfe!

Nick Holden
quelle
1
Dies könnte helfen: stackoverflow.com/questions/270879/…
Sean Vieira
1
Nick, ich verstehe, das ist ein sehr alter Beitrag. Wäre es möglich, den Titel auf etwas Richtiges wie "Einfügen mehrerer Datensätze mit SQLAlchemy ORM" zu aktualisieren ? Einfügeanweisungen mit mehreren Datensätzen wie die von Ihnen angegebene unterscheiden sich erheblich von Massenladevorgängen auf Datenbankebene. Masseneinfügungen sind für das Hochladen von Daten über 1 KB vorgesehen, normalerweise aus großen Datenmengen und von Anwendungsmanagern, nicht von REST-Operationen oder Code auf Anwendungsebene. Verwenden wir unsere Nomenklatur richtig.
W4t3randWind
Für diejenigen, die auf diese Frage stoßen, während sie nach Informationen über Massenoperationen in sqlalchemy Core (nicht ORM) suchen , siehe meine Antwort auf eine andere Frage .
Nickolay

Antworten:

173

SQLAlchemy hat das in der Version eingeführt 1.0.0:

Massenoperationen - SQLAlchemy-Dokumente

Mit diesen Vorgängen können Sie jetzt Masseneinfügungen oder Aktualisierungen vornehmen!

Zum Beispiel können Sie Folgendes tun:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Hier wird ein Bulk-Einsatz gemacht.

Pierre
quelle
30
Sie benötigen auch s.commit (), um die Datensätze tatsächlich zu speichern (ich habe ein bisschen gebraucht, um dies herauszufinden).
horcle_buzz
3
Ich habe dies mit sqlachemy 1.0.11 versucht und es werden immer noch 3 Einfügeanweisungen erstellt. Aber es ist viel schneller als der normale Orm-Betrieb.
Zidarsk8
3
Obwohl dies für die OP-Frage nicht relevant ist, ist es erwähnenswert, dass dies bestimmte Merkmale des ORM verletzt. docs.sqlalchemy.org/en/rel_1_0/orm/…
dangel
@dangel ja danke für das posten. Obwohl der Titel von OP "Massenladen" betrifft, hat seine Frage zu Einfügeanweisungen für mehrere Datensätze nichts mit der Massenladefunktion von sqlalchemy zu tun.
W4t3randWind
Im Vergleich zum Einfügen derselben Daten aus CSV mit \copypsql (vom selben Client zum selben Server) sehe ich auf der Serverseite einen großen Leistungsunterschied, der zu etwa 10x mehr Einfügungen / s führt. Anscheinend ist das Massenladen mithilfe \copy(oder COPYauf dem Server) mithilfe eines Pakets bei der Kommunikation von Client zu Server viel besser als die Verwendung von SQL über SQLAlchemy. Weitere Informationen: Großer Masseneinsatz Performance - Unterschied PostgreSQL vs ... .
Gertvdijk
42

In den sqlalchemy-Dokumenten wird die Leistung verschiedener Techniken beschrieben, die für Masseneinfügungen verwendet werden können:

ORMs sind grundsätzlich nicht für Hochleistungs-Bulk-Inserts gedacht - dies ist der ganze Grund, warum SQLAlchemy den Core zusätzlich zum ORM als erstklassige Komponente anbietet.

Für den Anwendungsfall schneller Masseneinfügungen ist das SQL-Generierungs- und Ausführungssystem, auf dem der ORM aufbaut, Teil des Kerns. Wenn Sie dieses System direkt verwenden, können Sie ein INSERT erstellen, das mit der direkten Verwendung der Rohdatenbank-API konkurrenzfähig ist.

Alternativ bietet das SQLAlchemy ORM die Bulk Operations-Suite von Methoden an, die Hooks in Unterabschnitte des Arbeitsprozesses bereitstellen, um INSERT- und UPDATE-Konstrukte auf Kernebene mit einem geringen Grad an ORM-basierter Automatisierung auszugeben.

Das folgende Beispiel zeigt zeitbasierte Tests für verschiedene Methoden zum Einfügen von Zeilen, von den am meisten automatisierten bis zu den am wenigsten automatisierten. Mit cPython 2.7 wurden Laufzeiten beobachtet:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Skript:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
quelle
1
Danke dir. Wirklich hilfreich und gründlich.
Steve B.
Ich habe ein anderes Beispiel mit Bindparams gesehen. Die Syntax sieht prägnant aus, ist das gut?
Jay
35

Soweit ich weiß, gibt es keine Möglichkeit, das ORM dazu zu bringen, Masseneinsätze auszustellen. Ich glaube, der Grund dafür ist, dass SQLAlchemy die Identität jedes Objekts (dh neue Primärschlüssel) verfolgen muss, und Bulk-Einfügungen stören dies. Angenommen, Ihre fooTabelle enthält eine idSpalte und ist einer FooKlasse zugeordnet:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Da SQLAlchemy den Wert für x.idohne weitere Abfrage ermittelt hat, können wir daraus schließen, dass der Wert direkt aus der INSERTAnweisung stammt. Wenn Sie keinen späteren Zugriff auf die erstellten Objekte über dieselben Instanzen benötigen , können Sie die ORM-Ebene für Ihre Einfügung überspringen:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy kann diese neuen Zeilen nicht mit vorhandenen Objekten abgleichen, daher müssen Sie sie für nachfolgende Vorgänge erneut abfragen.

In Bezug auf veraltete Daten ist es hilfreich, sich daran zu erinnern, dass die Sitzung keine integrierte Methode enthält, um festzustellen, wann die Datenbank außerhalb der Sitzung geändert wird. Um über vorhandene Instanzen auf extern geänderte Daten zugreifen zu können, müssen die Instanzen als abgelaufen markiert werden . Dies geschieht standardmäßig am session.commit(), kann jedoch manuell durch Aufrufen von session.expire_all()oder erfolgen session.expire(instance). Ein Beispiel (SQL weggelassen):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()läuft ab x, sodass die erste print-Anweisung implizit eine neue Transaktion öffnet und erneut abfragtx die Attribute erneut ab. Wenn Sie die erste Druckanweisung auskommentieren, werden Sie feststellen, dass die zweite jetzt den richtigen Wert aufnimmt, da die neue Abfrage erst nach der Aktualisierung ausgegeben wird.

Dies ist unter dem Gesichtspunkt der Transaktionsisolation sinnvoll - Sie sollten nur externe Änderungen zwischen Transaktionen vornehmen. Wenn dies zu Problemen führt, würde ich vorschlagen, die Transaktionsgrenzen Ihrer Anwendung zu klären oder zu überdenken, anstatt sofort danach zu greifen session.expire_all().

Dhaffey
quelle
Vielen Dank für Ihre Antwort, ich werde es versuchen. WRT das auslaufende Problem, was ich sah, war nicht ganz das gleiche. Ich verwende eine Scoped-Sitzung in Turbogears. Durchführen einer getSession (). Query (Foo) .filter .... all () hat je nach Anforderung unterschiedliche Dinge zurückgegeben und auch die aktualisierten Datensätze, die sich in der Datenbank befanden, nicht zurückgegeben, bis ich sie neu gestartet habe. Ich habe dieses Problem behoben, indem ich ein Autocommit = True ausgeführt und etwas hinzugefügt habe, das .remove () d der Sitzung nach Abschluss der Anforderung hinzugefügt hat (ich nehme an, Sie sollen das trotzdem tun).
Nick Holden
Ich denke, es wurden je nach Anforderung unterschiedliche Dinge zurückgegeben, da es eine Sitzung mit Gültigkeitsbereich pro Thread im Pool gab und sich die Sitzungen in unterschiedlichen Zuständen befanden. Es schien ein bisschen seltsam, dass sa nach einer neuen Anfrage keine neuen Daten erhalten würde. Ich gehe davon aus, dass ich falsch verstehe, was Autocommit = False tut
Nick Holden
Mit autocommit=False, glaube ich Sie anrufen sollte session.commit()auf Anfrage Abschluss (I mit Turbogears nicht vertraut bin, das so ignorieren , wenn das für Sie auf Framework - Ebene behandelt wird ). Abgesehen davon, dass Ihre Änderungen in die Datenbank gelangt sind, läuft alles in der Sitzung ab. Die nächste Transaktion würde erst bei der nächsten Verwendung dieser Sitzung beginnen, sodass zukünftige Anforderungen im selben Thread keine veralteten Daten sehen.
Dhaffey
10
Alternativer Stil:session.execute(Foo.__table__.insert(), values)
Joril
6
Beachten Sie, dass neuere Versionen von sqlalchemy über Funktionen zum Einfügen von Massengütern verfügen: docs.sqlalchemy.org/en/latest/orm/…
Wayne Werner
18

Ich mache es normalerweise mit add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
quelle
2
Bist du sicher, dass das funktioniert? Es ist nicht nur das Äquivalent, sie .addeinzeln zur Sitzung zu bringen?
Alec
Das wäre angesichts des Methodennamens kontraintuitiv, die Dokumente gehen nicht ins Detail: Add the given collection of instances to this Session.Haben Sie Grund zu der Annahme, dass keine Masseneinfügung erfolgt?
Reubano
3
Ich glaube nicht , es zu eingängig ist - es ist in der Tat fügen Sie all die Dinge , die Sie danach zu fragen. Nichts über das Hinzufügen aller Dinge zur Sitzung scheint zu implizieren, welche zugrunde liegenden SQL-Anweisungen ausgegeben werden. Betrachtet man die Quelle: github.com/zzzeek/sqlalchemy/blob/… scheint es tatsächlich nur für .addjeden Artikel einzeln zu sein.
Alec
Es funktioniert gut im Vergleich zu bulk_save_objects()a flush(), wir können die ID des Objekts abrufen, können es aber bulk_save_objects()nicht (Ereignis mit flush()aufgerufen).
Coanor
14

SQLAlchemy wurde ab Version 0.8 direkt unterstützt

Richtet sich nach den docs , connection.execute(table.insert().values(data))sollte es tun. (Beachten Sie, dass dies nicht dasselbe ist, connection.execute(table.insert(), data)was zu vielen einzelnen Zeileneinfügungen über einen Aufruf von führt. executemany) Bei einer anderen als einer lokalen Verbindung kann der Leistungsunterschied enorm sein.

user3805082
quelle
10

SQLAlchemy hat das in der Version eingeführt 1.0.0:

Massenoperationen - SQLAlchemy-Dokumente

Mit diesen Vorgängen können Sie jetzt Masseneinfügungen oder Aktualisierungen vornehmen!

Zum Beispiel (wenn Sie den geringsten Overhead für einfache Tabellen-INSERTs wünschen) können Sie Folgendes verwenden Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Oder, wenn Sie möchten, überspringen Sie die loadmeTupel und schreiben Sie die Wörterbücher direkt in dicts(aber ich finde es einfacher, die gesamte Worthaftigkeit aus den Daten herauszulassen und eine Liste der Wörterbücher in einer Schleife zu laden).

Juanitogan
quelle
7

Die Antwort von Piere ist richtig, aber ein Problem ist, dass bulk_save_objectsstandardmäßig nicht die Primärschlüssel der Objekte zurückgegeben werden, wenn dies für Sie von Belang ist. Set return_defaultszuTrue um dieses Verhalten zu erhalten.

Die Dokumentation finden Sie hier .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
quelle
2
Bei der Flagge ist Vorsicht geboten. Es wird nacheinander jeweils ein Objekt eingefügt, und der signifikante Leistungsgewinn ist möglicherweise nicht vorhanden [1]. In meinem Fall verschlechterte sich die Leistung, die ich aufgrund des Overheads vermutete. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea
6

Alle Wege führen nach Rom , aber einige von ihnen überqueren Berge, erfordern Fähren, aber wenn Sie schnell dorthin wollen, nehmen Sie einfach die Autobahn.


In diesem Fall muss die Autobahn die Funktion execute_batch () von psycopg2 verwenden . Die Dokumentation sagt es am besten:

Die derzeitige Implementierung von executemany()ist (unter Verwendung einer äußerst gemeinnützigen Untertreibung) nicht besonders leistungsfähig. Diese Funktionen können verwendet werden, um die wiederholte Ausführung einer Anweisung anhand einer Reihe von Parametern zu beschleunigen. Durch die Reduzierung der Anzahl der Server-Roundtrips kann die Leistung um Größenordnungen besser sein als bei Verwendung executemany().

In meinem eigenen Test execute_batch()ist ungefähr doppelt so schnell wie executemany()und bietet die Möglichkeit, die page_size für weitere Optimierungen zu konfigurieren (wenn Sie die letzten 2-3% der Leistung aus dem Treiber herausholen möchten).

Dieselbe Funktion kann problemlos aktiviert werden, wenn Sie SQLAlchemy verwenden, indem use_batch_mode=TrueSie beim Instanziieren der Engine als Parameter festlegencreate_engine()

chjortlund
quelle
Hinweis: Psycopg2 execute_valuesist schneller als Psycopg2, execute_batchwenn Bulk-Inserts ausgeführt werden!
Fierr
5

Dies ist ein Weg:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Dies wird wie folgt eingefügt:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Referenz: Die SQLAlchemy FAQ enthält Benchmarks für verschiedene Methoden begehen.

Eefret
quelle
3

Die beste Antwort, die ich bisher gefunden habe, war in der sqlalchemy-Dokumentation:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Es gibt ein vollständiges Beispiel für einen Benchmark möglicher Lösungen.

Wie in der Dokumentation gezeigt:

mass_save_objects ist nicht die beste Lösung, aber die Leistung ist korrekt.

Die zweitbeste Implementierung in Bezug auf die Lesbarkeit war meiner Meinung nach mit dem SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Der Kontext dieser Funktion ist im Dokumentationsartikel angegeben.

lelabo_m
quelle