Airflow教程——使用Airflow实现简单的工作流调度
Airflow是一个以编程方式编写,安排和监视工作流的平台。
使用Airflow将工作流编写任务的有向无环图(DAG)。Aiflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变得轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变得容易。
一、编写Dag任务脚本
1. 启动阿里云服务器集群,并启动hadoop集群。
2. 配置集群节点间ssh免密登录。
[root@airflowairflow]# vim /etc/hosts 172.26.16.78airflow airflow 172.26.16.41hadoop101 hadoop101 172.26.16.39hadoop102 hadoop102 172.26.16.40hadoop103 hadoop103 [root@airflow~]# ssh-keygen -t rsa [root@airflow~]# ssh-copy-id hadoop101 [root@airflow~]# ssh-copy-id hadoop102 [root@airflow~]# ssh-copy-id hadoop103
3. 创建work-py目录用于存放python调度脚本,编写.py脚本
[root@airflow~]# mkdir -p /opt/module/work-py [root@airflow~]# cd /opt/module/work-py/ [root@airflowwork-py]# vim test.py #!/usr/bin/python fromairflow import DAG fromairflow.operators.bash_operator import BashOperator fromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date':datetime(2020,12,15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1)) t1 =BashOperator( task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2 =BashOperator( task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t3 =BashOperator( task_id='ads', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2.set_upstream(t1) t3.set_upstream(t2)
脚本解读:
default_args 设置默认参数
depends_on_past 是否开启任务依赖
schedule_interval 调度频率
retries 重试次数
start_date 开始时间
BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。
task_id 任务唯一标识(必填)
bash_command 具体任务执行命令
set_upstream 设置依赖 如上图所示ads任务依赖dws任务依赖dwd任务
注意:
必须导包
from airflow import DAG
from airflow.operators.bash_operator importBashOperator
4. 配置JDK
注意:ssh的目标机(hadoop002) /etc/bashrc里必须配置java环境变量,配置完毕后source。
(python3)[root@airflow work-py]# vim /etc/bashrc (python3)[root@airflow work-py]# source /etc/bashrc
5. 查看Airflow配置文件,获取dag文件存放目录
(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
6. 按照配置文件中配置的文件路径,创建dag文件存放目录,将.py脚本放入此目录。
(python3)[root@airflow work-py]# mkdir ~/airflow/dags (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
7. 等待一段时间,刷新任务列表,可以看到列表中,已经出现test任务。
(python3)[root@airflow work-py]# airflow list_dags ------------------------------------------------------------------- DAGS ------------------------------------------------------------------- example_bash_operator example_branch_dop_operator_v3 example_branch_operator example_complex example_external_task_marker_child example_external_task_marker_parent example_http_operator example_kubernetes_executor_config example_nested_branch_dag example_passing_params_via_test_command example_pig_operator example_python_operator example_short_circuit_operator example_skip_dag example_subdag_operator example_subdag_operator.section-1 example_subdag_operator.section-2 example_trigger_controller_dag example_trigger_target_dag example_xcom latest_only latest_only_with_trigger test test_utils tutorial
8. 刷新Airflow的web页面,已经出现test任务。

9. 点击运行test任务。

10. 点击成功任务,查看日志。




11. 查看dag图,甘特图。


12. 查看脚本代码。

Dag任务操作
1. 删除dag任务。

2. 通过执行以下命令,可以重新添加dag任务。
(python3)[root@airflow work-py]# airflow list_tasks test --tree The'list_tasks' command is deprecated and removed in Airflow 2.0, please use'tasks list' instead [2020-12-1511:17:08,981] {__init__.py:50} INFO - Using executor SequentialExecutor [2020-12-1511:17:08,982] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags <Task(BashOperator):dwd> <Task(BashOperator): dws> <Task(BashOperator): ads>
3. 查看当前所有dag任务,可以看到test任务被重新添加了回来。
(python3)[root@airflow work-py]# (python3)[root@airflow work-py]# airflow list_dags The 'list_dags'command is deprecated and removed in Airflow 2.0, please use 'dags list', or'dags report' instead [2020-12-1511:33:57,106] {__init__.py:50} INFO - Using executor SequentialExecutor [2020-12-1511:33:57,106] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags ------------------------------------------------------------------- DAGS ------------------------------------------------------------------- example_bash_operator example_branch_dop_operator_v3 example_branch_operator example_complex example_external_task_marker_child example_external_task_marker_parent example_http_operator example_kubernetes_executor_config example_nested_branch_dag example_passing_params_via_test_command example_pig_operator example_python_operator example_short_circuit_operator example_skip_dag example_subdag_operator example_subdag_operator.section-1 example_subdag_operator.section-2 example_trigger_controller_dag example_trigger_target_dag example_xcom latest_only latest_only_with_trigger test test_utils tutorial
4. 重新添加的dag任务。

3配置邮件服务器
1. 首先确保所有邮箱已经开启SMTP服务。

2. 修改airflow配置文件,如下:
(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg smtp_host= smtp.qq.com smtp_starttls= True smtp_ssl= False smtp_user= 2473196869@qq.com #smtp_user = smtp_password= wjmfbxkfvypdebeg #smtp_password = smtp_port= 587 smtp_mail_from= 2473196869@qq.com
3. 重启Airflow。
(python3)[root@airflow airflow]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15 (python3)[root@airflow airflow]# ps -ef |grep airflow root 745 1 0 09:50 ? 00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0 root 7875 1851 0 12:51 pts/1 00:00:00 grep --color=auto airflow (python3)[root@airflow airflow]# kill -15 745 (python3)[root@airflow airflow]# airflow webserver -p 8080 -D (python3)[root@airflow airflow]# airflow scheduler -D
4. 重新编辑test.py脚本文件,并且替换。
[root@airflow~]# cd /opt/module/work-py/ [root@airflowwork-py]# vim test.py #!/usr/bin/python fromairflow import DAG fromairflow.operators.bash_operator import BashOperator fromairflow.operators.email_operator import EmailOperator fromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date':datetime(2020,12,15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1)) t1 =BashOperator( task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2 =BashOperator( task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t3 =BashOperator( task_id='ads', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1) t3.set_upstream(t2) email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
5. 查看页面是否生效。


6. 运行测试,查看运行情况和邮件。


