Wie verwende ich die Multiprocessing-Warteschlange in Python?

89

Ich habe große Probleme damit, zu verstehen, wie die Multiprocessing-Warteschlange in Python funktioniert und wie sie implementiert wird. Nehmen wir an, ich habe zwei Python-Module, die auf Daten aus einer gemeinsam genutzten Datei zugreifen. Nennen wir diese beiden Module einen Writer und einen Reader. Mein Plan ist es, dass sowohl der Leser als auch der Schreiber Anforderungen in zwei separate Mehrfachverarbeitungswarteschlangen stellen und diese Anforderungen dann von einem dritten Prozess in einer Schleife platzieren und als solche ausführen.

Mein Hauptproblem ist, dass ich wirklich nicht weiß, wie multiprocessing.queue korrekt implementiert wird. Sie können das Objekt nicht für jeden Prozess instanziieren, da es sich um separate Warteschlangen handelt. Wie stellen Sie sicher, dass sich alle Prozesse auf eine gemeinsam genutzte Warteschlange beziehen (oder in diesem Fall Warteschlangen)

Jab
quelle
4
Übergeben Sie die Warteschlangen als Parameter an jede Prozessklasse, wenn Sie sie im übergeordneten Prozess instanziieren.
Joel Cornett

Antworten:

119

Mein Hauptproblem ist, dass ich wirklich nicht weiß, wie multiprocessing.queue korrekt implementiert wird. Sie können das Objekt nicht für jeden Prozess instanziieren, da es sich um separate Warteschlangen handelt. Wie stellen Sie sicher, dass sich alle Prozesse auf eine gemeinsam genutzte Warteschlange beziehen (oder in diesem Fall Warteschlangen)

Dies ist ein einfaches Beispiel für einen Leser und einen Schreiber, die sich eine einzelne Warteschlange teilen ... Der Schreiber sendet eine Reihe von Ganzzahlen an den Leser. Wenn dem Schreiber die Zahlen ausgehen, sendet er 'DONE', wodurch der Leser weiß, dass er aus der Leseschleife ausbrechen kann.

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))
Mike Pennington
quelle
11
Tolles Beispiel. Nur als zusätzliche Information, um die Verwirrung des OP zu beheben ... Dieses Beispiel zeigt, dass eine gemeinsam genutzte Warteschlange aus dem Master-Prozess stammen muss, der dann an alle seine Unterprozesse übergeben wird. Damit zwei völlig unabhängige Prozesse Daten gemeinsam nutzen können, müssen sie über ein zentrales oder zugehöriges Netzwerkgerät (z. B. Sockets) kommunizieren. Etwas muss die Informationen koordinieren.
JDI
5
schönes Beispiel .. Ich bin auch neu in diesem Thema .. Wenn ich mehrere Prozesse habe, die dieselbe Zielfunktion ausführen (mit unterschiedlichen Argumenten), ist eine Sperre erforderlich, um sicherzustellen, dass sie nicht zusammenstoßen, während die Daten in die Warteschlange gestellt werden ?
WYSIWYG
@bharat_iyengar Aus der Dokumentation des Multiprocessing-Moduls geht hervor, dass die Warteschlange mit einigen Sperren / Semaphoren implementiert wird. Wenn Sie also die Methoden get () und put (object) Queue verwenden, wird die Warteschlange blockiert, wenn ein anderer Prozess / Thread versucht, etwas in die Warteschlange zu stellen oder zu stellen. Sie müssen sich also nicht um das manuelle Sperren kümmern.
Almel
1
Explizite Stoppbedingungen sind besser als implizite Stoppbedingungen
Mike Pennington
2
Qsize kann auf Null gehen, wenn die Warteschlangenleser die Rate des Warteschlangenschreibers überschreiten
Mike Pennington
7

in " from queue import Queue" wird kein Modul aufgerufen queue, sondern multiprocessingsollte verwendet werden. Daher sollte es wie " from multiprocessing import Queue" aussehen

Jean
quelle
10
Während Jahre zu spät, ist die Verwendung multiprocessing.Queuekorrekt. Das Normal Queue.Queuewird für Python- Threads verwendet . Wenn Sie versuchen, die Queue.QueueMehrfachverarbeitung zu verwenden, werden in jedem untergeordneten Prozess Kopien des Warteschlangenobjekts erstellt und die untergeordneten Prozesse werden niemals aktualisiert. Funktioniert grundsätzlich Queue.Queuemit einem globalen gemeinsam genutzten Objekt und multiprocessing.Queuemit IPC. Siehe: stackoverflow.com/questions/925100/…
Michael Guffre
5

Hier ist eine kinderleichte Verwendung von multiprocessing.Queueund multiprocessing.Processermöglicht es Anrufern, ein "Ereignis" plus Argumente an einen separaten Prozess zu senden, der das Ereignis an eine "do_" -Methode des Prozesses sendet. (Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

Verwendung:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

Das sendpassiert im übergeordneten Prozess, das do_*passiert im untergeordneten Prozess.

Ich habe jede Ausnahmebehandlung ausgelassen, die offensichtlich die Ausführungsschleife unterbrechen und den untergeordneten Prozess beenden würde. Sie können es auch anpassen, indem Sie es überschreiben run, um das Blockieren oder was auch immer zu steuern.

Dies ist wirklich nur in Situationen nützlich, in denen Sie einen einzelnen Arbeitsprozess haben, aber ich denke, es ist eine relevante Antwort auf diese Frage, um ein allgemeines Szenario mit etwas mehr Objektorientierung zu demonstrieren.

Joe Holloway
quelle
1
Hervorragende Antwort! Danke dir. +50 :)
kmiklas
2

Wir haben zwei Versionen davon implementiert, eine einfache Multi- Thread- Pool, die viele Arten von Callables ausführen kann, was unser Leben erheblich erleichtert, und die zweite Version, die Prozesse verwendet , die in Bezug auf Callables weniger flexibel sind und einen zusätzlichen Aufruf zum Dill erfordern.

Wenn Sie gefroren_pool auf true setzen, wird die Ausführung eingefroren, bis finish_pool_queue in einer der Klassen aufgerufen wird.

Thread-Version:

'''
Created on Nov 4, 2019

@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os

class ThreadPool(object):
    def __init__(self, queue_threads, *args, **kwargs):
        self.frozen_pool = kwargs.get('frozen_pool', False)
        self.print_queue = kwargs.get('print_queue', True)
        self.pool_results = []
        self.lock = Lock()
        self.queue_threads = queue_threads
        self.queue = Queue()
        self.threads = []

        for i in range(self.queue_threads):
            t = Thread(target=self.make_pool_call)
            t.daemon = True
            t.start()
            self.threads.append(t)

    def make_pool_call(self):
        while True:
            if self.frozen_pool:
                #print '--> Queue is frozen'
                sleep(1)
                continue

            item = self.queue.get()
            if item is None:
                break

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.lock.acquire()
                    self.pool_results.append((item, result))
                    self.lock.release()

            except Exception as e:
                self.lock.acquire()
                print e
                traceback.print_exc()
                self.lock.release()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self):
        self.frozen_pool = False

        while self.queue.unfinished_tasks > 0:
            if self.print_queue:
                print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
            sleep(5)

        self.queue.join()

        for i in range(self.queue_threads):
            self.queue.put(None)

        for t in self.threads:
            t.join()

        del self.threads[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]

Prozessversion:

  '''
Created on Nov 4, 2019

@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
    RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc

class ProcessPool(object):
    def __init__(self, queue_processes, *args, **kwargs):
        self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
        self.print_queue = kwargs.get('print_queue', True)
        self.manager = Manager()
        self.pool_results = self.manager.list()
        self.queue_processes = queue_processes
        self.queue = JoinableQueue()
        self.processes = []

        for i in range(self.queue_processes):
            p = Process(target=self.make_pool_call)
            p.start()
            self.processes.append(p)

        print 'Processes', self.queue_processes

    def make_pool_call(self):
        while True:
            if self.frozen_pool.value:
                sleep(1)
                continue

            item_pickled = self.queue.get()

            if item_pickled is None:
                #print '--> Ending'
                self.queue.task_done()
                break

            item = dill.loads(item_pickled)

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.pool_results.append(dill.dumps((item, result)))
                else:
                    del call, args, kwargs, keep_results, item, result

            except Exception as e:
                print e
                traceback.print_exc()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self, callable=None):
        self.frozen_pool.value = False

        while self.queue._unfinished_tasks.get_value() > 0:
            if self.print_queue:
                print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))

            if callable:
                callable()

            sleep(5)

        for i in range(self.queue_processes):
            self.queue.put(None)

        self.queue.join()
        self.queue.close()

        for p in self.processes:
            with ignore_exception: p.join(10)
            with ignore_exception: p.terminate()

        with ignore_exception: del self.processes[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]
def test(eg):
        print 'EG', eg

Rufen Sie an mit:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

oder

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
Kevin Parker
quelle
1

Ich habe mir mehrere Antworten über den Stapelüberlauf und das Web hinweg angesehen, während ich versucht habe, eine Methode für die Mehrfachverarbeitung mithilfe von Warteschlangen für die Weitergabe großer Pandas-Datenrahmen einzurichten. Es schien mir, dass jede Antwort die gleiche Art von Lösungen wiederholte, ohne die Vielzahl von Randfällen zu berücksichtigen, auf die man bei der Erstellung solcher Berechnungen definitiv stoßen wird. Das Problem ist, dass viele Dinge gleichzeitig im Spiel sind. Die Anzahl der Aufgaben, die Anzahl der Mitarbeiter, die Dauer jeder Aufgabe und mögliche Ausnahmen während der Ausführung der Aufgabe. All dies macht die Synchronisation schwierig und die meisten Antworten beziehen sich nicht darauf, wie Sie vorgehen können. Das ist also meine Einstellung, nachdem ich ein paar Stunden herumgespielt habe. Hoffentlich ist dies allgemein genug, damit die meisten Leute es nützlich finden.

