将 DAG 中的任务分组

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍如何使用以下设计模式对 Airflow 流水线中的任务进行分组:

  • 将 DAG 图中的任务分组。
  • 从父 DAG 触发子 DAG。
  • 使用 TaskGroup operator 将任务分组。

将 DAG 图中的任务分组

要将流水线的特定阶段中的任务分组,您可以使用 DAG 文件中任务之间的关系。

请参考以下示例:

显示分支任务的 Airflow 任务图
图 1. Airflow DAG 中的任务可以分组到一起(点击可放大)

在此工作流中,任务 op-1op-2 在初始任务 start 后一起运行。您可以使用语句 start >> [task_1, task_2] 来将任务分组,以实现此目的。

以下示例提供了此 DAG 的完整实现:

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago  DAG_NAME = "all_tasks_in_one_dag"  args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}  with DAG(dag_id=DAG_NAME, default_args=args) as dag:     start = DummyOperator(task_id="start")      task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)      task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)      some_other_task = DummyOperator(task_id="some-other-task")      task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)      task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)      end = DummyOperator(task_id="end")      start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

从父 DAG 触发子 DAG

您可以使用 TriggerDagRunOperator 运算符从一个 DAG 触发另一个 DAG。

请参考以下示例:

Airflow 任务图,显示了作为 DAG 图一部分触发的子 DAG
图 2. 可以使用 TriggerDagRunOperator 从 DAG 中触发 DAG(点击可放大)

在此工作流中,dag_1dag_2 块表示一系列任务,这些任务分组到 Cloud Composer 环境中的不同 DAG 中。

此工作流的实现需要两个独立的 DAG 文件。控制 DAG 文件如下所示:

from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.dates import days_ago   with DAG(     dag_id="controller_dag_to_trigger_other_dags",     default_args={"owner": "airflow"},     start_date=days_ago(1),     schedule_interval="@once", ) as dag:     start = DummyOperator(task_id="start")      trigger_1 = TriggerDagRunOperator(         task_id="dag_1",         trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger         conf={"message": "Hello World"},     )     trigger_2 = TriggerDagRunOperator(         task_id="dag_2",         trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger         conf={"message": "Hello World"},     )      some_other_task = DummyOperator(task_id="some-other-task")      end = DummyOperator(task_id="end")      start >> trigger_1 >> some_other_task >> trigger_2 >> end

由控制 DAG 触发的子 DAG 的实现如下所示:

from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago  DAG_NAME = "dag-to-trigger"  args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}  with DAG(dag_id=DAG_NAME, default_args=args) as dag:     dag_task = DummyOperator(task_id="dag-task")

您必须在 Cloud Composer 环境中上传两个 DAG 文件,此 DAG 才能正常运行。

使用 TaskGroup operator 对任务分组

您可以使用 TaskGroup operator 将 DAG 中的任务分组在一起。在 TaskGroup 块中定义的任务仍然是主 DAG 的一部分。

请参考以下示例:

显示两个任务组的 Airflow 任务图
图 3. 可以使用 TaskGroup operator 在界面中直观地将任务分组到一起(点击可放大)

任务 op-1op-2 分组到一个 ID 为 taskgroup_1 的块中。此工作流的实现类似于以下代码:

from airflow.models.dag import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago from airflow.utils.task_group import TaskGroup  with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:     start = DummyOperator(task_id="start")      with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:         task_1 = BashOperator(task_id="op-1", bash_command=":")         task_2 = BashOperator(task_id="op-2", bash_command=":")      with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:         task_3 = BashOperator(task_id="op-3", bash_command=":")         task_4 = BashOperator(task_id="op-4", bash_command=":")      some_other_task = DummyOperator(task_id="some-other-task")      end = DummyOperator(task_id="end")      start >> section_1 >> some_other_task >> section_2 >> end

后续步骤