MapReduce设计模式之概要设计模式

gougouzhang 2019-11-05

什么是概要设计模式

概要分析师将相似数据分组到一起并执行类似统计计算、索引生成或简单计数等后续的分析操作。

概要设计模式有哪些分类?

(1)数值概要 (2)倒排索引概要 (3)计数器计数等等。

数值概要

包括最大值、最小值、平均数、方差和中位数等等。

注意:MapReduce的内容结果输出,如果父文件夹已经存在,会报文件已存在错误,每次重新输出文件,如果都手动删除,会比较麻烦,可以自己写一个删除文件的工具类。或者hadoop中有一个FileUtil.fullyDelete()方法可以删除文件;以下是自己写的删除文件夹的代码:

import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 23:31
 * @Description
 */
public class FileUtil {

    /**
     * 删除文件
     * @param fileName 文件名称
     */
    public static void deleteFile(String fileName) {
        File file = new File(fileName);
        if(!file.exists()) {
            return;
        }
        if(file.isFile()) {
            file.delete();
        }else if(file.isDirectory()) {
            File[] fileList = file.listFiles();
            for (int i = 0; i < fileList.length; i++) {
                fileList[i].delete();
            }
            file.delete();
        }
    }
}

最大值/最小值/计数

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 9:57
 * @Description
 */
public class MinMaxCountData implements Writable {

    //日期
    private Date createDate;
    //用户标识
    private String userId;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public MinMaxCountData() {
    }

    public MinMaxCountData(Date createDate, String userId) {
        this.createDate = createDate;
        this.userId = userId;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(createDate.getTime());
        dataOutput.writeBytes(userId);
    }

    public void readFields(DataInput dataInput) throws IOException {
        createDate = new Date(dataInput.readLong());
        userId = dataInput.readLine();
    }

    @Override
    public String toString() {
        return "MinMaxCountData{" +
                "createDate=" + createDate +
                ", userId='" + userId + '\'' +
                '}';
    }
}
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 9:36
 * @Description
 */
public class MinMaxCountTuple implements Writable {
    //最小日期
    private Date min = null;
    //最大日期
    private Date max = null;
    //计数
    private long count = 0;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public Date getMin() {
        return min;
    }

    public void setMin(Date min) {
        this.min = min;
    }

    public Date getMax() {
        return max;
    }

    public void setMax(Date max) {
        this.max = max;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(min.getTime());
        dataOutput.writeLong(max.getTime());
        dataOutput.writeLong(count);
    }

    public void readFields(DataInput dataInput) throws IOException {
        min = new Date(dataInput.readLong());
        max = new Date(dataInput.readLong());
        count  = dataInput.readLong();
    }

    public String toString() {
        return frmt.format(min) + "\t" + frmt.format(max) + "\t" + count;
    }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.File;
import java.text.SimpleDateFormat;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 10:02
 * @Description
 */
public class MinMaxCountMain {

    public static class MinMaxCountMapper extends Mapper<Object,Text,Text,MinMaxCountTuple> {

        private Text userId = new Text();
        private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key,Text value,Context context){
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.setDateFormat(frmt);
                MinMaxCountData minMaxCountData = objectMapper.readValue(value.toString(), MinMaxCountData.class);
                minMaxCountTuple.setCount(1);
                minMaxCountTuple.setMin(minMaxCountData.getCreateDate());
                minMaxCountTuple.setMax(minMaxCountData.getCreateDate());
                userId.set(minMaxCountData.getUserId());
                context.write(userId, minMaxCountTuple);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
        private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple();

        public void reduce(Text key,Iterable<MinMaxCountTuple> values,Context context) {
            try {
                long sum = 0;
                for (MinMaxCountTuple value : values) {
                    if(minMaxCountTuple.getMin() == null || value.getMin().compareTo(minMaxCountTuple.getMin()) < 0 ) {
                        minMaxCountTuple.setMin(value.getMin());
                    }
                    if(minMaxCountTuple.getMax() == null || value.getMax().compareTo(minMaxCountTuple.getMax()) > 0 ) {
                        minMaxCountTuple.setMax(value.getMax());
                    }
                    sum += value.getCount();
                }
                minMaxCountTuple.setCount(sum);
                context.write(key, minMaxCountTuple);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            Job job = Job.getInstance(conf, "NumericalSummarization:MinMaxCount");
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MinMaxCountTuple.class);
            job.setJarByClass(MinMaxCountMain.class);
            job.setMapperClass(MinMaxCountMapper.class);
            job.setCombinerClass(MinMaxCountReducer.class);
            job.setReducerClass(MinMaxCountReducer.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            File outputFile = new File(args[1]);
            if(outputFile.exists()){
                outputFile.delete();
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

平均值

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:51
 * @Description
 */
public class CountAverageData implements Writable {

    //日期
    private Date creationDate;

    //文本
    private String text;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public CountAverageData() {
    }

    public CountAverageData(Date creationDate, String text) {
        this.creationDate = creationDate;
        this.text = text;
    }

    public Date getCreationDate() {
        return creationDate;
    }

    public void setCreationDate(Date creationDate) {
        this.creationDate = creationDate;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeBytes(frmt.format(creationDate));
        dataOutput.writeBytes(text);
    }

    public void readFields(DataInput dataInput) throws IOException {
        try {
            System.out.println(dataInput);
            creationDate = frmt.parse(dataInput.toString());
            text = dataInput.readLine();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "{" +
                "creationDate=" + creationDate +
                ", text='" + text + '\'' +
                '}';
    }
}
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:36
 * @Description
 */
public class CountAverageTuple implements Writable {

    //计数
    private long count;

    //平均值
    private float average;

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public float getAverage() {
        return average;
    }

    public void setAverage(float average) {
        this.average = average;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(count);
        dataOutput.writeFloat(average);
    }

    public void readFields(DataInput dataInput) throws IOException {
        count = dataInput.readLong();
        average = dataInput.readFloat();
    }

    @Override
    public String toString() {
        return "{" +
                "count=" + count +
                ", average=" + average +
                '}';
    }
}
import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:40
 * @Description
 */
public class CountAverageMain {

    public static class CountAverageMapper extends Mapper<Object, Text, IntWritable, CountAverageTuple> {

        private IntWritable outHour = new IntWritable();
        private CountAverageTuple countAverageTuple = new CountAverageTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key,Text value,Context context) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {
                CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Calendar calendar = Calendar.getInstance();
                Date creationDate = countAverageData.getCreationDate();
                calendar.setTime(creationDate);
                int hour = calendar.get(Calendar.HOUR_OF_DAY);
                outHour.set(hour);
                countAverageTuple.setAverage(countAverageData.getText().length());
                countAverageTuple.setCount(1);
                context.write(outHour, countAverageTuple);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class CountAverageReducer extends Reducer<IntWritable, CountAverageTuple,IntWritable, CountAverageTuple> {

        private CountAverageTuple result = new CountAverageTuple();

        public void reduce(IntWritable key, Iterable<CountAverageTuple> values,Context context) {
            float sum = 0;
            long count = 0;
            for(CountAverageTuple countAverageTuple : values) {
                count += countAverageTuple.getCount();
                sum += countAverageTuple.getCount() * countAverageTuple.getAverage();
            }
            result.setAverage(sum / count);
            result.setCount(count);
            try {
                context.write(key, result);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {

        Configuration configuration = new Configuration();
        try {
            Job job = Job.getInstance(configuration, "CountAverage");
            job.setJarByClass(CountAverageMain.class);
            job.setMapperClass(CountAverageMapper.class);
            job.setCombinerClass(CountAverageReducer.class);
            job.setReducerClass(CountAverageReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(CountAverageTuple.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true )? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

中位数和方差

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 6:33
 * @Description
 */
public class MedianStdDevTuple implements Writable {

    private float median;

    private float stdDev;

    public float getMedian() {
        return median;
    }

    public void setMedian(float median) {
        this.median = median;
    }

    public float getStdDev() {
        return stdDev;
    }

    public void setStdDev(float stdDev) {
        this.stdDev = stdDev;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeFloat(median);
        dataOutput.writeFloat(stdDev);
    }

    public void readFields(DataInput dataInput) throws IOException {
        median = dataInput.readFloat();
        stdDev = dataInput.readFloat();
    }

    @Override
    public String toString() {
        return "{" +
                "median=" + median +
                ", stdDev=" + stdDev +
                '}';
    }
}
import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 6:18
 * @Description
 */
public class MedianStdDevMain {

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public static class MedianStdDevMapper extends Mapper<Object, Text,IntWritable, IntWritable> {

        private IntWritable outhour = new IntWritable();
        private IntWritable outlength = new IntWritable();

        public void map(Object key,Text value,Context context) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {
                CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Date creationDate = countAverageData.getCreationDate();
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(creationDate);
                int hour = calendar.get(Calendar.HOUR_OF_DAY);
                int length = countAverageData.getText().length();
                outhour.set(hour);
                outlength.set(length);
                context.write(outhour, outlength);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class MadianStdDevReducer extends Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> {

        private ArrayList<Float> lengths = new ArrayList<Float>();
        private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple();

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
            int sum = 0;
            int count = 0;
            try {
                for (IntWritable value : values) {
                    sum += value.get();
                    count++;
                    lengths.add((float) value.get());
                }

                //进行排序
                Collections.sort(lengths);
                //求中位数
                if(count == 1 || count % 2 == 0) {
                    medianStdDevTuple.setMedian(lengths.get(count/2));
                }else {
                    medianStdDevTuple.setMedian((lengths.get(count / 2 - 1) + lengths.get(count / 2)) / 2.0f);
                }
                //求平均值
                float mean = sum / count;
                float sumOfSquare = 0.0f;
                //求标准差
                for(Float value: lengths) {
                    sumOfSquare += (value - mean) * (value - mean);
                }
                if(count == 1) {
                    medianStdDevTuple.setStdDev(0);
                }else{
                    medianStdDevTuple.setStdDev((float)Math.sqrt(sumOfSquare / (count - 1)));
                }

                context.write(key, medianStdDevTuple);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            Job job = Job.getInstance(configuration, "CountAverage");
            job.setJarByClass(MedianStdDevMain.class);
            job.setMapperClass(MedianStdDevMapper.class);
            job.setReducerClass(MadianStdDevReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true )? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

中位数方差升级版

import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;


import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 21:28
 * @Description
 */
public class MedianStdDevUpgradeMain {

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public static class MedianStdDevUpgradeMapper extends Mapper<Object, Text, IntWritable, SortedMapWritable> {
        private IntWritable outHour = new IntWritable();
        private LongWritable one = new LongWritable(1);
        private IntWritable lengths = new IntWritable();

        public void map(Object key,Text value,Context context) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {
                CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Date creationDate = countAverageData.getCreationDate();
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(creationDate);
                outHour.set(calendar.get(Calendar.HOUR_OF_DAY));
                lengths.set(countAverageData.getText().length());
                SortedMapWritable sortedMapWritable = new SortedMapWritable();
                sortedMapWritable.put(lengths,one);
                context.write(outHour, sortedMapWritable);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class MedianStdDevUpgradeCombiner extends Reducer<IntWritable,SortedMapWritable,IntWritable,SortedMapWritable> {

        protected void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) {
            SortedMapWritable outValue = new SortedMapWritable();

            try {
                for (SortedMapWritable sortedMapWritable : values) {
                    Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet();
                    Iterator<Map.Entry<WritableComparable,Writable>> iterator = set.iterator();
                    while(iterator.hasNext()) {
                        Map.Entry<WritableComparable,Writable> entry = iterator.next();

                       LongWritable count = (LongWritable) outValue.get(entry.getKey());
                       if(count != null) {
                            count.set(count.get() + ((LongWritable)entry.getValue()).get());
                            outValue.put(entry.getKey(), count);
                       }else{
                           outValue.put(entry.getKey(),new LongWritable(((LongWritable)entry.getValue()).get()));
                       }

                    }
                }
                context.write(key, outValue);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class MedianStdDevUpgradeReducer extends Reducer<IntWritable,SortedMapWritable,IntWritable,MedianStdDevTuple> {
        private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple();
        private TreeMap<Integer, Long> lengthCounts = new TreeMap<Integer, Long>();

        public void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) {
            float sum = 0;
            long total = 0;
           lengthCounts.clear();
            medianStdDevTuple.setStdDev(0);
            medianStdDevTuple.setMedian(0);

            for(SortedMapWritable sortedMapWritable : values) {
                Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet();
                Iterator<Map.Entry<WritableComparable,Writable>> iterator =  set.iterator();
               while (iterator.hasNext()) {
                   Map.Entry<WritableComparable,Writable> writableEntry = iterator.next();
                   int length = ((IntWritable)writableEntry.getKey()).get();
                   long count = ((LongWritable)writableEntry.getValue()).get();

                   total += count;
                   sum += count * length;
                   Long sortedCount = lengthCounts.get(length);
                   if(sortedCount == null) {
                       lengthCounts.put(length, count);
                   }else{
                       lengthCounts.put(length, count + sortedCount);
                   }
               }
            }

            long medianIndex = total / 2;
            long previousCount = 0;
            long count = 0;
            long prevKey = 0;
            for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) {
                count = previousCount + entry.getValue();
                if(previousCount <= medianIndex && medianIndex < count) {
                    if(total % 2 == 0 && previousCount == medianIndex) {
                        medianStdDevTuple.setMedian((entry.getKey() + prevKey) / 2.0f);
                    }else{
                        medianStdDevTuple.setMedian(entry.getKey());
                    }
                    break;
                }
                previousCount = count;
                prevKey = entry.getKey();
            }

            float mean = sum / total;
            float sumOfSquares = 0.0f;
            for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) {
                sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean) * entry.getValue();
            }
            if(total == 1) {
                medianStdDevTuple.setStdDev(0);
            }else{
                medianStdDevTuple.setStdDev((float)Math.sqrt((sumOfSquares / (total - 1))));
            }
            try {
                context.write(key, medianStdDevTuple);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            Job job = Job.getInstance(configuration, "MedianStdDevUpgrade");
            job.setJarByClass(MedianStdDevUpgradeMain.class);
            job.setMapperClass(MedianStdDevUpgradeMapper.class);
            job.setCombinerClass(MedianStdDevUpgradeCombiner.class);
            job.setReducerClass(MedianStdDevUpgradeReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(SortedMapWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true )? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

倒排索引概要

import java.io.Serializable;

/**
 * @Author bluesnail95
 * @Date 2019/7/18 23:22
 * @Description
 */
public class ExtractorData implements Serializable {

    private String link;

    private String id;

    public String getLink() {
        return link;
    }

    public void setLink(String link) {
        this.link = link;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}
import file.FileUtil;
import numericalSummarization.MedianStdDevMain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Iterator;

/**
 * @Author bluesnail95
 * @Date 2019/7/18 23:18
 * @Description
 */
public class ExtractorMain  {

    public static class ExtractorMapper extends Mapper<Object, Text,Text,Text> {

        private Text link = new Text();
        private Text id = new Text();

        public void map(Object key,Text value,Context context) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                ExtractorData extractorData = objectMapper.readValue(value.toString(), ExtractorData.class);
                link.set(extractorData.getLink());
                id.set(extractorData.getId());
                context.write(link,id);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

    public static class ExtractorReducer extends Reducer<Text,Text,Text,Text> {
        private Text link = new Text();
        private Text ids = new Text();

        public void reduce(Text key, Iterable<Text> values,Context context) {
            StringBuilder buffer = new StringBuilder("");
            for(Text value:values) {
                buffer.append(value.toString());
                buffer.append(",");
            }
            ids.set(buffer.toString().substring(0, buffer.length() - 1));
            link.set(key.toString());
            try {
                context.write(link,ids);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            Job job = Job.getInstance(configuration, "ExtractorMain");
            job.setJarByClass(ExtractorMain.class);
            job.setMapperClass(ExtractorMapper.class);
            job.setCombinerClass(ExtractorReducer.class);
            job.setReducerClass(ExtractorReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true )? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapReduce设计模式之概要设计模式

MapReduce设计模式之概要设计模式

计数器技术

import file.FileUtil;
import invertedIndex.ExtractorMain;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.util.Arrays;
import java.util.HashSet;

/**
 * @Author bluesnail95
 * @Date 2019/7/19 6:31
 * @Description
 */
public class CountNumUsersByStateMain {

    public static class CountNumUsersByStateMapper extends Mapper<Object, Text, NullWritable,NullWritable> {
        public static final String STATE_COUNTER_GROUP = "State";
        public static final String UNKNOWN_COUNTER = "Unknown";
        public static final String NULL_OR_EMPTY_COUNTER = "Null or Empty";

        private String stateArray[] = {"BeiJing","ShangHai","ShenZhen","GuangZhou"};

        private HashSet<String> stateSet = new HashSet<String>(Arrays.asList(stateArray));

        public void map(Object key,Text value,Context context) {
            String state = value.toString();
            if(state != null && StringUtils.isNoneBlank(state)) {
                if(stateSet.contains(state)) {
                    context.getCounter(STATE_COUNTER_GROUP,state).increment(1);
                }else{
                    context.getCounter(STATE_COUNTER_GROUP,UNKNOWN_COUNTER).increment(1);
                }
            }else {
                context.getCounter(STATE_COUNTER_GROUP,NULL_OR_EMPTY_COUNTER).increment(1);
            }
        }
    }

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            Job job = Job.getInstance(configuration, "CountNumUsersByState");
            job.setJarByClass(CountNumUsersByStateMain.class);
            job.setMapperClass(CountNumUsersByStateMapper.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(NullWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            int code = job.waitForCompletion(true )? 0 : 1;
            if(code == 0) {
                for(Counter counter:job.getCounters().getGroup(CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) {
                    System.out.println(counter.getDisplayName() + "\t" + counter.getValue());
                }
            }
            FileSystem.get(configuration).delete(new Path(args[1]),true);
            System.exit(code);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapReduce设计模式之概要设计模式

参考资料

《MapReduce设计模式》

相关推荐