Multiprocessing: Wie teile ich ein Diktat zwischen mehreren Prozessen?

112

Ein Programm, das mehrere Prozesse erstellt, die in einer Join-fähigen Warteschlange arbeiten Q, und möglicherweise ein globales Wörterbuch manipuliert D, um Ergebnisse zu speichern. (So ​​kann jeder untergeordnete Prozess Dsein Ergebnis speichern und auch sehen, welche Ergebnisse die anderen untergeordneten Prozesse erzielen.)

Wenn ich das Wörterbuch D in einem untergeordneten Prozess drucke, werden die Änderungen angezeigt, die daran vorgenommen wurden (dh auf D). Aber nachdem der Hauptprozess Q beigetreten ist, ist es ein leeres Diktat, wenn ich D drucke!

Ich verstehe, dass es sich um ein Synchronisations- / Sperrproblem handelt. Kann mir jemand sagen, was hier passiert und wie ich den Zugriff auf D synchronisieren kann?

dop
quelle
1
Dies funktioniert zumindest unter Python 3.7.2 mit osx 10.14.4 nicht wie erwartet. Dict ist nicht synchronisiert und sein Inhalt wird von anderen Prozessen neu geschrieben. <Code> multiprocessing.Manager (). List () </ code> funktioniert jedoch wie erwartet.
Andrew Druchenko

Antworten:

161

Eine allgemeine Antwort beinhaltet die Verwendung eines ManagerObjekts. Angepasst aus den Dokumenten:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Ausgabe:

$ python mul.py 
{1: '111', '2': 6}
senderle
quelle
4
Danke senderle. In der Tat löst D = Multiprocessing.Manager (). Dict () mein Problem. Ich habe D = dict () verwendet.
Dop
3
@ LorenzoBelli, wenn Sie fragen, ob der Zugriff auf den Manager synchronisiert ist, glaube ich, dass die Antwort ja ist. multiprocessing.Manager()gibt eine Instanz von zurückSyncManager , deren Name genau das andeutet!
senderle
@senderle Ich möchte den zufälligen Status eines übergeordneten Prozesses mit einem untergeordneten Prozess teilen. Ich habe versucht, Manageraber immer noch kein Glück. Könnten Sie bitte meine Frage hier ansehen und sehen, ob Sie eine Lösung anbieten können? Ich kann immer noch unterschiedliche Zufallszahlen erhalten, wenn ich np.random.seed(None)jedes Mal eine Zufallszahl generiere, aber dies erlaubt mir nicht, den Zufallsstatus des übergeordneten Prozesses zu verwenden, was nicht das ist, was ich will. Jede Hilfe wird sehr geschätzt.
Amir
1
@RadioControlled schreibt gerne ein Update, aber kurz gesagt, obwohl ich nicht glaube, dass Sie dies direkt erreichen können, können Sie einfach ein neues verwaltetes Diktat mit denselben Schlüsseln und Werten erstellen und dieses anstelle des Originals verwenden. Ist das für Ihren Fall ausreichend?
Absender
1
@senderle, das habe ich letztendlich gemacht. Die Antwort wäre also, dass Sie genau das tun müssten.
Radio Controlled
25

Multiprocessing ist nicht wie Threading. Jeder untergeordnete Prozess erhält eine Kopie des Speichers des Hauptprozesses. Im Allgemeinen wird der Status über Kommunikation (Pipes / Sockets), Signale oder gemeinsam genutzten Speicher geteilt.

Multiprocessing stellt einige Abstraktionen für Ihren Anwendungsfall zur Verfügung - gemeinsam genutzter Status, der mithilfe von Proxys oder gemeinsam genutztem Speicher als lokal behandelt wird: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Relevante Abschnitte:

Jeremy Brown
quelle
1
Vielen Dank. Sie haben mich zu der Lösung geführt: multiprocessing.Manager (). Dict ().
Dop
Kann jemand näher erläutern, was die Aussage "Jeder untergeordnete Prozess erhält eine Kopie des Speichers des Hauptprozesses" bedeutet.
Itsme2003
@ Itsme2003 Standardmäßig hat ein erzeugter Prozess keinen Zugriff auf den Speicher des übergeordneten Prozesses (dies ist einer der Hauptunterschiede zu Threads). Wenn ein Prozess ein Objekt des übergeordneten Prozesses benötigt, muss er eine Kopie davon erstellen (anstatt einen Verweis auf das tatsächliche Objekt zu erhalten). In der obigen Antwort wird erläutert, wie Objekte zwischen Prozessen ausgetauscht werden.
Niklas Mertsch
Weil dies oft falsch ist: Solange Sie das Objekt zumindest im üblichen Linux-Setup nicht ändern, wird das Objekt tatsächlich nur einmal im Speicher gespeichert. Es wird kopiert, sobald es geändert wird. Dies kann sehr wichtig sein, wenn Sie Speicherplatz sparen und das Objekt nicht ändern müssen.
Radio Controlled
16

Ich möchte meine eigene Arbeit teilen, die schneller als das Diktat des Managers ist und einfacher und stabiler als die Pyshmht-Bibliothek, die viel Speicher benötigt und unter Mac OS nicht funktioniert. Obwohl mein Diktat nur für einfache Saiten funktioniert und derzeit unveränderlich ist. Ich verwende eine lineare Abtastimplementierung und speichere Schlüssel- und Wertepaare in einem separaten Speicherblock nach der Tabelle.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

Auf meinem Laptop sind die Leistungsergebnisse:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

einfaches Anwendungsbeispiel:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
Alyaxey
quelle
13
Github? Dokumentation? Wie können wir dieses Tool verwenden?
Pavlos Panteliadis
10

Zusätzlich zu @ senderle's hier fragen sich einige möglicherweise auch, wie die Funktionalität von verwendet werden soll multiprocessing.Pool.

Das Schöne ist, dass .Pool()die managerInstanz eine Methode enthält, die alle bekannten APIs der obersten Ebene nachahmt multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Ausgabe:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Dies ist ein etwas anderes Beispiel, bei dem jeder Prozess nur seine Prozess-ID im globalen DictProxyObjekt protokolliert d.

Brad Solomon
quelle
3

Vielleicht können Sie pyshmht ausprobieren und die speicherbasierte Hash-Tabellenerweiterung für Python gemeinsam nutzen.

Beachten

  1. Es ist nicht vollständig getestet, nur als Referenz.

  2. Derzeit fehlen Lock / Sem-Mechanismen für die Mehrfachverarbeitung.

felix021
quelle