Ich habe einen Bibliothekscode (Socket Networking), der eine Task
API auf der Basis von ausstehenden Antworten auf Anforderungen basierend auf bereitstellt TaskCompletionSource<T>
. Es ist jedoch ärgerlich in der TPL, dass es unmöglich zu sein scheint, synchrone Fortsetzungen zu verhindern. Was würde ich gerne der Lage sein zu tun , entweder:
- Sagen Sie einem,
TaskCompletionSource<T>
dass es Anrufern nicht gestattet sein sollte, sich mitTaskContinuationOptions.ExecuteSynchronously
oder zu verbinden - Stellen Sie das Ergebnis (
SetResult
/TrySetResult
) so ein, dass angegeben wird, dassTaskContinuationOptions.ExecuteSynchronously
es ignoriert werden soll, und verwenden Sie stattdessen den Pool
Insbesondere habe ich das Problem, dass die eingehenden Daten von einem dedizierten Lesegerät verarbeitet werden und wenn ein Anrufer eine Verbindung herstellen kann TaskContinuationOptions.ExecuteSynchronously
, kann er das Lesegerät blockieren (was mehr als nur sie betrifft). Früher habe ich um diese von einigen Hacks gearbeitet, der erkennt , ob irgendwelche Fortsetzungen vorhanden sind, und wenn sie drückt er den Abschluss auf die ThreadPool
jedoch hat dies erhebliche Auswirkungen , wenn der Anrufer seine Arbeit Warteschlange gesättigt ist, da der Abschluss nicht bearbeitet erhalten rechtzeitig. Wenn sie Task.Wait()
(oder ähnliches) verwenden, blockieren sie sich im Wesentlichen selbst. Aus diesem Grund befindet sich der Leser in einem dedizierten Thread, anstatt Worker zu verwenden.
Damit; Bevor ich versuche, das TPL-Team zu nerven: Fehlt mir eine Option?
Wichtige Punkte:
- Ich möchte nicht, dass externe Anrufer meinen Thread entführen können
- Ich kann das nicht
ThreadPool
als Implementierung verwenden, da es funktionieren muss, wenn der Pool gesättigt ist
Das folgende Beispiel erzeugt eine Ausgabe (die Reihenfolge kann je nach Timing variieren):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
Das Problem ist die Tatsache, dass es einem zufälligen Anrufer gelungen ist, eine Fortsetzung des "Hauptthreads" zu erhalten. Im realen Code würde dies den primären Leser unterbrechen; schlechte Dinge!
Code:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
quelle
TaskCompletionSource
mit meiner eigenen API zu verpacken , um einen direkten Aufruf von zu verhindernContinueWith
, da wederTaskCompletionSource
nochTask
gut für die Vererbung von ihnen geeignet ist.Task
, was ausgesetzt ist, nicht dasTaskCompletionSource
. Das (eine andere APIThreadPool
für diese (die ich bereits erwähnt - es Probleme verursacht), oder Sie haben ein dedizierter Thread „Fortsetzungen pending“, und sie dann (continations mitExecuteSynchronously
angegeben) kann kapern , dass eine stattdessen - was genau das gleiche Problem verursacht, weil es bedeutet, dass Fortsetzungen für andere Nachrichten blockiert werden können, was wiederum mehrere AnruferAntworten:
Neu in .NET 4.6:
.NET 4.6 enthält eine neue
TaskCreationOptions
:RunContinuationsAsynchronously
.Da Sie bereit sind, Reflection zu verwenden, um auf private Felder zuzugreifen ...
Sie können die Aufgabe des TCS mit dem
TASK_STATE_THREAD_WAS_ABORTED
Flag markieren , wodurch alle Fortsetzungen nicht inline gesetzt werden.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Bearbeiten:
Anstatt Reflection emit zu verwenden, empfehle ich Ihnen, Ausdrücke zu verwenden. Dies ist viel besser lesbar und hat den Vorteil, dass es PCL-kompatibel ist:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Ohne Reflection:
Wenn jemand interessiert ist, habe ich einen Weg gefunden, dies ohne Reflexion zu tun, aber es ist auch ein bisschen "schmutzig" und bringt natürlich eine nicht zu vernachlässigende Perf-Strafe mit sich:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
quelle
TaskCreationOptions.DoNotInline
- und würde nicht einmal eine Änderung derTaskCompletionSource
ILGenerator
etc: github.com/StackExchange/StackExchange.Redis/blob/master/…Ich glaube nicht, dass TPL irgendetwas enthält, das eine explizite API-Kontrolle über
TaskCompletionSource.SetResult
Fortsetzungen bietet . Ich beschloss, meine erste Antwort für die Steuerung dieses Verhaltens fürasync/await
Szenarien beizubehalten.Hier ist eine andere Lösung, die asynchron auferlegt
ContinueWith
, wenn dietcs.SetResult
ausgelöste Fortsetzung auf demselben Thread stattfindet, auf dem sieSetResult
aufgerufen wurde:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }
Aktualisiert, um den Kommentar zu adressieren:
Mir war nicht bewusst, dass Sie den Anrufer nicht kontrollieren. Wenn Sie es jedoch nicht steuern, übergeben Sie das
TaskCompletionSource
Objekt wahrscheinlich auch nicht direkt an den Aufrufer. Logischerweise würden Sie den Token- Teil davon übergeben, dhtcs.Task
. In diesem Fall ist die Lösung möglicherweise noch einfacher, indem Sie eine weitere Erweiterungsmethode hinzufügen:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }
Verwenden:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);
Dies funktioniert
await
ContinueWith
tatsächlich für beide und ( Geige ) und ist frei von Reflexionshacks.quelle
Was ist mit anstatt zu tun
var task = source.Task;
Sie tun dies stattdessen
var task = source.Task.ContinueWith<Int32>( x => x.Result );
Sie fügen also immer eine Fortsetzung hinzu, die asynchron ausgeführt wird, und dann spielt es keine Rolle, ob die Abonnenten eine Fortsetzung im selben Kontext wünschen. Es ist eine Art Curry für die Aufgabe, nicht wahr?
quelle
ContinueWith
undawait
normalerweise zu vermeiden hart versuchen (von für die bereits vollständig Überprüfung usw.) - und da dies zwingen würde alles auf die Arbeiter, es würde tatsächlich die Situation verschärfen. Es ist eine positive Idee, und ich danke Ihnen dafür: aber in diesem Szenario hilft es nicht.Wenn Sie Reflexion verwenden können und wollen, sollte dies der Fall sein.
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
quelle
Task.Run(() => tcs.SetResult(result))
?Aktualisiert , gab ich eine gesonderte Antwort zu behandeln ,
ContinueWith
im Gegensatz zuawait
(weilContinueWith
nicht über den aktuellen Synchronisationskontext schert).Sie könnten einen stummen Synchronisationskontext verwenden Asynchronität zu verhängen bei Fortsetzung durch den Aufruf ausgelöst
SetResult/SetCancelled/SetException
aufTaskCompletionSource
. Ich glaube, der aktuelle Synchronisationskontext (zum Zeitpunkt vonawait tcs.Task
) ist das Kriterium, anhand dessen TPL entscheidet, ob eine solche Fortsetzung synchron oder asynchron gemacht werden soll.Folgendes funktioniert für mich:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsync
wird folgendermaßen implementiert:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContext
ist sehr billig in Bezug auf den Overhead, den es hinzufügt. Ein sehr ähnlicher Ansatz wird bei der Implementierung von WPF verfolgtDispatcher.BeginInvoke
.TPL vergleicht den Zielsynchronisationskontext am Punkt von
await
mit dem des Punkts vontcs.SetResult
. Wenn der Synchronisationskontext derselbe ist (oder an beiden Stellen kein Synchronisationskontext vorhanden ist), wird die Fortsetzung direkt und synchron aufgerufen. Andernfalls wird es unter Verwendung des ZielsynchronisationskontextsSynchronizationContext.Post
, dh des normalenawait
Verhaltens , in die Warteschlange gestellt . Dieser Ansatz erzwingt immer dasSynchronizationContext.Post
Verhalten (oder eine Fortsetzung des Pool-Threads, wenn kein Zielsynchronisationskontext vorhanden ist).Aktualisiert funktioniert dies nicht
task.ContinueWith
, daContinueWith
der aktuelle Synchronisationskontext nicht berücksichtigt wird. Es funktioniert jedoch fürawait task
( Geige ). Es funktioniert auch fürawait task.ConfigureAwait(false)
.OTOH, dieser Ansatz funktioniert für
ContinueWith
.quelle
tcs.SetResult
Aufrufs. Auf diese Weise wird es atomar und threadsicher, da die Fortsetzung selbst entweder in einem anderen Pool-Thread oder in der ursprünglichen Synchronisierung erfolgt. Kontext erfasst beiawait tcs.Task
. UndSynchronizationContext.SetSynchronizationContext
selbst ist sehr billig, viel billiger als ein Thread-Schalter selbst.ThreadPool
. Mit dieser Lösung wird die TPL tatsächlich verwendetThreadPool
, wenn keine Synchronisierung vorhanden war. Kontext (oder es war die grundlegende Standardeinstellung) beiawait tcs.Task
. Dies ist jedoch das Standard-TPL-Verhalten.Der simulierte Abbruchansatz sah wirklich gut aus, führte jedoch in einigen Szenarien zu TPL-Hijacking-Threads .
Ich hatte dann eine Implementierung, die der Überprüfung des Fortsetzungsobjekts ähnelte , aber nur nach einer Fortsetzung suchte, da es tatsächlich zu viele Szenarien gibt, als dass der angegebene Code gut funktionieren könnte, aber das bedeutete, dass selbst solche Dinge
Task.Wait
zu einer Thread-Pool-Suche führten.Letztendlich ist nach der Inspektion vieler, vieler IL das einzig sichere und nützliche Szenario das
SetOnInvokeMres
Szenario (manuelles Zurücksetzen, Ereignis, schlanke Fortsetzung). Es gibt viele andere Szenarien:Am Ende entschied ich mich, nach einem Nicht-Null-Fortsetzungsobjekt zu suchen. wenn es null ist, gut (keine Fortsetzung); Wenn es nicht null ist, prüfen Sie im Sonderfall auf
SetOnInvokeMres
- wenn dies der Fall ist: fine (sicher aufzurufen); Andernfalls lassen Sie den Thread-Pool das ausführenTrySetComplete
, ohne die Aufgabe anzuweisen, etwas Besonderes wie Spoofing-Abbruch auszuführen .Task.Wait
verwendet denSetOnInvokeMres
Ansatz, der das spezifische Szenario ist, das wir wirklich versuchen wollen, nicht zu blockieren.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
quelle