Cloudeep 2020-03-27
HDFS(Hadoop Distributed File System) 是一个 Apache Software Foundation 项目, 是 Apache Hadoop 项目的一个子项目. Hadoop 非常适于存储大型数据 (比如 TB 和 PB), 其就是使用 HDFS 作为存储系统. HDFS 使用多台计算机存储文件, 并且提供统一的访问接口, 像是访问一个普通文件系统一样使用分布式文件系统. HDFS 对数据文件的访问通过流的方式进行处理, 这意味着通过命令和 MapReduce 程序的方式可以直接使用 HDFS. HDFS 是容错的, 且提供对大数据集的高吞吐量访问.
HDFS 的一个非常重要的特点就是一次写入、多次读取, 该模型降低了对并发控制的要求, 简化了数据聚合性, 支持高吞吐量访问. 而吞吐量是大数据系统的一个非常重要的指标, 吞吐量高意味着能处理的数据量就大.
NameNode | DataNode |
---|---|
存储元数据 | 存储文件内容 |
元数据保存在内存中 | 文件内容保存在磁盘 |
保存文件, block, DataNode 之间的关系 | 维护了 block id 到 DataNode 文件之间的关系 |
所有的文件都是以 block 块的方式存放在 HDFS 文件系统当中, 在 Hadoop1 当中, 文件的 block 块默认大小是 64M, hadoop2 当中, 文件的 block 块大小默认是 128M, block 块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定
<property> <name>dfs.block.size</name> <value>块大小 以字节为单位</value> </property>
通常 DataNode 从磁盘中读取块, 但对于访问频繁的文件, 其对应的块可能被显式的缓存在 DataNode 的内存中, 以堆外块缓存的形式存在. 默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量. 作业调度器通过在缓存块的 DataNode 上运行任务, 可以利用块缓存的优势提高读操作的性能.
例如:
连接(join) 操作中使用的一个小的查询表就是块缓存的一个很好的候选
用户或应用通过在缓存池中增加一个 Cache Directive 来告诉 NameNode 需要缓存哪些文件及存多久. 缓存池(Cache Pool) 是一个拥有管理缓存权限和资源使用的管理性分组.
例如一个文件 130M, 会被切分成 2 个 block 块, 保存在两个 block 块里面, 实际占用磁盘 130M 空间, 而不是占用256M的磁盘空间
HDFS 的文件权限机制与 Linux 系统的文件权限机制类似
r:read w:write x:execute
权限 x
对于文件表示忽略, 对于文件夹表示是否有权限访问其内容 如果 Linux 系统用户 zhangsan 使用 Hadoop 命令创建一个文件, 那么这个文件在 HDFS 当中的 Owner 就是 zhangsan HDFS 文件权限的目的, 防止好人做错事, 而不是阻止坏人做坏事. HDFS相信你告诉我你是谁, 你就是谁
当 Hadoop 的集群当中, 只有一个 NameNode 的时候, 所有的元数据信息都保存在了 FsImage 与 Eidts 文件当中, 这两个文件就记录了所有的数据的元数据信息, 元数据信息的保存目录配置在了 hdfs-site.xml
当中
<property> <name>dfs.namenode.name.dir</name> <value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value> </property> <property> <name>dfs.namenode.edits.dir</name> <value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value> </property>
edits
edits
存放了客户端最近一段时间的操作日志edits
文件中edits
修改时元数据也会更新edits
先更新后客户端才会看到最新信息fsimage
fsimage
存放了一份比较完整的元数据信息fsimage
是 NameNode 的完整的镜像, 如果每次都加载到内存生成树状拓扑结构,这是非常耗内存和CPU, 所以一般开始时对 NameNode 的操作都放在 edits 中fsimage
内容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元数据信息.edits
内容增大, 就需要在一定时间点和 fsimage
合并使用命令 hdfs oiv
cd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatas hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml
使用命令 hdfs oev
cd /export/servers/hadoop-3.1.1/datas/dfs/nn/edits hdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML
SecondaryNameNode 定期合并 fsimage 和 edits, 把 edits 控制在一个范围内
配置 SecondaryNameNode
SecondaryNameNode 在 conf/masters
中指定
在 masters 指定的机器上, 修改 hdfs-site.xml
<property> <name>dfs.http.address</name> <value>host:50070</value> </property>
修改 core-site.xml
, 这一步不做配置保持默认也可以
<!-- 多久记录一次 HDFS 镜像, 默认 1小时 --> <property> <name>fs.checkpoint.period</name> <value>3600</value> </property> <!-- 一次记录多大, 默认 64M --> <property> <name>fs.checkpoint.size</name> <value>67108864</value> </property>
edits.new
Client 发起文件上传请求, 通过 RPC 与 NameNode 建立通讯, NameNode 检查目标文件是否已存在, 父目录是否存在, 返回是否可以上传
Client 请求第一个 block 该传输到哪些 DataNode 服务器上
NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配, 返回可用的 DataNode 的地址如: A, B, C
Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline ), A 收到请求会继续调用 B, 然后 B 调用 C, 将整个 pipeline 建立完成, 后逐级返回 client
Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存), 以 packet 为单位(默认64K), A 收到一个 packet 就会传给 B, B 传给 C. A 每传一个 packet 会放入一个应答队列等待应答
数据被分割成一个个 packet 数据包在 pipeline 上依次传输, 在 pipeline 反方向上, 逐个发送 ack(命令正确应答), 最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client
当一个 block 传输完成之后, Client 再次请求 NameNode 上传第二个 block 到服务 1
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
在 Java 中操作 HDFS, 主要涉及以下 Class:
Configuration
FileSystem
该类的对象是一个文件系统对象, 可以用该对象的一些方法来对文件进行操作, 通过 FileSystem 的静态方法 get 获得该对象
FileSystem fs = FileSystem.get(conf)
get
方法从 conf
中的一个参数 fs.defaultFS
的配置值判断具体是什么类型的文件系统fs.defaultFS
, 并且工程 ClassPath 下也没有给定相应的配置, conf
中的默认值就来自于 Hadoop 的 Jar 包中的 core-default.xml
file:///
, 则获取的不是一个 DistributedFileSystem 的实例, 而是一个本地文件系统的客户端对象@Test public void getFileSystem() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString()); }
@Test public void getFileSystem2() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.get(new URI("/"), configuration); System.out.println(fileSystem.toString()); }
@Test public void getFileSystem3() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString()); }
@Test public void getFileSystem4() throws Exception{ Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.newInstance(configuration); System.out.println(fileSystem.toString()); }
@Test public void listFile() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration()); FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/")); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ Path path = fileStatus.getPath(); listAllFiles(fileSystem,path); }else{ System.out.println("文件路径为"+fileStatus.getPath().toString()); ? } } } public void listAllFiles(FileSystem fileSystem,Path path) throws Exception{ FileStatus[] fileStatuses = fileSystem.listStatus(path); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ listAllFiles(fileSystem,fileStatus.getPath()); }else{ Path path1 = fileStatus.getPath(); System.out.println("文件路径为"+path1); } } }
@Test public void listMyFiles()throws Exception{ //获取fileSystem类 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); //获取RemoteIterator 得到所有的文件或者文件夹,第一个参数指定遍历的路径,第二个参数表示是否要递归遍历 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true); while (locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } fileSystem.close(); }
@Test public void getFileToLocal()throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log")); FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\install.log")); IOUtils.copy(open,fileOutputStream ); IOUtils.closeQuietly(open); IOUtils.closeQuietly(fileOutputStream); fileSystem.close(); }
@Test public void mkdirs() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test")); fileSystem.close(); }
@Test public void putData() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"),new Path("/hello/mydir/test")); fileSystem.close(); }
cd /export/servers/hadoop-3.1.1 sbin/stop-dfs.sh
cd /export/servers/hadoop-3.1.1/etc/hadoop vim hdfs-site.xml
<property> <name>dfs.permissions.enabled</name> <value>true</value> </property>
scp hdfs-site.xml node02:$PWD scp hdfs-site.xml node03:$PWD
cd /export/servers/hadoop-3.1.1 sbin/start-dfs.sh
cd /export/servers/hadoop-3.1.1/etc/hadoop hdfs dfs -mkdir /config hdfs dfs -put *.xml /config hdfs dfs -chmod 600 /config/core-site.xml
@Test public void getConfig()throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop"); fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml")); fileSystem.close(); }
由于 Hadoop 擅长存储大文件,因为大文件的元数据信息比较少,如果 Hadoop 集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理
在我们的 HDFS 的 Shell 命令模式下,可以通过命令行将很多的 hdfs 文件合并成一个大文件下载到本地
cd /export/servers hdfs dfs -getmerge /config/*.xml ./hello.xml
既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在上传的时候将小文件合并到一个大文件里面去
@Test public void mergeFile() throws Exception{ //获取分布式文件系统 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop"); FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml")); //获取本地文件系统 LocalFileSystem local = FileSystem.getLocal(new Configuration()); //通过本地文件系统获取文件列表,为一个集合 FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\\传智播客大数据离线阶段课程资料\\3、大数据离线第三天\\上传小文件合并")); for (FileStatus fileStatus : fileStatuses) { FSDataInputStream inputStream = local.open(fileStatus.getPath()); IOUtils.copy(inputStream,outputStream); IOUtils.closeQuietly(inputStream); } IOUtils.closeQuietly(outputStream); local.close(); fileSystem.close(); }