Wie kann die Anzahl der gleichzeitigen asynchronen E / A-Vorgänge begrenzt werden?

114
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Hier ist das Problem, es startet 1000+ gleichzeitige Webanfragen. Gibt es eine einfache Möglichkeit, die gleichzeitige Anzahl dieser asynchronen http-Anforderungen zu begrenzen? Damit werden nicht mehr als 20 Webseiten gleichzeitig heruntergeladen. Wie geht das am effizientesten?

Trauercodierer
quelle
2
Wie unterscheidet sich das von Ihrer vorherigen Frage ?
Svick
1
stackoverflow.com/questions/9290498/… Mit einem ParallelOptions-Parameter.
Chris Disley
4
@ChrisDisley, dies wird nur den Start der Anforderungen parallelisieren.
Spender
@svick ist richtig, wie ist es anders? Übrigens, ich liebe die Antwort dort stackoverflow.com/a/10802883/66372
eglasius
3
Außerdem HttpClientist IDisposable, und Sie sollten es entsorgen, besonders wenn Sie 1000+ von ihnen verwenden werden. HttpClientkann als Singleton für mehrere Anforderungen verwendet werden.
Shimmy Weitzhandler

Antworten:

160

Sie können dies definitiv in den neuesten Versionen von Async für .NET mit .NET 4.5 Beta tun. Der vorherige Beitrag von 'usr' verweist auf einen guten Artikel von Stephen Toub, aber die weniger angekündigten Neuigkeiten sind, dass das asynchrone Semaphor es tatsächlich in die Beta-Version von .NET 4.5 geschafft hat

Wenn Sie sich unsere geliebte SemaphoreSlimKlasse ansehen (die Sie verwenden sollten, da sie leistungsfähiger als das Original ist Semaphore), bietet sie jetzt eine WaitAsync(...)Reihe von Überladungen mit allen erwarteten Argumenten - Zeitüberschreitungsintervalle, Stornierungs-Token, all Ihre üblichen Planungsfreunde: )

Stephen ist auch eine neuere Blog - Post über die neuen .NET 4.5 Goodies geschrieben , die mit Beta herauskam siehe Was ist neu für Parallelismus in .NET 4.5 Beta .

Zuletzt finden Sie hier einen Beispielcode zur Verwendung von SemaphoreSlim für die Drosselung asynchroner Methoden:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Zuletzt, aber wahrscheinlich eine Erwähnung wert, ist eine Lösung, die TPL-basierte Planung verwendet. Sie können delegatengebundene Aufgaben in der TPL erstellen, die noch nicht gestartet wurden, und einen benutzerdefinierten Aufgabenplaner zulassen, um die Parallelität zu begrenzen. Tatsächlich gibt es hier ein MSDN-Beispiel dafür:

Siehe auch TaskScheduler .

Theo Yaung
quelle
3
Ist eine Parallele mit einem begrenzten Grad an Parallelität kein besserer Ansatz? msdn.microsoft.com/en-us/library/…
GreyCloud
2
Warum entsorgen Sie sich nichtHttpClient
Shimmy Weitzhandler
4
@GreyCloud: Funktioniert Parallel.ForEachmit synchronem Code. Auf diese Weise können Sie asynchronen Code aufrufen.
Josh Noe
2
@ TheMonarch du liegst falsch . Außerdem ist es immer eine gute Angewohnheit, alle IDisposables usingoder try-finallyAussagen einzuwickeln und ihre Entsorgung sicherzustellen.
Shimmy Weitzhandler
29
Angesichts der Beliebtheit dieser Antwort ist darauf hinzuweisen, dass HttpClient eine einzelne gemeinsame Instanz sein kann und sollte und nicht eine Instanz pro Anforderung.
Rupert Rawnsley
15

