hadoop 排序优化

打卤 2014-04-08

转:http://blog.csdn.net/wf1982/article/details/7369324

 

hive 全排序优化

全排序

Hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。考虑以下表定义:

CREATE TABLE if not exists t_order( id int, -- 订单编号 sale_id int, -- 销售ID customer_id int, --客户ID product _id int, -- 产品ID amount int -- 数量 ) PARTITIONED BY (ds STRING);

在表中查询所有销售记录,并按照销售ID和数量排序:

set mapred.reduce.tasks=2; Select sale_id, amount from t_order Sort by sale_id, amount;

这一查询可能得到非期望的排序。指定的2个reducer分发到的数据可能是(各自排序):

Reducer1:

Sale_id | amount 0 | 100 1 | 30 1 | 50 2 | 20

Reducer2:

Sale_id | amount 0 | 110 0 | 120 3 | 50 4 | 20

因为上述查询没有reduce key,hive会生成随机数作为reduce key。这样的话输入记录也随机地被分发到不同reducer机器上去了。为了保证reducer之间没有重复的sale_id记录,可以使用DISTRIBUTE BY关键字指定分发key为sale_id。改造后的HQL如下:

set mapred.reduce.tasks=2; Select sale_id, amount from t_order Distribute by sale_id Sort by sale_id, amount;

这样能够保证查询的销售记录集合中,销售ID对应的数量是正确排序的,但是销售ID不能正确排序,原因是hive使用hadoop默认的HashPartitioner分发数据。

这就涉及到一个全排序的问题。解决的办法无外乎两种:

1.) 不分发数据,使用单个reducer:

set mapred.reduce.tasks=1;

这一方法的缺陷在于reduce端成为了性能瓶颈,而且在数据量大的情况下一般都无法得到结果。但是实践中这仍然是最常用的方法,原因是通常排序的查询是为了得到排名靠前的若干结果,因此可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* (map个数)。

2.) 修改Partitioner,这种方法可以做到全排序。这里可以使用Hadoop自带的TotalOrderPartitioner(来自于Yahoo!的TeraSort项目),这是一个为了支持跨reducer分发有序数据开发的Partitioner,它需要一个SequenceFile格式的文件指定分发的数据区间。如果我们已经生成了这一文件(存储在/tmp/range_key_list,分成100个reducer),可以将上述查询改写为

set mapred.reduce.tasks=100; set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set total.order.partitioner.path=/tmp/ range_key_list; Select sale_id, amount from t_order Cluster by sale_id Sort by amount;

有很多种方法生成这一区间文件(例如hadoop自带的o.a.h.mapreduce.lib.partition.InputSampler工具)。这里介绍用Hive生成的方法,例如有一个按id有序的t_sale表:

CREATE TABLE if not exists t_sale ( id int, name string, loc string );

则生成按sale_id分发的区间文件的方法是:

create external table range_keys(sale_id int) row format serde'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe' stored as inputformat'org.apache.hadoop.mapred.TextInputFormat' outputformat'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat' location '/tmp/range_key_list';  insert overwrite table range_keys select distinct sale_id from source t_sale sampletable(BUCKET 100 OUT OF 100 ON rand()) s sort by sale_id;

生成的文件(/tmp/range_key_list目录下)可以让TotalOrderPartitioner按sale_id有序地分发reduce处理的数据。区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。

测试案例:

数据 140g, 按照字段time 降序排列 选出最大的前50个。

使用 一般方法 select * from table order by time desc limit 50.  执行了1小时6分钟完全算出。

任务数1个  map数  1783 reduce 1

而 select * from (select * from table distribute by time sort by time desc limit 50 ) t order by time desc limit 50;

需要5分钟算出。结果一致。

任务数2个   分别是:

map  1783 reduce 245

map 245 reduce   1

相关推荐