Einige Gedanken vor Codierungsbeispielen. Da queue.Emptyoder queue.qsize()oder ein anderes ähnliches Verfahren für die Flusskontrolle unzuverlässig ist, kann jeder Code dergleichen verwendet werden

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

ist falsch. Dies wird den Arbeiter töten, selbst wenn Millisekunden später eine andere Aufgabe in der Warteschlange auftaucht. Der Arbeiter wird sich nicht erholen und nach einer Weile verschwinden ALLE Arbeiter, da sie die Warteschlange zufällig für einen Moment leer finden. Das Endergebnis ist, dass die Haupt-Multiprozessor-Funktion (die mit dem Join () für die Prozesse) zurückgegeben wird, ohne dass alle Aufgaben abgeschlossen sind. Nett. Viel Glück beim Debuggen, wenn Sie Tausende von Aufgaben haben und einige fehlen.

Das andere Problem ist die Verwendung von Sentinel-Werten. Viele Leute haben vorgeschlagen, einen Sentinel-Wert in die Warteschlange aufzunehmen, um das Ende der Warteschlange zu kennzeichnen. Aber um es genau wem zu kennzeichnen? Wenn es N Worker gibt, unter der Annahme, dass N die Anzahl der verfügbaren Kerne ist, die geben oder nehmen, markiert ein einzelner Sentinel-Wert nur das Ende der Warteschlange für einen Worker. Alle anderen Arbeiter werden sitzen und auf weitere Arbeit warten, wenn keine mehr übrig ist. Typische Beispiele, die ich gesehen habe, sind

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

Ein Mitarbeiter erhält den Sentinel-Wert, während der Rest auf unbestimmte Zeit wartet. In keinem Beitrag, auf den ich gestoßen bin, wurde erwähnt, dass Sie den Sentinel-Wert mindestens so oft an die Warteschlange senden müssen, wie Sie Mitarbeiter haben, damit ALLE ihn erhalten.

Das andere Problem ist die Behandlung von Ausnahmen während der Taskausführung. Auch diese sollten gefangen und verwaltet werden. Wenn Sie eine completed_tasksWarteschlange haben, sollten Sie außerdem unabhängig und deterministisch zählen, wie viele Elemente sich in der Warteschlange befinden, bevor Sie entscheiden, dass die Aufgabe erledigt ist. Das Verlassen auf Warteschlangengrößen schlägt erneut fehl und gibt unerwartete Ergebnisse zurück.

Im folgenden Beispiel par_proc()erhält die Funktion eine Liste von Aufgaben, einschließlich der Funktionen, mit denen diese Aufgaben ausgeführt werden sollen, sowie benannte Argumente und Werte.

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None


def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})


def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results

Und hier ist ein Test, gegen den der obige Code ausgeführt werden kann

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21

plus eine andere mit einigen Ausnahmen

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21

Hoffe das ist hilfreich.

Nick B.
quelle
0

Ich habe gerade ein einfaches und allgemeines Beispiel für die Demonstration der Weitergabe einer Nachricht über eine Warteschlange zwischen zwei eigenständigen Programmen erstellt. Es beantwortet die Frage des OP nicht direkt, sollte aber klar genug sein, um das Konzept anzugeben.

Server:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q

    if not ident in q:
        q[ident] = multiprocessing.Queue()

    return q[ident]


q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')


def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)


def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")

        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue


async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()

    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))

    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise


if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

Klient:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


delattr(QueueManager, 'get_queue')


def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')


def main():
    init_queue_manager_client()

    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()

    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)


if __name__ == '__main__':
    main()

Verwendung

Server:

$ python3 multiprocessing-queue-manager-server.py N

Nist eine Ganzzahl, die angibt, wie viele Server erstellt werden sollen. Kopieren Sie eine der <server-address-N>Ausgaben des Servers und machen Sie sie zum ersten Argument multiprocessing-queue-manager-client.py.

Klient:

python3 multiprocessing-queue-manager-client.py <server-address-1>

Ergebnis

Server:

Client 1: <item> from <server-address-1>

Inhalt: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD : ein Paket Erstellt hier .

Server:

import ipcq


with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server:
    server.get_queue().get()

Klient:

import ipcq


client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT)
client.get_queue().put('a message')

Geben Sie hier die Bildbeschreibung ein

changyuheng
quelle