ErixHao 2020-05-16
(1)利用TreeSet排序,该方式利用小顶堆和集合重复原理的方式 , 每过来一个数据 , 跟堆顶数据进行比较 , 如果比最小的大 , 则将过来的数据替换堆顶元素 , 否则直接跳过数据 . 以此对数据进行排序 .
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.alibaba.fastjson.JSON;
//求电影时长最高10部电影
public class TopN {
public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
throws IOException, InterruptedException {
try {
MovieBean movieBean = JSON.parseObject(value.toString(), MovieBean.class);
String movie = movieBean.getMovie();
context.write(new Text(movie), movieBean);
} catch (Exception e) {
// TODO: handle exception
}
}
}
public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
@Override
protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
TreeSet<MovieBean> tree = new TreeSet<>(new Comparator<MovieBean>() {
@Override
public int compare(MovieBean o1, MovieBean o2) {
return o1.time-o2.time;
}
});
for (MovieBean movieBean : movieBeans) {
MovieBean movieBean2 = new MovieBean();
movieBean2.set(movieBean);
if (tree.size() <= 2) {
tree.add(movieBean2);
} else {
MovieBean first = tree.first();
if(first.getRate() < movieBean2.getRate()) {
//做替换
tree.remove(first);
tree.add(movieBean2);
}
}
}
for (MovieBean movieBean : tree) {
context.write(movieBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "topN2");
//设置map和reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(TopN2.class);
//设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieBean.class);
job.setOutputKeyClass(MovieBean.class);
job.setOutputValueClass(NullWritable.class);
//输入和输出目录
FileInputFormat.addInputPath(job, new Path("E:/data/rating.json"));
FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\topN2"));
//判断文件是否存在
File file = new File("E:\\data\\out\\topN2");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
}
} 通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容: