Wie UPSERT (MERGE, INSERT… ON DUPLICATE UPDATE) in PostgreSQL?

266

Eine sehr häufig gestellte Frage ist hier, wie ein Upsert durchgeführt wird. Dies wird von MySQL aufgerufen INSERT ... ON DUPLICATE UPDATEund der Standard unterstützt dies als Teil der MERGEOperation.

Wie machen Sie das, da PostgreSQL es nicht direkt unterstützt (vor S. 9.5)? Folgendes berücksichtigen:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Nun stell dir vor , dass Sie wollen „Upsert“ die Tupel (2, 'Joe'), (3, 'Alan'), so dass die neuen Tabelleninhalt wäre:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

Darüber reden die Leute, wenn sie über eine upsert. Entscheidend ist, dass jeder Ansatz sicher ist, wenn mehrere Transaktionen an derselben Tabelle ausgeführt werden - entweder durch explizite Sperrung oder auf andere Weise gegen die daraus resultierenden Rennbedingungen.

Dieses Thema wird ausführlich unter Einfügen, über doppelte Aktualisierung in PostgreSQL? Aber es geht um Alternativen zur MySQL-Syntax, und im Laufe der Zeit sind einiges an nicht verwandten Details gewachsen. Ich arbeite an endgültigen Antworten.

Diese Techniken sind auch nützlich für "Einfügen, wenn nicht vorhanden, sonst nichts tun", dh "Einfügen ... bei Duplikatschlüssel ignorieren".

Craig Ringer
quelle
1
Mögliches Duplikat von Einfügen bei doppeltem Update in PostgreSQL?
Michael Hampton
8
@MichaelHampton Das Ziel hier war es, eine endgültige Version zu erstellen, die nicht durch mehrere veraltete Antworten verwechselt wird - und gesperrt ist, damit niemand etwas dagegen tun kann. Ich bin mit der Abstimmung nicht einverstanden.
Craig Ringer
Warum, dann würde dies bald veraltet sein - und gesperrt, so dass niemand etwas dagegen tun konnte.
Michael Hampton
2
@MichaelHampton Wenn Sie besorgt sind, könnten Sie vielleicht den Link markieren, mit dem Sie verknüpft sind, und darum bitten, dass er entsperrt wird, damit er bereinigt werden kann. Dann können wir dies zusammenführen. Ich habe es einfach satt, die einzig offensichtliche Nähe zu haben. as-dup for upsert ist so ein verwirrendes und falsches Durcheinander.
Craig Ringer
1
Das Q & A ist nicht gesperrt!
Michael Hampton

Antworten:

396

9.5 und neuer:

PostgreSQL 9.5 und neuere Unterstützung INSERT ... ON CONFLICT UPDATE(und ON CONFLICT DO NOTHING), dh Upsert.

Vergleich mitON DUPLICATE KEY UPDATE .

Schnelle Erklärung .

Siehe Nutzung des manuellen - speziell die conflict_action Klausel in dem Syntax - Diagramm und den erläuternden Text .

Im Gegensatz zu den unten aufgeführten Lösungen für 9.4 und älter funktioniert diese Funktion mit mehreren widersprüchlichen Zeilen und erfordert keine exklusive Sperre oder Wiederholungsschleife.

Das Commit, das das Feature hinzufügt, ist hier und die Diskussion über seine Entwicklung ist hier .


Wenn Sie mit 9.5 arbeiten und nicht abwärtskompatibel sein müssen, können Sie jetzt aufhören zu lesen .


9.4 und älter:

PostgreSQL verfügt über keine integrierte UPSERT(oder MERGE) integrierte Funktion, und es ist sehr schwierig, dies bei gleichzeitiger Verwendung effizient durchzuführen.

Dieser Artikel beschreibt das Problem ausführlich .

