【Interview】Hive原理及调优

发布时间 2023-06-08 22:57:47作者: FlowersandBoys

关于Hive的参数配置:

Hive的参数配置有 3 种配置方式:

方式1: 在hive的配置文件中直接进行修改. 方式2: 在开启Hive服务的时候, 设置参数 nohup hive --service hiveserver2 --hiveconf 参数名=参数值 & 方式3: 通过 set方式进行修改. set mapreduce.job.reduces = 3; -- 分桶查询的时候用的.

上述三种参数设置的优先级及

作用范围从高到低分别是:

  • 配置文件(所有客户端有效) > hiveconf, hive服务(本服务运行期间, 所有客户端有效) > set方式(本次会话有效)

优先级从高到低分别是:

  • set方式 > hive服务方式 > 配置文件

Hive调优方案介绍

hive调优四句话

  1. 更改硬件.

  2. 增大或者开启某些设置. (Fetch抓取, 开启负载均衡(解决group by数据倾斜), 动态分区数, join优化, 开启严格模式(禁用低效SQL)

  3. 减小或者关闭某些设置. (推测执行, 严格模式(动态分区的), MapReduce的任务数.

  4. 减少IO. (数据压缩, 存储格式.

数据压缩.

判断一种压缩协议好与坏的标准: 压缩后文件占比, 解压速度, 压缩速度. Hive的底层是MR程序, 所以Hive的压缩本质上还是MR的压缩, 它(MR程序)支持GZip, Bzip2, Lzo, Snappy(推荐使用: 因为更均衡)等协议. 数据压缩分为两项, 即: Map端压缩 和 Reduce端压缩. Map端压缩的目的: 降低Reduce端拉取的数据量, 减少传输, 提高效率. Reduce端压缩的目的: 降低写入磁盘的数据量, 减小结果文件的大小, 提高磁盘利用率.

存储方式.

Hive表的存储格式分为 行存储 和 列存储两种, 其中: 行存储主要有两种方式: TextFile(默认的), SequenceFile 列存储主要有两种方式: Orc(推荐, 更均衡), Parquet(Spark中用的较多)

列存储+snappy压缩协议建表SQL

create table log_orc_snappy (
  track_time string,
  url string,
  session_id string,
  referer string,
  ip string,
  end_user_id string,
  city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc                                 -- 存储方式: 列存储(orc格式进行存储)
tblproperties ('orc.compress'='snappy');      -- 表属性: snappy压缩协议.

 

面试题: 行存储和列存储的区别是什么?

行存储:

手绘:

intstringintintstringint
1 小明 28 2 小威 18

不同的数据类型存储的方式不一样,数据密集度比较低。

打个比方,一会存个方块一会儿存个圆,数据密度肯定不高,所以相同的一个文件用行存储方式,占用的空间比较大,所以磁盘利用率低。

优点: select * from ... 方式, 查询数据, 效率较高. 缺点:

  1. 数据密集度低, 磁盘利用率低.

  2. select 列1, 列2..方式, 效率低.

列存储:

intintstringstringintint
1 2 小明 小威 28 18

打个比方:列存储相当于给行李箱收拾得整整齐齐,数据密度高,同等文件所占用的磁盘空间比较少。磁盘利用率高。

优点:

  1. 数据密集度较高, 磁盘利用率高.

  2. select 列1, 列2 方式效率高.

    • select * 方式效率相对较低, 但是实际开发中, 这种写法几乎不用.

细节:

  1. 实际开发中, 如果 存储格式 和 压缩协议 你也不知道用谁了, 推荐使用: Orc + Snappy.

  2. 列存储的底层是用 二进制 的形式来存储数据的.

Fetch抓取

人话: Fetch抓取的意思是: HiveSQL 底层能不转MR, 就不转MR, 而是直接执行.

我们知道Hive的底层要转MR任务来执行, 而MR程序的计算速度是非常慢的, 如果执行的是简单的HiveSQL, 没必要转MR程序, 直接执行即可. 哪些SQL不会转MR程序

  1. select * 全表扫描.

  2. select 列1, 列2 字段扫描.

  3. limit 分页查询.

  4. 一些简单的SQL语句.

设置Fetch本地抓取:

set hive.fetch.task.conversion=值;
-- 其中值为more,minimal,none
-- more(默认值): 上述四项都不走MR.
-- minimal: 全表扫描, 列扫描, 分页查询不走MR.
-- none: 所有iveSQL都走MR.

本地模式

人话: 如果HiveSQL非要转MR程序, 能在本地执行, 就尽量不要交给Yarn来调度, 因为可能会涉及到跨域(跨机器)传输, 降低效率.

我所理解的跨域(跨机器)传输:HQL转MR程序要经过Yarn调度。而这就涉及到Yarn调度的MR程序(job)的流程了,

首先,客户端提交任务给ResourceManager,然后RM校验合法后会生成一个Appmaster进程来处理这个任务,Appmaster会和RM建立心跳机制,然后再会想RM申请资源,此时RM会在nodemanger上开辟container资源容器,然后Appmaster会将计算任务放到container中执行,当MapTask和ReduceTask执行完成之后,Appmaster会向RM报告,然后申请释放资源container,最后Appmaster会执行自毁。

如果要经Yarn调度的话,会和别的Nodemanager建立连接,还要与ResourceManager建立连接,就会耗费很多资源和时间,因此尽量不要交给Yarn来调度,能在本地执行就在本地执行

Join优化

我的理解是:有两个MapTask(分别有5w,50w条数据,男生分别有4w和65w条数据),有三个ReduceTask(分别处理男,女,未知性别的数据),如果让ReduceTask直接从MapTask中拉取,则会发生严重数据倾斜,这个时候需要在MapTask阶段做join,最终ReduceTask会拉取到4w条属于自己的数据。这就一方面提高了传输效率, 另一方面可以防止出现数据倾斜。

join优化有3种情况, 即:

小表join大表:

  1. 开启Map端join, 在Map端(内存中)对数据做合并, 降低Reduce端拉取的数据量, 一方面提高传输效率, 另一方面可以防止出现数据倾斜.

  2. 其实join优化你开与不开, 本质没有太大的区别, 因为Hive为了提高查询效率,已经设置了, 自动join优化.

大表join大表:

前提条件:两张表的字段有很多空值Null

-- 假设有A表(100W条数据, id列50W是空),   B表(60W条数据, id列40W空), 如果此时两张表根据 id列做关联查询, 会有大量的空值.
-- 原始写法:
select * from A inner join B on A.id = B.id;        -- 弊端是: id列有大量的空值, 无意义的操作较多.
  1. 空值过滤.

    select * from A inner join (select * from B where id is not null) B on A.id = B.id; -- 弊端: 会过滤掉大量的数据, 可能也会把有效的数据过滤掉

  2. 空值转换.

    2.1 不随机分布, 固定值.

    select * from A inner join (
      select 列1, 列2.., case when id is null then 10 end as id from B
      ) B on A.id = B.id;   -- 虽然能解决, 但是null值过多, 会导致10过多, 将来出现数据倾斜的概率较大.

    2.2 随机分布, 可变值.

     select * from A inner join (
      select 列1, 列2.., case when id is null then concat('小威', rand()) end as id from B
    ) B on A.id = B.id;

分桶表 join 分桶表:

前提: 1. 两张表都是分桶表. 2. 某表的分桶数量是另外表分桶数量的整数倍. 结论: 表join连接查询的时候, 可以用 分桶字段替代 关联字段, 即:

-- 原始SQL:
   select * from A join B on A.id = B.id;
-- 优化后SQL:
   select * from A join B on A.分桶字段 = B.分桶字段;

 

SQL常规优化

  1. 列裁剪. 能写 select 列1, 列2 就不要写 select *

  2. 分区裁剪. 查询时, 如果是分区表, 记得写 where 分组字段

  3. count(id). 如果数据量大, 容易出错, 可以改成 先分组, 后统计, 虽然会转2个MR, 执行速度慢了, 但是数据量大的情况下, 也可以成功执行.

    SELECT count(DISTINCT id) FROM bigtable; 结果:

    SELECT count(id)FROM (SELECT id FROM bigtable GROUP BY id) a;

  4. 避免笛卡尔积的情况.

  5. group by数据倾斜 描述: 分配给ReduceTask端的数据不均衡导致的问题. 例如: 1个ReduceTask处理100W条数据, 另1个ReduceTask处理100条数据. 解决方案: 手动开启负载均衡, 程序的底层会转两个MR程序来执行该任务, 第1个MR程序负责把倾斜的数据随机打散, 交给不同的ReduceTask来处理. 第1个MR程序的(Reduce端结果) 作为 第2个MR程序的Map段数据源, 然后由第2个MR的Reduce负责合并数据. 大白话: 手动开启负载均衡, 由第1个MR随机打散(倾斜的)数据, 由第2个MR来合并.

  6. 动态分区 手动调大动态分区数, 默认动态分区上限是1000, 如果HQL分区数量超过它, 会报错, 我们调大分区数即可. 小细节: 动态分区的时候, 可以关闭严格模式, 因为严格模式要求: 动态分区时至少有1个静态分区.

调整MR的任务数.

调整MapTask任务数: 1. 1个切片 = 1个MapTask任务 = 1个分好区, 排好序, 规好约的文件. 2. 减小切片大小(默认: 128MB) = 增多MapTask任务数, 增大切片大小 = 减少MapTask任务数. 调整ReduceTask任务数: 1. 1个分区 = 1个ReduceTask任务 = 1个结果文件 2. 手动设置分区数, 即可修改ReduceTask的任务数量.

并行执行.

默认情况下, HiveSQL只会执行1个阶段, 如果多阶段之间依赖度不高, 我们可以开启并行执行机制. 并行执行机制, 默认并行度是 8, 我们可以调大一些.

严格模式.

这个严格模式指的是 禁用低效的SQL, 即: 如果SQL比较低效, 压根儿不让你执行. 这个严格模式 和 动态分区的严格模式可以不是同1个. 问: 哪些SQL属于低效的SQL呢? 答:

1. select *.. 全表扫描
2. order by的时候没有加 limit
3. 笛卡尔积.

jvm重用.

Hive2.X已开启, 无需设置, Hive2.X以前Container容器用一次就释放了, 开启JVM重用, 可以重复利用这些Container资源容器.

explain执行计划

在HQL前加 explain执行计划, 查看SQL的执行分几个阶段, 阶段越少, 执行速度越快.

推测执行

实际开发, 禁用它, 类似于木桶效应. 假设HQL转了3个MapReduce任务, 其中前两个任务执行速度都较快, 但是第三个任务执行速度太慢了, 会拖慢整个MR程序的执行进度. 此时程序会开启1个新的任务, 负责和那个慢的任务做同样的事儿, 采用谁先执行完, 用谁的结果. 无意义, 因为已经很慢了, 开启新任务后会更慢.

案例1: 测试存储方式 和 压缩协议

create database day10;
use day10;
show tables;

select url, session_id from log_text ;

-- 思考, 观察下列存储的表, 直接在HDFS上查看源文件, 能看到文件内容吗?
-- 答案: 看不到, 因为列存储底层用的是 二进制方式存储的.

-- 1. 建表, 默认方式: TextFile(行存储), 不压缩.
create table log_text (
  track_time string,
  url string,
  session_id string,
  referer string,
  ip string,
  end_user_id string,
  city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- STORED AS TEXTFILE ;     -- 默认就是 TextFile(行存储)

-- 2. 上传数据, 然后查看源数据.
select * from log_text;     -- 文件大小: 18.13 MB, 数据条数: 10W条

-- 3. 建表, orc(列存储), 如果不指定压缩, orc默认用的是 zlib方式压缩.
drop table log_orc;
create table log_orc (
  track_time string,
  url string,
  session_id string,
  referer string,
  ip string,
  end_user_id string,
  city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc;      -- 列存储(orc格式进行存储)

create table log_orc_zlib (
  track_time string,
  url string,
  session_id string,
  referer string,
  ip string,
  end_user_id string,
  city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc                               -- 存储方式: 列存储(orc格式进行存储)
tblproperties ('orc.compress'='zlib');      -- 表属性: zlib压缩协议.

show create table log_orc;
-- 查看orc格式的表, 默认的压缩协议.
show create table log_orc;

-- 细节: 通过 insert + select的方式, 往orc格式的表中添加数据.
insert into log_orc select * from log_text;
insert into log_orc_zlib select * from log_text;

-- 4. 上传数据, 然后查看源数据.
select * from log_orc;     -- 文件大小: 2.78 MB, 数据条数: 10W条
explain select session_id, count(session_id) from log_orc_zlib group by session_id order by session_id;     -- 文件大小: 2.78 MB, 数据条数: 10W条

-- 5. 建表, orc(列存储) + snappy(数据压缩)
create table log_orc_snappy (
  track_time string,
  url string,
  session_id string,
  referer string,
  ip string,
  end_user_id string,
  city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc                               -- 存储方式: 列存储(orc格式进行存储)
tblproperties ('orc.compress'='snappy');      -- 表属性: snappy压缩协议.

-- 6. 上传数据, 然后查看源数据.
insert into log_orc_snappy select * from log_text;
select * from log_orc_snappy;     -- 文件大小: 3.75 MB, 数据条数: 10W条