hive 中的MR hive on mr

admin2024-05-30  16

  1. Fetch抓取
    hive中的某些查询不必使用MR,例如select * from,在这种情况下,hive可以简单的读取表的存储目录下的文件,然后输出查询结果到控制台。
    hive.fetch.task.conversion设置成mre,如下查询方式都不会执行MR程序
    hive (default)> set hive.fetch.task.conversion=more;
    hive (default)> select * from emp;
    hive (default)> select ename from emp;
    hive (default)> select ename from emp limit 3;
  2. 本地模式
    当hive的输入数据量非常小时,可以通过本地模式在单台机器上处理,执行时间明显被缩短。
    set hive.exec.mode.local.auto=true; //开启本地 mr
    set hive.exec.mode.local.auto.inputbytes.max=50000000;//设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认为 134217728,即 128M
    set hive.exec.mode.local.auto.input.files.max=10;//设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默认为 4。
  3. 表的优化
    3.1 小表、大表join
    将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的机率;再进一步,可以使用group让小的维度表先进内存,在map端完成reduce。
    新版的hive已经对小表join大表进行了优化,左右已经没有明显区别。
    3.2 大表 join 大表
    1)空key过滤
    有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reduce上,从而导致内存不够,此时可以在SQL与剧中进行过滤。例如对应字段为空
    (1)测试不过滤空 id
    insert overwrite table jointable select n.* from nullidtable n left join ori o on n.id = o.id;
    Time taken: 42.038 seconds
    (2)测试过滤空id
    insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id =o.id;
    Time taken: 31.725 seconds
    2)空key转换
    有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join结果中,此时我们可以表a中key为空的字段赋一个随机值,使得数据随机均匀分布到不同的reduce上,例如:
    设置5个reduce个数 set mapreduce.job.reduces = 5;
    不随机分布空key值
    insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id; 有可能出现数据倾斜,某些reduce的资源小号远大于其他的reducer
    随机分布空key值
    insert overwrite table jointable select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id; 3.3 MapJoin
    如果不指定MapJoin或者不符合MapJoin的条件,那么hive解析器就会将join操作转换成common join,即在reduce阶段完成join,容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reduce处理
    1)开启MapJoin参数设置
    (1)设置自动选择 Mapjoinset hive.auto.convert.join = true; 默认为 true(2)大表小表的阀值设置(默认 25M 一下认为是小表):set hive.mapjoin.smalltable.filesize=25000000; 3.4 Group By
    默认情况下,map阶段同一key数据分发到一个reduce,当一个key数据过大时就产生过数据倾斜。并不是所有聚合操作都需要在reduce端完成,很多聚合操作都可以现在map端进行部分聚合,最后在reduce端得出最终结果。
    开启map端聚合参数设置
(1)是否在 Map 端进行聚合,默认为 True
hive.map.aggr = true
(2)在 Map 端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000
(3)有数据倾斜的时候进行负载均衡(默认是 false)
hive.groupby.skewindata = true

当选项设定为true时,生成的查询计划会有两个MR Job,第一个job中,map的输出结果会随机分不到reduce中,每个reduce做部分聚合操作,并输出结果,这样处理的结果是相同的,group by key有可能被分到不同的reduce中,从而达到负载均衡的目的;第二个job再根据预处理的数据结果按照groupbykey分布到reduce中,这个过程可以保证相同的group by key 被分布到同一个reduce中,最后完成最终的聚合操作。
3.4 Count(Distinct)去重统计
当数据量大时,由于count distinct操作需要用一个reduce task来完成,这个reduce需要处理的数据量太大,就会导致整个job很难完成,一般count distinct使用先group by key再count的方式替换
实例操作

(1)创建一张大表
hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword string,url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
(2)加载数据
load data local inpath '/root/bigtable' into table bigtable;
(3)设置 5 个 reduce 个数
set mapreduce.job.reduces = 5;
(4)执行去重id查询
select count(distinct id) from bigtable;
(5)采用 group by 去重 id
select count(id) from (select id from bigtable group by id) a;

3.5 笛卡儿积
尽量避免使用笛卡儿积,join的时候不加on条件,或者无效on条件,hive只能使用一个reduce来完成笛卡儿积。
3.6 行列过滤
列处理:在select中,只拿需要的列,如果有,尽量使用分区过滤。
行处理:当使用外关连时,如果将副表的过滤条件卸载where后面,那么就会先全表关联,之后在过滤,比如:

(1)测试先关联两张表,再用 where 条件过滤
hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 34.406 seconds, Fetched: 100 row(s)
Time taken: 26.043 seconds, Fetched: 100 row(s)
(2)通过子查询后,再关联表
hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 30.058 seconds, Fetched: 100 row(s)
Time taken: 29.106 seconds, Fetched: 100 row(s)

3.7动态分区调整

//开启动态分区
set hive.exec.dynamic.partition = true;
//默认是strict表示必须指定至少一个分区为静态分区,nonstrict表示允许所有的分区字段都可以使用动态分区
set hive.exec.dynamic.partition.mode = nonstrict;
//在所有的MR的节点上,最大一共可以创建多少个动态分区
set hive.exec.max.dynamic.partitions = 1000;
//每个MR的节点上,最大可以创建多少个动态分区,该参数需要根据实际的数据来设定,如day字段有365个值,那么该参数需要设置成大于365,否则报错
set hive.exec.max.dynamic.partitions.pernode = 100;
//整个MR Job中,最大可以创建多少个HDFS文件
set hive.exec.max.created.files = 100000;
//当有空分区生成时,是否抛出异常,一般不需要配置
set hive.error.on.empty.partition = false;

3.8 分区/分桶,见上一篇

  1. 数据倾斜
    4.1 合理设置map数
    1)通常情况下,job会通过input的目录产生一个或者多个map任务
    主要决定因素有:input总个数,input文件大小,集群设置的文件块大小
    2)是不是map数越多越好?
    否,如果一个任务有很多小文件,远远小于块大小128M,则每个小文件也会被当作一个块,用一个map来完成,而一个map任务启动和初始化的时间远远大于逻辑处理时间,就会造成很大的资源浪费,而且,可以同时执行的map数是受限制的。
    3)是不是保证每个map处理接近128m的文件快,就高枕无忧了?
    不一定,比如一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却又几千万的记录,如果map处理的逻辑比较复杂,用一个任务去做,肯定也比较耗时。
    4.2 小文件合并
    在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能,set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    4.3 复杂文件增加map数
    当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加map数,使得每个map处理的数据量减少,从而提高任务的执行效率。
    增加map的方法为:set mapreduce.input.fileinputformat.split.maxsize=100; 调正maxSize最大值,让maxSize最大值低于blocksize就可以增加map的个数。
    4.4 合理设置reduce数
    1)调整reduce个数方法 set mapreduce.job.reduces = 15;
    2)reduce个数是不是越多越好?
    否,过多的启动和初始化reduce也会消耗时间和资源,另外有多少个reduce就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则会出现小文件过多的问题。
    4.5 并行执行
    hive将一个查询转化成一个或者多个阶段,这样的阶段可以是MR阶段,抽样阶段,合并阶段,limit阶段,或者hive执行过程中需要的其他阶段。默认情况下hive一次指挥执行一个阶段,而这些阶段并非完全互相依赖的,也就是有些阶段可以并行执行的,这样可以使得整个job的执行时间缩短。
    开启并行执行的配置
    set hive.exec.parallel=true; //打开任务并行执行 set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8。 4.6 严格模式
    hive提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询,通过设置属性 hive.mapred.mode 值为strict,默认是非严格模式 nonstrict。开启严格模式可以禁止3中类型查询
    1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行,也就是不允许扫描所有分区
    2)对于使用order by语句的查询,要求必须使用limit语句,因为order by为了执行排序过程会将所有的结果数据分发到一个reducer中处理,强制要求用户增加limit可以防止reducer额外执行很长时间。
    3)限制笛卡儿积查询
    4.7 JVM重用
    这个是hadoop调优参数的内容,对hive的性能影响非常大,特别是对很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop的默认配置通常是使用派生JVM来执行map和reduce任务的,这是JVM的启动过程会造成相当大的开销,尤其是执行的job包含大量task任务的情况。JVM重用可以使得JVM实例在同一个job中重复使用n次,n的值可以在hadoop中配置,通常在10-20之间,具体多少,需要根据具体业务场景测试得出。
    这个功能的缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个不平衡job中有某几个reduce task执行的时间要比其他reduce小号的时间多很多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
mapred-site.xml
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
</property>


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!