ErixHao 2020-05-16
(1)利用TreeSet排序,该方式利用小顶堆和集合重复原理的方式 , 每过来一个数据 , 跟堆顶数据进行比较 , 如果比最小的大 , 则将过来的数据替换堆顶元素 , 否则直接跳过数据 . 以此对数据进行排序 .
import java.io.File; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.alibaba.fastjson.JSON; //求电影时长最高10部电影 public class TopN { public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context) throws IOException, InterruptedException { try { MovieBean movieBean = JSON.parseObject(value.toString(), MovieBean.class); String movie = movieBean.getMovie(); context.write(new Text(movie), movieBean); } catch (Exception e) { // TODO: handle exception } } } public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{ @Override protected void reduce(Text movieId, Iterable<MovieBean> movieBeans, Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context) throws IOException, InterruptedException { TreeSet<MovieBean> tree = new TreeSet<>(new Comparator<MovieBean>() { @Override public int compare(MovieBean o1, MovieBean o2) { return o1.time-o2.time; } }); for (MovieBean movieBean : movieBeans) { MovieBean movieBean2 = new MovieBean(); movieBean2.set(movieBean); if (tree.size() <= 2) { tree.add(movieBean2); } else { MovieBean first = tree.first(); if(first.getRate() < movieBean2.getRate()) { //做替换 tree.remove(first); tree.add(movieBean2); } } } for (MovieBean movieBean : tree) { context.write(movieBean, NullWritable.get()); } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "topN2"); //设置map和reduce,以及提交的jar job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(TopN2.class); //设置输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MovieBean.class); job.setOutputKeyClass(MovieBean.class); job.setOutputValueClass(NullWritable.class); //输入和输出目录 FileInputFormat.addInputPath(job, new Path("E:/data/rating.json")); FileOutputFormat.setOutputPath(job, new Path("E:\\data\\out\\topN2")); //判断文件是否存在 File file = new File("E:\\data\\out\\topN2"); if(file.exists()){ FileUtils.deleteDirectory(file); } //提交任务 boolean completion = job.waitForCompletion(true); System.out.println(completion?"你很优秀!!!":"滚去调bug!!"); } }
通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容: