csdnhadoop 2020-06-21
问题:按照条件将结果输出到不同文件中
1.自定义继承Partitioner类,重写getPartition()方法
2.在job驱动Driver中设置自定义的Partitioner
3.在Driver中根据分区数设置reducetask数
将统计结果按照手机归属地不同省份输出到不同文件中(分区),手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
MyPartitioner.class
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { String phone = text.toString(); if (phone.startsWith("136")) { return 0; } else if (phone.startsWith("137")) { return 1; } else if (phone.startsWith("138")) { return 2; }else if (phone.startsWith("139")){ return 3; }else { return 4; } } }
//设置自定义partitioner job.setPartitionerClass(MyPartioner.class); //设置reducetask数量 job.setNumReduceTasks(5);
当自定义的对象作为key,按照指定条件进行排序
实现WritableComparable接口,重写compareTo方法,就可以实现排序(二次排序)
public class OrderBean implements WritableComparable<OrderBean> { //自定义排序,先按pid升序,再按pname降序 @Override public int compareTo(OrderBean o) { int compare = this.pid.compareTo(o.pid); if (compare == 0) { return -this.pname.compareTo(o.pname); } return compare; } }
自定义比较器继承WritableComparator类,父类构造方法增加需要比较的Bean对象,
//继承WritableComparator类 public class MyGroupCompartor extends WritableComparator { public MyGroupCompartor(){ //增加Bean对象 super(OrderBean.class,true); } // 对Bean的排序方法 @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getPid().compareTo(ob.getPid()); } }
不分区,只有一个reducetask,针对Key进行排序
针对key全排序,然后针对key进行分区
分析:已经对key进行排序,比如key对象为OrderBean的排序是id,pname的二次排序
,在进入reduce()的分组希望是id相同的进入一组,那么就需要自定义分组针对id进行分组
OrderBean id pname amount 1 小米 1 2400 1 1500 2 华为 2 2400 2 3400
MyGroupCompartor.class
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyGroupCompartor extends WritableComparator { public MyGroupCompartor(){ super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getPid().compareTo(ob.getPid()); } }
job.setGroupingComparatorClass(MyGroupCompartor.class);