Pertanyaan Gandakan pengecualian dengan BroadcastBlock di TPL Dataflow


Saya mencoba menggunakan TPL Dataflow untuk membuat jalur pipa. Sejauh ini semua berfungsi dengan baik, dengan pipeline saya didefinisikan sebagai berikut (meskipun masalah saya hanya dengan penyiar, penyerahan Berhasil, penyerahan gagal):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

Masalah yang saya miliki adalah dengan penyebaran Pengecualian. Karena BroadcastBlock saya menyebarkan penyelesaiannya (dan karena itu setiap Kesalahan) menjadi dua blok, jika pengecualian terjadi, itu akan disebarkan ke kedua blok. Jadi ketika saya melakukannya

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

Saya berakhir dengan pengecualian agregat yang mengandung dua pengecualian. Saat ini yang terbaik yang dapat saya lakukan adalah memfilter ini, yaitu:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

tapi saya ingin tahu apakah ada cara yang lebih baik untuk melakukan ini. yaitu jika hanya satu pengecualian yang terjadi, saya hanya ingin satu pengecualian ditingkatkan. Saya baru menggunakan Dataflow, jadi temukan saja semua konvensi.


32
2018-02-03 17:47


asal


Jawaban:


Saya telah menulis contoh TPL DataFlow (https://github.com/squideyes/PodFetch) yang membutuhkan pendekatan yang sedikit berbeda untuk penyelesaian dan penanganan kesalahan. Berikut kode yang relevan dari Baris 171-201 dari Program.cs:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

Seperti yang Anda lihat, saya menghubungkan beberapa blok TPL bersama-sama kemudian mendapatkan prima untuk menangani penyelesaian menggunakan metode ekstensi HandleCompletion:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

Sangat penting, saya menyebut scraper.Complete () ketika saya selesai melewati objek ke blok pertama dalam rantai. Dengan itu, metode ekstensi HandleCompletion kemudian berhubungan dengan kelanjutan. Dan, karena saya menunggu untuk mengambil (blok terakhir dalam rantai untuk menyelesaikan), mudah untuk menangkap kesalahan yang dihasilkan dalam try / catch.


1
2017-10-09 19:27