Im Allgemeinen müssen Sie zwischen zwei Optionen wählen:

  • Einzelne Einfüge- / Aktualisierungsvorgänge in einer Wiederholungsschleife; oder
  • Sperren der Tabelle und Zusammenführen von Stapeln

Einzelne Zeilenwiederholungsschleife

Die Verwendung einzelner Zeilenumbrüche in einer Wiederholungsschleife ist die sinnvolle Option, wenn viele Verbindungen gleichzeitig versuchen sollen, Einfügungen durchzuführen.

Die PostgreSQL-Dokumentation enthält eine nützliche Prozedur, mit der Sie dies in einer Schleife innerhalb der Datenbank tun können . Im Gegensatz zu den meisten naiven Lösungen schützt es vor verlorenen Updates und fügt Rennen ein. Es funktioniert nur im READ COMMITTEDModus und ist nur dann sicher, wenn es das einzige ist, was Sie in der Transaktion tun. Die Funktion funktioniert nicht richtig, wenn Trigger oder sekundäre eindeutige Schlüssel eindeutige Verstöße verursachen.

Diese Strategie ist sehr ineffizient. Wann immer es praktisch ist, sollten Sie die Arbeit in die Warteschlange stellen und stattdessen einen Bulk-Upsert durchführen, wie unten beschrieben.

Viele Lösungsversuche für dieses Problem berücksichtigen keine Rollbacks, sodass sie zu unvollständigen Aktualisierungen führen. Zwei Transaktionen laufen miteinander; einer von ihnen erfolgreich INSERTs; Der andere erhält einen doppelten Schlüsselfehler und führt UPDATEstattdessen einen aus. Die UPDATEBlöcke, die darauf warten, dass INSERTsie zurückgesetzt oder festgeschrieben werden. Wenn es zurückgesetzt wird, UPDATEstimmt die erneute Überprüfung der Bedingung mit null Zeilen überein. Obwohl UPDATEdie Festschreibungen nicht den erwarteten Upsert ausgeführt haben. Sie müssen die Anzahl der Ergebniszeilen überprüfen und gegebenenfalls erneut versuchen.

Einige Lösungsversuche berücksichtigen auch keine SELECT-Rennen. Wenn Sie das Offensichtliche und Einfache versuchen:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

Wenn dann zwei gleichzeitig ausgeführt werden, gibt es mehrere Fehlermodi. Eines ist das bereits besprochene Problem bei einer erneuten Überprüfung des Updates. Eine andere ist, wo beide UPDATEgleichzeitig, null Zeilen übereinstimmen und fortfahren. Dann machen beide den EXISTSTest, der vor dem INSERT. Beide bekommen null Zeilen, also machen beide das INSERT. Einer schlägt mit einem doppelten Schlüsselfehler fehl.

Aus diesem Grund benötigen Sie eine Wiederholungsschleife. Sie könnten denken, dass Sie mit cleverem SQL doppelte Schlüsselfehler oder verlorene Updates verhindern können, aber Sie können nicht. Sie müssen die Zeilenanzahl überprüfen oder doppelte Schlüsselfehler behandeln (abhängig vom gewählten Ansatz) und es erneut versuchen.

Bitte rollen Sie keine eigene Lösung dafür. Wie bei der Nachrichtenwarteschlange ist es wahrscheinlich falsch.

Bulk Upsert mit Schloss

Manchmal möchten Sie einen Bulk-Upsert durchführen, bei dem Sie einen neuen Datensatz haben, den Sie in einen älteren vorhandenen Datensatz zusammenführen möchten. Dies ist weitaus effizienter als einzelne Zeilenumbrüche und sollte nach Möglichkeit bevorzugt werden.

