(二)项目涉及技术

1.6 Hive总结

1.6.1 Hive的架构

1.6.2 Hive和数据库比较

Hive 和数据库除了拥有类似的查询语言,再无类似之处。

1)数据存储位置

Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。

2)数据更新

Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,

3)执行延迟

Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。

4)数据规模

Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。

1.6.3 内部表和外部表

元数据、原始数据

1)删除数据时:

内部表:元数据、原始数据,全删除

外部表:元数据 只删除

2)在公司生产环境下,什么时候创建内部表,什么时候创建外部表?

在公司中绝大多数场景都是外部表。

自己使用的临时表,才会创建内部表;

1.6.4 4个By区别

1)Order By:全局排序,只有一个Reducer;

2)Sort By:分区内有序;

3)Distrbute By:类似MR中Partition,进行分区,结合sort by使用。

4) Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

在生产环境中Order By用的比较少,容易导致OOM。

在生产环境中Sort By+ Distrbute By用的多。

1.6.5 系统函数

1)date_add、date_sub函数(加减日期)

2)next_day函数(周指标相关)

3)date_format函数(根据格式整理日期)

4)last_day函数(求当月最后一天日期)

5)collect_set函数

6)get_json_object解析json函数

7)NVL(表达式1,表达式2)

如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。

1.6.6 自定义UDF、UDTF函数

1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?

(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。

(2)自定义UDF:继承UDF,重写evaluate方法

(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close

2)为什么要自定义UDF/UDTF?

因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

1.6.7 窗口函数

1)Rank

(1)RANK() 排序相同时会重复,总数不会变

(2)DENSE_RANK() 排序相同时会重复,总数会减少

(3)ROW_NUMBER() 会根据顺序计算

2) OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化

(1)CURRENT ROW:当前行

(2)n PRECEDING:往前n行数据

(3) n FOLLOWING:往后n行数据

(4)UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点

(5) LAG(col,n):往前第n行数据

(6)LEAD(col,n):往后第n行数据

(7) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。

3)手写TopN

1.6.8 Hive优化

1)MapJoin

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

2)行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。

行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

3)列式存储

4)采用分区技术

5)合理设置Map

mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B

mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB

通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。

需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。

6)合理设置Reduce

Reduce个数并不是越多越好

(1)过多的启动和初始化Reduce也会消耗时间和资源;

(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;

7)小文件如何产生的?

(1)动态分区插入数据,产生大量的小文件,从而导致map数量剧增;

(2)reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);

(3)数据源本身就包含大量的小文件。

8)小文件解决方案

(1)在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

(2)merge

// 输出合并小文件

SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件

SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件

SET hive.merge.size.per.task = 268435456; -- 默认256M

SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于16m该值时,启动一个独立的map-reduce任务进行文件merge

(3)开启JVM重用

set mapreduce.job.jvm.numtasks=10

9)开启map端combiner(不影响最终业务逻辑)

set hive.map.aggr=true;

10)压缩(选择快的)

设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)

set hive.exec.compress.intermediate=true --启用中间数据压缩

set mapreduce.map.output.compress=true --启用最终数据压缩

set mapreduce.map.outout.compress.codec=…; --设置压缩方式

11)采用tez引擎或者spark引擎

1.6.9 Hive解决数据倾斜方法

1)数据倾斜长啥样?

2)怎么产生的数据倾斜?

1)不同数据类型关联产生数据倾斜

情形:比如用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时。

后果:处理此特殊值的reduce耗时;只有一个reduce任务
默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方式:把数字类型转换成字符串类型

select * from users a

left outer join logs b

on a.usr_id = cast(b.user_id as string)

2)控制空值分布

在生产环境经常会用大量空值数据进入到一个reduce中去,导致数据倾斜。

解决办法:

自定义分区,将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个Reducer。

注意:对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算量大大减少

3)解决数据倾斜的方法?

1group by

注:group by 优于distinct group

解决方式:采用sum() group by的方式来替换count(distinct)完成计算。

2mapjoin

3)开启数据倾斜时负载均衡

set hive.groupby.skewindata=true;

思想:就是先随机分发并处理,再按照key group by来分发处理。

操作:当选项设定为true,生成的查询计划会有两个MRJob。

第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的原始GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

点评:它使计算变成了两个mapreduce,先在第一个中在shuffle过程partition时随机给 key打标记,使每个key随机均匀分布到各个reduce上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上。

所以需要第二次的mapreduce,这次就回归正常shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次mr中随机分布到各个节点完成。

1.6.10 Hive里边字段的分隔符用的什么?为什么用\t?有遇到过字段里边有\t的情况吗,怎么处理的?

hive 默认的字段分隔符为ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和javaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束。一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。

1.6.11 Tez引擎优点?

Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。

Mr/tez/spark区别:

Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘  DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。

Tez引擎:完全基于内存。  注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。

1.6.12 MySQL

1)MySQL之元数据备份(项目中遇到的问题)

元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)

Keepalived或者用mycat

2)MySQL utf8超过字节数问题

MySQL的utf8编码最多存储3个字节,当数据中存在表情号、特色符号时会占用超过3个字节数的字节,那么会出现错误 Incorrect string value: '\xF0\x9F\x91\x91\xE5\xB0...'

解决办法:将utf8修改为utf8mb4

首先修改库的基字符集和数据库排序规则

再使用 SHOW VARIABLES LIKE '%char%'; 命令查看参数

确保这几个参数的value值为utf8mb4 如果不是使用set命令修改

如:set character_set_server = utf8mb4;

1.6.13 Union与Union all区别

1)union会将联合的结果集去重,效率较union all差

2)union all不会对结果集去重,所以效率高

1.7 Sqoop

1.7.1 Sqoop参数

/opt/module/sqoop/bin/sqoop import \

--connect \

--username \

--password \

--target-dir \

--delete-target-dir \

--num-mappers \

--fields-terminated-by   \

--query   "$2" ' and $CONDITIONS;'

1.7.2 Sqoop导入导出Null存储一致性问题

Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。

1.7.3 Sqoop数据导出一致性问题

场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。

官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.

–staging-table方式

sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string '\N'

1.7.4 Sqoop底层运行的任务是什么

只有Map阶段,没有Reduce阶段的任务。默认是4个MapTask。

1.7.5 Sqoop一天导入多少数据

100万日活=》10万订单,1人10条,每天1g左右业务数据

Sqoop每天将1G的数据量导入到数仓。

1.7.6 Sqoop数据导出的时候一次执行多长时间

每天晚上00:30开始执行,Sqoop任务一般情况40 -50分钟的都有。取决于数据量(11:11,6:18等活动在1个小时左右)。

1.7.7 Sqoop在导入数据的时候数据倾斜

Sqoop 抽数的并行化主要涉及到两个参数:num-mappers:启动N个map来并行导入数据,默认4个;split-by:按照某一列来切分表的工作单元。

通过ROWNUM() 生成一个严格均匀分布的字段,然后指定为分割字段

1.7.8 Sqoop数据导出Parquet(项目中遇到的问题)

Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式

(1)创建临时表,把Parquet中表数据导入到临时表,把临时表导出到目标表用于可视化

(2)Sqoop里面有参数,可以直接把Parquet转换为text

(3)ads层建表的时候就不要建Parquet表

1.8 Azkaban

1.8.1 每天集群运行多少指标?

每天跑100多个指标,有活动时跑200个左右。

1.8.2 任务挂了怎么办?

运行成功或者失败都会发邮件、发钉钉、集成自动打电话(项目中遇到的问题)

最主要的解决方案就是重新跑。

1.9 Scala

1.9.1 开发环境

要求掌握必要的Scala开发环境搭建技能。

1.9.2 变量和数据类型

掌握var和val的区别

掌握数值类型(Byte、Short、Int、Long、Float、Double、Char)之间的转换关系

1.9.3 流程控制

掌握if-else、for、while等必要的流程控制结构,掌握如何实现break、continue的功能。

1.9.4 函数式编程

掌握高阶函数、匿名函数、函数柯里化、函数参数以及函数至简原则。

1.9.5 面向对象

掌握Scala与Java继承方面的区别、单例对象(伴生对象)、特质的用法及功能。

1.9.6 集合

掌握常用集合的使用、集合常用的计算函数。

1.9.7 模式匹配

掌握模式匹配的用法

1.9.8 异常

掌握异常常用操作即可

1.9.9 隐式转换

掌握隐式方法、隐式参数、隐式类,以及隐式解析机制

1.9.10 泛型

掌握泛型语法