Welche Faktoren bestimmen ein optimales chunksize
Argument für Methoden wie multiprocessing.Pool.map()
? Die .map()
Methode scheint eine beliebige Heuristik für ihre Standard-Blockgröße zu verwenden (siehe unten). Was motiviert diese Wahl und gibt es einen nachdenklicheren Ansatz, der auf einer bestimmten Situation / Einrichtung basiert?
Beispiel - sagen Sie, dass ich bin:
- Vorbei an einem
iterable
zu.map()
das hat ~ 15 Millionen Elemente; - Arbeiten an einer Maschine mit 24 Kernen und mit dem Standard -
processes = os.cpu_count()
inmultiprocessing.Pool()
.
Mein naives Denken ist es, jedem von 24 Arbeitern einen gleich großen Teil zu geben, dh 15_000_000 / 24
625.000. Große Brocken sollten den Umsatz / Overhead reduzieren und gleichzeitig alle Mitarbeiter voll ausnutzen. Es scheint jedoch, dass hier einige potenzielle Nachteile fehlen, wenn jedem Arbeiter große Mengen gegeben werden. Ist das ein unvollständiges Bild und was fehlt mir?
Ein Teil meiner Frage ergibt sich aus der Standardlogik für if chunksize=None
: both .map()
und .starmap()
call .map_async()
, die folgendermaßen aussieht:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
Was ist die Logik dahinter divmod(len(iterable), len(self._pool) * 4)
? Dies bedeutet, dass die Blockgröße näher liegt 15_000_000 / (24 * 4) == 156_250
. Was ist die Absicht beim Multiplizieren len(self._pool)
mit 4?
Dies macht die resultierende Blockgröße um den Faktor 4 kleiner als meine "naive Logik" von oben, die darin besteht, nur die Länge des Iterierbaren durch die Anzahl der Arbeiter in zu teilen pool._pool
.
Schließlich gibt es noch diesen Ausschnitt aus den Python-Dokumenten .imap()
, der meine Neugier weiter steigert:
Das
chunksize
Argument ist das gleiche wie das von dermap()
Methode verwendete. Bei sehr langen Iterables mit einem großen Wert fürchunksize
kann der Auftrag viel schneller abgeschlossen werden als mit dem Standardwert 1.
Verwandte Antwort, die hilfreich, aber etwas zu hoch ist: Python-Multiprocessing: Warum sind große Chunksize langsamer? .
4
Ist willkürlich und die gesamte Berechnung der Chunksize ist eine Heuristik. Der relevante Faktor ist, wie stark Ihre tatsächliche Verarbeitungszeit variieren kann. Ein bisschen mehr dazu hier, bis ich Zeit für eine Antwort habe, wenn sie dann noch gebraucht wird.Antworten:
Kurze Antwort
Der Chunksize-Algorithmus von Pool ist eine Heuristik. Es bietet eine einfache Lösung für alle denkbaren Problemszenarien, die Sie in die Methoden von Pool einbauen möchten. Infolgedessen kann es nicht für ein bestimmtes Szenario optimiert werden.
Der Algorithmus unterteilt das Iterable willkürlich in ungefähr viermal mehr Blöcke als der naive Ansatz. Mehr Chunks bedeuten mehr Overhead, aber mehr Planungsflexibilität. Wie diese Antwort zeigen wird, führt dies im Durchschnitt zu einer höheren Auslastung der Mitarbeiter, jedoch ohne die Garantie einer kürzeren Gesamtberechnungszeit für jeden Fall.
"Das ist schön zu wissen", könnte man meinen, "aber wie hilft mir das Wissen bei meinen konkreten Multiprozessor-Problemen?" Nun, das tut es nicht. Die ehrlichere kurze Antwort lautet: "Es gibt keine kurze Antwort", "Multiprocessing ist komplex" und "es kommt darauf an". Ein beobachtetes Symptom kann auch für ähnliche Szenarien unterschiedliche Wurzeln haben.
Diese Antwort versucht, Ihnen grundlegende Konzepte zu liefern, die Ihnen helfen, ein klareres Bild der Planungs-Blackbox von Pool zu erhalten. Es wird auch versucht, Ihnen einige grundlegende Tools zur Verfügung zu stellen, mit denen Sie potenzielle Klippen erkennen und vermeiden können, sofern sie mit Chunksize zusammenhängen.
Inhaltsverzeichnis
Teil I.
Quantifizierung der Algorithmuseffizienz
6.1 Modelle
6.2 Paralleler Zeitplan
6.3 Effizienz
6.3.1 Absolute Verteilungseffizienz (ADE)
6.3.2 Relative Verteilungseffizienz (RDE)
Teil II
Es ist notwendig, zuerst einige wichtige Begriffe zu klären.
1. Definitionen
Chunk
Ein Block hier ist eine Freigabe des
iterable
in einem Aufruf der Poolmethode angegebenen Arguments. Wie die Blockgröße berechnet wird und welche Auswirkungen dies haben kann, ist das Thema dieser Antwort.Aufgabe
Die physische Darstellung einer Aufgabe in einem Arbeitsprozess in Bezug auf Daten ist in der folgenden Abbildung dargestellt.
Die Abbildung zeigt einen beispielhaften Aufruf von
pool.map()
, der entlang einer Codezeile angezeigt wird und aus dermultiprocessing.pool.worker
Funktion stammt, in der eine aus der gelesene Aufgabeinqueue
entpackt wird.worker
ist die zugrunde liegende HauptfunktionMainThread
eines Pool-Worker-Prozesses. Dasfunc
in der Pool-Methode angegebene -argument stimmt nur mit derfunc
-variablen innerhalb derworker
-Funktion für Einzelaufrufmethoden wieapply_async
und fürimap
mit übereinchunksize=1
. Für den Rest der Pool-Methoden mit einemchunksize
Parameter ist die Verarbeitungsfunktionfunc
eine Mapper-Funktion (mapstar
oderstarmapstar
). Diese Funktion ordnet den benutzerdefiniertenfunc
Parameter jedem Element des übertragenen Blocks der Iterable zu (-> "Map-Tasks"). Die dafür benötigte Zeit definiert eine Aufgabeauch als Arbeitseinheit .Taskel
Während die Verwendung des Wortes "Aufgabe" für die gesamte Verarbeitung eines Blocks mit dem darin enthaltenen Code übereinstimmt
multiprocessing.pool
, gibt es keinen Hinweis darauf, wie ein einzelner Aufruf des benutzerdefinierten Blocksfunc
mit einem Element des Blocks als Argument (e) sein sollte bezogen auf. Um Verwirrung durch Namenskonflikte zu vermeiden (denken Sie an denmaxtasksperchild
Parameter für die Pool-__init__
Methode), bezieht sich diese Antwort auf die einzelnen Arbeitseinheiten innerhalb einer Aufgabe als Taskel .Parallelisierungs-Overhead (PO)
PO besteht aus Python-internem Overhead und Overhead für die Interprozesskommunikation (IPC). Der Aufwand pro Aufgabe in Python wird mit dem Code geliefert, der zum Packen und Entpacken der Aufgaben und ihrer Ergebnisse erforderlich ist. IPC-Overhead beinhaltet die notwendige Synchronisation von Threads und das Kopieren von Daten zwischen verschiedenen Adressräumen (zwei Kopierschritte erforderlich: Eltern -> Warteschlange -> Kind). Die Höhe des IPC-Overheads hängt von der Betriebssystem-, Hardware- und Datengröße ab, was Verallgemeinerungen über die Auswirkungen schwierig macht.
2. Parallelisierungsziele
Bei der Verwendung von Multiprocessing besteht unser übergeordnetes Ziel (offensichtlich) darin, die Gesamtverarbeitungszeit für alle Aufgaben zu minimieren. Um dieses Gesamtziel zu erreichen, muss unser technisches Ziel darin bestehen, die Nutzung der Hardwareressourcen zu optimieren .
Einige wichtige Unterziele zur Erreichung des technischen Ziels sind:
Zunächst müssen die Aufgaben rechenintensiv genug sein, um die Bestellung zurückzugewinnen, die wir für die Parallelisierung bezahlen müssen. Die Relevanz von PO nimmt mit zunehmender absoluter Rechenzeit pro Taskel ab. Oder anders ausgedrückt: Je größer die absolute Rechenzeit pro Taskel für Ihr Problem ist, desto weniger relevant ist die Notwendigkeit, die Bestellung zu reduzieren. Wenn Ihre Berechnung Stunden pro Taskel dauert, ist der IPC-Overhead im Vergleich vernachlässigbar. Das Hauptanliegen hierbei ist es, zu verhindern, dass Worker-Prozesse im Leerlauf ausgeführt werden, nachdem alle Aufgaben verteilt wurden. Wenn alle Kerne geladen bleiben, parallelisieren wir so viel wie möglich.
3. Parallelisierungsszenarien
Der Hauptfaktor ist, wie viel Rechenzeit zwischen unseren einzelnen Aufgaben variieren kann. Um es zu benennen, wird die Wahl für eine optimale Blockgröße durch den Variationskoeffizienten ( CV ) für die Berechnungszeiten pro Taskel bestimmt.
Die zwei Extremszenarien auf einer Skala, die sich aus dem Ausmaß dieser Variation ergeben, sind:
Zur besseren Einprägsamkeit beziehe ich mich auf folgende Szenarien:
Dichtes Szenario
In einem dichten Szenario wäre es wünschenswert, alle Taskels gleichzeitig zu verteilen, um den erforderlichen IPC- und Kontextwechsel auf ein Minimum zu beschränken. Dies bedeutet, dass wir nur so viele Chunks erstellen möchten, wie es viele Worker-Prozesse gibt. Wie bereits oben erwähnt, steigt das Gewicht von PO mit kürzeren Rechenzeiten pro Taskel.
Für einen maximalen Durchsatz möchten wir auch, dass alle Worker-Prozesse beschäftigt sind, bis alle Aufgaben verarbeitet sind (keine inaktiven Worker). Für dieses Ziel sollten die verteilten Blöcke gleich groß oder nahe sein.
Breites Szenario
Das beste Beispiel für ein breites Szenario wäre ein Optimierungsproblem, bei dem die Ergebnisse entweder schnell konvergieren oder die Berechnung Stunden, wenn nicht Tage dauern kann. Normalerweise ist es nicht vorhersehbar, welche Mischung aus "leichten Taskels" und "schweren Taskels" eine Task in einem solchen Fall enthält. Daher ist es nicht ratsam, zu viele Taskels gleichzeitig in einem Task-Batch zu verteilen. Wenn weniger Aufgaben gleichzeitig als möglich verteilt werden, erhöht sich die Planungsflexibilität. Dies ist hier erforderlich, um unser Unterziel einer hohen Auslastung aller Kerne zu erreichen.
Wenn
Pool
Methoden standardmäßig vollständig für das dichte Szenario optimiert würden, würden sie zunehmend suboptimale Zeitabläufe für jedes Problem erstellen, das sich näher am weiten Szenario befindet.4. Risiken von Chunksize> 1
Betrachten Sie dieses vereinfachte Pseudocode-Beispiel eines Wide Scenario -iterable, das wir an eine Pool-Methode übergeben möchten:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
Anstelle der tatsächlichen Werte geben wir vor, die erforderliche Rechenzeit in Sekunden zu sehen, der Einfachheit halber nur 1 Minute oder 1 Tag. Wir gehen davon aus, dass der Pool vier Worker-Prozesse (auf vier Kernen) hat und auf eingestellt
chunksize
ist2
. Da die Bestellung eingehalten wird, werden folgende Stücke an die Arbeiter gesendet:[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Da wir genug Arbeiter haben und die Rechenzeit hoch genug ist, können wir sagen, dass jeder Arbeitsprozess überhaupt einen Teil bekommt, an dem er arbeiten kann. (Dies muss nicht der Fall sein, um Aufgaben schnell zu erledigen). Weiter können wir sagen, dass die gesamte Verarbeitung ungefähr 86400 + 60 Sekunden dauern wird, da dies die höchste Gesamtberechnungszeit für einen Block in diesem künstlichen Szenario ist und wir Blöcke nur einmal verteilen.
Betrachten Sie nun diese Iterable, bei der nur ein Element seine Position im Vergleich zur vorherigen Iterable ändert:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
... und die entsprechenden Brocken:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
Nur Pech mit der Sortierung unserer iterable fast verdoppelt (86400 + 86400) unsere Gesamtverarbeitungszeit! Der Arbeiter, der den bösartigen (86400, 86400) -Stück erhält, blockiert, dass der zweite schwere Taskel in seiner Aufgabe an einen der untätigen Arbeiter verteilt wird, die bereits mit ihren (60, 60) -Blöcken fertig sind. Wir würden offensichtlich kein so unangenehmes Ergebnis riskieren, wenn wir uns setzen
chunksize=1
.Dies ist das Risiko größerer Brocken. Mit höheren Chunksize tauschen wir Planungsflexibilität gegen weniger Overhead und in Fällen wie oben ist das ein schlechtes Geschäft.
Wie wir in Kapitel 6 sehen werden. Quantifizierung der Algorithmuseffizienz , größere Blockgrößen können auch zu suboptimalen Ergebnissen für dichte Szenarien führen .
5. Pools Chunksize-Algorithmus
Unten finden Sie eine leicht modifizierte Version des Algorithmus im Quellcode. Wie Sie sehen können, habe ich den unteren Teil abgeschnitten und ihn in eine Funktion zur
chunksize
externen Berechnung des Arguments eingewickelt . Ich habe auch durch4
einenfactor
Parameter ersetzt und dielen()
Anrufe ausgelagert .# mp_utils.py def calc_chunksize(n_workers, len_iterable, factor=4): """Calculate chunksize argument for Pool-methods. Resembles source-code within `multiprocessing.pool.Pool._map_async`. """ chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 return chunksize
Um sicherzustellen, dass wir alle auf derselben Seite sind, gehen Sie wie
divmod
folgt vor:divmod(x, y)
ist eine eingebaute Funktion, die zurückgibt(x//y, x%y)
.x // y
ist die Bodenteilung, von der der abgerundete Quotient zurückgegeben wirdx / y
, währendx % y
die Modulo-Operation den Rest von zurückgibtx / y
. Also zBdivmod(10, 3)
kehrt zurück(3, 1)
.Wenn Sie sich nun ansehen
chunksize, extra = divmod(len_iterable, n_workers * 4)
, werden Sie feststellen, dassn_workers
hier der Divisory
inx / y
und die Multiplikation mit4
, ohne weitere Anpassung durchif extra: chunksize +=1
später, zu einer anfänglichen Blockgröße führt, die mindestens viermal kleiner (fürlen_iterable >= n_workers * 4
) ist als sonst.4
Berücksichtigen Sie diese Funktion, um den Effekt der Multiplikation mit dem Ergebnis der Zwischenblockgröße anzuzeigen:def compare_chunksizes(len_iterable, n_workers=4): """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize for Pool's complete algorithm. Return chunksizes and the real factors by which naive chunksizes are bigger. """ cs_naive = len_iterable // n_workers or 1 # naive approach cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo. cs_pool2 = calc_chunksize(n_workers, len_iterable) real_factor_pool1 = cs_naive / cs_pool1 real_factor_pool2 = cs_naive / cs_pool2 return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
Die obige Funktion berechnet die naive Chunksize (
cs_naive
) und die Chunksize im ersten Schritt des Chunksize-Algorithmus (cs_pool1
) von Pool sowie die Chunksize für den vollständigen Pool-Algorithmus (cs_pool2
). Weiter berechnet es die realen Faktorenrf_pool1 = cs_naive / cs_pool1
undrf_pool2 = cs_naive / cs_pool2
, die uns sagen, wie oft die naiv berechneten Blockgrößen größer sind als die internen Versionen von Pool.Unten sehen Sie zwei Figuren, die mit der Ausgabe dieser Funktion erstellt wurden. Die linke Abbildung zeigt nur die Blockgrößen für
n_workers=4
bis zu einer iterierbaren Länge von500
. Die rechte Abbildung zeigt die Werte fürrf_pool1
. Für iterierbare Längen16
wird der reale Faktor>=4
(fürlen_iterable >= n_workers * 4
) und sein Maximalwert gilt7
für iterierbare Längen28-31
. Dies ist eine massive Abweichung vom ursprünglichen Faktor, zu dem4
der Algorithmus für längere Iterabilitäten konvergiert. "Länger" ist hier relativ und hängt von der Anzahl der angegebenen Arbeitnehmer ab.Denken Sie daran , chunksize
cs_pool1
noch das fehltextra
-Einstellung mit dem Rest vondivmod
in enthaltenencs_pool2
aus dem gesamten Algorithmus.Der Algorithmus fährt fort mit:
if extra: chunksize += 1
Jetzt in Fällen gab es ist ein Rest (ein
extra
von der divmod-Operation), die chunksize um 1 zu erhöhen offensichtlich nicht für jede Aufgabe erarbeiten. Wenn es so wäre, gäbe es zunächst keinen Rest.Wie Sie in den Abbildungen unten sehen können, die „ Extra-Behandlung “ hat den Effekt, dass der reale Faktor für
rf_pool2
jetzt in Richtung konvergiert4
von unten4
und die Abweichung ist etwas glatter. Standardabweichung fürn_workers=4
undlen_iterable=500
fällt von0.5233
fürrf_pool1
nach0.4115
fürrf_pool2
.Eine Erhöhung
chunksize
um 1 hat schließlich zur Folge, dass die zuletzt übertragene Aufgabe nur eine Größe von hatlen_iterable % chunksize or chunksize
.Der interessantere und wie wir später sehen werden, konsequentere Effekt der Extrabehandlung kann jedoch für die Anzahl der erzeugten Chunks beobachtet werden (
n_chunks
). Für ausreichend lange Iterablesn_pool2
stabilisiert der abgeschlossene Chunksize-Algorithmus von Pool ( in der folgenden Abbildung) die Anzahl der Chunks bein_chunks == n_workers * 4
. Im Gegensatz dazu wechselt der naive Algorithmus (nach einem anfänglichen Rülpsen) ständig zwischenn_chunks == n_workers
undn_chunks == n_workers + 1
mit zunehmender Länge des iterierbaren Algorithmus .Unten finden Sie zwei erweiterte Info-Funktionen für Pools und den naiven Chunksize-Algorithmus. Die Ausgabe dieser Funktionen wird im nächsten Kapitel benötigt.
# mp_utils.py from collections import namedtuple Chunkinfo = namedtuple( 'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks', 'chunksize', 'last_chunk'] ) def calc_chunksize_info(n_workers, len_iterable, factor=4): """Calculate chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 # `+ (len_iterable % chunksize > 0)` exploits that `True == 1` n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) # exploit `0 == False` last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
Lassen Sie sich nicht von dem wahrscheinlich unerwarteten Aussehen verwirren
calc_naive_chunksize_info
. Dasextra
fromdivmod
wird nicht zur Berechnung der Blockgröße verwendet.def calc_naive_chunksize_info(n_workers, len_iterable): """Calculate naive chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers) if chunksize == 0: chunksize = 1 n_chunks = extra last_chunk = chunksize else: n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
6. Quantifizierung der Algorithmuseffizienz
Nachdem wir gesehen haben, wie die Ausgabe des
Pool
Chunksize-Algorithmus anders aussieht als die Ausgabe des naiven Algorithmus ...Wie im vorherigen Kapitel gezeigt, unterteilt der Chunksize-Algorithmus von Pool für längere Iterables (eine größere Anzahl von Taskels) das Iterable ungefähr in viermal mehr Chunks als die naive Methode. Kleinere Chunks bedeuten mehr Aufgaben und mehr Aufgaben bedeuten mehr Parallelization Overhead (PO). Diese Kosten müssen gegen den Vorteil einer erhöhten Planungsflexibilität abgewogen werden ( siehe "Risiken von Chunksize> 1" ).
Aus ziemlich offensichtlichen Gründen kann der grundlegende Chunksize-Algorithmus von Pool die Planungsflexibilität für uns nicht gegen PO abwägen . IPC-Overhead ist abhängig von der Betriebssystem-, Hardware- und Datengröße. Der Algorithmus kann weder wissen, auf welcher Hardware wir unseren Code ausführen, noch hat er eine Ahnung, wie lange es dauern wird, bis ein Taskel fertig ist. Es ist eine Heuristik, die grundlegende Funktionen für alle möglichen Szenarien bietet . Dies bedeutet, dass es nicht für ein bestimmtes Szenario optimiert werden kann. Wie bereits erwähnt, ist PO auch mit zunehmenden Rechenzeiten pro Taskel (negative Korrelation) zunehmend weniger ein Problem.
Wenn Sie sich an die Parallelisierungsziele aus Kapitel 2 erinnern , war ein Punkt:
Die bereits erwähnte etwas , Pool des chunksize-Algorithmus kann versuchen zu verbessern , ist die Minimierung Arbeiter-Prozesse im Leerlauf bzw. die Auslastung des CPU-Kerns .
Eine sich wiederholende Frage zu SO bezüglich
multiprocessing.Pool
wird von Personen gestellt, die sich über nicht verwendete Kerne / inaktive Arbeitsprozesse in Situationen wundern, in denen Sie erwarten würden, dass alle Arbeitsprozesse beschäftigt sind. Dies kann viele Gründe haben, aber Leerlauf-Worker-Prozesse gegen Ende einer Berechnung sind eine Beobachtung, die wir häufig machen können, selbst bei dichten Szenarien (gleiche Berechnungszeiten pro Taskel) in Fällen, in denen die Anzahl der Worker kein Teiler der Anzahl ist von Brocken (n_chunks % n_workers > 0
).Die Frage ist jetzt:
6.1 Modelle
Um hier tiefere Einsichten zu gewinnen, benötigen wir eine Form der Abstraktion paralleler Berechnungen, die die überkomplexe Realität bis zu einem überschaubaren Grad an Komplexität vereinfacht und gleichzeitig die Bedeutung innerhalb definierter Grenzen bewahrt. Eine solche Abstraktion wird als Modell bezeichnet . Eine Implementierung eines solchen " Parallelisierungsmodells" (PM) erzeugt Worker-Mapping-Metadaten (Zeitstempel) wie echte Berechnungen, wenn die Daten gesammelt würden. Die modellgenerierten Metadaten ermöglichen die Vorhersage von Metriken paralleler Berechnungen unter bestimmten Einschränkungen.
Eines von zwei Untermodellen innerhalb des hier definierten PM ist das Verteilungsmodell (DM) . Der DM erklärt, wie atomare Arbeitseinheiten (Taskels) über parallele Worker und Zeit verteilt sind , wenn keine anderen Faktoren als der jeweilige Chunksize-Algorithmus, die Anzahl der Worker, die Eingabe-Iterierbarkeit (Anzahl der Taskels) und deren Berechnungsdauer berücksichtigt werden . Gemeint ist jede Form von Overhead ist nicht enthalten.
Um eine vollständige PM zu erhalten , wird die DM um ein Overhead-Modell (OM) erweitert , das verschiedene Formen des Parallelisierungs-Overheads (PO) darstellt . Ein solches Modell muss für jeden Knoten einzeln kalibriert werden (Hardware-, Betriebssystemabhängigkeiten). Wie viele Arten von Overhead in einem OM dargestellt werden, bleibt offen, sodass mehrere OMs mit unterschiedlichem Komplexitätsgrad existieren können. Welche Genauigkeit der implementierte OM benötigt, wird durch das Gesamtgewicht der PO für die spezifische Berechnung bestimmt. Kürzere Aufgaben führen zu einem höheren PO- Gewicht , was wiederum ein genaueres OM erfordertwenn wir versuchen würden , Parallelisierungseffizienzen (PE) vorherzusagen .
6.2 Paralleler Zeitplan (PS)
Der parallele Zeitplan ist eine zweidimensionale Darstellung der parallelen Berechnung, wobei die x-Achse die Zeit und die y-Achse einen Pool paralleler Arbeiter darstellt. Die Anzahl der Arbeiter und die Gesamtberechnungszeit markieren die Ausdehnung eines Rechtecks, in das kleinere Rechtecke eingezeichnet sind. Diese kleineren Rechtecke repräsentieren atomare Arbeitseinheiten (Taskels).
Unten finden Sie die Visualisierung einer PS, die mit Daten aus dem Chunksize-Algorithmus von DM of Pool für das Dense-Szenario gezeichnet wurde .
Die Namen der zusammengesetzten Teile sind im Bild unten zu sehen.
In einer vollständigen PM mit einem OM ist die Leerlauffreigabe nicht auf das Ende beschränkt, sondern umfasst auch den Abstand zwischen Aufgaben und sogar zwischen Aufgaben.
6.3 Effizienz
Die oben eingeführten Modelle ermöglichen die Quantifizierung der Auslastungsrate der Arbeitnehmer. Wir können unterscheiden:
Es ist wichtig zu beachten, dass berechnete Wirkungsgrade nicht automatisch mit einer schnelleren Gesamtberechnung für ein bestimmtes Parallelisierungsproblem korrelieren . Die Arbeiternutzung unterscheidet in diesem Zusammenhang nur zwischen einem Arbeiter, der eine gestartete, aber noch nicht abgeschlossene Aufgabe hat, und einem Arbeiter, der keine solche "offene" Aufgabe hat. Das heißt, ein möglicher Leerlauf während der Zeitspanne eines Taskels wird nicht registriert.
Alle oben genannten Wirkungsgrade werden im Wesentlichen durch Berechnung des Quotienten der Division Busy Share / Parallel Schedule erhalten . Der Unterschied zwischen DE und PE der Busy Share einen kleineren Teil des gesamten parallelen Zeitplans für das Overhead-erweiterte PM belegt .
In dieser Antwort wird nur eine einfache Methode zur Berechnung von DE erörtert für das dichte Szenario erläutert. Dies ist ausreichend, um verschiedene Chunksize-Algorithmen zu vergleichen, da ...
6.3.1 Absolute Verteilungseffizienz (ADE)
Diese grundlegende Effizienz kann im Allgemeinen berechnet werden, indem der Busy Share durch das gesamte Potenzial des Parallel Schedule geteilt wird :
Für das Dense-Szenario sieht der vereinfachte Berechnungscode folgendermaßen aus:
# mp_utils.py def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Absolute Distribution Efficiency (ADE). `len_iterable` is not used, but contained to keep a consistent signature with `calc_rde`. """ if n_workers == 1: return 1 potential = ( ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize) + (n_chunks % n_workers == 1) * last_chunk ) * n_workers n_full_chunks = n_chunks - (chunksize > last_chunk) taskels_in_regular_chunks = n_full_chunks * chunksize real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk ade = real / potential return ade
Wenn es kein Leer Anteil , Busy Anteil wird gleich zu Schedule Parallel , daher erhalten wir eine ADE von 100%. In unserem vereinfachten Modell ist dies ein Szenario, in dem alle verfügbaren Prozesse während der gesamten Zeit, die für die Verarbeitung aller Aufgaben benötigt wird, ausgelastet sind. Mit anderen Worten, der gesamte Job wird effektiv zu 100 Prozent parallelisiert.
Aber warum muss ich halten mit Bezug auf PE als absolute PE hier?
Um dies zu verstehen, müssen wir einen möglichen Fall für die Chunksize (cs) in Betracht ziehen, der maximale Planungsflexibilität gewährleistet (auch die Anzahl der Highlander, die es geben kann. Zufall?):
Wenn wir zum Beispiel vier Arbeitsprozesse und 37 Aufgaben haben, wird es sogar mit untätigen Arbeitern geben
chunksize=1
, nur weiln_workers=4
es kein Teiler von 37 ist. Der Rest der Division von 37/4 ist 1. Diese einzige verbleibende Taskel muss sein von einem einzigen Arbeiter verarbeitet, während die restlichen drei im Leerlauf sind.Ebenso wird es immer noch einen Leerlaufarbeiter mit 39 Aufgaben geben, wie Sie unten sehen können.
Wenn Sie den oberen parallelen Zeitplan für
chunksize=1
mit der folgenden Version für vergleichenchunksize=3
, werden Sie feststellen, dass der obere parallele Zeitplan kleiner und die Zeitachse auf der x-Achse kürzer ist. Es sollte jetzt klar werden, wie unerwartet größere Blockgrößen auch zu längeren Gesamtberechnungszeiten führen können , selbst für dichte Szenarien .Weil der Overhead in diesem Modell nicht enthalten ist. Es wird für beide Blockgrößen unterschiedlich sein, daher ist die x-Achse nicht wirklich direkt vergleichbar. Der Overhead kann immer noch zu einer längeren Gesamtberechnungszeit führen, wie in Fall 2 aus der folgenden Abbildung gezeigt.
6.3.2 Relative Verteilungseffizienz (RDE)
Der ADE- Wert enthält keine Informationen, wenn eine bessere Verteilung der Taskels mit der Blockgröße 1 möglich ist. Besser bedeutet hier immer noch eine kleinere Leerlauffreigabe .
Um einen DE- Wert für das maximal mögliche DE einzustellen , müssen wir die betrachtete ADE durch die ADE teilen, für die wir erhalten
chunksize=1
.So sieht das im Code aus:
# mp_utils.py def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Relative Distribution Efficiency (RDE).""" ade_cs1 = calc_ade( n_workers, len_iterable, n_chunks=len_iterable, chunksize=1, last_chunk=1 ) ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk) rde = ade / ade_cs1 return rde
RDE , wie hier definiert, ist im Wesentlichen eine Geschichte über das Ende eines parallelen Zeitplans . Die RDE wird durch die maximal effektive Blockgröße im Schwanz beeinflusst. (Dieser Schwanz kann eine Länge von x-Achsen haben
chunksize
oderlast_chunk
.) Dies hat zur Folge, dass RDE für alle Arten von "Schwanz-Looks", wie in der folgenden Abbildung gezeigt , natürlich gegen 100% (gerade) konvergiert.Eine niedrige RDE ...
Teil II dieser Antwort finden Sie hier .
quelle
7. Naiver vs. Pools Chunksize-Algorithmus
Bevor Sie auf Details eingehen, betrachten Sie die beiden folgenden Gifs. Für eine Reihe unterschiedlicher
iterable
Längen zeigen sie, wie die beiden verglichenen Algorithmen die übergebeneniterable
Aufgaben aufteilen (es wird bis dahin eine Sequenz sein) und wie die resultierenden Aufgaben verteilt werden können. Die Reihenfolge der Mitarbeiter ist zufällig und die Anzahl der verteilten Aufgaben pro Mitarbeiter kann in der Realität von diesen Bildern für leichte Aufgaben und / oder Aufgaben in einem breiten Szenario abweichen. Wie bereits erwähnt, ist der Overhead hier ebenfalls nicht enthalten. Für ausreichend schwere Aufgaben in einem dichten Szenario mit vernachlässigbaren übertragenen Datengrößen zeichnen reale Berechnungen jedoch ein sehr ähnliches Bild.Wie in Kapitel " 5. Pools Chunksize-Algorithmus " gezeigt, stabilisiert sich mit Pools Chunksize-Algorithmus die Anzahl der Chunks
n_chunks == n_workers * 4
für ausreichend große Iterables, während ständig zwischenn_chunks == n_workers
undn_chunks == n_workers + 1
mit dem naiven Ansatz gewechselt wird . Für den naiven Algorithmus gilt: Weiln_chunks % n_workers == 1
istTrue
fürn_chunks == n_workers + 1
, wird ein neuer Abschnitt erstellt werden , in dem nur ein einzigen Arbeiter beschäftigt werden.Unten sehen Sie eine Abbildung ähnlich der in Kapitel 5 gezeigten, die jedoch die Anzahl der Abschnitte anstelle der Anzahl der Blöcke anzeigt. Für Pools vollständigen Chunksize-Algorithmus (
n_pool2
)n_sections
wird sich der berüchtigte, hartcodierte Faktor stabilisieren4
.n_sections
Wechselt für den naiven Algorithmus zwischen eins und zwei.Für Pool des chunksize-Algorithmus, die eine Stabilisierung auf
n_chunks = n_workers * 4
durch die vor genannten extra Behandlung , verhindert die Schaffung eines neuen Abschnitt hier und hält die Farbverreibung Anteil lange genug Iterables auf einen Arbeiter beschränkt. Darüber hinaus verringert der Algorithmus die relative Größe der Leerlauffreigabe weiter , was dazu führt, dass ein RDE-Wert gegen 100% konvergiert."Lang genug"
n_workers=4
istlen_iterable=210
zum Beispiel. Für Iterables, die gleich oder größer sind, ist der Leerlaufanteil auf einen Worker beschränkt, ein Merkmal, das ursprünglich aufgrund der4
Multiplikation innerhalb des Chunksize-Algorithmus verloren gegangen ist .Der naive Chunksize-Algorithmus konvergiert ebenfalls gegen 100%, dies geschieht jedoch langsamer. Der Konvergenzeffekt hängt ausschließlich von der Tatsache ab, dass der relative Teil des Schwanzes in Fällen, in denen es zwei Abschnitte geben wird, schrumpft. Dieser Schwanz mit nur einem beschäftigten Arbeiter ist auf die Länge
n_workers - 1
der x-Achse begrenzt , der mögliche maximale Rest fürlen_iterable / n_workers
.Unten finden Sie zwei Heatmaps, die die RDE- Werte für alle iterierbaren Längen bis zu 5000, für alle Anzahlen von Arbeitern von 2 bis 100 zeigen. Die Farbskala reicht von 0,5 bis 1 (50% -100%). Sie werden in der linken Heatmap viel mehr dunkle Bereiche (niedrigere RDE-Werte) für den naiven Algorithmus bemerken. Im Gegensatz dazu zeichnet der Chunksize-Algorithmus von Pool auf der rechten Seite ein viel sonnigeres Bild.
Der diagonale Gradient der dunklen Ecken unten links gegenüber den hellen Ecken oben rechts zeigt erneut die Abhängigkeit von der Anzahl der Arbeiter für das, was als "lange Iteration" bezeichnet wird.
Mit dem Chunksize-Algorithmus von Pool ist ein RDE- Wert von 81,25% der niedrigste Wert für den oben angegebenen Bereich von Arbeitern und iterierbaren Längen:
Mit dem naiven Chunksize-Algorithmus können die Dinge viel schlimmer werden. Die niedrigste berechnete RDE beträgt hier 50,72%. In diesem Fall läuft fast die Hälfte der Rechenzeit nur ein einziger Mitarbeiter! Also aufgepasst, stolze Besitzer von Knights Landing . ;)
8. Reality Check
In den vorangegangenen Kapiteln haben wir ein vereinfachtes Modell für das rein mathematische Verteilungsproblem betrachtet, das von den Details befreit ist, die die Mehrfachverarbeitung überhaupt zu einem heiklen Thema machen. Um besser zu verstehen, inwieweit das Verteilungsmodell (DM) allein dazu beitragen kann, die beobachtete Auslastung der Mitarbeiter in der Realität zu erklären, werden wir uns nun einige parallele Zeitpläne ansehen, die durch reale Berechnungen erstellt wurden.
Installieren
Die folgenden Diagramme befassen sich alle mit parallelen Ausführungen einer einfachen, CPU-gebundenen Dummy-Funktion, die mit verschiedenen Argumenten aufgerufen wird, damit wir beobachten können, wie sich der gezeichnete parallele Zeitplan in Abhängigkeit von den Eingabewerten ändert. Die "Arbeit" innerhalb dieser Funktion besteht nur aus der Iteration über ein Bereichsobjekt. Dies ist bereits genug, um einen Kern zu beschäftigen, da wir große Zahlen übergeben. Optional benötigt die Funktion einige aufgabenspezifische Besonderheiten
data
das nur unverändert zurückgegeben wird. Da jede Aufgabe genau den gleichen Arbeitsaufwand umfasst, handelt es sich hier immer noch um ein dichtes Szenario.Die Funktion ist mit einem Wrapper dekoriert, der Zeitstempel mit ns-Auflösung (Python 3.7+) verwendet. Die Zeitstempel werden verwendet, um die Zeitspanne eines Taskels zu berechnen und somit das Zeichnen eines empirischen parallelen Zeitplans zu ermöglichen.
@stamp_taskel def busy_foo(i, it, data=None): """Dummy function for CPU-bound work.""" for _ in range(int(it)): pass return i, data def stamp_taskel(func): """Decorator for taking timestamps on start and end of decorated function execution. """ @wraps(func) def wrapper(*args, **kwargs): start_time = time_ns() result = func(*args, **kwargs) end_time = time_ns() return (current_process().name, (start_time, end_time)), result return wrapper
Die Starmap-Methode von Pool ist auch so dekoriert, dass nur der Starmap-Aufruf selbst zeitlich festgelegt ist. "Start" und "Ende" dieses Aufrufs bestimmen das Minimum und Maximum auf der x-Achse des erzeugten parallelen Zeitplans.
Wir werden die Berechnung von 40 Taskels auf vier Worker-Prozessen auf einem Computer mit diesen Spezifikationen beobachten: Python 3.7.1, Ubuntu 18.04.2, Intel® Core ™ i7-2600K-CPU bei 3,40 GHz × 8
Die Eingabewerte, die variiert werden, sind die Anzahl der Iterationen in der for-Schleife (30k, 30M, 600M) und die zusätzliche Sendedatengröße (pro Task, Numpy-ndarray: 0 MiB, 50 MiB).
... N_WORKERS = 4 LEN_ITERABLE = 40 ITERATIONS = 30e3 # 30e6, 600e6 DATA_MiB = 0 # 50 iterable = [ # extra created data per taskel (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8))) # taskel args for i in range(LEN_ITERABLE) ] with Pool(N_WORKERS) as pool: results = pool.starmap(busy_foo, iterable)
Die unten gezeigten Läufe wurden handverlesen, um die gleiche Reihenfolge der Blöcke zu erhalten, damit Sie die Unterschiede im Vergleich zum parallelen Zeitplan aus dem Verteilungsmodell besser erkennen können. Vergessen Sie jedoch nicht, dass die Reihenfolge, in der die Mitarbeiter ihre Aufgabe erhalten, nicht deterministisch ist.
DM-Vorhersage
Um es noch einmal zu wiederholen: Das Verteilungsmodell "sagt" einen parallelen Zeitplan voraus, wie wir ihn bereits in Kapitel 6.2 gesehen haben:
1. RUN: 30.000 Iterationen und 0 MiB-Daten pro Taskel
Unser erster Lauf hier ist sehr kurz, die Aufgaben sind sehr "leicht". Der gesamte
pool.starmap()
Anruf dauerte insgesamt nur 14,5 ms. Sie werden feststellen, dass der Leerlauf im Gegensatz zum DM nicht auf den Heckbereich beschränkt ist, sondern auch zwischen Aufgaben und sogar zwischen Aufgaben stattfindet. Das liegt daran, dass unser wirklicher Zeitplan hier natürlich alle Arten von Overhead beinhaltet. Leerlauf bedeutet hier einfach alles außerhalb eines Taskels. Möglicher echter Leerlauf während eines Taskels wird nicht wie bereits erwähnt erfasst.Außerdem können Sie sehen, dass nicht alle Mitarbeiter gleichzeitig ihre Aufgaben erhalten. Dies liegt an der Tatsache, dass alle Mitarbeiter über einen gemeinsam genutzten Mitarbeiter gefüttert werden
inqueue
und jeweils nur ein Mitarbeiter daraus lesen kann. Gleiches gilt für dieoutqueue
. Dies kann zu größeren Störungen führen, sobald Sie nicht marginale Datenmengen übertragen, wie wir später sehen werden.Darüber hinaus können Sie feststellen, dass trotz der Tatsache, dass jedes Taskel den gleichen Arbeitsaufwand umfasst, die tatsächlich gemessene Zeitspanne für ein Taskel stark variiert. Die an Arbeiter 3 und Arbeiter 4 verteilten Aufgaben benötigen mehr Zeit als die von den ersten beiden Arbeitern verarbeiteten. Für diesen Lauf vermute ich, dass es daran liegt Turbo-Boost zurückzuführen ist für Worker-3/4 zu diesem Zeitpunkt nicht mehr auf den Kernen verfügbar ist, sodass sie ihre Aufgaben mit einer niedrigeren Taktrate abwickelten.
Die gesamte Berechnung ist so einfach, dass durch Hardware oder Betriebssystem eingeführte Chaosfaktoren die PS verzerren können drastisch verzerren können. Die Berechnung ist ein "Blatt im Wind" und die DM Vorhersage hat selbst für ein theoretisch passendes Szenario wenig Bedeutung.
2. RUN: 30 Millionen Iterationen und 0 MiB Daten pro Taskel
Wenn Sie die Anzahl der Iterationen in der for-Schleife von 30.000 auf 30 Millionen erhöhen, erhalten Sie einen echten parallelen Zeitplan, der nahezu perfekt mit dem übereinstimmt, der durch die vom DM bereitgestellten Daten vorhergesagt wird. Hurra! Die Berechnung pro Taskel ist jetzt schwer genug, um die Leerlaufteile zu Beginn und dazwischen zu marginalisieren und nur den großen Leerlaufanteil sichtbar zu machen, den der DM vorhergesagt hat.
3. RUN: 30 Millionen Iterationen und 50 MiB Daten pro Taskel
Wenn Sie die 30 Millionen Iterationen beibehalten, aber zusätzlich 50 MiB pro Taskel hin und her senden, wird das Bild erneut verzerrt. Hier ist der Warteschlangeneffekt gut sichtbar. Worker-4 muss länger auf seine zweite Aufgabe warten als Worker-1. Stellen Sie sich jetzt diesen Zeitplan mit 70 Arbeitern vor!
Wenn die Taskels rechenintensiv sind, aber eine bemerkenswerte Datenmenge als Nutzlast liefern, kann der Engpass einer einzelnen gemeinsam genutzten Warteschlange jeden zusätzlichen Vorteil verhindern, wenn mehr Mitarbeiter zum Pool hinzugefügt werden, selbst wenn sie durch physische Kerne unterstützt werden. In einem solchen Fall könnte Worker-1 mit seiner ersten Aufgabe fertig sein und auf eine neue warten, noch bevor Worker-40 seine erste Aufgabe erhalten hat.
Es sollte jetzt klar werden, warum die Rechenzeiten in a
Pool
nicht immer linear mit der Anzahl der Arbeiter abnehmen. Das Senden relativ großer Datenmengen kann zu Szenarien führen, in denen die meiste Zeit darauf gewartet wird, dass die Daten in den Adressraum eines Arbeitnehmers kopiert werden und nur ein Arbeitnehmer gleichzeitig eingespeist werden kann.4. RUN: 600 Millionen Iterationen und 50 MiB Daten pro Taskel
Hier senden wir erneut 50 MiB, erhöhen jedoch die Anzahl der Iterationen von 30M auf 600M, wodurch sich die Gesamtberechnungszeit von 10 s auf 152 s erhöht. Der gezeichnete parallele Zeitplan ist wieder nahezu perfekt mit dem vorhergesagten, der Overhead durch das Kopieren der Daten ist marginalisiert.
9. Fazit
Die diskutierte Multiplikation durch
4
erhöht die Planungsflexibilität, nutzt aber auch die Ungleichmäßigkeit bei der Aufgabenverteilung. Ohne diese Multiplikation wäre der Leerlaufanteil selbst für kurze Iterables (für DM) auf einen einzelnen Mitarbeiter beschränkt mit dichtem Szenario) . Der Chunksize-Algorithmus von Pool benötigt Eingabe-Iterables von einer bestimmten Größe, um dieses Merkmal wiederzugewinnen.Wie diese Antwort hoffentlich gezeigt hat, führt der Chunksize-Algorithmus von Pool im Durchschnitt zu einer besseren Kernauslastung im Vergleich zum naiven Ansatz, zumindest für den Durchschnittsfall und solange der Overhead nicht berücksichtigt wird. Der naive Algorithmus kann hier eine Verteilungseffizienz (DE) von nur ~ 51% haben, während der Chunksize-Algorithmus von Pool einen niedrigen Wert von ~ 81% hat. DE umfasst jedoch keinen Parallelization Overhead (PO) wie IPC. Kapitel 8 hat gezeigt, dass DE für das dichte Szenario mit marginalisiertem Overhead immer noch eine große Vorhersagekraft haben kann.
Trotz der Tatsache, dass der Chunksize-Algorithmus von Pool im Vergleich zum naiven Ansatz eine höhere DE erzielt , bietet er nicht für jede Eingabekonstellation optimale Taskel-Verteilungen. Während ein einfacher statischer Chunking-Algorithmus die Parallelisierungseffizienz (PE) nicht optimieren kann (einschließlich Overhead), gibt es keinen inhärenten Grund, warum er nicht immer eine relative Verteilungseffizienz (RDE) von 100% liefern könnte, dh dieselbe DE wie mit
chunksize=1
. Ein einfacher Chunksize-Algorithmus besteht nur aus grundlegender Mathematik und kann den Kuchen in irgendeiner Weise "in Scheiben schneiden".Im Gegensatz zu Pools Implementierung eines "Chunking-Algorithmus gleicher Größe" würde ein Chunking-Algorithmus gleicher Größe eine RDE von 100% für jede
len_iterable
/n_workers
Kombination liefern . Ein Chunking-Algorithmus mit gerader Größe wäre in der Quelle von Pool etwas komplizierter zu implementieren, kann jedoch zusätzlich zum vorhandenen Algorithmus moduliert werden, indem die Aufgaben extern gepackt werden (ich werde von hier aus einen Link erstellen, falls ich eine Frage / Antwort ablege) wie geht das).quelle
Ich denke, dass ein Teil dessen, was Sie vermissen, darin besteht, dass Ihre naive Schätzung davon ausgeht, dass jede Arbeitseinheit dieselbe Zeit benötigt. In diesem Fall wäre Ihre Strategie die beste. Wenn jedoch einige Jobs früher als andere beendet werden, werden einige Kerne möglicherweise inaktiv und warten darauf, dass die langsamen Jobs beendet werden.
Wenn Sie also die Chunks in viermal mehr Teile zerlegen, kann dieser Kern, wenn ein Chunk früh fertig ist, den nächsten Chunk starten (während die anderen Kerne weiter an ihrem langsameren Chunk arbeiten).
Ich weiß nicht, warum sie den Faktor 4 genau ausgewählt haben, aber es wäre ein Kompromiss zwischen der Minimierung des Overheads des Kartencodes (der die größtmöglichen Blöcke benötigt) und dem Ausgleichen von Blöcken, die unterschiedlich oft dauern (was den kleinstmöglichen Block wünscht) ).
quelle