In diesem Fall gehen Sie normalerweise wie folgt vor:

  • CREATEein TEMPORARYTisch

  • COPY oder fügen Sie die neuen Daten in großen Mengen in die temporäre Tabelle ein

  • LOCKdie Zieltabelle IN EXCLUSIVE MODE. Dies ermöglicht anderen Transaktionen, SELECTnimmt jedoch keine Änderungen an der Tabelle vor.

  • Führen Sie einen UPDATE ... FROMder vorhandenen Datensätze mit den Werten in der temporären Tabelle aus.

  • Führen Sie eine INSERTder Zeilen aus, die noch nicht in der Zieltabelle vorhanden sind.

  • COMMIT, das Schloss loslassen.

Beispiel: Für das in der Frage angegebene Beispiel wird INSERTdie temporäre Tabelle mit mehreren Werten gefüllt:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Verwandte Lektüre

Was ist mit MERGE?

SQL-Standard hat MERGEtatsächlich eine schlecht definierte Parallelitätssemantik und ist nicht zum Upserting geeignet, ohne vorher eine Tabelle zu sperren.

Es ist eine wirklich nützliche OLAP-Anweisung für das Zusammenführen von Daten, aber keine nützliche Lösung für das gleichzeitige Upsert. Es gibt viele Ratschläge für Leute, die andere DBMS MERGEfür Upserts verwenden, aber es ist tatsächlich falsch.

Andere DBs:

Craig Ringer
quelle
Gibt es im Bulk-Upsert einen möglichen Wert beim Löschen aus neuen Werten, anstatt das INSERT zu filtern? ZB WITH upd AS (UPDATE ... RETURNING newvals.id) DELETE FROM newvals USING upd WHERE newvals.id = upd.id, gefolgt von einem bloßen INSERT INTO testtable SELECT * FROM newvals? Meine Idee dazu: Anstatt zweimal in INSERT zu filtern (für JOIN / WHERE und für die eindeutige Einschränkung), verwenden Sie die Ergebnisse der Existenzprüfung aus dem UPDATE wieder, die sich bereits im RAM befinden und möglicherweise viel kleiner sind. Dies kann ein Gewinn sein, wenn nur wenige Zeilen übereinstimmen und / oder neue Werte viel kleiner als die Testtabelle sind.
Gunnlaugur Briem
1
Es gibt immer noch ungelöste Probleme und für die anderen Anbieter ist nicht klar, was funktioniert und was nicht. 1. Die angegebene Postgres-Schleifenlösung funktioniert bei mehreren eindeutigen Schlüsseln nicht. 2. Der On-Duplicate-Schlüssel für MySQL funktioniert auch nicht für mehrere eindeutige Schlüssel. 3. Funktionieren die anderen oben genannten Lösungen für MySQL, SQL Server und Oracle? Sind in diesen Fällen Ausnahmen möglich und müssen wir eine Schleife durchführen?
Dan B
@danb Hier geht es wirklich nur um PostgreSQL. Es gibt keine herstellerübergreifende Lösung. Die Lösung für PostgreSQL funktioniert nicht für mehrere Zeilen. Sie müssen leider eine Transaktion pro Zeile ausführen. Die "Lösungen", die MERGEfür SQL Server und Oracle verwendet werden, sind falsch und anfällig für Rennbedingungen, wie oben erwähnt. Sie müssen sich jedes DBMS genauer ansehen, um herauszufinden, wie Sie damit umgehen sollen. Ich kann wirklich nur Ratschläge zu PostgreSQL geben. Die einzige Möglichkeit, ein sicheres mehrzeiliges Upsert unter PostgreSQL durchzuführen, besteht darin, dem Kernserver Unterstützung für natives Upsert hinzuzufügen.
Craig Ringer
Selbst für PostGresQL funktioniert die Lösung nicht, wenn eine Tabelle mehrere eindeutige Schlüssel enthält (Aktualisierung nur einer Zeile). In diesem Fall müssen Sie angeben, welcher Schlüssel aktualisiert wird. Möglicherweise gibt es eine herstellerübergreifende Lösung, die beispielsweise jdbc verwendet.
Dan B
2
Postgres unterstützt jetzt UPSERT - git.postgresql.org/gitweb/…
Chris
32

