2018年第25周-Hadoop的MR程序-天气数据分析

大木行走 2019-06-27

天气数据

通过ftp从ftp.ncdc.noaa.gov地址获取/pub/data/noaa/目录下的天气数据,我写了个ftp抓取程序去抓取,如1950的天气数据摘录如下:

0171999999939931950010100005+36000-094167SAO  +038399999V02015859001550042749N008000599+01174+01065102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00014K11 1 00051L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00053V11 1 01099W11 1 54003
0165999999939931950010101005+36000-094167SAO  +038399999V02015859002650036649N011200599+01174+01005102474ADDGF108995999999999999999999MA1999999097934EQDN01 00000JPWTH 1QNNG11 1 00012K11 1 00050L11 1 00700M11 1 28920N11 1 00000Q11 1 10247S11 1 00053V11 1 01099W11 1 54005
0165999999939931950010102005+36000-094167SAO  +038399999V02022559002150036649N011200599+01224+01005102474ADDGF108995999999999999999999MA1999999097934EQDN01 00000JPWTH 1QNNG11 1 00012K11 1 00050L11 1 00700M11 1 28920N11 1 00000Q11 1 10247S11 1 00054V11 1 01099W11 1 66004
0171999999939931950010103005+36000-094167SAO  +038399999V02022559002150024449N008000599+01224+01005102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00008K11 1 00050L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00054V11 1 01099W11 1 66004
0171999999939931950010104005+36000-094167SAO  +038399999V02022559002650021349N008000599+01174+01005102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00007K11 1 00050L11 1 00500M11 1 28920N11 1 07000Q11 1 10247S11 1 00053V11 1 01099W11 1 66005
0171999999939931950010105005+36000-094167SAO  +038399999V02020359003150018349N008000599+01174+01065102444ADDGF108995999999999999999999MA1999999097904MW1455EQDN01 07000JPWTH 1QNNG11 1 00006K11 1 00051L11 1 00500M11 1 28910N11 1 07000Q11 1 10244S11 1 00053V11 1 01099W11 1 56006
0171999999939931950010106005+36000-094167SAO  +038399999V02022559001550018349N008000599+01174+01065102444ADDGF108995999999999999999999MA1999999097904MW1455EQDN01 07000JPWTH 1QNNG11 1 00006K11 1 00051L11 1 00500M11 1 28910N11 1 07000Q11 1 10244S11 1 00053V11 1 01099W11 1 66003
0171999999939931950010107005+36000-094167SAO  +038399999V02020359002650012249N004800599+01174+01065102404ADDGF108995999999999999999999MA1999999097874MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00051L11 1 00300M11 1 28900N11 1 07000Q11 1 10240S11 1 00053V11 1 01099W11 1 56005
0177999999939931950010108005+36000-094167SAO  +038399999V02027059001050003049N001600599+01174+01065102474ADDGF108995999999999999999999MA1999999097934MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00001K11 1 00051L11 1 00100M11 1 28920N11 1 03370Q11 1 10247S11 1 00053V11 1 01099W11 1 77002
0177999999939931950010109005+36000-094167SAO  +038399999V02018059000550003049N001600599+01114+01115102444ADDGF108995999999999999999999MA1999999097904MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00001K11 1 00052L11 1 00100M11 1 28910N11 1 03370Q11 1 10244S11 1 00052V11 1 01099W11 1 55001
0177999999939931950010110005+36000-094167SAO  +038399999V02015859001050000049N000000599+01064+01065102374ADDGF108995999999999999999999MA1999999097834MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00000K11 1 00051L11 1 00000M11 1 28890N11 1 03370Q11 1 10237S11 1 00051V11 1 01099W11 1 54002
0177999999939931950010111005+36000-094167SAO  +038399999V02013559001550000049N000000599+01064+01065102344ADDGF108995999999999999999999MA1999999097834MW1455MW2515EQDN01 03370JPWTH 1QNNG11 1 00000K11 1 00051L11 1 00000M11 1 28890N11 1 03370Q11 1 10234S11 1 00051V11 1 01099W11 1 44003
0171999999939931950010112005+36000-094167SAO  +038399999V02018059001550003049N001600599+01114+01115102374ADDGF108995999999999999999999MA1999999097834MW1455EQDN01 07000JPWTH 1QNNG11 1 00001K11 1 00052L11 1 00100M11 1 28890N11 1 07000Q11 1 10237S11 1 00052V11 1 01099W11 1 55003
0171999999939931950010113005+36000-094167SAO  +038399999V02022559001550012249N003200599+01174+01115102404ADDGF108995999999999999999999MA1999999097874MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00052L11 1 00200M11 1 28900N11 1 07000Q11 1 10240S11 1 00053V11 1 01099W11 1 66003
0171999999939931950010114005+36000-094167SAO  +038399999V02099999000050012249N003200599+01224+01115102474ADDGF108995999999999999999999MA1999999097934MW1455EQDN01 07000JPWTH 1QNNG11 1 00004K11 1 00052L11 1 00200M11 1 28920N11 1 07000Q11 1 10247S11 1 00054V11 1 01099W11 1 00000

