hadoop生态系统学习之路(二)如何编写MR以及运行测试

最近一直太忙,都没时间写博客了。首先是平时需要带我的一个哥们,他底子比我稍弱,于是我便从mybatis、spring、springMVC、html、css、js、jquery一个一个的教他,在教的过程中笔者也发现了很多之前自己没有弄明白的问题,所以说想把一样东西学好并不容易。另外笔者也参与了公司的大数据项目,学会怎么写一个MR,以及hdfs、hbase、hive、impala、zookeeper的基本使用,今天就与大家分享一下MR的编写,之后的博文中再与大家一一进行分享。当然,大数据相关的东西实在太多了,也不可能都会使用,并且用得很深,所以笔者也会再接再厉。同时,由于周末笔者还要学驾照,所以真是身心疲惫,但是也是对自己的锻炼。
好了,不说废话了,直入正题。
首先,笔者给大家介绍一下这个MR的大致业务:其实,就是一个etl过程,对数据进行抽取、转换以及加载到目的端,这里目的端,既可以是hdfs,然后交给下一个MR进行处理,也可以是hbase数据仓库,还可以是hive或者imapla的数据库,这里面hive和impala的数据还可以进行同步。这个MR是从ftp上拉取文件,直接存到hdfs,然后经过MR将数据存到hdfs中,提供给另一个MR进行处理。为了介绍简单,这里笔者将从ftp上拉取数据的过程改为直接从hdfs上读取。关于如果从ftp上拉取文件直接存到hdfs,后面的博文笔者再进行介绍。
好了,笔者将分以下几步进行讲解:

一、文件以及maven环境准备

这里,笔者使用的maven依赖,所有hadoop相关的包通过dependency依赖,pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.qiyongkang</groupId>
  <artifactId>mr-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>mr-demo</name>
  <description>mr-demo</description>
  <packaging>jar</packaging>

  <repositories>
      <!-- 注意,这里使用cloudera公司的maven仓库 -->
      <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
      </repository>  
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!-- hadoop版本 -->
    <hadoop.version>2.3.0-cdh5.0.0</hadoop.version>
    <!-- hbase版本 -->
    <hbase.version>0.96.1.1-cdh5.0.0</hbase.version>
    <!-- hive版本 -->
    <hive.version>0.12.0-cdh5.0.0</hive.version>
    <!-- junit版本 -->
    <junit.version>4.8.1</junit.version>
  </properties>

  <dependencies>
      <!-- hadoop相关依赖 -->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
              <exclusion>
                  <artifactId>jdk.tools</artifactId>
                  <groupId>jdk.tools</groupId>
              </exclusion>
          </exclusions>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
              <exclusion>
                  <artifactId>mockito-all</artifactId>
                  <groupId>org.mockito</groupId>
              </exclusion>
          </exclusions>
      </dependency>

      <!-- MRUnit相关依赖 -->
      <dependency>
    <groupId>org.apache.mrunit</groupId>
    <artifactId>mrunit</artifactId>
    <version>0.9.0-incubating</version>
    <classifier>hadoop2</classifier> 
</dependency>

<!-- junit依赖 -->
   <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>${junit.version}</version>
     <scope>test</scope>
   </dependency>
  </dependencies>

  <build>
    <!-- 这是一个打可执行jar的插件,没有将依赖打进去,执行package命令即可 -->
    <plugins>
      <plugin>
       <groupId>org.apache.maven.plugins</groupId>
       <artifactId>maven-jar-plugin</artifactId>
       <version>2.4</version>
       <configuration>
         <archive>
            <manifest>
              <addClasspath>false</addClasspath>
              <classpathPrefix>lib/</classpathPrefix>
              <mainClass>org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR</mainClass>
            </manifest>
          </archive>
       </configuration>
      </plugin>

      <!-- 此插件用于将依赖jar全部打到一个jar包里面去,以免在hadoop运行环境添加依赖包 -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>2.3</version>
          <configuration>
              <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                    <addClasspath>false</addClasspath>
                    <mainClass>org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR</mainClass>
                </manifest>
              </archive>
          </configuration>
          <executions>
              <execution>
                  <id>make-assembly</id>
                  <phase>package</phase>
                  <goals>
                      <goal>assembly</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>

      <!-- 拷贝依赖包 -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-dependency-plugin</artifactId>
          <executions>
              <execution>
                  <id>copy-dependencies</id>
                  <phase>package</phase>
                  <goals>
                      <goal>copy-dependencies</goal>
                  </goals>
                  <configuration>
                      <outputDirectory>${project.build.directory}/lib</outputDirectory>
                      <overWriteReleases>false</overWriteReleases>
                      <overWriteSnapshots>false</overWriteSnapshots>
                      <overWriteIfNewer>true</overWriteIfNewer>
                  </configuration>
              </execution>
          </executions>
      </plugin>
    </plugins>
  </build>
