大数据之Hive(三)

分区表

概念和常用操作

将一个大表的数据按照业务需要分散存储到多个目录,每个目录称为该表的一个分区。一般来说是按照日期来作为分区的标准。在查询时可以通过where子句来选择查询所需要的分区,这样查询效率会提高很多。

①创建分区表

hive (default)> 
create table dept_partition
(
    deptno int,    --部门编号
    dname  string, --部门名称
    loc    string  --部门位置
)
    partitioned by (day string, hour string)
    row format delimited fields terminated by 't';

查询分区表数据时,可以将分区字段看作表的伪列,可像使用其他字段一样使用分区字段。

操作命令 作用
desc 表名 查看表的信息,分辨是否为分区表
show partition 表名 查看所有分区信息
alter 表名 add partition(dt=‘’) 添加分区,多个分区不用添加分隔符
alter 表名 drop partition(),partiton2 删除分区, 多个分区逗号分隔
msck repair table 表名 add/drop/ sync partitions 没有使用hive load命令上传文件时,用来修复分区,默认是add

二级分区表

如果一天内的数据量也很大,可以再次将数据按照小时进行分区。适合数据量特别大的时候使用

动态分区表

动态分区是指向分区表insert数据时,被写往的分区不由用户指定,而是由每行数据的最后一个字段的值来动态的决定。使用动态分区,可只用一个insert语句将数据写入多个分区。

  1. 开启动态分区功能set hive.exec.dynamic.partition=true;
  2. 设置为动态分区非严格模式set hive.exec.dynamic.partition.mode=nonstrict
  3. 需要先存在一张大表已经存储好了,然后转换为动态分区表。
  4. 按照已经存储的表的最后一列作为分区列
insert into table dept_partition_dynamic 
partition(loc)  -- 动态分区就是指这个值没有写死
select 
    deptno, 
    dname, 
    loc 
from dept;

分桶表

分区提供一个隔离数据和优化查询的便利方式。底层是将数据放到不同目录,但是并非所有数据都可形成合理的分区。分桶是指将同一个文件的数据按照分桶数再划分为更细粒度的不同文件。数据内容是按照对应字段的哈希值对桶数取模来分配的。只在特定情况下效率会更高。

分区和分桶结合使用

create table stu_buck_sort_part(
	id int,
	name string
)
partitioned by (day string)  -- 分区
clustered by (id) sorted by (id)
into 4 buckets  -- 分桶
row format delimited fields terminated by 't';

分区和分桶的区别:

  1. 分区是分的是目录,分桶分的是文件
  2. 分区的字段不能是表中字段,分桶的字段必须是表中的字段

自定义函数

用户自定义函数分类

(1)UDF:一进一出
(2)UDAF:多进一出
(3)UDTF:一进多出

自定义步骤

  1. 模仿length函数
  2. 导入jar包
  3. 编写MyUDF类,继承GenericUDF类,重写方法
  4. initialize(检查器数组),返回值为检查器。检查器类内部封装了所有可以处理的类对象。初始化用来:
    • 检查参数个数,不正确时抛UDFArgumentLengthException()
    • 检查参数类型, 不正确时抛UDFArgumentTypeException()
    • 约定函数的返回值类型, 可以选择java的序列化对象或者hadoop的writable对象。使用工厂类(帮你把各种类的单例已经new好了)来获取返回对应的对象。
  5. evaluate(函数值对象 o) ,返回值是Object
    • 如果为null,返回0或-1
    • 不为null, 返回 o.toString().length();
  6. 使用Maven打包,在target中复制到hadoop中,建议放到data目录下, 复制路径pwd。
  7. 在hadoop中使用add jar 路径
  8. 进入jdbc中创建永久函数create function my_len as "方法的全类名";如果想创建临时方法,在function前面加上temporary。临时函数可以跨库使用,永久函数需要加上前缀库名后才能跨库使用。
  9. 由于add jar本身也是临时生效的,需要将jar包上传到HDFS中才能真正变成永久函数。然后在创建函数时添加using "HDFS路径"

Hadoop压缩

存储时选择压缩比的最好的bzip2,计算时选择速度快点压缩算法,目前天选加唯一的就是snappy。

  1. 打开参数, 这两个参数默认都为false
    Hadoop: mapreduce.map.output.compress=true
    Hive:hive.exec.compress.output=true
  2. 设置压缩方式
  3. 使用hadoop103:8088中的yarn来查看压缩算法是否被使用。
  4. 实际使用过程中并不能提升程序的运行效率,只是减少了IO,但需要额外的配置,只有在特殊场景才会配置。

