wap网站建设设计,农业网站建设费用预算,洛阳专业网站设计开发制作建站公司,dede网站地图 调用文章一、前言
1.1 概念
Hive 依赖于 HDFS 存储数据#xff0c;Hive 将 HQL 转换成 MapReduce 执行#xff0c;所以说 Hive 是基于Hadoop 的一个数据仓库工具#xff0c;实质就是一款基于 HDFS 的 MapReduce 计算框架#xff0c;对存储在HDFS 中的数据进行分析和管理。
1.2 架…一、前言
1.1 概念
Hive 依赖于 HDFS 存储数据Hive 将 HQL 转换成 MapReduce 执行所以说 Hive 是基于Hadoop 的一个数据仓库工具实质就是一款基于 HDFS 的 MapReduce 计算框架对存储在HDFS 中的数据进行分析和管理。
1.2 架构 1用户接口Client CLIhive shell、JDBC/ODBC(java访问hive)、WEBUI浏览器访问hive 2元数据Metastore 元数据包括表名、表所属的数据库默认是default、表的拥有者、列/分区字段、表的类型是否是外部表、表的数据所在目录等。 3Hadoop 使用 HDFS 进行存储使用 MapReduce 进行计算。 4驱动器Driver 1解析器SQL Parser将SQL字符串转换成抽象语法树AST这一步一般都用第三方工具库完成比如antlr对AST进行语法分析比如表是否存在、字段是否存在、SQL语义是否有误。 2编译器Physical Plan将AST编译生成逻辑执行计划。 3优化器Query Optimizer对逻辑执行计划进行优化。 4执行器Execution把逻辑执行计划转换成可以运行的物理计划。对于Hive来说就是MR/Spark。
1.3 调优概述
Hive 作为大数据领域常用的数据仓库组件在平时设计和查询时要特别注意效率。影响 Hive 效率的几乎从不是数据量过大而是 数据倾斜、数据冗余、Job或I/O过多、MapReduce 分配不合理 等等。 对 Hive 的调优既包含 Hive 的建表设计方面对 HiveHQL 语句本身的优化也包含 Hive 配置参数 和 底层引擎 MapReduce 方面的调整。 所以此次调优主要分为以下四个方面展开 1、Hive 的建表设计层面 2、HQL 语法和运行参数层面 3、Hive 架构层面 4、调优案例
1.4 调优须知
1.对于大数据计算引擎来说数据量大不是问题数据倾斜是个问题。 2.Hive 的复杂 HQL 底层会转换成多个 MapReduce Job 并行或者串行执行Job 数比较多的作业运行效率相对比较低比如即使只有几百行数据的表如果多次关联多次汇总产生十几个Job耗时很长。原因是 MapReduce 作业初始化的时间是比较长的。 3.在进行 Hive 大数据分析时常见的聚合操作比如 sumcountmaxminUDAF等 不怕数据倾斜问题MapReduce 在 Mappe 阶段的预聚合操作使数据倾斜不成问题。 4.好的建表设计模型设计事半功倍。 5.设置合理的 MapReduce 的 Task 并行度能有效提升性能。(比如10w数据量级别的计算用 100 个 ReduceTask那是相当的浪费1个足够但是如果是亿级别的数据量那么1个Task又显得捉襟见肘) 6.了解数据分布自己动手解决数据倾斜问题是个不错的选择。这是通用的算法优化但算法优化有时不能适应特定业务背景开发人员了解业务了解数据可以通过业务逻辑精确有效的解决数据倾斜问题。 7.数据量较大的情况下慎用 count(distinct)group by 容易产生倾斜问题。 8.对小文件进行合并是行之有效的提高调度效率的方法假如所有的作业设置合理的文件数对任务的整体调度效率也会产生积极的正向影响。 9.优化时把握整体单个作业最优不如整体最优。
二、Hive 建表设计层面优化
2.1 利用分区表优化
关于hive的表的类型有哪些 1、分区表 2、分桶表
分区表是在某一个或者几个维度上对数据进行分类存储一个分区对应一个目录。如果筛选条件里有分区字段那么 Hive 只需要遍历对应分区目录下的文件即可不需要遍历全局数据使得处理的数据量大大减少从而提高查询效率。 也就是说当一个 Hive 表的查询大多数情况下会根据某一个字段进行筛选时那么非常适合创建为分区表该字段即为分区字段。
select1: select .... where country china
select2: select .... where country china
select3: select .... where country china
select4: select .... where country china分门别类这个 city 字段的每个值就单独形成为一个分区。其实每个分区就对应带 HDFS 的一个目录 在创建表时通过启用 partitioned by 实现用来 partition 的维度并不是实际数据的某一列具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时可以采用 where 语句形似 where tablename.partition_column a 来实现。
1.创建含分区的表
CREATE TABLE page_view(viewTime INT, userid BIGINT,page_url STRING, referrer_url STRING,ip STRING )
PARTITIONED BY(date STRING,country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY 1
STORED AS TEXTFILE;2.载入内容并指定分区标志
load data local inpath /home/bigdata/pv_2018-07-08_us.txt into table page_view
partition(date2018-07-08, countryUS);3.查询指定标志的分区内容
SELECT page_views.* FROM page_viewsWHERE page_views.date 2008-03-01AND page_views.date 2008-03-31AND page_views.referrer_url like %xyz.com;简单总结 1、当你意识到一个字段经常用来做where建分区表使用这个字段当做分区字段 2、在查询的时候使用分区字段来过滤就可以避免全表扫描。只需要扫描这张表的一个分区的数据即可
2.2 利用分桶表优化
跟分区的概念很相似都是把数据分成多个不同的类别区别就是规则不一样 1、分区 按照字段值来进行一个分区就只是包含这个这一个值的所有记录不是当前分区的数据一定不在当前分区当前分区也只会包含当前这个分区值的数据 2、分桶默认规则Hash散列一个分桶中会有多个不同的值如果一个分桶中包含了某个值这个值的所有记录必然都在这个分桶
Hive Bucket分桶是指将数据以指定列的值为 key 进行 hashhash 到指定数目的桶中这样做的目的和分区表类似使得筛选时不用全局遍历所有的数据只需要遍历所在桶就可以了。这样也可以支持高效采样。
1、采样 2、join
如下例就是以 userid 这一列为 bucket 的依据共设置 32 个 buckets
CREATE TABLE page_view(viewTime INT,userid BIGINT,page_url STRING, referrer_url STRING,ip STRING )
COMMENT This is the page view table
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid)
SORTED BY(viewTime)
INTO 32 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY 1
COLLECTION ITEMS TERMINATED BY 2
MAP KEYS TERMINATED BY 3
STORED AS SEQUENCEFILE;分桶的语法
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETSCLUSTERED BY(userid) 表示按照 userid 来分桶 SORTED BY(viewTime) 按照 viewtime 来进行桶内排序 INTO 32 BUCKETS 分成多少个桶
两个表以相同方式相同字段划分桶两个表的桶个数是倍数关系
create table order(cid int,price float) clustered by(cid) into 32 buckets;
create table customer(id int,first string) clustered by(id) into 32 buckets;
select price from order t join customer s on t.cid s.id通常情况下Sampling 在全体数据上进行采样这样效率自然就低它要去访问所有数据。而如果一个表已经对某一列制作了 bucket就可以采样所有桶中指定序号的某个桶这就减少了访问量。 如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的全部数据
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);总结三种采样方式
# 分桶抽样
select * from student tablesample(bucket 3 out of 32);
# 随机采样rand() 函数
select * from student order by rand() limit 100; // 效率低
select * from student distribute by rand() sort by rand() limit 100; // 推荐使用这种
# 数据块抽样tablesample()函数
select * from student tablesample(10 percent); # 百分比
select * from student tablesample(5 rows); # 行数
select * from student tablesample(5 M); # 大小2.3 选择合适的文件存储格式
在 HiveSQL 的 create table 语句中可以使用 stored as … 指定表的存储格式。Apache Hive 支持 Apache Hadoop 中使用的几种熟悉的文件格式比如 TextFile、SequenceFile、RCFile、Avro、ORC、ParquetFile 等。 存储格式一般需要根据业务进行选择在我们的实操中绝大多数表都采用 TextFile 与Parquet 两种存储格式之一。 TextFile 是最简单的存储格式它是纯文本记录也是 Hive 的默认格式。虽然它的磁盘开销比较大查询效率也低但它更多地是作为跳板来使用。 RCFile、ORC、Parquet 等格式的表都不能由文件直接导入数据必须由 TextFile 来做中转。 Parquet 和 ORC 都是 Apache 旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量 OLAP 查询并且也支持更好的压缩和编码。 创建表时特别是宽表尽量使用 ORC、ParquetFile 这些列式存储格式因为列式存储的表每一列的数据在物理上是存储在一起的Hive 查询时会只遍历需要列数据大大减少处理的数据量。
第一种 [*] TextFile
1、存储方式行存储。默认格式如果建表时不指定默认为此格式。 2、每一行都是一条记录每行都以换行符\n结尾。数据不做压缩时磁盘会开销比较大数据解析开销也比较大。 3、可结合Gzip、Bzip2等压缩方式一起使用系统会自动检查查询时会自动解压,推荐选用可切分的压缩算法(bzip2)。
第二种Sequence File
1、一种Hadoop API提供的二进制文件使用方便、可分割、个压缩的特点。 2、支持三种压缩选择NONE、RECORD、BLOCK。RECORD压缩率低一般建议使用BLOCK压缩。
第三种RC File 1、存储方式数据按行分块每块按照列存储 。 A、首先将数据按行分块保证同一个record在一个块上避免读一个记录需要读取多个block。 B、其次块数据列式存储有利于数据压缩和快速的列存取。 2、相对来说RCFile对于提升任务执行性能提升不大但是能节省一些存储空间。可以使用升级版的ORC格式。
第四种ORC File 1、存储方式数据按行分块每块按照列存储 2、Hive提供的新格式属于RCFile的升级版性能有大幅度提升而且数据可以压缩存储压缩快快速 列存取。 3、ORC File会基于列创建索引当查询的时候会很快
第五种Parquet File 1、存储方式列式存储。 2、Parquet对于大型查询的类型是高效的。对于扫描特定表格中的特定列查询Parquet特别有用。 Parquet一般使用Snappy、Gzip压缩。默认Snappy。 3、Parquet支持Impala 查询引擎。 4、表的文件存储格式尽量采用Parquet或ORC不仅降低存储量还优化了查询压缩表关联等性能。
2.4 选择合适的压缩格式
Hive 语句最终是转化为 MapReduce 程序来执行的而 MapReduce 的性能瓶颈在与 网络 IO 和 磁盘 IO要解决性能瓶颈最主要的是 减少数据量对数据进行压缩是个好方式。压缩虽然是减少了数据量但是压缩过程要消耗 CPU但是在 Hadoop 中往往性能瓶颈不在于 CPUCPU 压力并不大所以压缩充分利用了比较空闲的 CPU。
常用压缩方法对比
如何选择压缩方式 1、压缩比率 2、压缩解压速度 3、是否支持split
支持分割的文件可以并行的有多个 mapper 程序处理大数据文件大多数文件不支持可分割是因为这些文件只能从头开始读。
是否压缩 1、计算密集型不压缩否则进一步增加了CPU的负担 2、网络密集型推荐压缩减小网络数据传输
各个压缩方式对应的class类
压缩使用
## 默认值是false
set mapreduce.output.fileoutputformat.compresstrue;
## 默认值是Record
set mapreduce.output.fileoutputformat.compress.typeBLOCK
## 默认值是org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.output.fileoutputformat.compress.codecorg.apache.hadoop.io.compress.GzipCodecMap 输出结果也以 Gzip 进行压缩
## 启用map端输出压缩
set mapred.map.output.compresstrue
## 默认值是org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.map.output.compress.codecorg.apache.hadoop.io.compress.GzipCodec对 Hive 输出结果和中间都进行压缩
## 默认值是false不压缩
set hive.exec.compress.outputtrue
## 默认值是false为true时MR设置的压缩才启用
set hive.exec.compress.intermediatetrue三、HQL语法和运行参数层面优化
3.1 查看Hive执行计划
Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务因此需要了解具体的转换过程可以在 SQL 语句中输入如下命令查看具体的执行计划。 查看执行计划添加extended关键字可以查看更加详细的执行计划 explain [extended] query
3.2 列裁剪
列裁剪就是在查询时只读取需要的列分区裁剪就是只读取需要的分区。当列很多或者数据量很大时如果 select * 或者不指定分区全列扫描和全表扫描效率都很低。Hive 在读数据的时候可以只读取查询中所需要用到的列而忽略其他的列。这样做可以节省读取开销中间表存储开销和数据整合开销。
set hive.optimize.cp true; ## 列裁剪取数只取查询中需要用到的列默认是true3.3 谓词下推
将 SQL 语句中的 where 谓词逻辑都尽可能提前执行减少下游处理的数据量。对应逻辑优化器是PredicatePushDown
set hive.optimize.ppdtrue; ## 默认是true示例程序
select a.*, b.* from a join b on a.id b.id where b.age 20;
select a.*, c.* from a join (select * from b where age 20) c on a.id c.id;3.4 分区裁剪
列裁剪就是在查询时只读取需要的列分区裁剪就是只读取需要的分区。当列很多或者数据量很大时 如果 select * 或者不指定分区全列扫描和全表扫描效率都很低。 在查询的过程中只选择需要的分区可以减少读入的分区数目减少读入的数据量 Hive 中与分区裁剪优化相关的则是
set hive.optimize.prunertrue; ## 默认是true在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。
select * from student where department AAAA;3.5 合并小文件
如果一个mapreduce job碰到一对小文件作为输入一个小文件启动一个Task
Map 输入合并
在执行 MapReduce 程序的时候一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件这样就会启动大量的 mapTask 任务这样会浪费大量资源。可以将输入的小文件进行合并从而减少 mapTask 任务数量。
## Map端输入、合并文件之后按照block的大小分割默认
set hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
## Map端输入不合并
set hive.input.formatorg.apache.hadoop.hive.ql.io.HiveInputFormat;Map/Reduce输出合并
大量的小文件会给 HDFS 带来压力影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。
## 是否合并Map输出文件, 默认值为true
set hive.merge.mapfilestrue;
## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfilestrue;
## 合并文件的大小,默认值为256000000
set hive.merge.size.per.task256000000;
## 每个Map 最大分割大小
set mapred.max.split.size256000000;
## 一个节点上split的最少值
set mapred.min.split.size.per.node1; // 服务器节点
## 一个机架上split的最少值
set mapred.min.split.size.per.rack1; // 服务器机架hive.merge.size.per.task 和 mapred.min.split.size.per.node 联合起来 1、默认情况先把这个节点上的所有数据进行合并如果合并的那个文件的大小超过了256M就开启另外一个文件继续合并 2、如果当前这个节点上的数据不足256M那么就都合并成一个逻辑切片。 现在有 100 个Task总共有 10000M 的数据 平均一下每个 Task 执行 100M 的数据的计算。假设只启动 10 个Task每个Task就要执行1000M 的数据。 如果只有2个Task5000M。
3.6 合理设置 Map Task 并行度
• 第一MapReduce 中的 MapTask 的并行度机制
Map 数过大当输入文件特别大MapTask 特别多每个计算节点分配执行的 MapTask 都很多这时候可以考虑减少 MapTask 的数量。增大每个 MapTask 处理的数据量。而且 MapTask 过多最终生成的结果文件数也太多。 1、Map阶段输出文件太小产生大量小文件 2、初始化和创建Map的开销很大 Map 数太小当输入文件都很大任务逻辑复杂MapTask 执行非常慢的时候可以考虑增加 MapTask 数来使得每个 MapTask 处理的数据量减少从而提高任务的执行效率。 1、文件处理或查询并发度小Job执行时间过长 2、大量作业时容易堵塞集群 一个 MapReduce Job 的 MapTask 数量是由输入分片 InputSplit 决定的。 而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask而输入分片是由三个参数决定。 输入分片大小的计算是这么计算出来的 long splitSize Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下输入分片大小和 HDFS 集群默认数据块大小一致也就是默认一个数据块启用一个 MapTask 进行处理这样做的好处是避免了服务器节点之间的数据传输提高 job 处理效率。
两种经典的控制 MapTask 的个数方案减少 MapTask 数或者增加 MapTask 数。 1、减少 MapTask 数是通过合并小文件来实现这一点主要是针对数据源 2、增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数 重点注意不推荐把这个值进行随意设置
推荐的方式使用默认的切块大小即可如果非要调整最好是切块的 N 倍数。
NodeManager节点个数N 》 Task ( N * 0.95) * M
第二合理控制 MapTask 数量 1、减少 MapTask 数可以通过合并小文件来实现 2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数
计算方式 输入文件总大小total_size HDFS 设置的数据块大小dfs_block_size default_mapper_num total_size / dfs_block_size MapReduce 中提供了如下参数来控制 map 任务个数从字面上看貌似是可以直接设置 MapTask 个数的样子但是很遗憾不行这个参数设置只有在大于 default_mapper_num 的时候才会生效。 set mapred.map.tasks10; ## 默认值是 2 那如果我们需要减少 MapTask 数量但是文件大小是固定的那该怎么办呢? 可以通过 mapred.min.split.size 设置每个任务处理的文件的大小这个大小只有在大于dfs_block_size 的时候才会生效。
split_size max(mapred.min.split.size, dfs_block_size)
split_num total_size / split_size
compute_map_num Math.min(split_num, Math.max(default_mapper_num,mapred.map.tasks))这样就可以减少 MapTask 数量了。 总结一下控制 mapper 个数的方法。 1、如果想增加 MapTask 个数可以设置 mapred.map.tasks 为一个较大的值 2、如果想减少 MapTask 个数可以设置 maperd.min.split.size 为一个较大的值 3、如果输入是大量小文件想减少 mapper 个数可以通过设置 hive.input.format 合并小文件 如果想要调整 mapper 个数在调整之前需要确定处理的文件大概大小以及文件的存在形式是大量小文件还是单个大文件然后再设置合适的参数。 不能盲目进行暴力设置不然适得其反。MapTask 数量与输入文件的 split 数息息相关在 Hadoop 源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。 可以直接通过参数 mapred.map.tasks 默认值2来设定 MapTask 数的期望值但它不一定会生效。
3.7 合理设置 Reduce Task 并行度
如果 ReduceTask 数量过多一个 ReduceTask 会产生一个结果文件这样就会生成很多小文件那么如果这些结果文件会作为下一个 Job 的输入则会出现小文件需要进行合并的问题而且启动和初始化ReduceTask 需要耗费资源。 如果 ReduceTask 数量过少这样一个 ReduceTask 就需要处理大量的数据并且还有可能会出现数据倾斜的问题使得整个查询耗时长。 默认情况下Hive 分配的 reducer 个数由下列参数决定 Hadoop MapReduce 程序中ReducerTask 个数的设定极大影响执行效率ReducerTask 数量与输出文件的数量相关。 如果 ReducerTask 数太多会产生大量小文件对HDFS造成压力。 如果 ReducerTask 数太少每个 ReducerTask 要处理很多数据容易拖慢运行时间或者造成 OOM。 这使得Hive 怎样决定 ReducerTask 个数成为一个关键问题。 遗憾的是 Hive 的估计机制很弱不指定 ReducerTask 个数的情况下Hive 会猜测确定一个ReducerTask 个数
基于以下两个设定 参数1hive.exec.reducers.bytes.per.reducer (默认256M) 参数2hive.exec.reducers.max (默认为1009) 参数3mapreduce.job.reduces (默认值为-1表示没有设置那么就按照以上两个参数进行设置) ReduceTask 的计算公式为 N Math.min(参数2总输入数据大小 / 参数1) 可以通过改变上述两个参数的值来控制 ReduceTask 的数量也可以通过。 set mapred.map.tasks10; set mapreduce.job.reduces10; 通常情况下有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少因此即使不设定 ReduceTask 个数重设参数2 还是必要的。 依据经验可以将 参数2 设定为 M * (0.95 * N (N为集群中 NodeManager 个数)。一般来说NodeManage 和 DataNode 的个数是一样的。