gougouzhang 2019-11-05
概要分析师将相似数据分组到一起并执行类似统计计算、索引生成或简单计数等后续的分析操作。
(1)数值概要 (2)倒排索引概要 (3)计数器计数等等。
包括最大值、最小值、平均数、方差和中位数等等。
注意:MapReduce的内容结果输出,如果父文件夹已经存在,会报文件已存在错误,每次重新输出文件,如果都手动删除,会比较麻烦,可以自己写一个删除文件的工具类。或者hadoop中有一个FileUtil.fullyDelete()方法可以删除文件;以下是自己写的删除文件夹的代码:
import java.io.File; /** * @Author bluesnail95 * @Date 2019/7/14 23:31 * @Description */ public class FileUtil { /** * 删除文件 * @param fileName 文件名称 */ public static void deleteFile(String fileName) { File file = new File(fileName); if(!file.exists()) { return; } if(file.isFile()) { file.delete(); }else if(file.isDirectory()) { File[] fileList = file.listFiles(); for (int i = 0; i < fileList.length; i++) { fileList[i].delete(); } file.delete(); } } }
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * @Author bluesnail95 * @Date 2019/7/14 9:57 * @Description */ public class MinMaxCountData implements Writable { //日期 private Date createDate; //用户标识 private String userId; private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public MinMaxCountData() { } public MinMaxCountData(Date createDate, String userId) { this.createDate = createDate; this.userId = userId; } public Date getCreateDate() { return createDate; } public void setCreateDate(Date createDate) { this.createDate = createDate; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(createDate.getTime()); dataOutput.writeBytes(userId); } public void readFields(DataInput dataInput) throws IOException { createDate = new Date(dataInput.readLong()); userId = dataInput.readLine(); } @Override public String toString() { return "MinMaxCountData{" + "createDate=" + createDate + ", userId='" + userId + '\'' + '}'; } }
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * @Author bluesnail95 * @Date 2019/7/14 9:36 * @Description */ public class MinMaxCountTuple implements Writable { //最小日期 private Date min = null; //最大日期 private Date max = null; //计数 private long count = 0; private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public Date getMin() { return min; } public void setMin(Date min) { this.min = min; } public Date getMax() { return max; } public void setMax(Date max) { this.max = max; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(min.getTime()); dataOutput.writeLong(max.getTime()); dataOutput.writeLong(count); } public void readFields(DataInput dataInput) throws IOException { min = new Date(dataInput.readLong()); max = new Date(dataInput.readLong()); count = dataInput.readLong(); } public String toString() { return frmt.format(min) + "\t" + frmt.format(max) + "\t" + count; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.File; import java.text.SimpleDateFormat; /** * @Author bluesnail95 * @Date 2019/7/14 10:02 * @Description */ public class MinMaxCountMain { public static class MinMaxCountMapper extends Mapper<Object,Text,Text,MinMaxCountTuple> { private Text userId = new Text(); private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple(); private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key,Text value,Context context){ try { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDateFormat(frmt); MinMaxCountData minMaxCountData = objectMapper.readValue(value.toString(), MinMaxCountData.class); minMaxCountTuple.setCount(1); minMaxCountTuple.setMin(minMaxCountData.getCreateDate()); minMaxCountTuple.setMax(minMaxCountData.getCreateDate()); userId.set(minMaxCountData.getUserId()); context.write(userId, minMaxCountTuple); } catch (Exception e) { e.printStackTrace(); } } } public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> { private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple(); public void reduce(Text key,Iterable<MinMaxCountTuple> values,Context context) { try { long sum = 0; for (MinMaxCountTuple value : values) { if(minMaxCountTuple.getMin() == null || value.getMin().compareTo(minMaxCountTuple.getMin()) < 0 ) { minMaxCountTuple.setMin(value.getMin()); } if(minMaxCountTuple.getMax() == null || value.getMax().compareTo(minMaxCountTuple.getMax()) > 0 ) { minMaxCountTuple.setMax(value.getMax()); } sum += value.getCount(); } minMaxCountTuple.setCount(sum); context.write(key, minMaxCountTuple); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf, "NumericalSummarization:MinMaxCount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MinMaxCountTuple.class); job.setJarByClass(MinMaxCountMain.class); job.setMapperClass(MinMaxCountMapper.class); job.setCombinerClass(MinMaxCountReducer.class); job.setReducerClass(MinMaxCountReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); File outputFile = new File(args[1]); if(outputFile.exists()){ outputFile.delete(); } FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * @Author bluesnail95 * @Date 2019/7/14 21:51 * @Description */ public class CountAverageData implements Writable { //日期 private Date creationDate; //文本 private String text; private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public CountAverageData() { } public CountAverageData(Date creationDate, String text) { this.creationDate = creationDate; this.text = text; } public Date getCreationDate() { return creationDate; } public void setCreationDate(Date creationDate) { this.creationDate = creationDate; } public String getText() { return text; } public void setText(String text) { this.text = text; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeBytes(frmt.format(creationDate)); dataOutput.writeBytes(text); } public void readFields(DataInput dataInput) throws IOException { try { System.out.println(dataInput); creationDate = frmt.parse(dataInput.toString()); text = dataInput.readLine(); } catch (ParseException e) { e.printStackTrace(); } } @Override public String toString() { return "{" + "creationDate=" + creationDate + ", text='" + text + '\'' + '}'; } }
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @Author bluesnail95 * @Date 2019/7/14 21:36 * @Description */ public class CountAverageTuple implements Writable { //计数 private long count; //平均值 private float average; public long getCount() { return count; } public void setCount(long count) { this.count = count; } public float getAverage() { return average; } public void setAverage(float average) { this.average = average; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(count); dataOutput.writeFloat(average); } public void readFields(DataInput dataInput) throws IOException { count = dataInput.readLong(); average = dataInput.readFloat(); } @Override public String toString() { return "{" + "count=" + count + ", average=" + average + '}'; } }
import file.FileUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; /** * @Author bluesnail95 * @Date 2019/7/14 21:40 * @Description */ public class CountAverageMain { public static class CountAverageMapper extends Mapper<Object, Text, IntWritable, CountAverageTuple> { private IntWritable outHour = new IntWritable(); private CountAverageTuple countAverageTuple = new CountAverageTuple(); private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key,Text value,Context context) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDateFormat(frmt); try { CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class); Calendar calendar = Calendar.getInstance(); Date creationDate = countAverageData.getCreationDate(); calendar.setTime(creationDate); int hour = calendar.get(Calendar.HOUR_OF_DAY); outHour.set(hour); countAverageTuple.setAverage(countAverageData.getText().length()); countAverageTuple.setCount(1); context.write(outHour, countAverageTuple); } catch (Exception e) { e.printStackTrace(); } } } public static class CountAverageReducer extends Reducer<IntWritable, CountAverageTuple,IntWritable, CountAverageTuple> { private CountAverageTuple result = new CountAverageTuple(); public void reduce(IntWritable key, Iterable<CountAverageTuple> values,Context context) { float sum = 0; long count = 0; for(CountAverageTuple countAverageTuple : values) { count += countAverageTuple.getCount(); sum += countAverageTuple.getCount() * countAverageTuple.getAverage(); } result.setAverage(sum / count); result.setCount(count); try { context.write(key, result); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { Job job = Job.getInstance(configuration, "CountAverage"); job.setJarByClass(CountAverageMain.class); job.setMapperClass(CountAverageMapper.class); job.setCombinerClass(CountAverageReducer.class); job.setReducerClass(CountAverageReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(CountAverageTuple.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.deleteFile(args[1]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true )? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @Author bluesnail95 * @Date 2019/7/16 6:33 * @Description */ public class MedianStdDevTuple implements Writable { private float median; private float stdDev; public float getMedian() { return median; } public void setMedian(float median) { this.median = median; } public float getStdDev() { return stdDev; } public void setStdDev(float stdDev) { this.stdDev = stdDev; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeFloat(median); dataOutput.writeFloat(stdDev); } public void readFields(DataInput dataInput) throws IOException { median = dataInput.readFloat(); stdDev = dataInput.readFloat(); } @Override public String toString() { return "{" + "median=" + median + ", stdDev=" + stdDev + '}'; } }
import file.FileUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.Date; /** * @Author bluesnail95 * @Date 2019/7/16 6:18 * @Description */ public class MedianStdDevMain { private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public static class MedianStdDevMapper extends Mapper<Object, Text,IntWritable, IntWritable> { private IntWritable outhour = new IntWritable(); private IntWritable outlength = new IntWritable(); public void map(Object key,Text value,Context context) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDateFormat(frmt); try { CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class); Date creationDate = countAverageData.getCreationDate(); Calendar calendar = Calendar.getInstance(); calendar.setTime(creationDate); int hour = calendar.get(Calendar.HOUR_OF_DAY); int length = countAverageData.getText().length(); outhour.set(hour); outlength.set(length); context.write(outhour, outlength); } catch (Exception e) { e.printStackTrace(); } } } public static class MadianStdDevReducer extends Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> { private ArrayList<Float> lengths = new ArrayList<Float>(); private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple(); public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) { int sum = 0; int count = 0; try { for (IntWritable value : values) { sum += value.get(); count++; lengths.add((float) value.get()); } //进行排序 Collections.sort(lengths); //求中位数 if(count == 1 || count % 2 == 0) { medianStdDevTuple.setMedian(lengths.get(count/2)); }else { medianStdDevTuple.setMedian((lengths.get(count / 2 - 1) + lengths.get(count / 2)) / 2.0f); } //求平均值 float mean = sum / count; float sumOfSquare = 0.0f; //求标准差 for(Float value: lengths) { sumOfSquare += (value - mean) * (value - mean); } if(count == 1) { medianStdDevTuple.setStdDev(0); }else{ medianStdDevTuple.setStdDev((float)Math.sqrt(sumOfSquare / (count - 1))); } context.write(key, medianStdDevTuple); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { Job job = Job.getInstance(configuration, "CountAverage"); job.setJarByClass(MedianStdDevMain.class); job.setMapperClass(MedianStdDevMapper.class); job.setReducerClass(MadianStdDevReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.deleteFile(args[1]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true )? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import file.FileUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; /** * @Author bluesnail95 * @Date 2019/7/16 21:28 * @Description */ public class MedianStdDevUpgradeMain { private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public static class MedianStdDevUpgradeMapper extends Mapper<Object, Text, IntWritable, SortedMapWritable> { private IntWritable outHour = new IntWritable(); private LongWritable one = new LongWritable(1); private IntWritable lengths = new IntWritable(); public void map(Object key,Text value,Context context) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDateFormat(frmt); try { CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class); Date creationDate = countAverageData.getCreationDate(); Calendar calendar = Calendar.getInstance(); calendar.setTime(creationDate); outHour.set(calendar.get(Calendar.HOUR_OF_DAY)); lengths.set(countAverageData.getText().length()); SortedMapWritable sortedMapWritable = new SortedMapWritable(); sortedMapWritable.put(lengths,one); context.write(outHour, sortedMapWritable); } catch (Exception e) { e.printStackTrace(); } } } public static class MedianStdDevUpgradeCombiner extends Reducer<IntWritable,SortedMapWritable,IntWritable,SortedMapWritable> { protected void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) { SortedMapWritable outValue = new SortedMapWritable(); try { for (SortedMapWritable sortedMapWritable : values) { Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet(); Iterator<Map.Entry<WritableComparable,Writable>> iterator = set.iterator(); while(iterator.hasNext()) { Map.Entry<WritableComparable,Writable> entry = iterator.next(); LongWritable count = (LongWritable) outValue.get(entry.getKey()); if(count != null) { count.set(count.get() + ((LongWritable)entry.getValue()).get()); outValue.put(entry.getKey(), count); }else{ outValue.put(entry.getKey(),new LongWritable(((LongWritable)entry.getValue()).get())); } } } context.write(key, outValue); } catch (Exception e) { e.printStackTrace(); } } } public static class MedianStdDevUpgradeReducer extends Reducer<IntWritable,SortedMapWritable,IntWritable,MedianStdDevTuple> { private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple(); private TreeMap<Integer, Long> lengthCounts = new TreeMap<Integer, Long>(); public void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) { float sum = 0; long total = 0; lengthCounts.clear(); medianStdDevTuple.setStdDev(0); medianStdDevTuple.setMedian(0); for(SortedMapWritable sortedMapWritable : values) { Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet(); Iterator<Map.Entry<WritableComparable,Writable>> iterator = set.iterator(); while (iterator.hasNext()) { Map.Entry<WritableComparable,Writable> writableEntry = iterator.next(); int length = ((IntWritable)writableEntry.getKey()).get(); long count = ((LongWritable)writableEntry.getValue()).get(); total += count; sum += count * length; Long sortedCount = lengthCounts.get(length); if(sortedCount == null) { lengthCounts.put(length, count); }else{ lengthCounts.put(length, count + sortedCount); } } } long medianIndex = total / 2; long previousCount = 0; long count = 0; long prevKey = 0; for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) { count = previousCount + entry.getValue(); if(previousCount <= medianIndex && medianIndex < count) { if(total % 2 == 0 && previousCount == medianIndex) { medianStdDevTuple.setMedian((entry.getKey() + prevKey) / 2.0f); }else{ medianStdDevTuple.setMedian(entry.getKey()); } break; } previousCount = count; prevKey = entry.getKey(); } float mean = sum / total; float sumOfSquares = 0.0f; for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) { sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean) * entry.getValue(); } if(total == 1) { medianStdDevTuple.setStdDev(0); }else{ medianStdDevTuple.setStdDev((float)Math.sqrt((sumOfSquares / (total - 1)))); } try { context.write(key, medianStdDevTuple); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { Job job = Job.getInstance(configuration, "MedianStdDevUpgrade"); job.setJarByClass(MedianStdDevUpgradeMain.class); job.setMapperClass(MedianStdDevUpgradeMapper.class); job.setCombinerClass(MedianStdDevUpgradeCombiner.class); job.setReducerClass(MedianStdDevUpgradeReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(SortedMapWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.deleteFile(args[1]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true )? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import java.io.Serializable; /** * @Author bluesnail95 * @Date 2019/7/18 23:22 * @Description */ public class ExtractorData implements Serializable { private String link; private String id; public String getLink() { return link; } public void setLink(String link) { this.link = link; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
import file.FileUtil; import numericalSummarization.MedianStdDevMain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Iterator; /** * @Author bluesnail95 * @Date 2019/7/18 23:18 * @Description */ public class ExtractorMain { public static class ExtractorMapper extends Mapper<Object, Text,Text,Text> { private Text link = new Text(); private Text id = new Text(); public void map(Object key,Text value,Context context) { ObjectMapper objectMapper = new ObjectMapper(); try { ExtractorData extractorData = objectMapper.readValue(value.toString(), ExtractorData.class); link.set(extractorData.getLink()); id.set(extractorData.getId()); context.write(link,id); } catch (Exception e) { e.printStackTrace(); } } } public static class ExtractorReducer extends Reducer<Text,Text,Text,Text> { private Text link = new Text(); private Text ids = new Text(); public void reduce(Text key, Iterable<Text> values,Context context) { StringBuilder buffer = new StringBuilder(""); for(Text value:values) { buffer.append(value.toString()); buffer.append(","); } ids.set(buffer.toString().substring(0, buffer.length() - 1)); link.set(key.toString()); try { context.write(link,ids); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { Job job = Job.getInstance(configuration, "ExtractorMain"); job.setJarByClass(ExtractorMain.class); job.setMapperClass(ExtractorMapper.class); job.setCombinerClass(ExtractorReducer.class); job.setReducerClass(ExtractorReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.deleteFile(args[1]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true )? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import file.FileUtil; import invertedIndex.ExtractorMain; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.util.Arrays; import java.util.HashSet; /** * @Author bluesnail95 * @Date 2019/7/19 6:31 * @Description */ public class CountNumUsersByStateMain { public static class CountNumUsersByStateMapper extends Mapper<Object, Text, NullWritable,NullWritable> { public static final String STATE_COUNTER_GROUP = "State"; public static final String UNKNOWN_COUNTER = "Unknown"; public static final String NULL_OR_EMPTY_COUNTER = "Null or Empty"; private String stateArray[] = {"BeiJing","ShangHai","ShenZhen","GuangZhou"}; private HashSet<String> stateSet = new HashSet<String>(Arrays.asList(stateArray)); public void map(Object key,Text value,Context context) { String state = value.toString(); if(state != null && StringUtils.isNoneBlank(state)) { if(stateSet.contains(state)) { context.getCounter(STATE_COUNTER_GROUP,state).increment(1); }else{ context.getCounter(STATE_COUNTER_GROUP,UNKNOWN_COUNTER).increment(1); } }else { context.getCounter(STATE_COUNTER_GROUP,NULL_OR_EMPTY_COUNTER).increment(1); } } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { Job job = Job.getInstance(configuration, "CountNumUsersByState"); job.setJarByClass(CountNumUsersByStateMain.class); job.setMapperClass(CountNumUsersByStateMapper.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.deleteFile(args[1]); FileOutputFormat.setOutputPath(job, new Path(args[1])); int code = job.waitForCompletion(true )? 0 : 1; if(code == 0) { for(Counter counter:job.getCounters().getGroup(CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) { System.out.println(counter.getDisplayName() + "\t" + counter.getValue()); } } FileSystem.get(configuration).delete(new Path(args[1]),true); System.exit(code); } catch (Exception e) { e.printStackTrace(); } } }
《MapReduce设计模式》