AirFlow 日常任务的跑批,常常伴随着大量日志文件的产生,尤其是在任务失败尝试重启的时候,日志文件会变得异常地大。这些文件如果不能及时清理,很可能导致服务器磁盘拉爆,下面我们编写一个每天自动清理日志的任务,只保留近一个月的日志。
同时,MySQL 执行日志也应该定期进行清理,不然 MySQL 中 Airflow 库占用空间会越来越大,放任不管的话,迟早有一天 MySQL 库会崩掉。
1. 日志占用磁盘大小
先来看一下运行日志会占用我们磁盘多大的空间,可以看到轻轻松松就会占用 140G 数据占用了,这还只是一个月的数据,所以每天清理日志数据是非常有必要的。
$ du -sh ~/bigdata/airflow/logs140G logs/
2. 清理本地调度文件
AirFlow 任务运行的日志文件分为两部分,一部分为本地日志文件,它保存在 $AIRFLOW_HOME/logs/scheduler 文件夹下,通常你查看运行日志时的数字,浏览器就会下载一份文件(这份文件便是存储在 logs/scheduler 文件夹下的)。
实现的代码也很简单,就是使用 BashOperator 执行一段删除代码即可,这里设置为只保存近 30 天的日志文件,其余的都删掉。
from airflow.operators.bash_operator import BashOperatorairflow_home = os.environ.get("AIRFLOW_HOME")logging_dir = os.path.join(airflow_home, "logs", "scheduler")clean_templated = """rm -rf %s/{{ macros.ds_add(ds, -30) }}""" % logging_dircleanup_op = BashOperator(task_id="cleanup_logging",bash_command=clean_templated,dag=dag)
3. 清理MySQL库中的执行日志
借助于 MySqlOperator 来清理 MySQL 中 Airflow 执行日志,前提是 Airflow 的数据库使用的是 MySQL。
⭐ 值得注意的是,如果有任务的执行周期是 1 个月或者 2 个月执行一次,需要修改
ds_add(ds, -7)
from airflow.providers.mysql.operators.mysql import MySqlOperatortk = {"xcom":"execution_date","task_instance":"execution_date","task_fail":"execution_date","sla_miss":"execution_date","log":"execution_date","job":"start_date","dag_run":"execution_date"}for tb in tk:del_op = MySqlOperator(task_id=f"del_{tb}",sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -7) }}'" % (tb, tk[tb]),dag=dag,)
3. 完整脚本
以下为完整的清理脚本
import osfrom airflow import DAGfrom datetime import timedeltafrom airflow.utils.dates import days_agofrom airflow.operators.bash_operator import BashOperatorfrom airflow.providers.mysql.operators.mysql import MySqlOperatorairflow_home = os.environ.get("AIRFLOW_HOME")logging_dir = os.path.join(airflow_home, "logs", "scheduler")clean_templated = "rm -rf %s/{{ macros.ds_add(ds, -30) }}" % logging_dir__doc__ = "自动清理DAG日志"default_args = {'owner': 'yumingmin','depends_on_past': False,'email': ['yu_mingm623@163.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=30),'catchup': False}dag = DAG('cleanup_logging',default_args=default_args,description='Cleanup logging automatically',schedule_interval=timedelta(days=1),start_date=days_ago(1),tags=['prod'],)cleanup_op = BashOperator(task_id="cleanup_logging",bash_command=clean_templated,dag=dag)tk = {"xcom":"execution_date","task_instance":"execution_date","task_fail":"execution_date","sla_miss":"execution_date","log":"execution_date","job":"start_date","dag_run":"execution_date"}for tb in tk:del_op = MySqlOperator(task_id=f"del_{tb}",sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -30) }}'" % (tb, tk[tb]),dag=dag,)cleanup_op >> del_op# Documentation for Dag and Taskdag.doc_md = dedent("""# 自动清理DAG日志- 仅仅保留<font color='red'>30</font>天内的运行日志,其余都会被删除- 脚本使用 Bash 作为媒介来清除 logs 下 schedule 相关日期文件夹""")
,开启该任务就会每天清理前 31 天
