Harper 2012-09-24
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容 if (line == null || line.equals("")) return; String[] words = line.split(","); if (words == null || words.length < 8) return; String appid = words[1]; String apiName = words[2]; LongWritable recbytes = new LongWritable(Long.parseLong(words[7])); Text record = new Text(); record.set(new StringBuffer("flow::").append(appid) .append("::").append(apiName).toString()); reporter.progress(); output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。 record.clear(); record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString()); output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示 } }
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable> { public int getPartition(Text key, LongWritable value, int numPartitions) { if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce if (key.toString().startsWith("flow::")) return 0; else return 1; else return 0; } public void configure(JobConf job){} }
LogAnalysiser:: CombinerClass
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase implements Reducer<Text, LongWritable,Text, LongWritable> { public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException { Text newkey = new Text(); newkey.set(key.toString().substring(key.toString().indexOf("::")+2)); LongWritable result = new LongWritable(); long tmp = 0; int counter = 0; while(values.hasNext())//累加同一个key的统计结果 { tmp = tmp + values.next().get(); counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下 if (counter == 1000) { counter = 0; reporter.progress(); } } result.set(tmp); output.collect(newkey, result);//输出最后的汇总结果 } }
public static void main(String[] args) { try { run(args); } catch (Exception e) { e.printStackTrace(); } } public static void run(String[] args) throws Exception { if (args == null || args.length <2) { System.out.println("need inputpath and outputpath"); return; } String inputpath = args[0]; String outputpath = args[1]; String shortin = args[0]; String shortout = args[1]; if (shortin.indexOf(File.separator) >= 0) shortin = shortin.substring(shortin.lastIndexOf(File.separator)); if (shortout.indexOf(File.separator) >= 0) shortout = shortout.substring(shortout.lastIndexOf(File.separator)); SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd"); shortout = new StringBuffer(shortout).append("-") .append(formater.format(new Date())).toString(); if (!shortin.startsWith("/")) shortin = "/" + shortin; if (!shortout.startsWith("/")) shortout = "/" + shortout; shortin = "/user/root" + shortin; shortout = "/user/root" + shortout; File inputdir = new File(inputpath); File outputdir = new File(outputpath); if (!inputdir.exists() || !inputdir.isDirectory()) { System.out.println("inputpath not exist or isn't dir!"); return; } if (!outputdir.exists()) { new File(outputpath).mkdirs(); } JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config FileSystem fileSys = FileSystem.get(conf); fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中 conf.setJobName("analysisjob"); conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查 conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查 conf.setMapperClass(MapClass.class); conf.setCombinerClass(CombinerClass.class); conf.setReducerClass(ReduceClass.class); conf.setPartitionerClass(PartitionerClass.class); conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计 FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径 FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径 Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(conf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); //删除输入和输出的临时文件 fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath)); fileSys.delete(new Path(shortin),true); fileSys.delete(new Path(shortout),true); }
以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。
public class ExampleDriver { public static void main(String argv[]){ ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log ."); pgd.driver(argv); } catch(Throwable e){ e.printStackTrace(); } } }
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030/jobtracker.jsp
文件复制数为1,blocksize 5M
Slave数 | 处理记录数(万条) | 执行时间(秒) |
2 | 95 | 38 |
2 | 950 | 337 |
4 | 95 | 24 |
4 | 950 | 178 |
6 | 95 | 21 |
6 | 950 | 114 |
Blocksize 5M
Slave数 | 处理记录数(万条) | 执行时间(秒) |
2(文件复制数为1) | 950 | 337 |
2(文件复制数为3) | 950 | 339 |
6(文件复制数为1) | 950 | 114 |
6(文件复制数为3) | 950 | 117 |
Slave数 | 处理记录数(万条) | 执行时间(秒) |
6(blocksize 5M) | 95 | 21 |
6(blocksize 77M) | 95 | 26 |
4(blocksize 5M) | 950 | 178 |
4(blocksize 50M) | 950 | 54 |
6(blocksize 5M) | 950 | 114 |
6(blocksize 50M) | 950 | 44 |
6(blocksize 77M) | 950 | 74 |