Schreiben von mehr als 50 Millionen von Pyspark df bis PostgresSQL, bester effizienter Ansatz

16

Was wäre der effizienteste Weg, um Millionen von Datensätzen einzufügen, beispielsweise 50 Millionen von einem Spark-Datenrahmen in Postgres-Tabellen. Ich habe dies in der Vergangenheit von Spark bis MSSQL getan, indem ich die Option für Massenkopien und Stapelgrößen verwendet habe, die ebenfalls erfolgreich war.

Gibt es etwas Ähnliches, das für Postgres hier sein kann?

Hinzufügen des Codes, den ich versucht habe, und der Zeit, die zum Ausführen des Prozesses benötigt wurde:

def inserter():
    start = timer()
    sql_res.write.format("jdbc").option("numPartitions","5").option("batchsize","200000")\
    .option("url", "jdbc:postgresql://xyz.com:5435/abc_db") \
    .option("dbtable", "public.full_load").option("user", "root").option("password", "password").save()
    end = timer()
    print(timedelta(seconds=end-start))
inserter()

Also habe ich den obigen Ansatz für 10 Millionen Datensätze durchgeführt und 5 parallele Verbindungen wie in angegeben angegeben numPartitionsund auch eine Stapelgröße von 200 KB ausprobiert .

Die Gesamtzeit für den Vorgang betrug 0: 14: 05.760926 (vierzehn Minuten und fünf Sekunden).

Gibt es einen anderen effizienten Ansatz, der die Zeit verkürzen würde?

Was wäre die effiziente oder optimale Chargengröße, die ich verwenden kann? Wird das Erhöhen meiner Chargengröße die Arbeit schneller erledigen? Oder das Öffnen mehrerer Verbindungen, dh> 5, hilft mir, den Prozess zu beschleunigen?

Im Durchschnitt sind 14 Minuten für 10 Millionen Datensätze nicht schlecht , aber es gibt Leute, die dies zuvor getan hätten, um diese Frage zu beantworten.

Chetan_Vasudevan
quelle
1
Sie können die Daten zuerst in eine lokale CSV-Datei sichern und dann mit den eigenen Import-Tools von PostgreSQL importieren. Dies hängt davon ab, wo der Engpass liegt: Ist der Export aus Pyspark langsam oder der Import nach Postgres langsam oder etwas anderes? (Das heißt, 14 Minuten für 50 Millionen Zeilen scheinen mir nicht so schlecht zu sein - welche Indizes sind in der Tabelle definiert?).
Dai
Dai, ich habe eine df, die 52mil ist, und jetzt schreibe ich sie an Postgres. Es ist eine neue Tabelle, die ich mit dem obigen Code erstelle. Ich habe die Tabelle nicht in Postgres erstellt und dann dort geschrieben. Gibt es eine bessere Möglichkeit, wenn ich zuerst eine Tabelle erstellen und dort in Postgres indizieren und dann Daten von spark df senden kann?
Chetan_Vasudevan
2
(Es ist umgekehrt - Indizes verlangsamen Einfügevorgänge für Tabellen, beschleunigen aber ausgewählte Abfragen)
Dai
Dai, also erstelle ich die Tabelle in Postgres ohne Index und versuche dann, meine Leistung einzufügen und zu messen?
Chetan_Vasudevan
2
stackoverflow.com/questions/758945/… könnte hilfreich sein.
Alexey Romanov

Antworten:

4

Ich habe vor einiger Zeit tatsächlich die gleiche Arbeit gemacht, aber Apache Sqoop verwendet.

Ich würde sagen, dass wir zur Beantwortung dieser Fragen versuchen müssen, die Kommunikation zwischen Spark und PostgresSQL zu optimieren, insbesondere die Daten, die von Spark nach PostgreSql fließen.

Aber seien Sie vorsichtig, vergessen Sie nicht die Spark-Seite. Es ist nicht sinnvoll, mapPartitions auszuführen, wenn die Anzahl der Partitionen im Vergleich zur Anzahl der von PostgreSQL unterstützten maximalen Verbindungen zu hoch ist. Wenn Sie zu viele Partitionen haben und für jede eine Verbindung öffnen, tritt wahrscheinlich der folgende Fehler auf org.postgresql.util.PSQLException: FATAL: sorry, too many clients already.

Um den Einfügevorgang zu optimieren, würde ich mich dem Problem mit den folgenden Schritten nähern:

  • Denken Sie daran, dass die Anzahl der Partitionen wichtig ist. Überprüfen Sie die Anzahl der Partitionen und passen Sie sie dann an die Anzahl der gewünschten Parallelverbindungen an. Vielleicht möchten Sie pro Partition eine Verbindung haben, so würde ich überprüfen vorschlagen coalesce, wie erwähnt ist hier .
  • Überprüfen Sie die maximale Anzahl von Verbindungen, die Ihre postgreSQL-Instanz unterstützt, und Sie möchten die Anzahl erhöhen .
  • Zum Einfügen von Daten in PostgreSQL wird der Befehl COPY empfohlen . Hier finden Sie auch eine ausführlichere Antwort dazu, wie Sie das Einfügen von PostgreSQL beschleunigen können.

Schließlich gibt es keine Silberkugel, um diesen Job zu erledigen. Sie können alle oben genannten Tipps verwenden, dies hängt jedoch wirklich von Ihren Daten und Anwendungsfällen ab.

dbustosp
quelle
Dbustosp Ich werde auf jeden Fall die oben genannten Tipps ausprobieren, bis dahin haben Sie sicher eine positive Bewertung verdient.
Chetan_Vasudevan
@chetan_vasudevan Wenn Sie weitere Details zu den von Ihnen verwendeten Daten, zur Größe pro Datensatz usw. angeben. Wenn die Daten öffentlich sind, kann ich selbst etwas ausprobieren und die Zeiten vergleichen.
dbustosp
Dbustosp die Daten hat 80 Spalten und seine 55 Millionen Datensätze. Ich habe angefangen, an den Vorschlägen zu arbeiten, die Sie mir gegeben haben.
Chetan_Vasudevan
@Chetan_Vasudevan Die Gesamtgröße des Datensatzes? Wie ist das Format der Eingabedaten?
dbustosp
@Chetan_Vasudevan Gibt es ein Update?
dbustosp