Airflow教程——使用Airflow实现简单的工作流调度

发布时间:2021年09月27日作者:atguigu浏览次数:2,961

Airflow是一个以编程方式编写,安排和监视工作流的平台。

使用Airflow将工作流编写任务的有向无环图(DAG)。Aiflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变得轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变得容易。

一、编写Dag任务脚本

1. 启动阿里云服务器集群,并启动hadoop集群。

2. 配置集群节点间ssh免密登录。

[root@airflowairflow]# vim /etc/hosts
172.26.16.78airflow  airflow
172.26.16.41hadoop101 hadoop101
172.26.16.39hadoop102 hadoop102
172.26.16.40hadoop103 hadoop103
 
[root@airflow~]# ssh-keygen -t rsa
[root@airflow~]# ssh-copy-id hadoop101
[root@airflow~]# ssh-copy-id hadoop102
[root@airflow~]# ssh-copy-id hadoop103

3. 创建work-py目录用于存放python调度脚本,编写.py脚本

[root@airflow~]# mkdir -p /opt/module/work-py
[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
 
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromdatetime import datetime, timedelta
 
default_args= {
    'owner': 'test_owner',
    'depends_on_past': True,
    'email': ['2473196869@qq.com'],
    'start_date':datetime(2020,12,15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
 
t1 =BashOperator(
    task_id='dwd',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2 =BashOperator(
    task_id='dws',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t3 =BashOperator(
    task_id='ads',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2.set_upstream(t1)
t3.set_upstream(t2)

脚本解读:

default_args 设置默认参数

depends_on_past 是否开启任务依赖

schedule_interval 调度频率

retries 重试次数

start_date 开始时间

BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

task_id 任务唯一标识(必填)

bash_command 具体任务执行命令

set_upstream 设置依赖 如上图所示ads任务依赖dws任务依赖dwd任务

注意:

必须导包

from airflow import DAG

from airflow.operators.bash_operator importBashOperator

4. 配置JDK

注意:ssh的目标机(hadoop002) /etc/bashrc里必须配置java环境变量,配置完毕后source。

(python3)[root@airflow work-py]# vim /etc/bashrc
(python3)[root@airflow work-py]# source /etc/bashrc
Airflow教程——使用Airflow实现简单的工作流调度

 

5. 查看Airflow配置文件,获取dag文件存放目录

(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
Airflow教程——使用Airflow实现简单的工作流调度

 

6. 按照配置文件中配置的文件路径,创建dag文件存放目录,将.py脚本放入此目录。

(python3)[root@airflow work-py]# mkdir ~/airflow/dags
(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

7. 等待一段时间,刷新任务列表,可以看到列表中,已经出现test任务。

(python3)[root@airflow work-py]# airflow list_dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_kubernetes_executor_config
example_nested_branch_dag
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test
test_utils
tutorial

8. 刷新Airflow的web页面,已经出现test任务。

Airflow教程——使用Airflow实现简单的工作流调度

 

9. 点击运行test任务。

Airflow教程——使用Airflow实现简单的工作流调度

 

10. 点击成功任务,查看日志。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

11. 查看dag图,甘特图。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

12. 查看脚本代码。

Airflow教程——使用Airflow实现简单的工作流调度

 

Dag任务操作

1. 删除dag任务。

Airflow教程——使用Airflow实现简单的工作流调度

2. 通过执行以下命令,可以重新添加dag任务。

(python3)[root@airflow work-py]# airflow list_tasks  test --tree
The'list_tasks' command is deprecated and removed in Airflow 2.0, please use'tasks list' instead
[2020-12-1511:17:08,981] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:17:08,982] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
<Task(BashOperator):dwd>
    <Task(BashOperator): dws>
        <Task(BashOperator): ads>

3. 查看当前所有dag任务,可以看到test任务被重新添加了回来。

(python3)[root@airflow work-py]#
(python3)[root@airflow work-py]# airflow list_dags
The 'list_dags'command is deprecated and removed in Airflow 2.0, please use 'dags list', or'dags report' instead
[2020-12-1511:33:57,106] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:33:57,106] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
 
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_kubernetes_executor_config
example_nested_branch_dag
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test
test_utils
tutorial

4. 重新添加的dag任务。

Airflow教程——使用Airflow实现简单的工作流调度

3配置邮件服务器

1. 首先确保所有邮箱已经开启SMTP服务。

Airflow教程——使用Airflow实现简单的工作流调度

 

2. 修改airflow配置文件,如下:

(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
smtp_host= smtp.qq.com
smtp_starttls= True
smtp_ssl= False
smtp_user= 2473196869@qq.com
#smtp_user =
smtp_password= wjmfbxkfvypdebeg
#smtp_password =
smtp_port= 587
smtp_mail_from= 2473196869@qq.com

3. 重启Airflow。

(python3)[root@airflow airflow]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15
(python3)[root@airflow airflow]# ps -ef |grep airflow
root       745    1  0 09:50 ?        00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0
root      7875 1851  0 12:51 pts/1    00:00:00 grep --color=auto airflow
(python3)[root@airflow airflow]# kill -15 745
 
 
(python3)[root@airflow airflow]# airflow webserver -p 8080 -D
(python3)[root@airflow airflow]# airflow scheduler -D

4. 重新编辑test.py脚本文件,并且替换。

[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
 
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromairflow.operators.email_operator import EmailOperator
fromdatetime import datetime, timedelta
 
default_args= {
    'owner': 'test_owner',
    'depends_on_past': True,
    'email': ['2473196869@qq.com'],
    'start_date':datetime(2020,12,15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
 
t1 =BashOperator(
    task_id='dwd',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2 =BashOperator(
    task_id='dws',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t3 =BashOperator(
    task_id='ads',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
dag=dag)
 
email=EmailOperator(
   task_id="email",
   to="2473196869@qq.com",
    subject="test-subject",
    html_content="<h1>test-content</h1>",
    cc="chaosong@qq.com",
   dag=dag)
 
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)
 
(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

5. 查看页面是否生效。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

6. 运行测试,查看运行情况和邮件。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)