Teilen Sie ein großes, schreibgeschütztes Numpy-Array zwischen Mehrfachverarbeitungsprozessen

87

Ich habe ein 60 GB SciPy Array (Matrix), das ich zwischen mehr als 5 multiprocessing ProcessObjekten teilen muss . Ich habe numpy-sharedmem gesehen und diese Diskussion auf der SciPy-Liste gelesen . Es scheint zwei Ansätze zu geben - numpy-sharedmemund die Verwendung von a multiprocessing.RawArray()und die Zuordnung von NumPy dtypes zu ctypes. Nun numpy-sharedmemscheint der richtige Weg zu sein, aber ich habe noch kein gutes Referenzbeispiel gesehen. Ich brauche keine Sperren, da das Array (eigentlich eine Matrix) schreibgeschützt ist. Aufgrund seiner Größe möchte ich jetzt eine Kopie vermeiden. Es klingt so, als ob die richtige Methode darin besteht, die einzige Kopie des Arrays als sharedmemArray zu erstellen und sie dann an die ProcessObjekte zu übergeben. Einige spezifische Fragen:

  1. Was ist der beste Weg, um die Sharedmem-Handles tatsächlich an Subes zu übergeben Process()? Benötige ich eine Warteschlange, um nur ein Array weiterzugeben? Wäre eine Pfeife besser? Kann ich es einfach als Argument an den Process()Init der Unterklasse übergeben (wo ich davon ausgehe, dass es eingelegt ist)?

  2. In der Diskussion, die ich oben verlinkt habe, wird erwähnt, dass es numpy-sharedmemnicht 64-Bit-sicher ist. Ich verwende definitiv einige Strukturen, die nicht mit 32-Bit adressierbar sind.

  3. Gibt es Kompromisse bei der RawArray()Herangehensweise? Langsamer, fehlerhafter?

  4. Benötige ich eine Zuordnung von Typ zu Typ für die Methode numpy-sharedmem?

  5. Hat jemand ein Beispiel für OpenSource-Code, der dies tut? Ich bin sehr praktisch und es ist schwierig, dies zum Laufen zu bringen, ohne ein gutes Beispiel zu haben.

Wenn ich zusätzliche Informationen zur Verfügung stellen kann, um dies für andere zu klären, kommentieren Sie diese bitte und ich werde sie hinzufügen. Vielen Dank!

Dies muss unter Ubuntu Linux und möglicherweise Mac OS ausgeführt werden, aber die Portabilität ist kein großes Problem.

Wille
quelle
1
Wenn die verschiedenen Prozesse in dieses Array schreiben, müssen Sie multiprocessingfür jeden Prozess eine Kopie des Ganzen erstellen.
Tiago
3
@tiago: "Ich brauche keine Sperren, da das Array (eigentlich eine Matrix) schreibgeschützt ist"
Dr. Jan-Philip Gehrcke
1
@tiago: Außerdem erstellt Multiprocessing keine Kopie, solange dies nicht explizit angegeben wird (über Argumente an target_function). Das Betriebssystem kopiert Teile des Speichers des Elternteils nur nach Änderung in den Speicher des Kindes.
Dr. Jan-Philip Gehrcke
Ich habe vorher ein paar Fragen dazu gestellt. Meine Lösung finden Sie hier: github.com/david-hoffman/peaks/blob/… (Entschuldigung, der Code ist eine Katastrophe).
David Hoffman

Antworten:

30

@ Velimir Mlaker gab eine tolle Antwort. Ich dachte, ich könnte ein paar Kommentare und ein kleines Beispiel hinzufügen.

(Ich konnte nicht viel Dokumentation zu sharedmem finden - dies sind die Ergebnisse meiner eigenen Experimente.)

  1. Müssen Sie die Handles übergeben, wenn der Unterprozess gestartet wird oder nachdem er gestartet wurde? Wenn es nur das erstere ist, können Sie einfach die Argumente targetund argsfür verwenden Process. Dies ist möglicherweise besser als die Verwendung einer globalen Variablen.
  2. Auf der von Ihnen verlinkten Diskussionsseite wird angezeigt, dass vor einiger Zeit die Unterstützung für 64-Bit-Linux zu sharedmem hinzugefügt wurde, sodass dies möglicherweise kein Problem darstellt.
  3. Ich weiß nichts darüber.
  4. Siehe Beispiel unten.

Beispiel

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

Ausgabe

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

Diese verwandte Frage könnte nützlich sein.

James Lim
quelle
37

