Ich möchte await
auf das Ergebnis von BlockingCollection<T>.Take()
asynchron, damit ich den Thread nicht blockiere. Auf der Suche nach so etwas:
var item = await blockingCollection.TakeAsync();
Ich weiß, dass ich das tun kann:
var item = await Task.Run(() => blockingCollection.Take());
aber das tötet irgendwie die ganze Idee, weil ThreadPool
stattdessen ein anderer Thread (von ) blockiert wird.
Gibt es eine Alternative?
await Task.Run(() => blockingCollection.Take())
die Aufgabe verwenden, wird sie in einem anderen Thread ausgeführt und Ihr UI-Thread wird nicht blockiert. Ist das nicht der Punkt?Task
API. Es kann beispielsweise von ASP.NET aus verwendet werden. Der fragliche Code würde dort nicht gut skalieren.ConfigureAwait
es nach dem verwendet würdeRun()
? [ed.Antworten:
Ich kenne vier Alternativen.
Der erste ist Channels , der eine threadsichere Warteschlange bereitstellt, die Asynchronität
Read
undWrite
Operationen unterstützt. Kanäle sind stark optimiert und unterstützen optional das Löschen einiger Elemente, wenn ein Schwellenwert erreicht wird.Der nächste stammt
BufferBlock<T>
aus dem TPL-Datenfluss . Wenn Sie nur einen einzigen Verbraucher haben, können SieOutputAvailableAsync
oder verwendenReceiveAsync
oder einfach mit einem verknüpfenActionBlock<T>
. Weitere Informationen finden Sie in meinem Blog .Die letzten beiden sind von mir erstellte Typen, die in meiner AsyncEx-Bibliothek verfügbar sind .
AsyncCollection<T>
istasync
nahezu gleichwertig mitBlockingCollection<T>
, in der Lage, eine gleichzeitige Produzenten- / Verbrauchersammlung wieConcurrentQueue<T>
oder zu verpackenConcurrentBag<T>
. Sie könnenTakeAsync
damit Elemente aus der Sammlung asynchron konsumieren. Weitere Informationen finden Sie in meinem Blog .AsyncProducerConsumerQueue<T>
ist eine tragbarereasync
kompatible Produzenten- / Konsumentenwarteschlange. Sie könnenDequeueAsync
damit Elemente aus der Warteschlange asynchron konsumieren. Weitere Informationen finden Sie in meinem Blog .Die letzten drei dieser Alternativen ermöglichen synchrone und asynchrone Puts und Takes.
quelle
AsyncCollection.TryTakeAsync
, kann sie jedoch in der heruntergeladenenNito.AsyncEx.Coordination.dll 5.0.0.0
(neuesten Version) nicht finden . Die referenzierte Nito.AsyncEx.Concurrent.dll ist im Paket nicht vorhanden . Was vermisse ich?while ((result = await collection.TryTakeAsync()).Success) { }
. Warum wurde es entfernt?... oder Sie können dies tun:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }
Einfache, voll funktionsfähige asynchrone FIFO-Warteschlange.
quelle
for
? Wenn das Semaphor freigegeben wird, muss in der Warteschlange mindestens ein Element aus der Warteschlange entfernt werden.TryDequeue
are, return mit einem Wert oder gar nicht zurückgibt. Wenn Sie mehr als einen Leser haben, kann derselbe Leser zwei (oder mehr) Elemente verbrauchen, bevor ein anderer Leser vollständig wach ist. Ein ErfolgWaitAsync
ist nur ein Signal dafür, dass sich möglicherweise Elemente in der Warteschlange befinden, die verbraucht werden müssen. Dies ist keine Garantie.If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.
von docs.microsoft.com/en-us/dotnet/api/… Wie ist es erfolgreichWaitAsync
, wenn keine Elemente in der Warteschlange stehen? Wenn N Release mehr als N Verbraucher weckt, alssemaphore
gebrochen ist. Ist es nicht?Hier ist eine sehr grundlegende Implementierung von a
BlockingCollection
, die das Warten unterstützt, mit vielen fehlenden Funktionen. Es verwendet dieAsyncEnumerable
Bibliothek, die eine asynchrone Aufzählung für C # -Versionen ermöglicht, die älter als 8.0 sind.public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } }
Anwendungsbeispiel:
var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer);
Ausgabe:
Update: Mit der Veröffentlichung von C # 8 ist die asynchrone Aufzählung zu einer integrierten Sprachfunktion geworden. Die erforderlichen Klassen (
IAsyncEnumerable
,IAsyncEnumerator
) sind in .NET Core 3.0 eingebettet und werden als Paket für .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ) angeboten.Hier ist eine alternative
GetConsumingEnumerable
Implementierung mit der neuen C # 8-Syntax:public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } }
Beachten Sie die Koexistenz von
await
undyield
in derselben Methode.Anwendungsbeispiel (C # 8):
var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } });
Beachten Sie die
await
vor demforeach
.quelle
AsyncBlockingCollection
unsinnig ist. Etwas kann nicht gleichzeitig asynchron und blockierend sein, da diese beiden Konzepte die genauen Gegensätze sind!Wenn Ihnen ein kleiner Hack nichts ausmacht, können Sie diese Erweiterungen ausprobieren.
public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } }
quelle