changjiang 2013-01-10
public class MapReduceDemo extends Configured implements Tool {
public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text>
{
public void map(Text key,Text value,OutputCollector<Text,Text> output,Reporter reporter)throws IOException
{
output.collect(value,key);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterator<Text> values,OutputCollector<Text,Text> output,,Reporter reporter)throws IOException
{
String csv = "";
while(values.hasNext())
{
if(csv.length()>0)
csv+=",";
csv+=values.next().toString();
}
output.collect(key,new Text(csv));
}
}
/**
* 结构的核心 被称为driver
* @param args
* @return
* @throws Exception
*/
public int run(String[] args)throws Exception
{
Configuration conf = getConf();
JobConf job = new JobConf(conf,MapReduceDemo.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);
job.setJobName("MapReduceDemo");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line",",");
JobClient.runJob(job);
return 0;
}
public static void main(String[]args)throws Exception
{
int res = ToolRunner.run(new Configuration(),new MapReduceDemo(),args)
System.exit(res);
}
}
执行程序:
bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo input /cite75_99.txt output
如果只想看mapper的输出,可以将reducer的 数目设置成0,如下命令:
bin/hadoop jar playground/MapReduceDemo.jar MapReduceDemo -D mapred.reduce.tasks=0 input /cite75_99.txt output