香帅 2020-04-15
一、IP去重示例
数据文件:
192.168.10.111 192.168.10.111 10.32.100.111 192.168.21.111 192.168.10.112 192.168.10.111 192.168.11.111 192.168.12.112 192.168.11.111
package com.blb.ip; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class IPMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 每一行数据就是一个ip context.write(value, NullWritable.get()); } }
package com.blb.ip; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class IPReducer extends Reducer<Text, NullWritable, Text, NullWritable> { // key:ip // values:null,null,null... @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
package com.blb.ip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class IPDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(IPDriver.class); job.setMapperClass(IPMapper.class); job.setReducerClass(IPReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.10.131:9000/txt/ip.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.10.131:9000/result/ip")); job.waitForCompletion(true); } }
二、axScore示例
数据文件:
张三 684
李四 312
王五 721
赵六 548
田七 470
王八 668
陈九 340
package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxScoreReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 在MapReduce中,为了节省内存,减少对象的创建和销毁,采取了地址复用的机制
// 即在迭代过程中,对象只创建一次
IntWritable max = new IntWritable(0);
// IntWritable max = new IntWritable(0);
// IntWritable value = new IntWritable();
// values:684 312 721 548...
// value.set(684);
// value.get() > max.get() --- 684 > 0 --- true
// max = value; 此时max和value都是引用类型,给的是地址
// 也就意味着max和value的地址一样
// value.set(312); 此时max的值也是312
// value.get() > max.get() --- 312 > 312 --- false
// 继续迭代,max和value的地址始终一致
// max的值就是迭代的最后一个值
for (IntWritable value : values) {
if (value.get() > max.get())
// max = value;
max.set(value.get());
}
context.write(key, max);
}
}
package com.blb.maxscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxScoreDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MaxScoreDriver.class);
job.setMapperClass(MaxScoreMapper.class);
job.setReducerClass(MaxScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2.txt"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/maxscore2"));
job.waitForCompletion(true);
}
}
三、TotalScoreMapper示例
数据文件:
张三 78
李四 66
王五 73
张三 88
田七 75
张三 65
陈九 90
李四 67
王五 78
package com.blb.maxscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
package com.blb.totalscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TotalScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拆分名字和分数
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}
package com.blb.totalscore;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TotalScoreReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
package com.blb.totalscore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TotalScoreDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TotalScoreDriver.class);
job.setMapperClass(TotalScoreMapper.class);
job.setReducerClass(TotalScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2/"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/totalscore"));
job.waitForCompletion(true);
}
}
通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容: