xclxcl 2014-12-10
0 目的:
将文件,第一列相同时,第二列升序;第二列相同时,第三列升序
3,3,3
3,2,4
3,2,0
2,2,1
2,1,4
1,1,0
mapreduce中:
1 代码: 核心就是将 hadoop map output的key自定义,里面写好比较写法
package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyThreeSortApp { // 0 定义操作地址 static final String FILE_ROOT = "hdfs://master:9000/"; static final String INPUT_PATH = "hdfs://master:9000/hello"; static final String OUT_PATH = "hdfs://master:9000/out"; /** * @param args */ public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(OUT_PATH); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //用户在启动MapReduce的时候需要指定一个InputFormat的implement //1.2 指定自定义的map类 job.setMapperClass(MyMapper3.class); job.setMapOutputKeyClass(NewKey3.class); job.setMapOutputValueClass(NullWritable.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 分组 目前按照默认方式执行 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer3.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活 job.waitForCompletion(true); } } class MyMapper3 extends Mapper<LongWritable, Text, NewKey3, NullWritable>{ @Override protected void map(LongWritable k1, Text v1, Context context)throws IOException, InterruptedException { String lineStr = v1.toString(); System.out.println("map the line: " + lineStr); String[] split = lineStr.split(","); NewKey3 newKey3 = new NewKey3(Long.parseLong(split[0]),Long.parseLong(split[1]),Long.parseLong(split[2])); context.write(newKey3, NullWritable.get()); } } class MyReducer3 extends Reducer<NewKey3, NullWritable, Text, NullWritable>{ protected void reduce(NewKey3 k2, Iterable<NullWritable> v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { System.out.println("reduce the key is: " + k2.toString()); context.write(new Text(k2.toString()), NullWritable.get()); } } // 核心就是将 hadoop map output的key自定义,里面写好比较写法 class NewKey3 implements WritableComparable<NewKey3>{ private long first; private long second; private long third; public NewKey3(){} public NewKey3(long first,long second,long third){ this.first = first; this.second = second; this.third = third; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (first ^ (first >>> 32)); result = prime * result + (int) (second ^ (second >>> 32)); result = prime * result + (int) (third ^ (third >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; NewKey3 other = (NewKey3) obj; if (first != other.first) return false; if (second != other.second) return false; if (third != other.third) return false; return true; } @Override public String toString() { return first + " " + second + " " + third ; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.first); out.writeLong(this.second); out.writeLong(this.third); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); this.third = in.readLong(); } @Override public int compareTo(NewKey3 other) { long result; result = this.first - other.first; if(result == 0){ result = this.second - other.second; if(result == 0){ result = this.third - other.third; } } return (int)result; } }
2 运行结果:
[root@master local]# hadoop fs -text /out/part-r-00000 Warning: $HADOOP_HOME is deprecated. 1 1 0 2 1 4 2 2 1 3 2 0 3 2 4 3 3 3