Ich versuche, mit einer anderen Lösung für das Problem der einzelnen Einfügung mit den Versionen vor 9.5 von PostgreSQL beizutragen. Die Idee ist einfach, zuerst zu versuchen, das Einfügen durchzuführen, und falls der Datensatz bereits vorhanden ist, ihn zu aktualisieren:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Beachten Sie, dass diese Lösung nur angewendet werden kann, wenn keine Zeilen in der Tabelle gelöscht werden .

Ich weiß nichts über die Effizienz dieser Lösung, aber es scheint mir vernünftig genug.

Renzo
quelle
3
Danke, genau das habe ich gesucht. Ich kann nicht verstehen, warum es so schwer zu finden war.
Isapir
3
Ja. Diese Vereinfachung funktioniert genau dann, wenn keine Löschvorgänge vorhanden sind.
Craig Ringer
@CraigRinger Kannst du erklären, was genau passiert, wenn es Löschungen gibt?
Turbanoff
@turbanoff Das Einfügen kann fehlschlagen, da der Datensatz bereits vorhanden ist. Dann wird er gleichzeitig gelöscht, und die Aktualisierung wirkt sich dann auf null Zeilen aus, da die Zeile gelöscht wurde.
Craig Ringer
@CraigRinger So. Das Löschen erfolgt gleichzeitig . Was sind mögliche outways wenn dies ist funktioniert gut? Wenn das Löschen gleichzeitig funktioniert, kann es direkt nach unserem Block ausgeführt werden. Was ich versuche zu sagen - wenn wir gleichzeitig löschen - dann insert on update
woks
28

Hier sind einige Beispiele für insert ... on conflict ...( S. 9.5+ ):

  • Einfügen, bei Konflikten - nichts tun .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Einfügen, bei Konflikt - aktualisieren , Konfliktziel über Spalte angeben .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Bei Konflikt einfügen - Aktualisieren , Konfliktziel über den Einschränkungsnamen angeben .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
Eric Wang
quelle
gute Antwort - Frage: Warum oder in welcher Situation sollte man die Zielspezifikation über den Spalten- oder Einschränkungsnamen verwenden? Gibt es einen Vor- / Nachteil für verschiedene Anwendungsfälle?
Nathan Benton
1
@ NathanBenton Ich denke, es gibt mindestens zwei Unterschiede: (1) Der Spaltenname wird vom Programmierer angegeben, während der Einschränkungsname entweder vom Programmierer angegeben oder von der Datenbank gemäß den Tabellen- / Spaltennamen generiert wird. (2) Jede Spalte kann mehrere Einschränkungen haben. Das heißt, es hängt von Ihrem Fall ab, welche Sie verwenden möchten.
Eric Wang
8

SQLAlchemy Upsert für Postgres> = 9,5

Da der große Beitrag oben viele verschiedene SQL-Ansätze für Postgres-Versionen behandelt (nicht nur Nicht-9.5 wie in der Frage), möchte ich hinzufügen, wie dies in SQLAlchemy gemacht wird, wenn Sie Postgres 9.5 verwenden. Anstatt Ihren eigenen Upsert zu implementieren, können Sie auch die Funktionen von SQLAlchemy verwenden (die in SQLAlchemy 1.1 hinzugefügt wurden). Persönlich würde ich empfehlen, diese zu verwenden, wenn möglich. Nicht nur aus praktischen Gründen, sondern auch, weil PostgreSQL damit alle möglicherweise auftretenden Rennbedingungen bewältigen kann.

Cross-Posting von einer anderen Antwort, die ich gestern gegeben habe ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy unterstützt ON CONFLICTjetzt mit zwei Methoden on_conflict_do_update()und on_conflict_do_nothing():

Kopieren aus der Dokumentation:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