</project>

然后,我们准备一份文件,格式如下:

202.102.224.68|53|61.158.148.103|17872|22640|p.tencentmind.com|A|A_125.39.213.86|20160308100839.993|0|r
202.102.224.68|53|61.158.152.97|20366|27048|api.k.sohu.com|A|A_123.126.104.116;A_123.126.104.119;A_123.126.104.114;A_123.126.104.117;A_123.126.104.118;A_123.126.104.120;A_123.126.104.115;A_123.126.104.122|20160308100839.993|0|r
115.60.53.151|7582|202.102.224.68|53|33946|cip4.e1977.com|A||20160308100839.993|0|q
182.119.224.59|14731|202.102.224.68|53|31185|s.jpush.cn|A||20160308100839.993|0|q
202.102.224.68|53|182.118.77.145|22420|19278|file32.mafengwo.net|A|A_182.118.77.145|20160308100839.993|0|r
202.102.224.68|53|115.60.14.138|22929|31604|mmbiz.qpic.cn|A|A_42.236.95.35;A_42.236.95.36;A_42.236.95.34;A_182.118.63.200;A_182.118.63.196;A_42.236.95.33;A_42.236.95.37|20160308100839.993|0|r
115.60.109.162|3760|202.102.224.68|53|8920|a.root-servers.net|A||20160308100839.993|0|q

每一行以|分隔,然后r或者q结尾,这里我们的MR只会取r结尾的数据,并且只会取此行的某几列数据,然后以其中三行为key进行计数,作为reducer的输入,最后将结果写入到hdfs,这样便可极大的祛除无效数据,减小文件大小。
这里,笔者准备了一个1.9大小.txt文件,如:
这里写图片描述
上面的jar就是后面我们要在yarn上执行的包。
然后,执行:

su hdfs

使用hdfs用户。因为这里笔者使用的生态系统环境就是上一篇博文中使用cm搭建的环境。cm会为hdfs创建一个hdfs用户,所以我们必须使用此用户进行hdfs的相关操作。
执行以下命令,将文件上传到hdfs的/test/input目录:

hadoop fs -put testData.txt /test/input

执行hadoop fs -ls /test/input可看到上传到hdfs成功:
这里写图片描述

二、Mapper类编写

Mapper类ParseDataToFileElementMapper:

public static class ParseDataToFileElementMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);
        private Text mapKey = new Text();

        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            String[] values = value.toString().split("\\|");

            if ("r".equals(values[10])) {

                mapKey.set(values[5] + "\t" + values[0] + "\t" + values[2]);
                System.out.println(mapKey.toString());
                context.write(mapKey, one);
            }
        }

    }

这里,由于代码不多,笔者将Mapper和Reducer作为内部类,大家可以抽离出来。

三、Reducer类编写

Reducer类ParseDataToFileElementReducer:

public static class ParseDataToFileElementReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private Text reduceKey = new Text();
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //把key相同的统计一下次数
            //cname + topDomain + cip + dip
            int sum = 0;
            for (IntWritable val : values) {
              sum += val.get();
            }
            this.result.set(sum);
            this.reduceKey.set("1.1-1.1" + "\t" + key.toString());

            context.write(this.reduceKey, this.result);
        }

    }

这里,mapper会将txt数据一行行读取解析,经过shuffle后,会对key进行哈希,然后将相同的key交给一个Reducer,然后reducer对相同key进行计数,写入hdfs。

四、main函数调用MR

主类ParseDataToFileElementMR:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: ParseDataToFileElementMR <in> <out>");
          System.exit(2);
        }
        Job job = Job.getInstance(conf, "ParseDataToFileElementMR");
        job.setJarByClass(ParseDataToFileElementMR.class);
        //Mapper
        job.setMapperClass(ParseDataToFileElementMapper.class);

        //Combiner
//        job.setCombinerClass(ParseDataToFileElementReducer.class);

        //Reducer
        job.setReducerClass(ParseDataToFileElementReducer.class);
        job.setNumReduceTasks(10);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        //将reduce输出文件压缩.gz
        FileOutputFormat.setCompressOutput(job, true);  //job使用压缩  
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //设置压缩格式

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