其实抓取来的数据是这样

[root@amd data]# tree |more
.
└── noaa
    ├── 1901
    │   ├── 029070-99999-1901.gz
    │   ├── 029500-99999-1901.gz
    │   ├── 029600-99999-1901.gz
    │   ├── 029720-99999-1901.gz
    │   ├── 029810-99999-1901.gz
    │   └── 227070-99999-1901.gz
    ├── 1902
    │   ├── 029070-99999-1902.gz
    │   ├── 029500-99999-1902.gz
    │   ├── 029600-99999-1902.gz
    │   ├── 029720-99999-1902.gz
    │   ├── 029810-99999-1902.gz
    │   └── 227070-99999-1902.gz

所以我附带个shell脚本将所有数据都变成一个txt文件
zcat_all.sh

#!/bin/bash
for file in ./data/noaa/*
do
    if test -f $file
    then
        echo $file 是文件
    fi
    if test -d $file
    then
        beginSecond=$(date "+%s") 
        echo 正在遍历$file
        
        for gzfile in $file/*
        do
               zcat $gzfile >> all.txt
        done
        endTime=`date +%Y%m%d%H%M%S` 
        endSecond=$(date "+%s") 
        
        echo $endTime  $file遍历完成  cost $[endSecond-beginSecond] s
    fi
done

这样我们就有一个文件all.txt包含一个世纪的天气数据。接下来我们就开始写mapreduce(MR)程序去获取每年的最高气温。

获得最高气温的mapreduce(MR)程序

一般写MR程序,都以小量的数据在本地测试通过,再在放在集群上跑,不然在集群上测试,很浪费时间。

项目的pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jc.demo</groupId>
    <artifactId>jc-demo-hadoop</artifactId>
    <version>0.0.1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>oneapp-archetype-test</name>
    <url>http://www.jevoncode.com</url>




    <repositories>
        <repository>
            <id>aliyun repository</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun plugin repository</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </pluginRepository>
    </pluginRepositories>
    <properties>
        <!-- Every File in Project Enconding -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Compiling Time Enconding -->
        <maven.compiler.encoding>UTF-8</maven.compiler.encoding>

        <!-- Compiling Time JDK Version -->
        <java.version>1.7</java.version>

        <!-- Test -->
        <junit.version>4.12</junit.version>


        <!-- Logging -->
        <slf4j.version>1.7.21</slf4j.version>
        <logback.version>1.1.7</logback.version>


    </properties>


    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.hamcrest/hamcrest-all -->
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>

        <!-- Log依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- logback -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>


        <!-- mapreduce测试工具包-->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>


    </dependencies>

    <!-- 分环境打包 -->
    <profiles>
        <profile>
            <!-- 打包命令命令 mvn package -Pdev -->
            <id>dev</id> <!-- 开发环境 -->
            <properties>
                <env>development</env>
            </properties>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <!-- 打包命令命令 mvn package -Ptest -->
            <id>test</id> <!-- 测试环境 -->
            <properties>
                <env>test</env>
            </properties>
        </profile>
        <profile>
            <!-- 打包命令命令 mvn package -Pprod -->
            <id>prod</id> <!-- 生产环境 -->
            <properties>
                <env>prod</env>
            </properties>
        </profile>
    </profiles>

    <build>
        <finalName>${project.artifactId}-${project.version}-${env}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptor>src/main/assembly/dep.xml</descriptor>
                </configuration>
                <executions>
                    <execution>
                        <id>create-archive</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <filters>
            <filter>src/main/environment/${env}.properties</filter>
        </filters>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.ftl</include>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替换资源中的属性 -->
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
                <!-- 是否替换资源中的属性 -->
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>system.properties</include>
                </includes>
                <!-- 是否替换资源中的属性 -->
                <filtering>true</filtering>
            </resource>
        </resources>

    </build>
</project>

编写mapper程序

天气数据解析类

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.Text;

public class NcdcRecordParser {

    private static final int MISSING_TEMPERATURE=9999;
    private String year;
    private int airTemperature;
    private String quality;

    public void parse(String record){
        year = record.substring(15,19);
        String airTemperatureString;
        if(record.charAt(87) == '+'){
            airTemperatureString = record.substring(88,92);
        }else {
            airTemperatureString = record.substring(87,92);
        }
        airTemperature = Integer.parseInt(airTemperatureString);
        quality = record.substring(92,93);
    }

    public void parse(Text record){
        parse(record.toString());
    }

    public boolean isValidTemperature(){
        return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
    }

    public String getYear() {
        return year;
    }

    public int getAirTemperature() {
        return airTemperature;
    }
}

mapper程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        String line = value.toString();
//        String year = line.substring(15, 19);
//        int airTemperature = Integer.parseInt(line.substring(87, 92));
//        context.write(new Text(year), new IntWritable(airTemperature));
        parser.parse(value);
        if(parser.isValidTemperature()){
            context.write(new Text(parser.getYear()),new IntWritable(parser.getAirTemperature()));
        }
    }
}

mapper单元测试类

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Test;

import java.io.IOException;

public class MaxTemperatureMapperTest {

    @Test
    public void processesValidRecord() throws IOException, InterruptedException {
        Text value = new Text("0097999999949101950010121005+42550-092400SAO  +026599999V02022559004650030549N004800599+00724+00395999999ADDMW1055EQDN01" +
                " 08100JPWTH 1QNNG11 1 00010K11 1 00039L11 1 00300N11 1 08100S11 1 00045W11 1 66009");
        new MapDriver<LongWritable, Text, Text, IntWritable>()//
                .withMapper(new MaxTemperatureMapper())//
                .withInput(new LongWritable(0), value)//
                .withOutput(new Text("1950"), new IntWritable(72))      //输出结果1950年1月1日21:00的气温为72华氏度
                .runTest();
    }

    @Test
    public void igoresMissingTemperatureRecord() throws IOException, InterruptedException {
        Text value = new Text("0036999999949101950030716005+42550-092400SAO  +026599999V02015859008850228649N016000599+99999+99999999999QNNG11 1 00075L" +
                "11 1 01000W11 1 54017");
        new MapDriver<LongWritable, Text, Text, IntWritable>()//
                .withMapper(new MaxTemperatureMapper())//
                .withInput(new LongWritable(0), value)//
//                .withOutput(new Text("1950"), new IntWritable(9999))      //不应该有输出值,因为此记录是无效的
                .runTest();
    }
}

编写reducer程序

编写reducer程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable value:values){
            maxValue = Math.max(maxValue,value.get());
        }
        context.write(key,new IntWritable(maxValue));
    }
}

编写reducer单元测试类

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;

public class MaxTemperatureReducerTest {

    @Test
    public void returnsMaximuumIntegerInValues() throws IOException {
        new ReduceDriver<Text, IntWritable, Text, IntWritable>()//
                .withReducer(new MaxTemperatureReducer())//
                .withInput(new Text("1950"), Arrays.asList(new IntWritable(10), new IntWritable(5), new IntWritable(34)))//
                .withOutput(new Text("1950"), new IntWritable(34))//
                .runTest();
    }
}

编写Driver程序

由于Hadoop有很多配置,所以需要借助ToolRun来启动MR程序,这个工具里会调用GenericOptionsParser处理好所有的配置。启动的时候你可以指定或不指定配置都行。

编写Driver程序

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 在命令行启动时,如:
 * hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-cluster.xml /user/jevoncode/input/all.txt max-temp
 * -conf就可以动态配置参数,默认则从$HADOOP_HOME/etc/hadoop里取配置
 */
