Hadoop的I/O

shenhongdb 2012-04-06

1. 数据完整性:任何语言对IO的操作都要保持其数据的完整性。Hadoop当然希望数据在存储和处理中不会丢失或损坏。检查数据完整性的常用方法是校验和。

  • HDFS的数据完整性:客户端在写或者读取HDFS的文件时,都会对其进行校验和验证,当然我们可以通过在Open()方法读取之前,将false传给FileSystem中的setVerifyCheckSum()来禁用校验和。
  • 本地文件系统,hadoop的本地文件系统执行客户端校验,这意味着,在写一个filename文件时,文件系统的客户端以透明方式创建了一个隐藏的文件.filename.crc,块的大小做为元数据存于此,所以读取文件时会进行校验和验证。
  • ChecksumFileSystem:可以通过它对其数据验证。

2. 压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。hadoop的压缩格式

压缩格式算法文件扩展名多文件可分割性
DEFLATEaDEFLATE.deflatenono
gzip(zip)DEFLATE.gz(.zip)no(yes)no(yes)
bzip2bzip2.bz2noyes
LZOLZO.lzonono
  • 编码/解码
Compression format          Hadoop CompressionCodec
DEFLATE                            org.apache.hadoop.io.compress.DefaultCodec
gzip                                   org.apache.hadoop.io.compress.GzipCodec
bzip2                                 org.apache.hadoop.io.compress.BZip2Codec
LZO                                   com.hadoop.compression.lzo.LzopCodec
可以用ComressionCodec轻松的压缩和解压缩。我们可以用CompressionOutput创建一个CompressionOutputStream未压缩的数据写到此)。相反,可以用compressionInputStream进行解压缩。
  1. /** 
  2.      * @param args 
  3.      */  
  4.     public static void main(String[] args) throws Exception  
  5.     {  
  6.         // TODO Auto-generated method stub   
  7.         String codecClassname = args[0];  
  8.         Class<?> codecClass = Class.forName(codecClassname);  
  9.         Configuration configuration = new Configuration();  
  10.         CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);  
  11.         CompressionOutputStream  outputStream = codec.createOutputStream(System.out);  
  12.         IOUtils.copyBytes(System.in, outputStream, 4096,false);  
  13.         outputStream.finish();  
  14.     }  
  • 压缩和分割:因为HDFS默认是以块的来存储数据的,所以在压缩时考虑是否支持分割时非常重要的。
  • 在MapReduce使用压缩:例如要压缩MapReduce作业的输出,需要将配置文件中mapred.output.compress的属性设置为true
  1. public static void main(String[] args) throws IOException {  
  2.     if (args.length != 2) {  
  3.       System.err.println("Usage: MaxTemperatureWithCompression <input path> " +  
  4.             "<output path>");  
  5.       System.exit(-1);  
  6.     }  
  7.       
  8.     JobConf conf = new JobConf(MaxTemperatureWithCompression.class);  
  9.     conf.setJobName("Max temperature with output compression");  
  10.   
  11.     FileInputFormat.addInputPath(conf, new Path(args[0]));  
  12.     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  13.       
  14.     conf.setOutputKeyClass(Text.class);  
  15.     conf.setOutputValueClass(IntWritable.class);  
  16.       
  17.     /*[*/conf.setBoolean("mapred.output.compress"true);  
  18.     conf.setClass("mapred.output.compression.codec", GzipCodec.class,  
  19.         CompressionCodec.class);/*]*/  
  20.   
  21.     conf.setMapperClass(MaxTemperatureMapper.class);  
  22.     conf.setCombinerClass(MaxTemperatureReducer.class);  
  23.     conf.setReducerClass(MaxTemperatureReducer.class);  
  24.   
  25.     JobClient.runJob(conf);  
  26.   }  

3.序列化:将字节流和机构化对象的转化。hadoop是进程间通信(RPC调用),PRC序列号结构特点:紧凑,快速,可扩展,互操作,hadoop使用自己的序列化格式Writerable,

  • Writerable接口: 
  1. package org.apache.hadoop.io;  
  2. import java.io.DataOutput;  
  3. import java.io.DataInput;  
  4. import java.io.IOException;  
  5. public interface Writable {  
  6. void write(DataOutput out) throws IOException;// 将序列化流写入DataOutput   
  7. void readFields(DataInput in) throws IOException; //从DataInput流读取二进制   
  8. }  

 

  1. package WritablePackage;  
  2.   
  3. import java.io.ByteArrayInputStream;  
  4. import java.io.ByteArrayOutputStream;  
  5. import java.io.DataInputStream;  
  6. import java.io.DataOutputStream;  
  7. import java.io.IOException;  
  8.   
  9.   
  10. import org.apache.hadoop.io.Writable;  
  11. import org.apache.hadoop.util.StringUtils;  
  12. import org.hsqldb.lib.StringUtil;  
  13.   
  14. public class WritableTestBase  
  15. {  
  16.     public static byte[] serialize(Writable writable) throws IOException  
  17.     {  
  18.         ByteArrayOutputStream outputStream  = new ByteArrayOutputStream();  
  19.         DataOutputStream dataOutputStream = new DataOutputStream(outputStream);  
  20.         writable.write(dataOutputStream);  
  21.         dataOutputStream.close();  
  22.         return outputStream.toByteArray();  
  23.     }  
  24.       
  25.     public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException  
  26.     {  
  27.         ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);  
  28.         DataInputStream dataInputStream = new DataInputStream(inputStream);  
  29.         writable.readFields(dataInputStream);  
  30.         dataInputStream.close();  
  31.         return bytes;  
  32.     }  
  33.       
  34.     public static String serializeToString(Writable src) throws IOException  
  35.     {  
  36.         return StringUtils.byteToHexString(serialize(src));  
  37.     }  
  38.       
  39.     public static String writeTo(Writable src, Writable des) throws IOException  
  40.     {  
  41.         byte[] data = deserialize(des, serialize(src));  
  42.         return StringUtils.byteToHexString(data);  
  43.     }  
  44. }  
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关推荐