Wenn Sie über eine IEnumerable (dh URL-Zeichenfolgen) verfügen und mit jeder dieser Operationen gleichzeitig eine E / A-gebundene Operation ausführen möchten (dh eine asynchrone http-Anforderung erstellen) UND optional möchten Sie auch die maximale Anzahl gleichzeitiger Operationen festlegen E / A-Anforderungen in Echtzeit. So können Sie das tun. Auf diese Weise verwenden Sie Thread Thread et al. Nicht. Die Methode verwendet Semaphoreslim, um die maximale Anzahl gleichzeitiger E / A-Anforderungen zu steuern, ähnlich einem Schiebefenstermuster, das eine Anforderung abschließt, das Semaphor verlässt und die nächste eingeht.

Verwendung: Warten auf ForEachAsync (urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
Dogu Arslan
quelle
Nein, Sie sollten SemaphoreSlim in dieser Implementierung und Verwendung nicht explizit entsorgen müssen, da es intern innerhalb der Methode verwendet wird und die Methode nicht auf ihre AvailableWaitHandle-Eigenschaft zugreift. In diesem Fall hätten wir es entweder entsorgen oder in einen using-Block einschließen müssen.
Dogu Arslan
1
Ich denke nur an die besten Praktiken und Lektionen, die wir anderen Menschen beibringen. A usingwäre schön.
AgentFire
Nun, diesem Beispiel kann ich folgen, aber wenn ich versuche herauszufinden, was der beste Weg ist, dies zu tun, habe ich im Grunde einen Throttler, aber mein Func würde eine Liste zurückgeben, die ich letztendlich in einer endgültigen Liste aller abgeschlossenen, wenn fertig ... möchte benötigen auf Liste gesperrt, haben Sie Vorschläge.
Seabizkit
Sie können die Methode leicht aktualisieren, sodass die Liste der tatsächlichen Aufgaben zurückgegeben wird und Sie auf Task.WhenAll in Ihrem aufrufenden Code warten. Sobald Task.WhenAll abgeschlossen ist, können Sie jede Aufgabe in der Liste auflisten und ihre Liste zur endgültigen Liste hinzufügen. Ändern Sie die Methodensignatur in 'public static IEnumerable <Task <TOut>> ForEachAsync <TIn, TOut> (IEnumerable <TIn> inputEnumerable, Func <TIn, Task <TOut>> asyncProcessor, int? MaxDegreeOfParallelism = null)'
Dogu Arslan
7

Leider fehlen in .NET Framework die wichtigsten Kombinatoren für die Orchestrierung paralleler asynchroner Aufgaben. Es ist so etwas nicht eingebaut.

Schauen Sie sich die AsyncSemaphore- Klasse an, die von Stephen Toub erstellt wurde. Was Sie wollen, heißt Semaphor, und Sie benötigen eine asynchrone Version davon.

usr
quelle
12
Beachten Sie: "Leider fehlen in .NET Framework die wichtigsten Kombinatoren für die Orchestrierung paralleler asynchroner Aufgaben. Es ist keine solche Funktion integriert." ist ab .NET 4.5 Beta nicht mehr korrekt. SemaphoreSlim bietet jetzt WaitAsync (...) Funktionalität :)
Theo Yaung
Sollte SemaphoreSlim (mit seinen neuen asynchronen Methoden) AsyncSemphore vorgezogen werden, oder hat die Implementierung von Toub noch einen Vorteil?
Todd Menier
Meiner Meinung nach sollte der eingebaute Typ bevorzugt werden, da er wahrscheinlich gut getestet und gut gestaltet ist.
usr
4
Stephen fügte einen Kommentar als Antwort auf eine Frage in seinem Blog-Beitrag hinzu, in der bestätigt wurde, dass die Verwendung von SemaphoreSlim für .NET 4.5 im Allgemeinen der richtige Weg ist.
jdasilva
7

Es gibt viele Fallstricke und die direkte Verwendung eines Semaphors kann in Fehlerfällen schwierig sein. Daher würde ich empfehlen, das AsyncEnumerator NuGet-Paket zu verwenden, anstatt das Rad neu zu erfinden:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
Serge Semenov
quelle
4

