当我们将任务在 Airflow 上部署好之后,很多时候我们都需要回溯历史一段时间(比如近6个月)的数据,但之前的刷数据方式都是先通过修改 Admin ➜ Variable 中的任务日期变量,然后手动重新跑任务,这种情况回溯个 10 天数据还可以,但如果回溯近半年的数据真的是要累死,而且每次都要等任务运行完之后再去修改,以便下一次手动跑任务,真的是苦不堪言。
1. 写回溯历史的脚本
上面说到,部署上线的 DAG 任务是不能直接回溯历史的,这里一个比较笨的方式就是,将上线代码中函数整理出来,写一个 for 循环脚本来刷数据(因为任务中大概率都是包含多个 task,所以循环中也用不了多进程),这种方式也可以达到刷数据的目的,就是每次需要编写一个回溯脚本,还是有点麻烦的。
2. Backfill回溯数据
如果你看过 Airflow 的官网文档,在其 Quick Start 中就有提到 backfill ,且有一个示例代码,回溯 example_bash_operator 任务 2021-05-01 至 2021-05-02 的数据:
$ airflow dags backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
2.1 tutorial任务回溯
再来看一个官方 tutorial 示例代码:
from datetime import timedeltafrom textwrap import dedentfrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.utils.dates import days_agodefault_args = {'owner': 'airflow','depends_on_past': False,'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),}with DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1),start_date=days_ago(2),tags=['example'],) as dag:# t1, t2 and t3 are examples of tasks created by instantiating operatorst1 = BashOperator(task_id='print_date',bash_command='date',)t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',retries=3,)t1.doc_md = dedent("""\#### Task DocumentationYou can document your task using the attributes `doc_md` (markdown),`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which getsrendered in the UI's Task Instance Details page.""")dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAGdag.doc_md = """This is a documentation placed anywhere""" # otherwise, type it like thistemplated_command = dedent("""{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"{% endfor %}""")t3 = BashOperator(task_id='templated',depends_on_past=False,bash_command=templated_command,params={'my_param': 'Parameter I passed in'},)t1 >> [t2, t3]
我们使用 backfill 来回溯这个任务的 2天数据,看看什么效果。
$ airflow dags backfill tutorial.py -s 2021-06-01 -e 2021-06-02
任务运行完之后,查看一下 2021-06-01 任务执行的日志:
# print_date[2021-08-05 14:01:46,636] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'date'][2021-08-05 14:01:46,642] {subprocess.py:74} INFO - Output:[2021-08-05 14:01:46,643] {subprocess.py:78} INFO - Thu Aug 5 14:01:46 CST 2021 ⭐# templated[2021-08-05 14:02:09,081] {subprocess.py:63} INFO - Running command: ['bash', '-c', '\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n'][2021-08-05 14:02:09,087] {subprocess.py:74} INFO - Output:[2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-01 ⭐[2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-08
着重观察一下两个 ⭐ 处,对于 print_date task,我们看到日志中打印的还是 2021-08-05 的日期,而对于 templated task,日志中如我们所期望那样,打印出了 2021-06-01 日期,所以正确回溯历史的关键点在于 {{ ds }} 变量如何获取。
2.2 编写测试脚本
因为我们绝大多数的任务都是使用 PythonOperator 来进行了,而示例中是一个 BashOperator ,接下来我们编写测试脚本,看如何在正确在代码中获取该变量。
from datetime import timedelta, datetimefrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatorfrom airflow.utils.dates import days_agodef print_date():print("today is: {{ ds }}")print("today is: {{ macros.ds_add(ds, 7) }}")default_args = {'owner': 'yumingmin','depends_on_past': False,'email': ['yu_mingm623@163.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),'catchup': False}dag = DAG('backfill_dags',default_args=default_args,description='run backfill dags correctly',schedule_interval=timedelta(days=1),start_date=days_ago(1),tags=['demo'],)print_date_op = PythonOperator(task_id="print_date",python_callable=print_date,dag=dag)
回溯一下该任务 2021-06-01 至 2021-06-02 的数据:
$ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02
📋输出日志:
[2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ ds }}[2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}[2021-08-05 15:27:53,181] {python.py:151} INFO - Done. Returned value was: None
2.3 example_python_operator示例
示例中还有一个 example_python_operator DAG,让我们来看看有什么可参考的东西。
def print_context(ds, **kwargs):"""Print the Airflow context and ds variable from the context."""pprint(kwargs)print(ds)return 'Whatever you return gets printed in the logs'run_this = PythonOperator(task_id='print_the_context',python_callable=print_context,)
print_context 函数中接收了一个参数 ds,好像我们已经发现了怎么去做了。
2.4 第二版测试脚本
def print_date(ds, **kwargs):print("today is: ", datetime.now().strftime("%Y-%m-%d"))print("today is: ", kwargs["run_date"])print("today is: ", ds)print("today is: {{ ds }}")print("today is: {{ macros.ds_add(ds, 7) }}")print_date_op = PythonOperator(task_id="print_date",python_callable=print_date,op_kwargs={"run_date": "{{ ds }}"},dag=dag)
再次回溯一下相关数据,--reset-dagsruns 表示重新运行已经完成状态的 DAG:
$ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02 --reset-dagruns
📋再次查看一下输入日志:
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-08-05[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ ds }}[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}
Wow,可以了,正确接收到该变量了,完美🤟。
同样,我们可以看到有两种方式可以接收回溯的日期,另外一种就是 op_kwargs={"run_date": "{{ ds }}"} 的传参方式。
值得注意的是,在 Airflow 1.x 版本中,是不可以直接收
ds参数的。
2.5 按照顺序依次回溯
使用上述方式,会导致同一时间触发太多的任务并行跑,我们可以通过设置 DAG 的 max_active_runs=1 来限定每次只跑一个任务,同时需要 depends_on_past 设置为 True。
default_args = {'owner': 'yumingmin','depends_on_past': True,'email': ['yumingmin@airflow.com'],'start_date': days_ago(1),'retries': 60,'retry_delay': timedelta(minutes=5),'catchup': False,}dag = DAG(dag_id=config["project"]["task_name"],default_args=default_args,description=config["project"]["task_name_zh"],schedule_interval='@daily',max_active_runs=1,tags=['model'],)
完成上述操作后,就可以根据给定时间范围,依次回溯历史数据了。
3. 扩展
3.1 backfill命令
❯ airflow dags backfill -husage: airflow dags backfill [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT][-x] [-n] [-e END_DATE] [-i] [-I] [-l] [-m][--pool POOL] [--rerun-failed-tasks][--reset-dagruns] [-B] [-s START_DATE][-S SUBDIR] [-t TASK_REGEX] [-v] [-y]dag_idRun subsections of a DAG for a specified date range. If reset_dag_run option is used, backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances within the backfill date range. If rerun_failed_tasks is used, backfill will auto re-run the previous failed task instances within the backfill date rangepositional arguments:dag_id The id of the dagoptional arguments:-h, --help show this help message and exit-c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf attribute--delay-on-limit DELAY_ON_LIMITAmount of time in seconds to wait when the limit on maximum active dag runs (max_active_runs) has been reached before trying to execute a dag run again-x, --donot-pickle Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code-n, --dry-run Perform a dry run for each task. Only renders Template Fields for each task, nothing else-e END_DATE, --end-date END_DATEOverride end_date YYYY-MM-DD-i, --ignore-dependenciesSkip upstream tasks, run only the tasks matching the regexp. Only works in conjunction with task_regex-I, --ignore-first-depends-on-pastIgnores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past)-l, --local Run the task using the LocalExecutor-m, --mark-success Mark jobs as succeeded without running them--pool POOL Resource pool to use--rerun-failed-tasks if set, the backfill will auto-rerun all the failed tasks for the backfill date range instead of throwing exceptions⭐--reset-dagruns if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs⭐-B, --run-backwards if set, the backfill will run tasks from the most recent day first. if there are tasks that depend_on_past this option will throw an exception-s START_DATE, --start-date START_DATEOverride start_date YYYY-MM-DD-S SUBDIR, --subdir SUBDIRFile location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'-t TASK_REGEX, --task-regex TASK_REGEXThe regex to filter specific task_ids to backfill (optional)-v, --verbose Make logging output more verbose-y, --yes Do not prompt to confirm reset. Use with care!
--run-backwards 参数表示倒序执行,对于日期任务有特殊要求可以使用下这个参数。
3.2 macros宏
参考文档:https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
