Airflow教程——使用Airflow实现简单的工作流调度

Airflow是一个以编程方式编写,安排和监视工作流的平台。

使用Airflow将工作流编写任务的有向无环图(DAG)。Aiflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变得轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变得容易。

一、编写Dag任务脚本

1. 启动阿里云服务器集群,并启动hadoop集群。

2. 配置集群节点间ssh免密登录。

[root@airflowairflow]# vim /etc/hosts
172.26.16.78airflow  airflow
172.26.16.41hadoop101 hadoop101
172.26.16.39hadoop102 hadoop102
172.26.16.40hadoop103 hadoop103
 
[root@airflow~]# ssh-keygen -t rsa
[root@airflow~]# ssh-copy-id hadoop101
[root@airflow~]# ssh-copy-id hadoop102
[root@airflow~]# ssh-copy-id hadoop103

3. 创建work-py目录用于存放python调度脚本,编写.py脚本

[root@airflow~]# mkdir -p /opt/module/work-py
[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
 
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromdatetime import datetime, timedelta
 
default_args= {
    'owner': 'test_owner',
    'depends_on_past': True,
    'email': ['2473196869@qq.com'],
    'start_date':datetime(2020,12,15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
 
t1 =BashOperator(
    task_id='dwd',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2 =BashOperator(
    task_id='dws',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t3 =BashOperator(
    task_id='ads',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2.set_upstream(t1)
t3.set_upstream(t2)

脚本解读:

default_args 设置默认参数

depends_on_past 是否开启任务依赖

schedule_interval 调度频率

retries 重试次数

start_date 开始时间

BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

task_id 任务唯一标识(必填)

bash_command 具体任务执行命令

set_upstream 设置依赖 如上图所示ads任务依赖dws任务依赖dwd任务

注意:

必须导包

from airflow import DAG

from airflow.operators.bash_operator importBashOperator

4. 配置JDK

注意:ssh的目标机(hadoop002) /etc/bashrc里必须配置java环境变量,配置完毕后source。

(python3)[root@airflow work-py]# vim /etc/bashrc
(python3)[root@airflow work-py]# source /etc/bashrc
Airflow教程——使用Airflow实现简单的工作流调度

 

5. 查看Airflow配置文件,获取dag文件存放目录

(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
Airflow教程——使用Airflow实现简单的工作流调度

 

6. 按照配置文件中配置的文件路径,创建dag文件存放目录,将.py脚本放入此目录。

(python3)[root@airflow work-py]# mkdir ~/airflow/dags
(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

7. 等待一段时间,刷新任务列表,可以看到列表中,已经出现test任务。

(python3)[root@airflow work-py]# airflow list_dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_kubernetes_executor_config
example_nested_branch_dag
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test
test_utils
tutorial

8. 刷新Airflow的web页面,已经出现test任务。

Airflow教程——使用Airflow实现简单的工作流调度

 

9. 点击运行test任务。

Airflow教程——使用Airflow实现简单的工作流调度

 

10. 点击成功任务,查看日志。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

11. 查看dag图,甘特图。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

12. 查看脚本代码。

Airflow教程——使用Airflow实现简单的工作流调度

 

Dag任务操作

1. 删除dag任务。

Airflow教程——使用Airflow实现简单的工作流调度

2. 通过执行以下命令,可以重新添加dag任务。

(python3)[root@airflow work-py]# airflow list_tasks  test --tree
The'list_tasks' command is deprecated and removed in Airflow 2.0, please use'tasks list' instead
[2020-12-1511:17:08,981] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:17:08,982] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
<Task(BashOperator):dwd>
    <Task(BashOperator): dws>
        <Task(BashOperator): ads>

3. 查看当前所有dag任务,可以看到test任务被重新添加了回来。

(python3)[root@airflow work-py]#
(python3)[root@airflow work-py]# airflow list_dags
The 'list_dags'command is deprecated and removed in Airflow 2.0, please use 'dags list', or'dags report' instead
[2020-12-1511:33:57,106] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:33:57,106] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
 
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_kubernetes_executor_config
example_nested_branch_dag
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test
test_utils
tutorial

4. 重新添加的dag任务。

Airflow教程——使用Airflow实现简单的工作流调度

3配置邮件服务器

1. 首先确保所有邮箱已经开启SMTP服务。

Airflow教程——使用Airflow实现简单的工作流调度

 

2. 修改airflow配置文件,如下:

(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
smtp_host= smtp.qq.com
smtp_starttls= True
smtp_ssl= False
smtp_user= 2473196869@qq.com
#smtp_user =
smtp_password= wjmfbxkfvypdebeg
#smtp_password =
smtp_port= 587
smtp_mail_from= 2473196869@qq.com

3. 重启Airflow。

(python3)[root@airflow airflow]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15
(python3)[root@airflow airflow]# ps -ef |grep airflow
root       745    1  0 09:50 ?        00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0
root      7875 1851  0 12:51 pts/1    00:00:00 grep --color=auto airflow
(python3)[root@airflow airflow]# kill -15 745
 
 
(python3)[root@airflow airflow]# airflow webserver -p 8080 -D
(python3)[root@airflow airflow]# airflow scheduler -D

4. 重新编辑test.py脚本文件,并且替换。

[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
 
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromairflow.operators.email_operator import EmailOperator
fromdatetime import datetime, timedelta
 
default_args= {
    'owner': 'test_owner',
    'depends_on_past': True,
    'email': ['2473196869@qq.com'],
    'start_date':datetime(2020,12,15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
 
t1 =BashOperator(
    task_id='dwd',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t2 =BashOperator(
    task_id='dws',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
    dag=dag)
 
t3 =BashOperator(
    task_id='ads',
    bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
    retries=3,
dag=dag)
 
email=EmailOperator(
   task_id="email",
   to="2473196869@qq.com",
    subject="test-subject",
    html_content="<h1>test-content</h1>",
    cc="chaosong@qq.com",
   dag=dag)
 
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)
 
(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

5. 查看页面是否生效。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

6. 运行测试,查看运行情况和邮件。

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度

 

Airflow教程——使用Airflow实现简单的工作流调度
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。