读书笔记3:Hadoop实战-(1)构建MapReduce模版

changjiang 2013-01-10

public class MapReduceDemo extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text>

{

public void map(Text key,Text value,OutputCollector<Text,Text> output,Reporter reporter)throws                        IOException

{

output.collect(value,key);

}

}

public static class Reduce extends MapReduceBase implements Reducer<Text,Text,Text,Text>

{

public void reduce(Text key,Iterator<Text> values,OutputCollector<Text,Text> output,,Reporter reporter)throws IOException

{

String csv = "";

while(values.hasNext())

{

if(csv.length()>0)

csv+=",";

csv+=values.next().toString();

}

output.collect(key,new Text(csv));

}

}

/**

 * 结构的核心  被称为driver

 * @param args

 * @return

 * @throws Exception

 */

public int run(String[] args)throws Exception

{

Configuration conf = getConf();

JobConf  job = new JobConf(conf,MapReduceDemo.class);

Path in = new Path(args[0]);

Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job,in);

FileOutputFormat.setOutputPath(job,out);

job.setJobName("MapReduceDemo");

job.setMapperClass(MapClass.class);

job.setReducerClass(Reduce.class);

job.setInputFormat(KeyValueTextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.set("key.value.separator.in.input.line",",");

JobClient.runJob(job);

return 0;

}

public static void main(String[]args)throws Exception

{

int res = ToolRunner.run(new Configuration(),new MapReduceDemo(),args)

System.exit(res);

}

}

执行程序:

bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo input /cite75_99.txt output

如果只想看mapper的输出,可以将reducer的 数目设置成0,如下命令:

bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo -D mapred.reduce.tasks=0  input /cite75_99.txt output

相关推荐

ganyouxianjava / 0评论 2012-05-31