dvccxhk 2014-11-12
1、前言
HDF文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用Hadoop处理HDF文件这个问题困扰过相当长的一段时间。于是Google各种解决方案,但都没有找到一种理想的处理办法。也曾参考过HDFGroup官方发的一篇帖子(网址在这里),里面提供了使用Hadoop针对大、中、小HDF文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用Hadoop处理HDF文件这个问题,但个人感觉方法偏复杂且需要对HDF的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。
2、MapReduce主程序
这里主要使用到了netcdf的库进行hdf数据流的反序列化工作(netcdf库的下载地址)。与HDF官方提供的Java库不同,netcdf仅利用Java进行HDF文件的读写操作,且这个库支持多种科学数据,包括HDF4、HDF5等多种格式。而HDF的官方Java库中,底层实际仍是用C进行HDF文件的操作。
下面贴出MapReduce的Mapper函数代码:
package example;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;
public class ReadMapper extends
Mapper<Text, BytesWritable, Text, BytesWritable> {
public void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String fileName = key.toString();
NetcdfFile file = NetcdfFile.openInMemory("hdf4", value.get());
Group dataGroup = (file.findGroup("MOD_Grid_monthly_1km_VI")).findGroup("Data_Fields");
//读取到1_km_monthly_red_reflectance的变量
Variable redVar = dataGroup.findVariable("1_km_monthly_red_reflectance");
short[][] data = new short[1200][1200];
if(dataGroup != null){
ArrayShort.D2 dataArray;
//读取redVar中的影像数据
dataArray = (ArrayShort.D2) redVar.read();
List<Dimension> dimList = file.getDimensions();
//获取影像的y方向像元个数
Dimension ydim = dimList.get(0);
//获取影像的x方向像元个数
Dimension xdim = dimList.get(1);
//遍历整个影像,读取出像元的值
for(int i=0;i<xdim.getLength();i++){
for(int j=0;j<ydim.getLength();j++){
data[i][j] = dataArray.get(i, j);
}
}
}
System.out.print(file.getDetailInfo());
}
}
注意程序中的NetcdfFile.openInMemory方法,该静态方法支持从byte[]中构造HDF文件,从而实现了HDF文件的反序列化操作。下面贴出主程序的示例代码:
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import example.WholeFileInputFormat;
public class ReadMain {
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// conf.set("mapred.job.tracker", Utils.JOBTRACKER);
String rootPath= "/opt/hadoop-2.3.0/etc/hadoop";
//String rootPath="/opt/hadoop-2.3.0/etc/hadoop/";
conf.addResource(new Path(rootPath+"yarn-site.xml"));
conf.addResource(new Path(rootPath+"core-site.xml"));
conf.addResource(new Path(rootPath+"hdfs-site.xml"));
conf.addResource(new Path(rootPath+"mapred-site.xml"));
Job job = new Job(conf);
job.setJobName("Job name:" + args[0]);
job.setJarByClass(ReadMain.class);
job.setMapperClass(ReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
boolean flag = job.waitForCompletion(true);
return flag;
}
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
String[] inputPaths = new String[] { "normalizeJob",
"hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf",
"hdfs://192.168.168.101:9000/user/hduser/test/" };
ReadMain test = new ReadMain();
test.runJob(inputPaths);
}
}
关于MapReduce主程序有几点值得说明一下:
2、本人用的是Yarn2.3.0来执行计算任务,如果用老版本的hadoop,如1.2.0,则把以上主程序中的conf.addResource部分的代码删掉即可。
3、以上MapReduce程序中,只用到了Map函数,未设置Reduce函数。
4、以上程序用到的为HDF4格式的数据,按理说,HDF5格式的数据应该也是支持的。