巴山独钓 2014-09-20
为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。
编程环境:
输入数据:
A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A矩阵内容:
3 4 6
4 0 8
B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb
B矩阵内容:
2 3
3 0
4 1
实现代码:
一共三个类:
大家可根据个人习惯合并成一个类使用。
MMDriver.java
package dataguru.matrixmultiply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MMDriver {
public static void main(String[] args) throws Exception {
// set configuration
Configuration conf = new Configuration();
// create job
Job job = new Job(conf,"MatrixMultiply");
job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);
// specify Mapper & Reducer
job.setMapperClass(dataguru.matrixmultiply.MMMapper.class);
job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);
// specify output types of mapper and reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// specify input and output DIRECTORIES
Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA");
Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB");
Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC");
FileInputFormat.addInputPath(job, inPathA);
FileInputFormat.addInputPath(job, inPathB);
FileOutputFormat.setOutputPath(job,outPath);
// delete output directory
try{
FileSystem hdfs = outPath.getFileSystem(conf);
if(hdfs.exists(outPath))
hdfs.delete(outPath);
hdfs.close();
} catch (Exception e){
e.printStackTrace();
return ;
}
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MMMapper.java
package dataguru.matrixmultiply;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper<Object, Text, Text, Text> {
private String tag; //current matrix
private int crow = 2;// 矩阵A的行数
private int ccol = 2;// 矩阵B的列数
private static int arow = 0; //current arow
private static int brow = 0; //current brow
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO get inputpath of input data, set to tag
FileSplit fs = (FileSplit)context.getInputSplit();
tag = fs.getPath().getParent().getName();
}
/**
* input data include two matrix files
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer str = new StringTokenizer(value.toString());
if ("matrixA".equals(tag)) { //left matrix,output key:x,y
int col = 0;
while (str.hasMoreTokens()) {
String item = str.nextToken(); //current x,y = line,col
for (int i = 0; i < ccol; i++) {
Text outkey = new Text(arow+","+i);
Text outvalue = new Text("a,"+col+","+item);
context.write(outkey, outvalue);
System.out.println(outkey+" | "+outvalue);
}
col++;
}
arow++;
}else if ("matrixB".equals(tag)) {
int col = 0;
while (str.hasMoreTokens()) {
String item = str.nextToken(); //current x,y = line,col
for (int i = 0; i < crow; i++) {
Text outkey = new Text(i+","+col);
Text outvalue = new Text("b,"+brow+","+item);
context.write(outkey, outvalue);
System.out.println(outkey+" | "+outvalue);
}
col++;
}
brow++;
}
}
}
MMReducer.java
package dataguru.matrixmultiply;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class MMReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String,String> matrixa = new HashMap<String,String>();
Map<String,String> matrixb = new HashMap<String,String>();
for (Text val : values) { //values example : b,0,2 or a,0,4
StringTokenizer str = new StringTokenizer(val.toString(),",");
String sourceMatrix = str.nextToken();
if ("a".equals(sourceMatrix)) {
matrixa.put(str.nextToken(), str.nextToken()); //(0,4)
}
if ("b".equals(sourceMatrix)) {
matrixb.put(str.nextToken(), str.nextToken()); //(0,2)
}
}
int result = 0;
Iterator<String> iter = matrixa.keySet().iterator();
while (iter.hasNext()) {
String mapkey = iter.next();
result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey));
}
context.write(key, new Text(String.valueOf(result)));
}
}
通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容: