Azkaban概述
1)一个完整的数据分析系统通常都是由大量任务单元组成:
Shell脚本程序,Java程序,MapReduce程序、Hive脚本等
2)各任务单元之间存在时间先后及前后依赖关系
3)为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;
常见工作流调度系统
1)简单的任务调度:直接使用Linux的Crontab来定义;
2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如Azkaban、Ooize、Airflow、DolphinScheduler等。
Azkaban与Oozie对比
对市面上最流行的两种调度器,给出以下详细对比,以供技术选型参考。总体来说,Ooize相比Azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器Azkaban是很不错的候选对象。
Azkaban安装
准备三台测试机hadoop102,hadoop103,hadoop104
1)将azkaban-db-3.84.4.tar.gz,
azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上传到hadoop102的/opt/software路径
[atguigu@hadoop102 software]$ ll
总用量 35572
-rw-r--r--. 1 atguigu atguigu 6433 4月 18 17:24 azkaban-db-3.84.4.tar.gz
-rw-r--r--. 1 atguigu atguigu 16175002 4月 18 17:26 azkaban-exec-server-3.84.4.tar.gz
-rw-r--r--. 1 atguigu atguigu 20239974 4月 18 17:26 azkaban-web-server-3.84.4.tar.gz
2)新建/opt/module/azkaban目录,并将所有tar包解压到这个目录下
[atguigu@hadoop102 software]$ mkdir /opt/module/azkaban
3)解压azkaban-db-3.84.4.tar.gz、 azkaban-exec-server-3.84.4.tar.gz和azkaban-web-server-3.84.4.tar.gz到/opt/module/azkaban目录下
[atguigu@hadoop102 software]$ tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
4)进入到/opt/module/azkaban目录,依次修改名称
[atguigu@hadoop102 azkaban]$ mv azkaban-exec-server-3.84.4/ azkaban-exec
[atguigu@hadoop102 azkaban]$ mv azkaban-web-server-3.84.4/ azkaban-web
配置MySQL
1)正常安装MySQL
详见《尚硅谷大数据技术之Hive》
2)启动MySQL
[atguigu@hadoop102 azkaban]$ mysql -uroot -p000000
3)登陆MySQL,创建Azkaban数据库
mysql> create database azkaban
4)创建azkaban用户并赋予权限
设置密码有效长度4位及以上
mysql> set global validate_password_length=4;
设置密码策略最低级别
mysql> set global validate_password_policy=0;
创建Azkaban用户,任何主机都可以访问Azkaban,密码是000000
mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';
赋予Azkaban用户增删改查权限
mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
5)创建Azkaban表,完成后退出MySQL
mysql> use azkaban;
mysql> source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
mysql> quit;
6)更改MySQL包大小;防止Azkaban连接MySQL阻塞
[atguigu@hadoop102 software]$ sudo vim /etc/my.cnf
在[mysqld]下面加一行max_allowed_packet=1024M,修改以字节发送给服务器的最大数据包大小:
[mysqld]
max_allowed_packet=1024M
8)重启MySQL
[atguigu@hadoop102 software]$ sudo systemctl restart mysqld
配置Executor Server
Azkaban Executor Server处理工作流和作业的实际执行。
1)编辑azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
修改如下标红的属性
#...
default.timezone.id=Asia/Shanghai
#...
azkaban.webserver.url=http://hadoop102:8081
executor.port=12321
#...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
在最后添加
executor.metric.reports=true
executor.metric.milisecinterval.default=60000
2)编辑commonprivate.properties
[atguigu@hadoop102 jobtypes]$ vim /opt/module/azkaban/azkaban-exec/plugins/jobtypes/commonprivate.properties
添加
azkaban.native.lib=false
若不添加这个参数,在执行Job时可能会报如下错:
azkaban.utils.UndefinedPropertyException: Missing required property 'azkaban.native.lib'
at azkaban.utils.Props.getString(Props.java:450)
at azkaban.jobExecutor.ProcessJob.run(ProcessJob.java:242)
at azkaban.execapp.JobRunner.runJob(JobRunner.java:823)
at azkaban.execapp.JobRunner.doRun(JobRunner.java:602)
at azkaban.execapp.JobRunner.run(JobRunner.java:563)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2)同步azkaban-exec到所有节点
[atguigu@hadoop102 azkaban]$ xsync /opt/module/azkaban/azkaban-exec
3)必须进入到
/opt/module/azkaban/azkaban-exec路径,分别在三台机器上,启动executor server
[atguigu@hadoop102 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop103 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop104 azkaban-exec]$ bin/start-exec.sh
注意:如果在
/opt/module/azkaban/azkaban-exec目录下出现executor.port文件,说明启动成功
4)下面激活executor,需要
[atguigu@hadoop102 azkaban-exec]$ curl -G "hadoop102:$(<./executor.port)/executor?action=activate" && echo
[atguigu@hadoop103 azkaban-exec]$ curl -G "hadoop103:$(<./executor.port)/executor?action=activate" && echo
[atguigu@hadoop104 azkaban-exec]$ curl -G "hadoop104:$(<./executor.port)/executor?action=activate" && echo
如果三台机器都出现如下提示,则表示激活成功
{"status":"success"}
配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
1)编辑azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
修改如下属性
...
default.timezone.id=Asia/Shanghai
...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
说明:
#StaticRemainingFlowSize:正在排队的任务数;
#CpuStatus:CPU占用情况
#MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行。
2)修改azkaban-users.xml文件,添加atguigu用户
[atguigu@hadoop102 azkaban-web]$ vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="atguigu" roles="metrics,admin" username="atguigu"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
3)必须进入到hadoop102的
/opt/module/azkaban/azkaban-web路径,启动web server
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
4)访问http://hadoop102:8081,并用atguigu用户登陆
Work Flow案例
1)在windows环境,新建azkaban.project文件,编辑内容如下
azkaban-flow-version: 2.0
注意:该文件作用,是采用新的Flow-API方式解析flow文件。
2)新建basic.flow文件,内容如下
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
(1)Name:job名称
(2)Type:job类型。command表示你要执行作业的方式为命令
(3)Config:job配置
3)将azkaban.project、basic.flow文件压缩到一个zip文件,文件名称必须是英文。
4)在WebServer新建项目:hadoop102:8081/index
5)给项目名称命名和添加项目描述
6)first.zip文件上传
7)选择上传的文件
8)执行任务流
9)在日志中,查看运行结果
作业依赖案例
需求:JobA和JobB执行完了,才能执行JobC
具体步骤:
1)修改basic.flow为如下内容
nodes:
- name: jobC
type: command
# jobC 依赖 JobA和JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
1)dependsOn:作业依赖,后面案例中演示
2)将修改后的basic.flow和azkaban.project压缩成second.zip文件
3)重复上述HelloWorld后续步骤。
内嵌工作流案例
需求:JobA执行完后执行JobB,JobA和JobB形成一个工作流embedded_flow;JobC依赖于embedded_flow该工作流。
1)工作流定义文件中可以添加子工作流,例如:
nodes:
- name: jobC
type: command
# jobC 依赖embedded_flow
dependsOn:
- embedded_flow
config:
command: echo "I’m JobC"
- name: embedded_flow
type: flow
nodes:
- name: jobB
type: noop
dependsOn:
- jobA
- name: jobA
type: command
config:
command: pwd
参数说明:
type: 作业类型。flow表示,定义为工作流类型
type: noop 什么也不处理
2)将修改后的basic.flow和azkaban.project压缩成three.zip文件
3)重复上述HelloWorld后续步骤。
自动失败重试案例
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
具体步骤:
1)编译配置流
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 1000000
参数说明:
retries:重试次数
retry.backoff:重试的时间间隔
2)将修改后的basic.flow和azkaban.project压缩成four.zip文件
3)重复上述HelloWorld后续步骤。
4)执行并观察到一次失败+三次重试
5)也可以点击上图中的Log,在任务日志中看到,总共执行了4次。
6)也可以在Flow全局配置中添加任务失败重试配置,此时重试配置会应用到所有Job。
案例如下:
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
手动失败重试案例
需求:JobA=》JobB(依赖于A)=》JobC=》JobD=》JobE=》JobF。生产环境,任何Job都有可能挂掉,可以根据需求执行想要执行的Job。
具体步骤:
1)编译配置流
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
2)将修改后的basic.flow和azkaban.project压缩成five.zip文件
3)重复上述HelloWorld后续步骤。
Enable和Disable下面都分别有如下参数:
Parents:该作业的上一个任务
Ancestors:该作业前的所有任务
Children:该作业后的一个任务
Descendents:该作业后的所有任务
Enable All:所有的任务
4)可以根据需求选择性执行对应的任务。
上一篇: DataFrame转换为RDD_大数据培训
下一篇: 大数据培训技术DataSet