Verwenden Sie das numpy-Array im gemeinsam genutzten Speicher für die Mehrfachverarbeitung

110

Ich möchte ein Numpy-Array im gemeinsam genutzten Speicher zur Verwendung mit dem Multiprocessing-Modul verwenden. Die Schwierigkeit besteht darin, es wie ein Numpy-Array zu verwenden und nicht nur als ctypes-Array.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Dies erzeugt eine Ausgabe wie:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Auf das Array kann auf ctypes Weise zugegriffen werden, z arr[i]. B. macht es Sinn. Es ist jedoch kein Numpy-Array, und ich kann keine Operationen wie -1*arroder ausführen arr.sum(). Ich nehme an, eine Lösung wäre, das ctypes-Array in ein numpy-Array umzuwandeln. Ich glaube jedoch nicht (dass ich nicht in der Lage bin, diese Arbeit zu machen), dass sie nicht mehr geteilt wird.

Es scheint eine Standardlösung für ein allgemeines Problem zu geben.

Ian Langmore
quelle
1
Es ist nicht dasselbe wie dieses? stackoverflow.com/questions/5033799/…
Pygabriel
1
Es ist nicht ganz die gleiche Frage. Die verknüpfte Frage fragt subprocesseher nach als multiprocessing.
Andrew

Antworten:

81

Hinzufügen zu den Antworten von @ unutbu (nicht mehr verfügbar) und @Henry Gomersall. Sie können shared_arr.get_lock()den Zugriff bei Bedarf synchronisieren:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Beispiel

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Wenn Sie keinen synchronisierten Zugriff benötigen oder eigene Sperren erstellen, ist dies nicht erforderlich mp.Array(). Sie könnten mp.sharedctypes.RawArrayin diesem Fall verwenden.

jfs
quelle
2
Schöne Antwort! Wenn ich mehr als ein gemeinsam genutztes Array haben möchte, das separat gesperrt werden kann, aber die Anzahl der Arrays zur Laufzeit bestimmt ist, ist dies eine einfache Erweiterung dessen, was Sie hier getan haben?
Andrew
3
@ Andrew: Gemeinsame Arrays sollten erstellt werden, bevor untergeordnete Prozesse erzeugt werden.
JFS
Guter Punkt zur Reihenfolge der Operationen. Das hatte ich jedoch im Sinn: Erstellen Sie eine benutzerdefinierte Anzahl von gemeinsam genutzten Arrays und erzeugen Sie dann einige untergeordnete Prozesse. Ist das einfach?
Andrew
1
@Chicony: Sie können die Größe des Arrays nicht ändern. Stellen Sie sich einen gemeinsam genutzten Speicherblock vor, der zugewiesen werden musste, bevor untergeordnete Prozesse gestartet werden. Sie müssen nicht den gesamten Speicher verwenden, z. B. könnten Sie countan übergeben numpy.frombuffer(). Sie können versuchen, dies auf einer niedrigeren Ebene zu tun, indem Sie ein RawArray-Analogon (oder möglicherweise nach einer vorhandenen Bibliothek suchen) verwenden mmapoder etwas posix_ipcdirektes implementieren, um eine Größenänderung zu implementieren (möglicherweise das Kopieren während der Größenänderung). Oder wenn es Ihre Aufgabe erlaubt: Kopieren Sie Daten in Teilen (wenn Sie nicht alle auf einmal benötigen). "So ändern Sie die Größe eines gemeinsam genutzten Speichers" ist eine gute separate Frage.
JFS
1
@umopapisdn: Definiert Pool()die Anzahl der Prozesse (die Anzahl der verfügbaren CPU-Kerne wird standardmäßig verwendet). Mist die Häufigkeit, mit der die f()Funktion aufgerufen wird.
JFS
21

Dem ArrayObjekt ist eine get_obj()Methode zugeordnet, die das Array ctypes zurückgibt, das eine Pufferschnittstelle darstellt. Ich denke, das Folgende sollte funktionieren ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Beim Ausführen wird das erste Element von ajetzt 10.0 gedruckt , wobei aund angezeigt werdenb sind nur zwei Ansichten in den gleichen Speicher.

Um noch es um sicherzustellen , dass Multi - Prozessor sicher, ich glaube , Sie werden die benutzen müssen acquireund releaseMethoden , die auf dem existieren ArrayObjekt, aund seine in Schloss gebaut , um sicherzustellen , es ist alles sicher zugegriffen wird (obwohl ich kein Experte bin auf der Multiprozessormodul).

Henry Gomersall
quelle
Ohne Synchronisation funktioniert es nicht, wie @unutbu in seiner (jetzt gelöschten) Antwort gezeigt hat.
JFS
1
Wenn Sie nur auf die Nachbearbeitung des Arrays zugreifen möchten, kann dies vermutlich sauber durchgeführt werden, ohne sich um Parallelitätsprobleme und Sperren sorgen zu müssen.
Henry Gomersall
in diesem Fall brauchen Sie nicht mp.Array.
JFS
1
Der Verarbeitungscode erfordert möglicherweise gesperrte Arrays, die Interpretation der Daten nach der Verarbeitung ist jedoch möglicherweise nicht unbedingt erforderlich. Ich denke, das kommt vom Verständnis, was genau das Problem ist. Es ist klar, dass der gleichzeitige Zugriff auf gemeinsam genutzte Daten einen gewissen Schutz erfordert, was meiner Meinung nach offensichtlich wäre!
Henry Gomersall
16