Wenn Sie unter Linux (oder einem POSIX-kompatiblen System) arbeiten, können Sie dieses Array als globale Variable definieren. multiprocessingwird unter fork()Linux verwendet, wenn ein neuer untergeordneter Prozess gestartet wird. Ein neu erzeugter untergeordneter Prozess teilt den Speicher automatisch mit seinem übergeordneten Prozess, solange er ihn nicht ändert ( Copy-on-Write- Mechanismus).

Da Sie sagen "Ich brauche keine Sperren, da das Array (eigentlich eine Matrix) schreibgeschützt ist", wäre es ein sehr einfacher und dennoch äußerst effizienter Ansatz, dieses Verhalten auszunutzen: Alle untergeordneten Prozesse greifen darauf zu die gleichen Daten im physischen Speicher beim Lesen dieses großen numpy-Arrays.

Sie übergeben Sie Ihre Array nicht an den Process()Konstruktor, wird dieses anweisen , multiprocessingauf pickledie Daten an das Kind, das in Ihrem Fall extrem ineffizient oder unmöglich wäre. Unter Linux ist direkt nach fork()dem Kind eine exakte Kopie des Elternteils, die denselben physischen Speicher verwendet. Sie müssen also nur sicherstellen, dass die Python-Variable, die die Matrix enthält, über die targetFunktion, an die Sie übergeben, zugänglich ist Process(). Dies können Sie normalerweise mit einer 'globalen' Variablen erreichen.

Beispielcode:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

Unter Windows - das nicht unterstützt fork()- multiprocessingwird der Win32-API-Aufruf verwendet CreateProcess. Es erstellt einen völlig neuen Prozess aus einer bestimmten ausführbaren Datei. Deshalb unter Windows eine ist erforderlich , um Beize Daten an das Kind , wenn ein Bedarf Daten , die während der Laufzeit des Mutter erstellt wurde.