Das Beispiel von Theo Yaung ist nett, aber es gibt eine Variante ohne Liste wartender Aufgaben.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
vitidev
quelle
4
Beachten Sie, dass bei der Verwendung dieses Ansatzes eine Gefahr besteht: Ausnahmen, die in ProccessUrloder deren Unterfunktionen auftreten, werden tatsächlich ignoriert. Sie werden in Aufgaben erfasst, aber nicht an den ursprünglichen Anrufer von weitergeleitet Check(...). Persönlich verwende ich deshalb immer noch Aufgaben und deren Kombinatorfunktionen wie WhenAllund WhenAny-, um eine bessere Fehlerausbreitung zu erzielen. :)
Theo Yaung
3

SemaphoreSlim kann hier sehr hilfreich sein. Hier ist die Erweiterungsmethode, die ich erstellt habe.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Beispielnutzung:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
quelle
0

Alte Frage, neue Antwort. @vitidev hatte einen Codeblock, der in einem von mir überprüften Projekt fast intakt wiederverwendet wurde. Nach einer Diskussion mit einigen Kollegen fragte man: "Warum verwenden Sie nicht einfach die integrierten TPL-Methoden?" ActionBlock sieht dort wie der Gewinner aus. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . Wahrscheinlich wird sich kein vorhandener Code ändern, aber auf jeden Fall wird versucht, dieses Nuget zu übernehmen und Mr. Softys Best Practice für gedrosselte Parallelität wiederzuverwenden.

Keine Rückerstattung Keine Rückgabe
quelle
0

Hier ist eine Lösung, die die Faulheit von LINQ ausnutzt. Es entspricht funktional der akzeptierten Antwort , verwendet jedoch Worker-Tasks anstelle von a SemaphoreSlim, wodurch der Speicherbedarf des gesamten Vorgangs verringert wird. Lassen Sie es zunächst ohne Drosselung funktionieren. Der erste Schritt besteht darin, unsere URLs in eine Vielzahl von Aufgaben umzuwandeln.

string[] urls =
{
    "https://stackoverflow.com",
    "https://superuser.com",
    "https://serverfault.com",
    "https://meta.stackexchange.com",
    // ...
};
var httpClient = new HttpClient();
var tasks = urls.Select(async (url) =>
{
    return (Url: url, Html: await httpClient.GetStringAsync(url));
});

Der zweite Schritt besteht darin, awaitalle Aufgaben gleichzeitig mit der folgenden Task.WhenAllMethode auszuführen :

var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
    Console.WriteLine($"Url: {result.Url}, {result.Html.Length:#,0} chars");
}

Ausgabe:

URL: https://stackoverflow.com , 105,574 Zeichen
Url: https://superuser.com , 126,953 Zeichen
Url: https://serverfault.com , 125,963 Zeichen
Url: https://meta.stackexchange.com , 185,276 Zeichen
...

Durch die Implementierung von Microsoft wirdTask.WhenAll die bereitgestellte Aufzählung für ein Array sofort materialisiert, sodass alle Aufgaben gleichzeitig gestartet werden. Das wollen wir nicht, weil wir die Anzahl der gleichzeitigen asynchronen Operationen begrenzen wollen. Wir müssen also eine Alternative implementieren WhenAll, die unsere Aufzählung sanft und langsam auflistet. Wir werden dies tun, indem wir eine Anzahl von Worker-Tasks erstellen (die dem gewünschten Grad an Parallelität entsprechen), und jede Worker-Task zählt unsere aufzählbaren Aufgaben einzeln auf, wobei eine Sperre verwendet wird, um sicherzustellen, dass jede URL-Task verarbeitet wird von nur einer Arbeiteraufgabe. Dann müssen wir awaitalle Arbeiteraufgaben erledigen und schließlich die Ergebnisse zurückgeben. Hier ist die Implementierung:

