briupacmer 2019-11-19
5V特征:
GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBase
心跳是每3秒一次,
心跳返回结果带有NameNode给该DataNode的命令如删除块,
复制块等
如果超过10分钟没有收到某个DataNode 的心跳,则认为该
节点不可用
DataNode启动后向NameNode注册,
通过后,周期性(1小时)的向NameNode上报所有的块信息
当DataNode读取block的时候,重新计算checksum,和创建
时的对比
DataNode 在其文件创建后三周验证其checksum
NameNode
DataNode
NodeManager
ResourceManager
创建fsimage文件,存储fsimage信息
创建edits文件
加载fsimage和edits文件
生成新的fsimage和edits文件
等待DataNode注册与发送Block Report
向NameNode注册、发送Block Report
namenode启动时会进入安全模式,此时只可读不可写
NameNode 启动过程
在安全模式下,文件系统不允许修改
目的,是在系统启动时检查各个datanode数据的有效性
进入安全模式的三种方式
$ bin/hdfs dfsadmin -safemode enter
$ bin/hdfs dfsadmin -safemode leave
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
优点
缺点:
package com.ct.test; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; public class TestDemo { FileSystem fs = null; // public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { // //// FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"), //// new Configuration(), //// "chen"); //// //// boolean success = fs.mkdirs(new Path("/test")); //// //// System.out.println(success); //// test.setUp(); //// test.testMkdir(); //// test.testDelete(); // // // // // } @Before //获取文件对象 public void setUp() { Configuration conf = new Configuration(); conf.set("dfs.replication", "7"); try { fs = FileSystem.get(new URI("hdfs://centos01:8020"), conf, "chen"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //创建文件夹 @Test public void testMkdir() throws IllegalArgumentException, IOException { boolean success = fs.mkdirs(new Path("/result")); System.out.println(success); } //删除文件夹 public void testDelete() throws IllegalArgumentException, IOException { fs.delete(new Path("/result"), true); } @Test //上传文件 public void testUpload() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/input/testUpload.log")); FileInputStream input = new FileInputStream("F:/test.txt"); IOUtils.copy(input, out, 1024); } @Test public void testDownload() throws IllegalArgumentException, IOException { FSDataInputStream input = fs.open(new Path("/input/testUpload.log")); FileOutputStream out = new FileOutputStream("F:/test-copy.txt"); IOUtils.copy(input, out, 1024); } @Test public void testList() throws FileNotFoundException, IllegalArgumentException, IOException { RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true); while(ri.hasNext()) { LocatedFileStatus next = ri.next(); next.getBlockLocations(); String group = next.getGroup(); long len = next.getLen(); String owner = next.getOwner(); FsPermission permission = next.getPermission(); long blockSize = next.getBlockSize(); short rep = next.getReplication(); System.out.println(permission+"\t"+owner+"\t"+group); System.out.println(len+"\t"+blockSize+"\t"+rep); BlockLocation[] blockLocations = next.getBlockLocations(); for (BlockLocation blktn : blockLocations) { System.out.println("length:"+blktn.getLength()); System.out.println("offset:"+blktn.getOffset()); System.out.println(Arrays.toString(blktn.getHosts())); } } } }
ResourceManager
NodeManager
ApplicationMaster
Container
Map和Reduce 计算框架,编程模型 “分而治之”的思想, 分布式并行计算
对一些独立元素组成的列表的每一个元素进行制定的操作,可高度并行
// step 1: Map Class /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO update paragram public static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub }
对一个列表元素进行合并
// step 2: Reduce Class /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub } }
// step 3: Driver ,component job, implements Tool public int run(String[] args) throws Exception { // 1: get configration Configuration configuration = getConf(); // 2: create Job Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); // run jar job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1 input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // 3.2: map job.setMapperClass(ModuleMapper.class); //TODO update paragram job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3: reduce job.setReducerClass(ModuleReducer.class); //TODO job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
package com.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDemo extends Configured implements Tool { /** * map 任务的定义 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 偏移量 LongWritable * VALUEIN 一行文本 Text * KEYOUT 单词 Text * VALUEOUT 1 IntWritable * * map任务 * 将一行文本拆分成单词 * * */ public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value); //1. 单词拆分 String[] vals = value.toString().split(" "); //2. 遍历输出 for (String val : vals) { keyOut.set(val); valueOut.set(1); context.write(keyOut, valueOut); System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut); } } } /** * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 单词 Text * VALUEIN 单词次数的集合 list的元素 IntWritable * KEYOUT 单词 Text * VALUEOUT 总次数 IntWritable * */ public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable valueOut = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.print("keyIn:"+key+"\t\t["); //1. 求次数综合 int sum = 0; for (IntWritable value : values) { sum += value.get(); System.out.print(value+",\t"); } System.out.println("]"); //2. 输出 valueOut.set(sum); context.write(key, valueOut); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2. 设置map类和reduce类 job.setMapperClass(WCMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new WordCountDemo(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
适合大小表join,将小表缓存在内存中,join发生在map端
只缓存一次,在Mapper子类中重写setup方法,在setup方法中将小表文件装入内存中
Mapper子类中map方法读取大表
package com.join; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> { HashMap<String, String> cacheMap = new HashMap<String, String>(); // 首相将小表读入内存 // 该方法只在每次任务开始时加载一次 @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String path = "F:\\input\\join\\dept.log"; FileReader fr = new FileReader(path); BufferedReader br = new BufferedReader(fr); String line = null; while((line=br.readLine()) != null) { String[] vals = line.split("\t"); cacheMap.put(vals[0], vals[1]); } br.close(); fr.close(); } // map端根据两张表的key进行合并 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); String deptno = cacheMap.get(vals[2]); String dname = cacheMap.get(deptno); context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1])); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(MJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new MapJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
适合两张大表join
package com.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceJoin extends Configured implements Tool { /* * 1 技术部 * 1002 rose 1 */ public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyOut = new Text(); Text valueOut = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); if(vals.length == 2) { keyOut.set(vals[0]); valueOut.set(vals[1]); }else { keyOut.set(vals[2]); valueOut.set(vals[0]+"\t"+vals[1]); } context.write(keyOut, valueOut); } } /* * keyIn:1 * valueIn List{[1007 lily], [1002 rose], [1001 jack], [技术部]} */ // reduce端合并是依靠MapReduce shuffle过程中将相同key的行放入同一台机器 public static class RJReducer extends Reducer<Text, Text, Text, Text> { ArrayList<String> employees = new ArrayList<String>(); @Override protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String department = null; employees.clear(); //这里要注意清空list for (Text tmp : valueIn) { String[] vals = tmp.toString().split("\t"); // 根据length判断这是张什么表 if(vals.length == 1) { department = vals[0]; }else if(vals.length == 2) { employees.add(tmp.toString()); } } for (String employee : employees) { context.write(keyIn, new Text(employee+"\t"+department)); } } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(RJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(RJReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new ReduceJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
安装maven
修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)
<localRepository>D:/repository</localRepository>
配置eclipse的maven环境
windows->preferences->maven->
->installations->add->勾选自己安装的maven ->user settings->选择mave家目录/conf/settings
Windows安装hadoop
配置hadoop的环境变量
添加环境变量 HADOOP_HOME=hadoop解压目录 在PATH环境变量中追加 %HADOOP_HOME%/bin;
测试
hadoop -h
eclipse安装插件
eclipse配置插件参数,连接HDFS
<!--关闭hdfs的文件权限控制--> <property> <name>dfs.permissions</name> <value>false</value> </property>
eclipse->windows->show views->other->输入MapReduce->点击map reduce locations
右击->new hadoop locations
Map/Reduce Master
Mapreduce(V2) host:[hostname] port:8032 //resourcemanager 的默认端口号
DFS Master
DFS Master host:[hostname] port:8020
将lo4j.perperties文件拷贝到src/main/resources
yarn jar pv.jar /input/2015082818 /output
yarn jar pv.jar 类的全限定名 /input/2015082818 /output
不同包中可能有相同类名,所以要指定类的全限定名
MapReduce框架核心部分(设计精髓):内核
map() 输出开始 到 reduce()输入开始 此阶段是shuffle
input -> map -> shuffle -> reduce -> output
map shuffle phase
reduce shuffle phase
shuffle主要操作
partitioner - map
sorter - map & reduce
combiner: map phase局部聚合操作 不是所有的MapReduce程序都可以进行局部聚合的
compress:map phase的输出数据压缩 针对所有MapReduce程序都可以进行设置
group - reduce
所有操作都是针对map()输出的<key, value>数据进行的
当达到环形缓冲区内存的80%默认情况下,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)
溢写
并不是立即将缓冲区中的数据溢写到本地磁盘,而是需要经过一些操作
依据此MapReduce Job中Reduce Task个数进行分区决定map输出的数据被哪个reduce任务进行处理分析默认情况下,依据key采用HashPartitioner
// 通过取余将数据分配到哪个reduce处理 HashPartitioner int getParitition(key, value, numreducetask) { return ( key.hashCode&Integer.maxValue)%numreducetask; }
会对每个分区中的数据进行排序,默认情况下依据key进行排序
将分区排序后的数据写到本地磁盘的一个文件中
反复上述的操作,产生多个小文件
当溢写结束后
各个分区的数据合并在一起(当MapTask处理数据完成以后,告知AppMaster,然后AppMaster通知所有的ReduceTask,各个ReduceTask主动到已经完成的MapTask的本地磁盘,去拉取属于自己要处理的数据(分区中))
最后每个分区形成一个文件(map输出的数据最后在个文件中),分区的,并且各个分区的数据已经进行了排序。
分组group
将相同key的value值存入到list集合,形成新的key, list(value),将key/value对数据传递给reduce()函数进行处理。
最后将(key, list(value))传给 reduce()
FileInputFormat.setMaxInputSplitSize(job, size); 设置切片最大值 FileInputFormat.setMinInputSplitSize(job, size); 设置切片最小值
FileInputFormat public List<InputSplit> getSplits(JobContext job){。。。} protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } // minSize<=maxSize<blockSize 提高并发 // minSize>blockSize 降低并发
job.setNumReduceTasks(2); HashParitioner 决定map输出的类被哪个reduce处理
package com.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 不用serializable * * 用Hadoop的Writable * */ public class Flow implements Writable { private long up; private long down; private long sum; public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } @Override public String toString() { return up + "\t" + down + "\t" + sum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { up = in.readLong(); down = in.readLong(); sum = in.readLong(); } }
public static class MyPartitioner extends Partitioner<Text, Flow> { @Override public int getPartition(Text key, Flow value, int numPartitions) { if(value.getSum()<1024) { return 0; }else if(value.getSum()<10*1024) { return 1; } return 2; } }
只能按照key排序,如果需要多重排序,需要自定义key
在shuffle过程中自动排序,无需手动调用方法
public class MyKey implements WritableComparable<MyKey> //要排序的类要实现WritableComparable接口 @Override public int compareTo(MyKey o) { long result = o.getSum() - this.getSum(); if(result>0) { return 1; }else if(result<0) { return -1; } return o.getPhone().compareTo(this.getPhone()); }
map端的小reduce,对每个map后的value进行reduce,减少数据传输
可以通过设置job.setCombinerClass(WCReducer.class);设置combiner
前后效果对比
原始数据 hello world hello hadoop hello world hello java keyIn:hadoop [1, ] keyIn:hello [1, 1, 1, 1, ] keyIn:java [1, ] keyIn:world [1, 1, ] keyIn:hadoop [1, ] keyIn:hello [2, 2, ] keyIn:java [1, ] keyIn:world [1, 1, ]
根据需求将key中相同的字段作为同一个key以减少键值对,作为一种优化的手段
重写 RawComparator 方法合并key中相同字段
通过 job.setGroupingComparatorClass(Mygroup.class); 调用
public static class Mygroup implements RawComparator<Person> { @Override public int compare(Person o1, Person o2) { // TODO Auto-generated method stub return 0; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
map方法把文件的行号当成key,所以要用LongWritable。