Die ganze Idee dahinter Parallel.ForEach()
ist, dass Sie eine Reihe von Threads haben und jeder Thread einen Teil der Sammlung verarbeitet. Wie Sie bemerkt haben, funktioniert dies nicht mit async
- await
, wo Sie den Thread für die Dauer des asynchronen Aufrufs freigeben möchten.
Sie könnten das "beheben", indem Sie die ForEach()
Threads blockieren , aber das macht den ganzen Punkt von async
- zunichte await
.
Sie können stattdessen TPL Dataflow verwenden Parallel.ForEach()
, das asynchrone Task
Daten gut unterstützt.
Insbesondere könnte Ihr Code mit einem geschrieben werden TransformBlock
, der jede ID Customer
mit dem async
Lambda in einen umwandelt . Dieser Block kann so konfiguriert werden, dass er parallel ausgeführt wird. Sie würden diesen Block mit einem verknüpfen, ActionBlock
der jeden Customer
in die Konsole schreibt . Nachdem Sie das Blocknetzwerk eingerichtet haben, können Sie Post()
jede ID dem TransformBlock
.
In Code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Obwohl Sie wahrscheinlich die Parallelität der TransformBlock
auf eine kleine Konstante beschränken möchten . Sie können auch die Kapazität von einschränken TransformBlock
und die Elemente asynchron hinzufügen SendAsync()
, z. B. wenn die Sammlung zu groß ist.
Ein zusätzlicher Vorteil im Vergleich zu Ihrem Code (falls er funktioniert hat) ist, dass das Schreiben beginnt, sobald ein einzelnes Element fertig ist, und nicht wartet, bis die gesamte Verarbeitung abgeschlossen ist.
Parallel.ForEach()
vonPost()
Elementen sollte keine wirklichen Auswirkungen haben.Die Antwort von svick ist (wie immer) ausgezeichnet.
Ich finde Dataflow jedoch nützlicher, wenn Sie tatsächlich große Datenmengen übertragen müssen. Oder wenn Sie eine
async
kompatible Warteschlange benötigen .In Ihrem Fall besteht eine einfachere Lösung darin, nur die
async
Parallelität im Stil zu verwenden:quelle
Parallel.ForEach()
). Aber ich denke, es ist derzeit die beste Option, um fast jedeasync
Arbeit mit Sammlungen zu erledigen .ParallelOptions
es helfen? Es gilt nur fürParallel.For/ForEach/Invoke
, die, wie das OP festgelegt hat, hier keinen Nutzen haben.GetCustomer
Methode a zurückgibtTask<T>
, sollte man verwendenSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
unterstützt nichtasync
.Die Verwendung von DataFlow, wie von svick vorgeschlagen, kann übertrieben sein, und Stephens Antwort bietet nicht die Möglichkeit, die Parallelität des Vorgangs zu steuern. Dies kann jedoch ziemlich einfach erreicht werden:
Die
ToArray()
Aufrufe können optimiert werden, indem ein Array anstelle einer Liste verwendet und abgeschlossene Aufgaben ersetzt werden. Ich bezweifle jedoch, dass dies in den meisten Szenarien einen großen Unterschied bewirken würde. Beispielnutzung gemäß der Frage des OP:EDIT Fellow SO-Benutzer und TPL-Assistent Eli Arbel hat mich auf einen verwandten Artikel von Stephen Toub hingewiesen . Wie immer ist seine Implementierung sowohl elegant als auch effizient:
quelle
Partitioner.Create
verwendet tatsächlich die Chunk-Partitionierung, die Elemente dynamisch für die verschiedenen Aufgaben bereitstellt, sodass das von Ihnen beschriebene Szenario nicht stattfindet. Beachten Sie auch, dass die statische (vorgegebene) Partitionierung in einigen Fällen aufgrund des geringeren Overheads (insbesondere der Synchronisierung) schneller sein kann. Weitere Informationen finden Sie unter: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) die Ausnahme (innerhalb einesAggregateException
) enthält. Wenn der Anrufer verwendet wirdawait
, wird folglich eine Ausnahme in der Anrufsite ausgelöst. WartetTask.WhenAll
jedoch weiterhin auf den Abschluss aller Aufgaben und weistGetPartitions
Elemente beimpartition.MoveNext
Aufruf dynamisch zu, bis keine weiteren Elemente mehr verarbeitet werden müssen. Dies bedeutet, dass dies nicht von alleineCancellationToken
geschieht , wenn Sie keinen eigenen Mechanismus zum Stoppen der Verarbeitung hinzufügen (z. B. ).var current = partition.Current
vorher machenawait body
und danncurrent
in der Fortsetzung (ContinueWith(t => { ... }
) verwenden.Mit dem neuen AsyncEnumerator NuGet-Paket , das vor 4 Jahren nicht existierte, als die Frage ursprünglich gestellt wurde, können Sie Aufwand sparen . Hiermit können Sie den Grad der Parallelität steuern:
Haftungsausschluss: Ich bin der Autor der AsyncEnumerator-Bibliothek, die Open Source ist und unter MIT lizenziert ist, und ich poste diese Nachricht nur, um der Community zu helfen.
quelle
AsyncStreams
und ich muss sagen, dass sie ausgezeichnet ist. Kann diese Bibliothek nicht genug empfehlen.Wickeln Sie das
Parallel.Foreach
in einTask.Run()
und anstelle desawait
Schlüsselworts verwenden[yourasyncmethod].Result
(Sie müssen die Task.Run-Aktion ausführen, um den UI-Thread nicht zu blockieren.)
Etwas wie das:
quelle
Parallel.ForEach
Sie die parallele Arbeit, die blockiert, bis alle erledigt sind, und verschieben Sie das Ganze dann in einen Hintergrund-Thread, um eine reaktionsfähige Benutzeroberfläche zu erhalten. Irgendwelche Probleme damit? Vielleicht ist das ein schlafender Thread zu viel, aber es ist kurzer, lesbarer Code.Task.Run
wannTaskCompletionSource
es vorzuziehen ist.TaskCompletionSource
vorzuziehen?await
kann nach vorne verschoben werden, um den zusätzlichen Variablennamen zu speichern.Dies sollte ziemlich effizient und einfacher sein, als den gesamten TPL-Datenfluss zum Laufen zu bringen:
quelle
await
wie folgt verwendet werden :var customers = await ids.SelectAsync(async i => { ... });
?Ich bin etwas spät zum Feiern, aber vielleicht möchten Sie GetAwaiter.GetResult () verwenden, um Ihren asynchronen Code im Synchronisationskontext auszuführen, aber so parallel wie unten;
quelle
Nachdem Sie eine Reihe von Hilfsmethoden eingeführt haben, können Sie parallele Abfragen mit dieser einfachen Syntax ausführen:
Was hier passiert, ist: Wir teilen die Quellensammlung in 10 Chunks (
.Split(DegreeOfParallelism)
) auf, führen dann 10 Tasks aus, die jeweils ihre Elemente.SelectManyAsync(...)
einzeln verarbeiten ( ), und führen diese wieder in einer einzigen Liste zusammen.Erwähnenswert ist, dass es einen einfacheren Ansatz gibt:
Aber es bedarf einer Vorsichtsmaßnahme : Wenn Sie über eine zu große Quellensammlung verfügen, wird
Task
für jeden Artikel sofort ein Zeitplan festgelegt , was zu erheblichen Leistungseinbußen führen kann.Die in den obigen Beispielen verwendeten Erweiterungsmethoden sehen wie folgt aus:
quelle
Eine Erweiterungsmethode hierfür, die SemaphoreSlim verwendet und es auch ermöglicht, einen maximalen Grad an Parallelität festzulegen
Beispielnutzung:
quelle