Pertanyaan Bagaimana saya bisa mencegah kelanjutan sinkron pada Tugas?


Saya memiliki beberapa perpustakaan (socket networking) kode yang menyediakan Taskberbasis API untuk menunggu tanggapan atas permintaan, berdasarkan TaskCompletionSource<T>. Namun, ada gangguan dalam TPL karena tampaknya tidak mungkin untuk mencegah kelanjutan sinkron. Apa yang akan saya lakukan seperti yang bisa dilakukan adalah:

  • katakan a TaskCompletionSource<T> yang seharusnya tidak mengizinkan penelepon untuk melampirkan TaskContinuationOptions.ExecuteSynchronously, atau
  • tetapkan hasilnya (SetResult / TrySetResult) dengan cara yang menentukan itu TaskContinuationOptions.ExecuteSynchronously harus diabaikan, menggunakan kolam sebagai gantinya

Secara khusus, masalah yang saya miliki adalah bahwa data yang masuk sedang diproses oleh pembaca yang berdedikasi, dan jika pemanggil dapat dilampirkan TaskContinuationOptions.ExecuteSynchronously mereka dapat menghambat pembaca (yang mempengaruhi lebih dari sekedar mereka). Sebelumnya, saya telah mengatasi ini dengan beberapa peretasan yang mendeteksi apakah itu apa saja kelanjutan hadir, dan jika mereka mendorong penyelesaian ke ThreadPoolNamun, ini memiliki dampak yang signifikan jika penelepon telah jenuh antrian pekerjaan mereka, karena penyelesaian tidak akan diproses secara tepat waktu. Jika mereka menggunakannya Task.Wait() (atau serupa), pada dasarnya mereka akan memecah sendiri. Demikian juga, inilah mengapa pembaca menggunakan benang khusus daripada menggunakan pekerja.

Begitu; sebelum saya mencoba dan merengek tim TPL: apakah saya kehilangan pilihan?

Poin-poin penting:

  • Saya tidak ingin penelepon eksternal dapat membajak utas saya
  • Saya tidak bisa menggunakan ThreadPool sebagai implementasi, karena perlu bekerja ketika kolam jenuh

Contoh di bawah ini menghasilkan output (pemesanan dapat bervariasi berdasarkan waktu):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Masalahnya adalah kenyataan bahwa seorang penelepon acak berhasil mendapatkan kelanjutan pada "Main thread". Dalam kode asli, ini akan mengganggu pembaca utama; hal buruk!

Kode:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

76
2018-03-22 14:56


asal


Jawaban:


Baru di .NET 4.6:

.NET 4.6 berisi yang baru TaskCreationOptions: RunContinuationsAsynchronously.


Karena Anda bersedia menggunakan Refleksi untuk mengakses bidang pribadi ...

Anda dapat menandai Tugas TCS dengan TASK_STATE_THREAD_WAS_ABORTED bendera, yang akan menyebabkan semua kelanjutan tidak harus digarisbawahi.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edit:

Alih-alih menggunakan pantulan Refleksi, saya sarankan Anda menggunakan ekspresi. Ini jauh lebih mudah dibaca dan memiliki keunggulan sebagai PCL-kompatibel:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Tanpa menggunakan Refleksi:

Jika ada yang tertarik, saya telah menemukan cara untuk melakukan ini tanpa Refleksi, tetapi itu sedikit "kotor" juga, dan tentu saja membawa penalti yang tidak dapat diabaikan:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

46
2018-03-23 08:06



Saya tidak berpikir ada sesuatu di TPL yang akan menyediakan eksplisit Kontrol API selesai TaskCompletionSource.SetResult lanjutan. Saya memutuskan untuk mempertahankan jawaban awal untuk mengendalikan perilaku ini untuk async/await skenario.

Berikut ini adalah solusi lain yang memaksakan asynchronous atas ContinueWith, jika tcs.SetResult-kelanjutan yang dipicu terjadi pada utas yang sama SetResult dipanggil:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Diperbarui untuk menjawab komentar:

Saya tidak mengontrol pemanggil - saya tidak bisa menggunakannya secara spesifik   lanjutkan-dengan varian: jika saya bisa, masalahnya tidak akan ada di   tempat pertama

