日常任务中,我们会遇到一些串联任务,比如下方这个 run_case_sql1 ➜ run_case_sql2 ➜ ... ➜ run_case_sql9,其实每个 task 运行的功能都是类似的,就是运行文件中的一段 SQL。这里只有 9 个还好,但是如果有 100 多个,一个一个写 task 会让代码显得很冗余,而且非常容易出粗,一点也不 Pythonic。
1. SQL文件
我们有一个 case.sql 文件,里面存放了我们需要跑的 SQL 代码,内容如下(中间省去 sql2 到 sql8 代码):
--[sql1]DROP TABLE IF EXISTS test.tb1;CREATE TABLE IF NOT EXISTS test.tb1STORED AS PARQUET ASSELECT * FROM test.tb;COMPUTE STATS test.tb1;--[end]--[sql9]DROP TABLE IF EXISTS test.tb9;CREATE TABLE IF NOT EXISTS test.tb9STORED AS PARQUET ASSELECT * FROM test.tb8;COMPUTE STATS test.tb9;--[end]
2. 复用函数
这是一段复用函数,主要用于对应文件中的 SQL 代码:
def run_case_sql(i=0):"""运行sql函数"""batch_date = airflow_get_date(today_key, 0)context = {"BATCH_DATE": batch_date}sql_file = os.path.join(sql_path, "case.sql")runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)runner.run_sql_block(sql_name="sql" + str(i))runner.close()def run_phone_sql(i=0):"""运行sql函数"""batch_date = airflow_get_date(today_key, 0)context = {"BATCH_DATE": batch_date}sql_file = os.path.join(sql_path, "phone.sql")runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)runner.run_sql_block(sql_name="sql" + str(i))runner.close()
3. DAG文件
下面我们编写 DAG 文件
default_args = {"owner": "yumingmin","depends_on_past": False,"start_date": days_ago(1),"email": ["yu_mingm623@163.com"],"email_on_failure": False,"email_on_retry": False,"retries": 60,"retry_delay": timedelta(minutes=5),"catchup": False}dag = DAG(project_name,default_args=default_args,description=project_name,schedule_interval='30 8 * * *')taskflows = []for j in range(0, 9):taskflows.append(PythonOperator(task_id="run_case_sql" + str(j + 1),pyttaskflowshon_callable=run_case_sql,op_kwargs={"i": j + 1},dag=dag))if j != 0:taskflows[-2] >> taskflows[-1]for j in range(0, 2):order.append(PythonOperator(task_id="run_phone_sql" + str(j + 1),python_callable=run_case_sql,op_kwargs={"i": j + 1},dag=dag))taskflows[-2] >> taskflows[-1]
这样我们就代码量极大地进行了缩短,Perfect 🤟