public static async Task<T[]> WhenAll<T>(IEnumerable<Task<T>> tasks,
    int concurrencyLevel)
{
    if (tasks is ICollection<Task<T>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(tasks));
    var locker = new object();
    var results = new List<T>();
    var failed = false;
    using (var enumerator = tasks.GetEnumerator())
    {
        var workerTasks = Enumerable.Range(0, concurrencyLevel)
        .Select(async _ =>
        {
            try
            {
                while (true)
                {
                    Task<T> task;
                    int index;
                    lock (locker)
                    {
                        if (failed) break;
                        if (!enumerator.MoveNext()) break;
                        task = enumerator.Current;
                        index = results.Count;
                        results.Add(default); // Reserve space in the list
                    }
                    var result = await task.ConfigureAwait(false);
                    lock (locker) results[index] = result;
                }
            }
            catch (Exception)
            {
                lock (locker) failed = true;
                throw;
            }
        }).ToArray();
        await Task.WhenAll(workerTasks).ConfigureAwait(false);
    }
    lock (locker) return results.ToArray();
}

... und hier ist, was wir in unserem ursprünglichen Code ändern müssen, um die gewünschte Drosselung zu erreichen:

var results = await WhenAll(tasks, concurrencyLevel: 2);

Es gibt einen Unterschied in der Behandlung der Ausnahmen. Der native Task.WhenAllwartet darauf, dass alle Aufgaben abgeschlossen sind, und aggregiert alle Ausnahmen. Die obige Implementierung wird sofort nach Abschluss der ersten fehlerhaften Aufgabe beendet.

Theodor Zoulias
quelle
Eine AC # 8-Implementierung, die eine zurückgibt IAsyncEnumerable<T>, finden Sie hier .
Theodor Zoulias
-1

Obwohl 1000 Aufgaben möglicherweise sehr schnell in die Warteschlange gestellt werden, kann die Bibliothek für parallele Aufgaben nur gleichzeitige Aufgaben verarbeiten, die der Anzahl der CPU-Kerne auf dem Computer entsprechen. Das bedeutet, dass bei einem Vier-Kern-Computer zu einem bestimmten Zeitpunkt nur vier Aufgaben ausgeführt werden (es sei denn, Sie verringern den MaxDegreeOfParallelism).

Scottm
quelle
8
Ja, aber das bezieht sich nicht auf asynchrone E / A-Operationen. Der obige Code startet mehr als 1000 gleichzeitige Downloads, selbst wenn er auf einem einzelnen Thread ausgeführt wird.
Trauer-Codierer
Ich habe das awaitSchlüsselwort dort nicht gesehen. Das Entfernen sollte das Problem lösen, richtig?
Scottm
2
Die Bibliothek kann sicherlich mehr Aufgaben (mit dem RunningStatus) gleichzeitig ausführen als die Anzahl der Kerne. Dies ist insbesondere bei E / A-gebundenen Aufgaben der Fall.
Svick
@svick: yep. Wissen Sie, wie Sie die maximal gleichzeitigen TPL-Aufgaben (keine Threads) effizient steuern können?
Trauer-Codierer
-1

Parallele Berechnungen sollten verwendet werden, um CPU-gebundene Operationen zu beschleunigen. Hier geht es um E / A-gebundene Operationen. Ihre Implementierung sollte rein asynchron sein , es sei denn, Sie überwältigen den ausgelasteten Single Core auf Ihrer Multi-Core-CPU.

BEARBEITEN Ich mag den Vorschlag von usr, hier ein "asynchrones Semaphor" zu verwenden.