public class MaxTemperatureDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output> \n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }


        Job job = Job.getInstance(getConf(), "Max temperature");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}

编写Driver的单元测试类

package com.jc.demo.hadoop.yarn.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

public class MaxTemperatureDriverTest {

    @Test
    public void tets() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        conf.set("mapreduce.framework.name","local");
        conf.setInt("mapreduce.task.io.sort.mb",1);

        Path input = new Path("src/test/resources/mr/ncdc/input/all.txt");
        Path ouput = new Path("src/test/resources/mr/ncdc/output");

        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(ouput,true);                              //delete old output

        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);

        int exitCode = driver.run(new String[]{input.toString(),ouput.toString()});
        assertThat(exitCode,is(0));

    }
}

在这里,我们用小量的数据all.txt来测试,本文末尾会附带这个测试数据。
最后结果会生成一个part-r-00000文件,内容是:

1950    350

运行MR程序

在编写和测试玩MR程序后,就可以打包程序(jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar),在机子上跑了,有两种种方法跑:

本地执行

其实使用hadoop来执行本地文件,用了hadoop,但没有用hadoop集群,不会有application存在),all.txt都是本地目录的文件,运行完就会创建一个max-temp保存结果
配置文件hadoop-local.xml如下:

<?xml version="1.0" ?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>file:///</value>
    </property>

    <property>
        <name>mapreduce.framework.name</name>
        <value>local</value>
    </property>

</configuration>
[cdata@s1 ~]$ ll
total 26480
-rw-r--r-- 1 cdata cdata  2385449 Jun 24 00:30 all.txt
-rw-r--r-- 1 cdata cdata     1482 Jun 24 00:26 hadoop-cluster.xml
-rw-r--r-- 1 cdata cdata      388 Jun 24 00:26 hadoop-localhost.xml
-rw-r--r-- 1 cdata cdata      260 Jun 24 00:26 hadoop-local.xml
-rw-r--r-- 1 cdata cdata    17655 Jun 24 00:24 jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar
hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-local.xml all.txt max-temp
[cdata@s1 ~]$ ls -al max-temp/
total 16
drwxrwxr-x 2 cdata cdata   84 Jun 24 00:31 .
drwx------ 5 cdata cdata 4096 Jun 24 00:31 ..
-rw-r--r-- 1 cdata cdata    9 Jun 24 00:31 part-r-00000
-rw-r--r-- 1 cdata cdata   12 Jun 24 00:31 .part-r-00000.crc
-rw-r--r-- 1 cdata cdata    0 Jun 24 00:31 _SUCCESS
-rw-r--r-- 1 cdata cdata    8 Jun 24 00:31 ._SUCCESS.crc

单个虚拟机,在s1服务器上运行了快一个星期,才处理了93784637440+33554432字节(Byte)的数据量(87G),文件总大小为279970000130(260G).

集群运行

这个就会生成一个application了,而且还做了高可用的处理(除YARN之外)
配置文件hadoop-cluster.xml如下:

<?xml version="1.0" ?>
<configuration>
    <!--用来指定hdfs的集群名,ns为固定属性名,表示两个namenode-->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://ns</value>
    </property>
    <!--执行hdfs的nameservice为ns,和core-site.xml保持一致-->
    <property>
        <name>dfs.nameservices</name>
        <value>ns</value>
    </property>
    <!--定义ns下有两个namenode,分别是nn1,nn2,然后再具体定义n1和n2-->
    <property>
        <name>dfs.ha.namenodes.ns</name>
        <value>nn1,nn2</value>
    </property>
    <!--nn1的RPC通信地址-->
    <property>
        <name>dfs.namenode.rpc-address.ns.nn1</name>
        <value>s1.jevoncode.com:9000</value>
    </property>
    <!--nn1的http通信地址-->
    <property>
        <name>dfs.namenode.http-address.ns.nn1</name>
        <value>s1.jevoncode.com:50070</value>
    </property>
    <!--nn2的RPC通信地址-->
    <property>
        <name>dfs.namenode.rpc-address.ns.nn2</name>
        <value>s2.jevoncode.com:9000</value>
    </property>
    <!--nn2的http通信地址-->
    <property>
        <name>dfs.namenode.http-address.ns.nn2</name>
        <value>s2.jevoncode.com:50070</value>
    </property>

    <!-- 需配置HA才能使用hdfs://ns,否则抛unknownHost异常-->
    <!--开启namenode故障时自动切换-->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
    <!--配置切换的实现方式-->
    <property>
        <name>dfs.client.failover.proxy.provider.ns</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>



    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

    <property>
        <name>yarn.resourcemanager.address</name>
        <value>s1.jevoncode.com:8032</value>
    </property>

</configuration>

运行情况如下(需将文件上传到HDFS,然后执行命令hadoop jar运行):

[cdata@s1 ~]$ hdfs dfs -mkdir /user/
[cdata@s1 ~]$ hdfs dfs -mkdir /user/jevoncode/ 
[cdata@s1 ~]$ hdfs dfs -mkdir /user/jevoncode/input 
[cdata@s1 ~]$ hdfs dfs -put all.txt /user/jevoncode/input/
[cdata@s1 ~]$ hadoop jar jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar com.jc.demo.hadoop.yarn.mr.MaxTemperatureDriver -conf hadoop-cluster.xml /user/jevoncode/input/all.txt max-temp
18/06/24 00:45:13 INFO input.FileInputFormat: Total input paths to process : 1
18/06/24 00:45:15 INFO mapreduce.JobSubmitter: number of splits:1
18/06/24 00:45:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1529756862370_0003
18/06/24 00:45:17 INFO impl.YarnClientImpl: Submitted application application_1529756862370_0003
18/06/24 00:45:17 INFO mapreduce.Job: The url to track the job: http://s1.jevoncode.com:8088/proxy/application_1529756862370_0003/
18/06/24 00:45:17 INFO mapreduce.Job: Running job: job_1529756862370_0003
...

最后在HDFS目录/user/cdata/max-temp下面生成结果文件part-r-00000
/user/jevoncode/input/all.txt文件,一整个20世纪的天气数据260G大小,运行了两个小时左右,我是加了个硬盘,所以并行读写硬盘的速度变高了。
然后将它图形化展示出来,例如awk和sed转换格式

cat part-r-00000|awk '{for(i=0;++i<=NF;)a[i]=a[i]?a[i] FS $i:$i}END{for(i=0;i++<NF;)print a[i]}' |sed 's/ /,/g'

结果:

1901,1902,1903,1904,1905,1906,1907,1908,1909,1910,1911,1912,1913,1914,1915,1916,1917,1918,1919,1920,1921,1922,1923,1924,1925,1926,1927,1928,1929,1930,1931,1932,1933,1934,1935,1936,1937,1938,1939,1940,1941,1942,1943,1944,1945,1946,1947,1948,1949,1950,1951,1952,1953,1954,1955,1956,1957,1958,1959,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,1997,1998,1999
317,244,289,256,283,294,283,289,278,294,306,322,300,333,294,278,317,322,378,294,283,278,294,294,317,489,489,378,328,400,461,489,489,478,478,550,489,489,489,489,462,479,485,507,496,494,490,601,511,494,511,544,506,506,500,511,489,489,500,490,600,489,489,489,489,506,489,489,474,489,478,483,580,560,561,570,580,550,600,600,580,617,616,617,611,607,607,607,600,607,607,605,567,568,567,561,564,568,568

将它放入echart里:

option = {
    xAxis: {
        type: 'category',
        name:'year',
        data: [1901,1902,1903,1904,1905,1906,1907,1908,1909,1910,1911,1912,1913,1914,1915,1916,1917,1918,1919,1920,1921,1922,1923,1924,
1925,1926,1927,1928,1929,1930,1931,1932,1933,1934,1935,1936,1937,1938,1939,1940,1941,1942,1943,1944,1945,1946,1947,1948,
1949,1950,1951,1952,1953,1954,1955,1956,1957,1958,1959,1960,1961,1962,1963,1964,1965,1966,1967,1968,1969,1970,1971,1972,
1973,1974,1975,1976,1977,1978,1979,1980,1981,1982,1983,1984,1985,1986,1987,1988,1989,1990,1991,1992,1993,1994,1995,1996,
1997,1998,1999]
    },
    yAxis: {
        type: 'value',
        name:'x10°C'
    },
    series: [{
        data: [317,244,289,256,283,294,283,289,278,294,306,322,300,333,294,278,317,322,378,294,283,278,294,294,317,489,489,378,328,400,
461,489,489,478,478,550,489,489,489,489,462,479,485,507,496,494,490,601,511,494,511,544,506,506,500,511,489,489,500,490,
600,489,489,489,489,506,489,489,474,489,478,483,580,560,561,570,580,550,600,600,580,617,616,617,611,607,607,607,600,607,
607,605,567,568,567,561,564,568,568],
        type: 'line',
        smooth: true
    }]
};

2018年第25周-Hadoop的MR程序-天气数据分析

1901年317不是317°C,这个数字要除以10,于是1901年最高气温是31.7°C,20世纪最高气温1982年是61.7°C,最低气温1902年的24.4°C,可以看出整个世纪温度是往上升的,是温室效应呢?还是只是因为统计更精确而已?这都是题外话了。

TroubleShooting

1.遇到application状态一直是:

最后发现是nodemanager没起来,错误提示是这个

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, Message from ResourceManager: NodeManager from  s4.jevoncode.com doesn't satisfy minimum allocations, Sending SHUTDOWN signal to the NodeManager.
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.serviceStart(NodeStatusUpdaterImpl.java:203)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
    at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:496)
    at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:543)
Caused by: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, Message from ResourceManager: NodeManager from  s4.jevoncode.com doesn't satisfy minimum allocations, Sending SHUTDOWN signal to the NodeManager.
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.registerWithRM(NodeStatusUpdaterImpl.java:278)
    at org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.serviceStart(NodeStatusUpdaterImpl.java:197)
    ... 6 more
2018-06-24 23:16:58,432 INFO org.apache.hadoop.yarn.server.nodemanager.NodeManager: SHUTDOWN_MSG:

乍眼一看以为是内存不够,但我的yarn.scheduler.minimum-allocation-mb都配置512了,mapreduce.reduce.memory.mb和mapreduce.map.memory.mb也是512,不够,最后发现是cpu配置是这样:

<!-- 指定cpu个数-->
    <property>
        <!--     Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.-->
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>-1</value>
    </property>

配置为默认8个,但我这虚拟机每个机器只有2核,所以nodemanager就启动失败了,YARN不会智能的探测节点的内存总量及CPU个数。需配置为:

<!-- 指定cpu个数-->
    <property>
        <!--     Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.-->
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>1</value>
    </property>

配了上面那个还是不行,最后还是有内存问题,那就是这个:

<property>
        <!--    Amount of physical memory, in MB, that can be allocated for containers. If set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically calculated(in case of Windows and Linux). In other cases, the default is 8192MB.-->
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>-1</value>
      </property>
    <property>

这是可以分配所有容器的总内存大小,我物理机总大小才8G,虚拟机才2G,所以我设置为:

<property>
        <!--    Amount of physical memory, in MB, that can be allocated for containers. If set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically calculated(in case of Windows and Linux). In other cases, the default is 8192MB.-->
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>1024</value>
      </property>
    <property>

相关推荐

kktode / 0评论 2013-09-09
pipimob / 0评论 2011-10-18