Wie kann man parallel und verzögert so implementieren, dass die parallelisierte for-Schleife stoppt, wenn die Ausgabe einen Schwellenwert unterschreitet?

8

Angenommen, ich habe den folgenden Code:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

Ich möchte die Ausgabe nur bis zur Ausgabe> einer bestimmten Zahl berechnen (es kann angenommen werden, dass die Ausgabeelemente mit zunehmender x monoton abnehmen) und dann anhalten (NICHT für alle Werte von x berechnen und dann sortieren, das ist für meinen Zweck ineffizient). Gibt es eine Möglichkeit, dies mit paralleler, verzögerter oder anderer Mehrfachverarbeitung zu tun?

user247534
quelle
Sie können auch numpy verwenden. Ich habe einige Zahlen hinzugefügt. Die Auswahl [z> .1] in der Demofunktion sollte in der Hauptfunktion erfolgen, um den Code effizienter zu gestalten.
user247534
Ich weiß, dass es chaotisch wäre, aber ich würde eine Liste erstellen, sie an die Funktion übergeben und die Funktion würde das Ergebnis an diese Liste anhängen. Dann würde ich draußen prüfen, ob die Liste eine höhere Zahl enthält, und dann die Threads irgendwie beenden. Jetzt, wo ich darüber nachdenke, gibt es wahrscheinlich intelligentere Methoden, um dies zu tun, wie Queues
Maxxik CZ

Antworten:

1

Es gab keine output > a given numberAngabe, also habe ich mir nur eine ausgedacht. Nach dem Testen musste ich den Zustand für einen ordnungsgemäßen Betrieb umkehren output < a given number.

Ich würde einen Pool verwenden, die Prozesse mit einer Rückruffunktion starten, um die Stoppbedingung zu überprüfen, und dann den Pool beenden, wenn er bereit ist. Dies würde jedoch zu einer Racebedingung führen, die es ermöglichen würde, Ergebnisse aus laufenden Prozessen wegzulassen, die nicht beendet werden durften. Ich denke, diese Methode hat nur minimale Änderungen an Ihrem Code und ist sehr einfach zu lesen. Die Reihenfolge der Liste ist NICHT garantiert.

Vorteile: sehr wenig Overhead
Nachteile: könnten fehlende Ergebnisse haben.

Methode 1)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Diese Methode hat mehr Overhead, ermöglicht jedoch Prozesse, deren Abschluss begonnen hat. Methode 2)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Methode 3) Vorteile: Es werden keine Ergebnisse ausgelassen.
Nachteile: Dieser Schritt geht weit über das hinaus, was Sie normalerweise tun würden.

nimm Methode 1 und füge hinzu

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

Dann ändern stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)
Ron
quelle
0

Ich würde Dask verwenden, um parallel auszuführen, und speziell die Futures- Schnittstelle für Echtzeit-Feedback der Ergebnisse, sobald sie abgeschlossen sind. Wenn Sie fertig sind, können Sie entweder die verbleibenden Futures im Flug stornieren, die nicht benötigten Futures leasen, um asynchron zu beenden, oder den Cluster schließen.

from dask.distributed import Client, as_completed
client = Client()  # defaults to ncores workers, one thread each
y, xmin, xmax, dx = 2.,1.,30.,.1

def func(x, y):
    return x, y/x
x = arange(xmin,xmax,dx)
outx = []
output = []
futs = [client.submit(func, val, y) for val in x]
for future in as_completed(futs):
    outs = future.result()
    outx.append(outs[0])
    output.append(outs[1])
    if outs[1] < 0.1:
        break

Anmerkungen: - Ich gehe davon aus, dass Sie "kleiner als" gemeint haben, da ansonsten der erste Wert bereits übergeben wird ( y / xmin > 0.1). - Es wird nicht garantiert, dass die Ausgaben in der Reihenfolge vorliegen, in der Sie sie eingegeben haben, wenn Sie Ergebnisse abrufen möchten, sobald sie fertig sind, aber mit einer solchen schnelle Berechnung, vielleicht sind sie es immer (aus diesem Grund ließ ich die Funktion auch den Eingabewert zurückgeben) - wenn Sie aufhören zu rechnen, ist die Ausgabe kürzer als der gesamte Satz von Eingaben, daher bin ich mir nicht ganz sicher, was Sie wollen drucken.

mdurant
quelle