Nehmen wir also an, Sie haben einen Python-Prozess, der Daten in Echtzeit mit ungefähr 500 Zeilen pro Sekunde (dies kann weiter parallelisiert werden, um auf ungefähr 50 ps zu reduzieren) aus einem Warteschlangensystem sammelt und an Folgendes anfügt DataFrame
:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
Die Frage ist nun: Wie werden die CPUs basierend auf diesen Daten genutzt? So bin ich von den Einschränkungen des voll bewusst GIL , und sah in Manager - Multiprocessing - Namespace , auch hier , aber es sieht so gibt es einige Nachteile im Hinblick auf die Latenz auf dem centerally halten Datenrahmen . Bevor ich pool.map
mich damit beschäftigte, habe ich auch versucht, was ich dann erkannt habe , um es pickle
zwischen den Prozessen anzuwenden , was viel zu langsam ist und zu viel Overhead hat.
Nach all dem frage ich mich schließlich, wie (wenn) eine Einfügung von 500 Zeilen pro Sekunde (oder sogar 50 Zeilen pro Sekunde) auf verschiedene Prozesse übertragen werden kann, wobei noch etwas CPU-Zeit für die Anwendung von Statistiken und Heuristiken auf die Daten im Kind verbleibt Prozesse?
Vielleicht wäre es besser, einen benutzerdefinierten TCP-Socket oder ein Warteschlangensystem zwischen den beiden Prozessen zu implementieren? Oder gibt es einige Implementierungen in pandas
oder andere Bibliotheken, um wirklich einen schnellen Zugriff auf den einen großen Datenrahmen im übergeordneten Prozess zu ermöglichen ? Ich liebe Pandas!
Antworten:
Bevor wir beginnen, sollte ich sagen, dass Sie uns nicht viel über Ihren Code erzählt haben, aber diesen Punkt im Kopf haben, nur diese 50/500 neuen Zeilen pro Sekunde an den untergeordneten Prozess zu übertragen und zu versuchen, diesen großen untergeordneten Prozess zu erstellen
DataFrame
.Ich arbeite an einem Projekt genauso wie Sie. Python hat viele IPC-Implementierungen wie
Pipe
undQueue
wie Sie wissen.Shared Memory
Die Lösung kann in vielen Fällen problematisch sein. Die offizielle Dokumentation von AFAIK Python warnt vor der Verwendung gemeinsamer Speicher.Nach meiner Erfahrung besteht der beste Weg, Daten zwischen nur zwei Prozessen zu transformieren
Pipe
, darin, DataFrame auszuwählen und an den anderen Verbindungsendpunkt zu senden. Ich empfehle Ihnen dringend, in Ihrem FallTCP
Sockets (AF_INET
) zu vermeiden .Pandas
DataFrame
können nicht in einen anderen Prozess umgewandelt werden, ohne eingelegt und ungepflückt zu werden.dict
Daher empfehle ich Ihnen auch, Rohdaten als integrierte Typen wie anstelle von DataFrame zu übertragen. Dies kann das Beizen und Entnehmen beschleunigen und hat auch weniger Speicherbedarf.quelle
Shared Memory
Bereich, die hoffentlich viele Lesevorgänge aus den untergeordneten Prozessen verarbeiten kann, während die Hauptprozesse daran angehängt werden, könnte dies aus meiner Sicht tun, wenn ich die Schreibzugriffe auf den übergeordneten Prozess kaum einschränke.shared memory
in irgendeiner Art ist,block state
während ich darauf schreibe? Das würde bedeuten, dass untergeordnete Prozesse den DataFrame nicht lesen dürfen, während der übergeordnete Prozess daran angehängt wird (was fast immer der Fall sein wird).Shared Memory
Lösung verwenden möchten, sollten Sie die untergeordneten Prozesse mit dem Lieferantenprozess synchronisieren. Dies könnte geschehen durchmultiprocessing.Lock
: docs.python.org/3/library/…Die Parallelisierung in
pandas
wird wahrscheinlich von einem anderen Motor insgesamt besser gehandhabt.Schauen Sie sich das Koalas-Projekt von Databricks oder Dasks DataFrame an .
quelle
df.map_partitions
) und danngroupby
Index (wichtig für die Leistung in Dask), speichern als CSV.Eine einfache Lösung wäre, den Prozess in zwei verschiedene Stufen zu unterteilen. Verwenden Sie Asyncio, um die Daten nicht blockierend zu empfangen und Ihre Transformationen innerhalb dieser durchzuführen. Die zweite Stufe würde eine Asyncio-Warteschlange zum Erstellen des DataFrame verwenden. Dies setzt voraus, dass Sie den DataFrame nicht für einen anderen Prozess benötigen, während Sie Daten aus der Redis-Warteschlange empfangen.
Hier ist ein Beispiel für die Erstellung eines Produzenten / Konsumenten-Modells mit Asyncio
quelle