GregC
quelle
Guter Punkt! Obwohl jede Aufgabe hier Async- und Sync-Code enthält (Seite asynchron heruntergeladen und dann synchron verarbeitet). Ich versuche, den Synchronisierungsteil des Codes auf CPUs zu verteilen und gleichzeitig die Anzahl der gleichzeitigen asynchronen E / A-Operationen zu begrenzen.
Trauer-Codierer
Warum? Da das gleichzeitige Starten von mehr als 1000 http-Anforderungen möglicherweise nicht für die Netzwerkkapazität des Benutzers geeignet ist.
Spender
Parallele Erweiterungen können auch zum Multiplexen von E / A-Vorgängen verwendet werden, ohne dass eine reine asynchrone Lösung manuell implementiert werden muss. Ich bin damit einverstanden, dass dies als schlampig angesehen werden kann, aber solange Sie die Anzahl der gleichzeitigen Vorgänge eng begrenzen, wird der Threadpool wahrscheinlich nicht zu stark belastet.
Sean U
3
Ich glaube nicht, dass diese Antwort eine Antwort liefert. Rein asynchron zu sein ist hier nicht genug: Wir wollen die physischen E / A wirklich auf nicht blockierende Weise drosseln.
usr
1
Hmm ... ich bin mir nicht sicher, ob ich damit einverstanden bin ... Wenn bei der Arbeit an einem großen Projekt zu viele Entwickler diese Ansicht vertreten, werden Sie verhungern, obwohl der Beitrag jedes Entwicklers für sich allein nicht ausreicht, um die Dinge über den Rand zu bringen. Angesichts der Tatsache, dass es nur einen ThreadPool gibt, auch wenn Sie ihn halb respektvoll behandeln ... Wenn alle anderen das Gleiche tun, können Probleme auftreten. Als solches rate ich immer davon ab , lange Sachen im ThreadPool laufen zu lassen.
Spender
-1

Verwenden MaxDegreeOfParallelismSie diese Option, die Sie angeben können in Parallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
Sean U.
quelle
4
Ich denke nicht, dass das funktioniert. GetStringAsync(url)soll mit gerufen werden await. Wenn Sie den Typ von überprüfen var html, ist dies ein Task<string>, nicht das Ergebnis string.
Neal Ehardt
2
@NealEhardt ist richtig. Parallel.ForEach(...)ist zum parallelen Ausführen von synchronen Codeblöcken vorgesehen (z. B. auf verschiedenen Threads).
Theo Yaung
-1

Im Wesentlichen möchten Sie für jede URL, die Sie treffen möchten, eine Aktion oder Aufgabe erstellen, diese in eine Liste einfügen und diese Liste dann verarbeiten, um die Anzahl zu begrenzen, die parallel verarbeitet werden kann.

Mein Blog-Beitrag zeigt, wie dies sowohl mit Aufgaben als auch mit Aktionen gemacht wird, und enthält ein Beispielprojekt, das Sie herunterladen und ausführen können, um beide in Aktion zu sehen.

Mit Aktionen

Wenn Sie Aktionen verwenden, können Sie die integrierte .Net Parallel.Invoke-Funktion verwenden. Hier beschränken wir uns darauf, höchstens 20 Threads parallel auszuführen.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

Mit Aufgaben

Bei Aufgaben ist keine Funktion integriert. Sie können jedoch die verwenden, die ich in meinem Blog zur Verfügung stelle.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

Wenn Sie dann Ihre Aufgabenliste erstellen und die Funktion aufrufen, um sie ausführen zu lassen, beispielsweise maximal 20 gleichzeitig, können Sie Folgendes tun:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
tödlicher Hund
quelle
Ich denke, Sie geben nur initialCount für SemaphoreSlim an und müssen den 2. Parameter, dh maxCount, im Konstruktor von SemaphoreSlim angeben.
Jay Shah
Ich möchte, dass jede Antwort von jeder Aufgabe in einer Liste verarbeitet wird. Wie kann ich eine Rückgabe erhalten
?
-1

Dies ist keine gute Vorgehensweise, da eine globale Variable geändert wird. Es ist auch keine allgemeine Lösung für Async. Aber es ist einfach für alle Instanzen von HttpClient, wenn das alles ist, wonach Sie suchen. Sie können einfach versuchen:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
Symbiont
quelle