Hive文件格式

文件名 特点
textfile 行式存储
orc 列式存储, 比较适合列式的查询,符合公司业务需求
Parquet 列式存储

ORC文件结构

  • Stripe0:大小等于物理块,128M
    • Index索引
    • column a
    • column b
    • column c
    • Footer编码信息
  • Stripe1:和上面一样
  • File Footer:
    • stripe的起始位置,索引的长度,数据的长度,Stripe footer的长度

使用orc列式存储时可以将原文件大小缩小到原先的40%,parquet大概是原先的70%。在数据量较大时,orc和parquet进行按列查询时查询速度会比textfile速度更快。

企业优化

计算资源配置

  1. 调整yarn内存和容器内存
  2. 调整map和reduce的内存和CPU核心数

Explain查看执行计划

语法:explain query-SQL

分组聚合优化

map-side聚合

将聚合操作从reduce阶段提前到map阶段。
set hive.map.aggr = true. 开启预聚合combiner
可以将该参数关闭,比较两次查询过程的执行时间。该优化对于有数据倾斜的数据有很好的优化效果。

join优化

  1. common join
    • 没有开启自动转map join
  2. map join
    • 文件大小小于25M时被称为小表
    • 配置参数开启hive.auto.convert.join
    • 配置参数开启无条件转map join,不考虑数据是否是小表,出错时直接OOM内存溢出。
  3. bucket map join
    • 将大表进行分桶,分桶是根据字段来分的,分桶时必须按照连接键来分。
    • 左右两边分桶的个数必须是相等或倍数关系。
  4. sort merge bucket join
    • 在分桶的基础上,将桶内数据进行排序后再进行Join操作,将全量IO转换为部分IO。
    • 设置参数为true:
      • sortedmerge
      • sortmerge.join

数据倾斜

reducer倾斜

  1. map-side聚合:默认是开启的
  2. Skew-GroupBy优化:将数据打散,不按照原先的逻辑进行分组,随机平均分散到不同的reducer中。适合倾斜量级很大时,否则优化效果不是很明显。

join数据倾斜

  1. 桶表join
  2. map join

任务的并行度

  1. 设置输入类为CombineInputFormat, 而不是hadoop默认的TextInputFormat。切片默认大小为256,而不是原先的128M.
    set hive.input.format = hive.ql.io.CombineInputFormat
  2. reduce数量默认跟map数量一致,设定reduce个数的上限为1009个。reducer处理字节的上限为256M。
  3. 开启合并map reduce任务输出的小文件hive.merge.mapredfiles, 小文件平均大小小于平均值16M时触发合并。这个配置默认是关闭,如果出现输出时小文件很多时,建议修改配置一下。

其他优化

CBO优化

最优成本优化,hive.cbo.enable = true 默认是开启的。多表连接时,会自动将最小的两个表进行连接,这样处理过程中的IO和写入磁盘负担更小,整体速度更快。

谓词下推(过滤提前)

在既有join操作和where操作时,先进行连接操作后再进行过滤计算量会远远大于先进行过滤后再进行连接操作。并且这个顺序变化后的结果是一样的。
hive.optimize.ppd = false ,该参数默认是开启的,设置时需要将cbo同时修改。

Fetch抓取

  1. select * 语句
  2. where语句
  3. limit语句
    上述三个语句不需要进行MR计算操作,hive.fetch.task.conversion = more; 有三个等级,分别是, minimal, more
级别 作用
none 不转换成IO、计数器、过滤操作
minimal select * 转换,其他不转换
more 尽量转换成不走MR的操作

本地化模式

  1. 本地模式数据不能超过128M,超过时会自动转换为128M
  2. 本地输入的数据总量不能超过4个

并行执行

hive.exec.parallel, 默认是关闭的,有利于提高集群的稳定性,如果需要提高效率,可以开启该参数
exec.parallel.thread.number=8, 设置最大并行度,如果集群比较小,反而会降低效率。

严格模式

  1. hive.strict.checks.no.partition.filter=false; 默认是关闭的,查询数据时必须给分区参数,否则报错。日常是开启的,年终是关闭的。年终时需要进行文件的合并操作。

  2. order by 后面必须添加limit n关键字,提高map阶段的shuffle速度,只需要排序前面n条数据。

  3. 笛卡尔乘积默认会开启,连接时必须加连接条件。