Pertanyaan Cara yang tepat untuk membuat alur kerja dinamis di Aliran Udara


Masalah

Apakah ada cara di Aliran Udara untuk membuat alur kerja sehingga jumlah tugas B. * tidak diketahui hingga penyelesaian Tugas A? Saya telah melihat subdags tetapi tampaknya itu hanya dapat bekerja dengan satu set tugas statis yang harus ditentukan pada penciptaan Dag.

Akankah pemicu dag bekerja? Dan jika demikian bisa Anda berikan contoh.

Saya memiliki masalah di mana tidak mungkin untuk mengetahui jumlah tugas B yang akan diperlukan untuk menghitung Tugas C sampai Tugas A telah selesai. Setiap Tugas B. * akan memakan waktu beberapa jam untuk dihitung dan tidak dapat digabungkan.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Ide # 1

Saya tidak suka solusi ini karena saya harus membuat pemblokiran ExternalTaskSensor dan semua Tugas B. * akan memakan waktu antara 2-24 jam untuk menyelesaikannya. Jadi saya tidak menganggap ini sebagai solusi yang layak. Tentunya ada cara yang lebih mudah? Atau apakah Airflow tidak dirancang untuk ini?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

Sampai sekarang pertanyaan ini masih belum memiliki jawaban yang bagus. Saya telah dihubungi oleh beberapa orang yang mencari solusi.


32
2018-01-07 04:32


asal


Jawaban:


Beginilah cara saya melakukannya dengan permintaan yang serupa tanpa subdags apa pun:

Pertama buat metode yang mengembalikan nilai apa pun yang Anda inginkan

def values_function():
     return values

Berikutnya buat metode yang akan menghasilkan pekerjaan secara dinamis:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

Dan kemudian gabungkan mereka:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

9
2018-01-13 02:37



OA: "Apakah ada cara di Aliran Udara untuk membuat alur kerja sehingga jumlah tugas B. * tidak diketahui sampai selesainya Tugas A?"

Jawaban singkatnya adalah tidak. Aliran udara akan membangun aliran DAG sebelum mulai menjalankannya.

Yang mengatakan kami sampai pada kesimpulan sederhana, bahwa kami tidak memiliki kebutuhan seperti itu. Ketika Anda ingin memparalelkan beberapa pekerjaan, Anda harus mengevaluasi sumber daya yang Anda miliki dan bukan jumlah barang yang akan diproses.

Kami melakukannya seperti ini: kami secara dinamis menghasilkan sejumlah tugas tetap, katakanlah 10, yang akan membagi pekerjaan. Sebagai contoh jika kita perlu memproses 100 file setiap tugas akan memproses 10 dari mereka. Saya akan posting kode hari ini.


1
2018-06-14 13:44