PR
quelle
4
Python und SQLAlchemy werden in der Frage nicht erwähnt.
Alexander Emelianov
Ich benutze oft Python in den Lösungen, die ich schreibe. Aber ich habe mich nicht mit SQLAlchemy befasst (oder war mir dessen bewusst). Dies scheint eine elegante Option zu sein. Danke dir. Wenn es auscheckt, werde ich dies meiner Organisation präsentieren.
Robert
3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Getestet auf Postgresql 9.3

Aristar
quelle
@CraigRinger: Könnten Sie das näher erläutern? ist das cte nicht atomar?
Parisni
2
@parisni Nein. Jeder CTE-Begriff erhält einen eigenen Snapshot, wenn Schreibvorgänge ausgeführt werden. Es gibt auch keine Art von Prädikatsperre für Zeilen, die nicht gefunden wurden, sodass sie gleichzeitig von einer anderen Sitzung erstellt werden können. Wenn Sie die SERIALIZABLEIsolation verwenden, erhalten Sie einen Abbruch mit einem Serialisierungsfehler, andernfalls erhalten Sie wahrscheinlich eine eindeutige Verletzung. Upsert nicht neu erfinden, die Neuerfindung wird falsch sein. Verwenden Sie INSERT ... ON CONFLICT .... Wenn Ihr PostgreSQL zu alt ist, aktualisieren Sie es.
Craig Ringer
@CraigRinger INSERT ... ON CLONFLICT ...ist nicht zum Massenladen vorgesehen. Von Ihrem Beitrag aus ist das LOCK TABLE testtable IN EXCLUSIVE MODE;innerhalb eines CTE eine Problemumgehung, um atomare Dinge zu erhalten. Nein ?
Paris
@parisni Es ist nicht zum Massenladen gedacht? Sagt wer? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Sicher, es ist viel langsamer als Massenladen ohne Upsert-ähnliches Verhalten, aber das ist offensichtlich und wird der Fall sein, egal was Sie tun. Es ist viel schneller als die Verwendung von Subtransaktionen, das ist sicher. Der schnellste Ansatz besteht darin , die Zieltabelle zu sperren und dann insert ... where not exists ...natürlich eine oder ähnliche Aktionen auszuführen.
Craig Ringer
1

Da diese Frage geschlossen wurde, poste ich hier, wie Sie es mit SQLAlchemy machen. Durch Rekursion wird ein Masseneinsatz oder eine Aktualisierung wiederholt, um die Rennbedingungen zu bekämpfen und Validierungsfehler .

Zuerst die Importe

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Jetzt funktioniert ein paar Helfer

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

Und schließlich die Upsert-Funktion

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

So verwenden Sie es

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

Dies hat den Vorteil, bulk_save_objectsdass Beziehungen (Fehlerprüfung usw.) beim Einfügen verarbeitet werden können (im Gegensatz zu Massenoperationen ).

reubano
quelle
Es sieht für mich auch falsch aus. Was passiert, wenn eine gleichzeitige Sitzung eine Zeile einfügt, nachdem Sie Ihre ID-Liste erfasst haben? Oder löscht man?
Craig Ringer
guter Punkt @CraigRinger Ich mache etwas Ähnliches, habe aber nur 1 Sitzung, in der der Job ausgeführt wird. Was ist dann der beste Weg, um mehrere Sitzungen abzuwickeln? Eine Transaktion vielleicht?
Reubano
Transaktionen sind nicht die magische Lösung für alle Parallelitätsprobleme. Sie könnten SERIALIZABLE Transaktionen verwenden und Serialisierungsfehler behandeln, aber es ist langsam. Sie benötigen eine Fehlerbehandlung und eine Wiederholungsschleife. Siehe meine Antwort und den Abschnitt "Verwandte Lektüre" darin.
Craig Ringer
@CraigRinger gotcha. Ich habe in meinem Fall aufgrund anderer Validierungsfehler tatsächlich eine Wiederholungsschleife implementiert. Ich werde diese Antwort entsprechend aktualisieren.
Reubano