Obwohl die bereits gegebenen Antworten gut sind, gibt es eine viel einfachere Lösung für dieses Problem, vorausgesetzt, zwei Bedingungen sind erfüllt:

  1. Sie sind POSIX-konform Betriebssystem (z. B. Linux, Mac OSX). und
  2. Ihre untergeordneten Prozesse benötigen schreibgeschützten Zugriff auf das freigegebene Array.

In diesem Fall müssen Sie nicht damit herumspielen, Variablen explizit gemeinsam zu nutzen, da die untergeordneten Prozesse mit einem Fork erstellt werden. Ein gegabeltes Kind teilt automatisch den Speicherplatz des Elternteils. Im Kontext der Python-Mehrfachverarbeitung bedeutet dies, dass alle Variablen auf Modulebene gemeinsam genutzt werden. Beachten Sie, dass dies nicht für Argumente gilt, die Sie explizit an Ihre untergeordneten Prozesse oder an die Funktionen übergeben, die Sie für a aufrufenmultiprocessing.Pool oder so .

Ein einfaches Beispiel:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
quelle
3
+1 Wirklich wertvolle Infos. Können Sie erklären, warum nur Variablen auf Modulebene gemeinsam genutzt werden? Warum sind lokale Variablen nicht Teil des Speicherplatzes der Eltern? Warum kann das nicht funktionieren, wenn ich eine Funktion F mit der lokalen Variable V und eine Funktion G innerhalb von F habe, die auf V verweist?
Coffee_Table
5
Warnung: Diese Antwort täuscht ein wenig. Der untergeordnete Prozess erhält zum Zeitpunkt des Verzweigens eine Kopie des Status des übergeordneten Prozesses, einschließlich globaler Variablen. Die Zustände sind in keiner Weise synchronisiert und werden von diesem Moment abweichen. Diese Technik kann in einigen Szenarien nützlich sein (z. B. das Ausschalten von untergeordneten Ad-hoc-Prozessen, die jeweils einen Snapshot des übergeordneten Prozesses verarbeiten und dann beenden), ist jedoch in anderen Szenarien nutzlos (z. B. lang laufende untergeordnete Prozesse, die gemeinsam genutzt werden müssen und Daten mit dem übergeordneten Prozess synchronisieren).
David Stein
4
@EelkeSpaak: Ihre Aussage - "Ein gegabeltes Kind teilt automatisch den Speicherplatz des Elternteils" - ist falsch. Wenn ich einen untergeordneten Prozess habe, der den Status des übergeordneten Prozesses streng schreibgeschützt überwachen möchte, bringt mich das Gabeln nicht dorthin: Das Kind sieht nur eine Momentaufnahme des übergeordneten Status zum Zeitpunkt des Gabelns. Genau das habe ich versucht (nach Ihrer Antwort), als ich diese Einschränkung entdeckte. Daher das Postskriptum zu Ihrer Antwort. Kurz gesagt: Der übergeordnete Status wird nicht "geteilt", sondern lediglich auf das Kind kopiert. Das ist nicht "Teilen" im üblichen Sinne.
David Stein
2
Bin ich falsch zu glauben, dass dies eine Copy-on-Write-Situation ist, zumindest auf Posix-Systemen? Das heißt, nach dem Fork wird der Speicher gemeinsam genutzt, bis neue Daten geschrieben werden. Zu diesem Zeitpunkt wird eine Kopie erstellt. Ja, es stimmt, dass die Daten nicht genau "geteilt" werden, aber sie können einen potenziell enormen Leistungsschub bewirken. Wenn Ihr Prozess schreibgeschützt ist, entsteht kein Kopieraufwand! Habe ich den Punkt richtig verstanden?
senderle
2
@senderle Ja, genau das habe ich gemeint! Daher mein Punkt (2) in der Antwort zum schreibgeschützten Zugriff.
EelkeSpaak
11

Ich habe ein kleines Python-Modul geschrieben, das POSIX Shared Memory verwendet, um Numpy-Arrays zwischen Python-Interpreten zu teilen. Vielleicht finden Sie es praktisch.

https://pypi.python.org/pypi/SharedArray

So funktioniert das:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
Matte
quelle
8

Sie können das sharedmemModul verwenden: https://bitbucket.org/cleemesser/numpy-sharedmem

Hier ist Ihr ursprünglicher Code, diesmal unter Verwendung eines gemeinsam genutzten Speichers, der sich wie ein NumPy-Array verhält (beachten Sie die zusätzliche letzte Anweisung, die eine NumPy- sum()Funktion aufruft ):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
quelle
1
Hinweis: Dies wird nicht mehr entwickelt und scheint unter Linux nicht zu funktionieren. Github.com/sturlamolden/sharedmem-numpy/issues/4
AD
numpy-sharedmem befindet sich möglicherweise nicht in der Entwicklung, funktioniert jedoch unter Linux. Weitere Informationen finden Sie unter github.com/vmlaker/benchmark-sharedmem .
Velimir Mlaker