Saya tidak sadar Anda tidak mengontrol pemanggil. Namun demikian, jika Anda tidak mengendalikannya, Anda mungkin tidak lulus TaskCompletionSource obyek langsung kepada si penelepon, juga. Secara logis, Anda akan melewati token bagian dari itu, yaitu tcs.Task. Dalam hal ini, solusinya mungkin lebih mudah, dengan menambahkan metode ekstensi lain ke atas:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Menggunakan:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Ini sebenarnya bekerja untuk keduanya await dan ContinueWith (biola) dan bebas dari pantangan refleksi.


10
2018-03-23 04:16



Bagaimana bukannya melakukan

var task = source.Task;

Anda melakukan ini sebagai gantinya

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Jadi Anda selalu menambahkan satu kelanjutan yang akan dijalankan secara asynchronous dan kemudian tidak masalah jika pelanggan menginginkan kelanjutan dalam konteks yang sama. Ini semacam menjilat tugas, bukan?


3
2018-03-22 22:25



jika Anda bisa dan siap untuk menggunakan refleksi, ini harus dilakukan;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

3
2018-03-23 02:22



Diperbarui, Saya memposting a jawaban terpisah untuk menangani ContinueWith sebagai lawan await (karena ContinueWith tidak peduli dengan konteks sinkronisasi saat ini).

Anda bisa menggunakan konteks sinkronisasi bodoh untuk memaksakan asynchrony pada kelanjutan yang dipicu oleh panggilan SetResult/SetCancelled/SetException di TaskCompletionSource. Saya percaya konteks sinkronisasi saat ini (pada titik await tcs.Task) adalah kriteria yang digunakan TPL untuk memutuskan apakah akan membuat kelanjutan seperti sinkron atau asinkron.

Berikut ini berfungsi untuk saya:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync diimplementasikan seperti ini:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext  sangat murah dalam hal overhead yang ditambahkannya. Bahkan, pendekatan yang sangat mirip diambil oleh implementasi WPF Dispatcher.BeginInvoke.

TPL membandingkan konteks sinkronisasi target pada titik await untuk itu dari titik tcs.SetResult. Jika konteks sinkronisasi sama (atau tidak ada konteks sinkronisasi di kedua tempat), kelanjutannya disebut secara langsung, serentak. Kalau tidak, itu antri menggunakan SynchronizationContext.Post pada konteks sinkronisasi target, yaitu, normal await tingkah laku. Apa pendekatan ini selalu memaksakan SynchronizationContext.Post perilaku (atau kelanjutan thread pool jika tidak ada konteks sinkronisasi target).

Diperbarui, ini tidak akan berhasil task.ContinueWith, karena ContinueWith tidak peduli dengan konteks sinkronisasi saat ini. Namun itu berhasil await task (biola). Ini juga bekerja untuk await task.ConfigureAwait(false).

OTOH, pendekatan ini bekerja untuk ContinueWith.


3
2018-03-23 01:42



Itu simulasikan batalkan pendekatan tampak sangat bagus, tetapi mengarah ke utas pembajakan TPL dalam beberapa skenario.

Saya kemudian memiliki implementasi yang mirip dengan memeriksa objek lanjutan, tapi hanya memeriksa apa saja kelanjutan karena sebenarnya ada terlalu banyak skenario untuk kode yang diberikan untuk bekerja dengan baik, tetapi itu berarti bahwa bahkan hal-hal seperti Task.Wait menghasilkan pencarian thread-pool.

Akhirnya, setelah memeriksa banyak sekali IL, satu-satunya aman dan berguna skenario adalah SetOnInvokeMres skenario (kelanjutan manual-reset-event-slim). Ada banyak skenario lain:

  • beberapa tidak aman, dan menyebabkan pembajakan benang
  • sisanya tidak berguna, karena pada akhirnya mengarah ke thread-pool

Jadi pada akhirnya, saya memilih untuk memeriksa objek kelanjutan non-null; jika tidak, baik (tidak ada kelanjutan); jika itu tidak null, pemeriksaan khusus untuk SetOnInvokeMres - jika itu: baik (aman untuk memohon); jika tidak, biarkan ulir-ulir melakukan TrySetComplete, tanpa memberitahu tugas untuk melakukan sesuatu yang istimewa seperti spoofing abort. Task.Wait menggunakan SetOnInvokeMres pendekatan, yang merupakan skenario spesifik yang ingin kami coba sangat sulit untuk tidak deadlock.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

3
2017-09-01 12:30