这里我们指定reducer个数为1个,并指定输出格式为.gz。

五、编写MRUnit测试

接下来,我们使用MRUnit对MR进行测试,相关的jar依赖在第一步pom文件已给出,直接贴出测试代码,和junit一样执行:

package org.qiyongkang.mr.parsetofivele;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
import org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR.ParseDataToFileElementMapper;
import org.qiyongkang.mr.parsetofivele.ParseDataToFileElementMR.ParseDataToFileElementReducer;

/**
 * ClassName:ParseDataToFileElementMRTest <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason: TODO ADD REASON. <br/>
 * Date: 2016年3月15日 下午12:04:55 <br/>
 * 
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class ParseDataToFileElementMRTest {

    MapDriver<Object, Text, Text, IntWritable> mapDriver;
    ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

    @Before
    public void setUp() throws Exception {
        ParseDataToFileElementMapper mapper = new ParseDataToFileElementMapper();
        ParseDataToFileElementReducer reducer = new ParseDataToFileElementReducer();
        mapDriver = MapDriver.newMapDriver(mapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }

    @Test
    public void testMapper() {
        mapDriver.withInput(new Object(), new Text(
                "202.102.224.68|53|115.60.109.162|3760|8920|a.root-servers.net|A|A_198.41.0.4|20160308100839.993|0|r"));
        mapDriver.withOutput(new Text("a.root-servers.net\t202.102.224.68\t115.60.109.162"), new IntWritable(1));
        mapDriver.runTest();
    }

    @Test
    public void testReducer() {
        List<IntWritable> values = new ArrayList<IntWritable>();
        values.add(new IntWritable(1));
        values.add(new IntWritable(1));
        reduceDriver.withInput(new Text("a.root-servers.net\t202.102.224.68\t115.60.109.162"), values);
        reduceDriver.withOutput(new Text("1.1-1.1\ta.root-servers.net\t202.102.224.68\t115.60.109.162"),
                new IntWritable(2));
        reduceDriver.runTest();
    }

    @Test
    public void testMapReducer() {
        mapReduceDriver.withInput(new Object(), new Text(
                "202.102.224.68|53|115.60.109.162|3760|8920|a.root-servers.net|A|A_198.41.0.4|20160308100839.993|0|r"));
        List<IntWritable> values = new ArrayList<IntWritable>();
        values.add(new IntWritable(1));
        mapReduceDriver.withOutput(new Text("1.1-1.1\ta.root-servers.net\t202.102.224.68\t115.60.109.162"), new IntWritable(1));
        mapReduceDriver.runTest();
    }

}

这里我们可以对文件的单行进行测试,因为mapper本来就类似bufferedReader对文件一行行的读取。

六、打包

这里,笔者使用maven提供的插件进行打包,已在pom文件写出。然后,为了不将依赖包拷到hadoop环境,我们采用jar-with-dependencies这种打包方式,笔者对mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar反编译如下:
这里写图片描述
同时也指定了main函数所在类,大家可以看下pom文件。

七、在yarn上执行(MR2)

MR已写完,下面我们便可以在yarn上执行了。由于hadoop1.x使用的是MR1,而yarn上已经包括了MR2了,关于MR1与MR2的区别,笔者在后面的博文中会进行介绍。
下面开始执行:

yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar /test/input /test/output

这里,我们的输入文件格式是使用的.txt,其实hdfs还支持压缩格式以及其它的格式,后面再进行介绍。
然后,我们在hdfs上查看下输出目录:
这里写图片描述
这里由于reducer只指定了一个,所以只有一个输出文件。
我们把此文件get到本地,解压看看:
这里写图片描述

八、查看运行结果以及日志

这里,我们访问http://massdata8:19888/jobhistory,JobHistory Server的默认端口便可查看MR运行日志:
这里写图片描述
同时,也可以运行yarn application -list,查看正在运行的job。

好了,关于MR的编写就讲到这儿了,希望给刚学hadoop的童鞋提供点帮助,另外,大家也可以看看hadoop提供的mr example,学会如何写一个基本的mr。

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

用P2P方法快速分发Docker镜像 - 2016-03-26 17:03:17

在部署较大的容器应用集群时,把应用镜像发布到所有节点常常需要大量时间。我们VMware的研发团队测试了P2P的方法,能够较好地解决大规模镜像分发的问题,为运维实践提供了很好的指引。 概述 在使用Docker运行容器化应用时,宿主机通常先要从Registry服务(如Docker Hub)下载相应的镜像(image)。这种镜像机制在开发环境中使用还是很有效的,团队成员之间可以很方便地共享同样的镜像。在实际的生产环境中,从效率和安全角度,往往会部署私有的Registry服务,专供产线机器集群使用。当大量主机需要

Elasticsearch基础教程 - 2016-03-26 17:03:25

http://blog.csdn.net/cnweike/article/details/33736429 基础概念     Elasticsearch有几个核心概念。从一开始理解这些概念会对整个学习过程有莫大的帮助。     接近实时(NRT)         Elasticsearch是一个接近实时的搜索平台。这意味着,从索引一个文档直到这个文档能够被搜索到有一个轻微的延迟(通常是1秒)。             集群(cluster)         一个集群就是由一个或多个节点组织在一起,它们共同

Java并发编程:synchronized - 2016-03-26 17:03:10

虽然多线程编程极大地提高了效率,但是也会带来一定的隐患。比如说两个线程同时往一个数据库表中插入不重复的数据,就可能会导致数据库中插入了相同的数据。今天我们就来一起讨论下线程安全问题,以及Java中提供了什么机制来解决线程安全问题。 [非static的使用的是对象锁,static的使用的是类锁] 以下是本文的目录大纲: 一.什么时候会出现线程安全问题? 二.如何解决线程安全问题? 三.synchronized同步方法或者同步块 一.什么时候会出现线程安全问题? 在单线程中不会出现线程安全问题,而在多线程编程

搭建5个节点的Hadoop集群 - 2016-03-26 17:03:04

搭建5个节点的Hadoop集群 节点 在实验室192.168.100.20:8006,用户名:root,密码:gzudb604,申请了5个节点,分别是: 192.168 . 100.120 #主节点 192.168 . 100.123 #节点1 192.168 . 100.126 #节点2 192.168 . 100.127 #节点3 192.168 . 100.128 #节点4 网络基本配置 255.255 . 255.0 #子网掩码 192.168 . 100.1 #网关 233.6 . 6.6 #使
        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。         一、Alluxio中RPC实现技术支持         Alluxio中的RPC是依靠Thrift实现的,Apache Thrift 是 Facebook 实现的一种高效的、支持多种编程语言
【原创】k8s源码分析----apiserver之APIGroupVersion 本文中转载自本人空间: http://user.qzone.qq.com/29185807/blog/1458892866         前面3篇文章,主要是根据程序处理流程进行跳转分析。经过这些流程的跳转分析,拨开乌云终见日。                我们剥掉那些不重要的部分,直接进入主要框架。 APIGroupVersion 在master中,api v1的初始化   生成了一个default的apigroupv

理解Java虚拟机体系结构 - 2016-03-26 17:03:20

1 概述 众所周知,Java支持平台无关性、安全性和网络移动性。而Java平台由Java虚拟机和Java核心类所构成,它为纯Java程序提供了统一的编程接口,而不管下层操作系统是什么。正是得益于Java虚拟机,它号称的“一次编译,到处运行”才能有所保障。 1.1 Java程序执行流程 Java程序的执行依赖于编译环境和运行环境。源码代码转变成可执行的机器代码,由下面的流程完成: Java技术的核心就是Java虚拟机,因为所有的Java程序都在虚拟机上运行。Java程序的运行需要Java虚拟机、Java A
1.集群规划:  主机名         IP                  安装的软件                             运行的进程 drguo1  192.168.80.149  jdk、hadoop                          NameNode、DFSZKFailoverController(zkfc)、ResourceManager drguo2   192.168.80.150    jdk、hadoop                      

六、Hadoop安装与启动 - 2016-03-26 14:03:46

注意:[]为替换部分, Hadoop运行模式 单机:Hadoop的默认模式 伪分布:所有守护进程都运行在一个节点上。 完全分布模式:守护进程运行在多个节点上,真正的集群。 以下是完全分布式安装步骤,所有节点均用root用户执行。 准备工作 1.在每个节点新建hadoop用户,相同的密码。 useradd hadoop passwd hadoop 2.修改好主机名 选择一台作为master主机名修改为master,其余作为slave,主机名改为slave1,slave2… 见《云主机主机名修改》 3.配置每
logstash+elastic+kibana日志管理工具介绍及安装 一、 Logstash 安装 1.1 前置条件,安装了官方java 7或更新版本 查看 [ldx @1511 -min ~] $ java -versionopenjdk version "1.8.0_71" OpenJDK Runtime Environment (build 1.8 . 0_71 -b15) OpenJDK 64 - Bit Server VM (build 25.71 -b15, mixed mode) 1.2 在