Dr. Jan-Philip Gehrcke
quelle
3
Beim Kopieren beim Schreiben wird die Seite kopiert, die den Referenzzähler enthält (sodass jeder gegabelte Python seinen eigenen Referenzzähler hat), es wird jedoch nicht das gesamte Datenarray kopiert.
Robince
1
Ich möchte hinzufügen , dass ich mehr Erfolg mit Modul - Ebene Variablen hatten als mit globalen Variablen ... dh die Variable auf ein Modul in globalen Bereich vor dem Gabelende
robince
4
Ein Wort der Vorsicht für Personen, die über diese Frage / Antwort stolpern: Wenn Sie OpenBLAS-verknüpftes Numpy für den Multithreadead-Vorgang verwenden, müssen Sie das Multithreading deaktivieren (OPENBLAS_NUM_THREADS = 1 exportieren), wenn untergeordnete multiprocessingProzesse verwendet werden oder hängen bleiben ( In der Regel wird 1 / n eines Prozessors anstelle von n Prozessoren verwendet, wenn lineare Algebraoperationen an einem gemeinsam genutzten globalen Array / einer gemeinsam genutzten globalen Matrix ausgeführt werden. Der bekannte Multithread-Konflikt mit OpenBLAS scheint sich auf Pythonmultiprocessing
Dologan
1
Kann jemand erklären, warum Python nicht einfach das Betriebssystem verwendet fork, um die angegebenen Parameter zu übergeben Process, anstatt sie zu serialisieren? Das heißt, konnte das nicht forkunmittelbar vor dem child Aufruf auf den übergeordneten Prozess angewendet werden , sodass der Parameterwert weiterhin vom Betriebssystem verfügbar ist? Scheint effizienter zu sein als es zu serialisieren?
Max
2
Wir sind uns alle bewusst, dass fork()dies unter Windows nicht verfügbar ist. Dies wurde in meiner Antwort und mehrfach in den Kommentaren angegeben. Ich weiß , dass dies Ihre erste Frage ist, und ich antwortete es vier Kommentare über diese : „Der Kompromiss ist auf beiden Plattformen standardmäßig die gleiche Methode der Parameterübergabe zu verwenden , für eine bessere Wartbarkeit und für gleiches Verhalten zu gewährleisten.“. Beide Möglichkeiten haben ihre Vor- und Nachteile, weshalb der Benutzer in Python 3 flexibler die Methode auswählen kann. Diese Diskussion ist ohne produktive Details nicht produktiv, was wir hier nicht tun sollten.
Dr. Jan-Philip Gehrcke
24

Vielleicht interessiert Sie ein winziger Code, den ich geschrieben habe: github.com/vmlaker/benchmark-sharedmem

Die einzige Datei von Interesse ist main.py. Es ist ein Benchmark von numpy-sharedmem - der Code übergibt Arrays einfach (entweder numpyoder sharedmem) über Pipe an erzeugte Prozesse. Die Arbeiter rufen Sie einfach sum()auf die Daten. Ich war nur daran interessiert, die Datenkommunikationszeiten zwischen den beiden Implementierungen zu vergleichen.

Ich habe auch einen anderen, komplexeren Code geschrieben: github.com/vmlaker/sherlock .

Hier verwende ich das numpy-sharedmem- Modul für die Echtzeit-Bildverarbeitung mit OpenCV - die Bilder sind NumPy-Arrays gemäß der neueren OpenCV- cv2API. Die Bilder, eigentlich Referenzen davon, über das Dictionary - Objekt zwischen Prozessen gemeinsam erstellt von multiprocessing.Manager(im Gegensatz zu verwenden Queue oder Rohr gegenüber .) Ich bin immer große Leistungsverbesserungen im Vergleich mit der Verwendung von Klar NumPy Arrays.

Pipe vs. Queue :

Nach meiner Erfahrung ist IPC mit Pipe schneller als Queue. Und das ist sinnvoll, da Queue Sperren hinzufügt, um die Sicherheit für mehrere Hersteller / Verbraucher zu gewährleisten. Pipe nicht. Wenn Sie jedoch nur zwei Prozesse haben, die hin und her sprechen, ist es sicher, Pipe zu verwenden, oder, wie in den Dokumenten gelesen:

... besteht keine Gefahr der Beschädigung durch Prozesse, die gleichzeitig unterschiedliche Rohrenden verwenden.

sharedmemSicherheit :

Das Hauptproblem mit dem sharedmemModul ist die Möglichkeit eines Speicherverlusts beim nicht ordnungsgemäßen Beenden des Programms. Dies wird in einer längeren Diskussion beschrieben hier . Obwohl Sturla am 10. April 2011 eine Korrektur für Speicherverluste erwähnt, habe ich seitdem immer noch Lecks festgestellt, bei denen beide Repos verwendet wurden, Sturla Moldens eigene auf GitHub ( github.com/sturlamolden/sharedmem-numpy ) und Chris Lee-Messers auf Bitbucket ( bitbucket.org/cleemesser/numpy-sharedmem ).

Velimir Mlaker
quelle
Danke, sehr sehr informativ. Der Speicherverlust sharedmemklingt jedoch nach einer großen Sache. Irgendwelche Hinweise zur Lösung des Problems?
Will
1
Abgesehen davon, dass ich nur die Lecks bemerkt habe, habe ich im Code nicht danach gesucht. Ich habe meiner Antwort unter "Sharedmem-Sicherheit" oben als sharedmemReferenz die Bewahrer der beiden Open-Source-Repos des Moduls hinzugefügt .
Velimir Mlaker
13

Wenn Ihr Array so groß ist, können Sie verwenden numpy.memmap. Wenn Sie beispielsweise ein Array auf der Festplatte gespeichert haben 'test.array', können Sie auch im Schreibmodus simultane Prozesse verwenden, um auf die darin enthaltenen Daten zuzugreifen. Ihr Fall ist jedoch einfacher, da Sie nur den Lesemodus benötigen.

Erstellen des Arrays:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

Sie können dieses Array dann genauso füllen wie mit einem normalen Array. Beispielsweise:

a[:10,:100]=1.
a[10:,100:]=2.

Die Daten werden beim Löschen der Variablen auf der Festplatte gespeichert a.

Später können Sie mehrere Prozesse verwenden, die auf die Daten zugreifen test.array:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Verwandte Antworten:

Saullo GP Castro
quelle
3

Es kann auch nützlich sein, die Dokumentation für pyro zu lesen, um Ihre Aufgabe entsprechend zu partitionieren und verschiedene Abschnitte auf verschiedenen Computern sowie auf verschiedenen Kernen auf demselben Computer auszuführen.

Steve Barnes
quelle
0

Warum nicht Multithreading verwenden? Ressourcen des Hauptprozesses können von seinen Threads nativ gemeinsam genutzt werden. Daher ist Multithreading offensichtlich eine bessere Möglichkeit, Objekte des Hauptprozesses gemeinsam zu nutzen.

Wenn Sie sich über den GIL-Mechanismus von Python Sorgen machen, können Sie vielleicht auf den nogilvon zurückgreifen numba.

Nico
quelle