Spark平台(高级版六)Tez

完整目录、平台简介、安装环境及版本:参考《Spark平台(高级版)概览》

六、Tez 

本章节包括三部分内容:

  • 基本概念,为什么选择Tez、Tez到底是什么、架构是什么样的,功能有哪些。
  • 动手搭建Tez的继承环境。
  • 讲解一个变成案例,案例建立在对MapReduce的理解至上,如果说在hadoop模块中,MapReduce案例能得到计算框架的思想,那么这部分将会了解到许多其他计算框架,诸如Spark、Flink等,在做数据处理时用到的一些思想。

6.1 基本概念

6.1.1 为什么要Tez

在分布式系统中,要存储海量的数据,hadoop构建了一个非商用的机器上能够运行的HDFS分布式存储空间,而且这个存储空间是低成本的,具有良好的扩展性,那么很多企业都会将自己的海量数据存储迁移到hadoop上,而摒弃以前的IOE方式,然后再利用Hive、Pig等提供的类SQL语句完成大规模的数据处理,以应对数据挖掘等应用场景的需求,优点如下:

廉价:存储廉价,这个是HDFS建立的初衷;

人员丰富:开发人员丰富,借助SQL易学的特性,门槛相对低,带来的开发成本会降低。

可是,Hive、Pig等如果需要通过MapReduce进行处理的话,那么解决问题的实时性将得不到满足,如将一条Hive SQL语句通过语法分析器,再转换为一个或多个MapReduce的工作流,这种适合于批量处理的特征,但是高级的处理程序需要一个能交互查询的框架,MapReduce满足不了这个要求。

MapReduce中间的过程,需要将每一步的结果写到HDFS中,会增加IO的操作,从而降低Hive进入hadoop的高效实时性和交互性,因此Tez就是在这种情况下产生的。

现在有很多框架,如Spark、Flink等也是有效的规避了MapReduce总是将数据存储在HDFS或者中间结果存储在HDFS中这么一个缺陷,而衍生出来的,Tez也需要规避这个缺陷。

如图所示,展示了Tez在大数据处理架构中的层次图,YARN做集群管理,HDFS做数据存储,需要有一个交互式的查询任务,通过Hive、Pig等转化为Tez这种有向无环图的执行方式去执行查询,将查询任务提交到YARN。而Spark、Flink有自己的框架可以自己提交给YARN,之所以还继续选择Tez这种框架,是因为Tez可以做为底层的计算框架,可以继承在YARN上。

当然也有其他一些框架的继承选择,比如用Spark 作为Hive  的计算底层框架,截止到目前来看,其兼容性和性能并不是那么优越,目前用Tez作为批处理的架构选择是合理的,特别是在hadoop3的场景下,提供了一序列缓存措施,包括hadoop的缓存、YARN的缓存等,都可以做到更高效。

如图展示了进入MR(MapReduce)和进入Tez的Hive或者Pig查询,他们之间的性能或者架构对比。左侧蓝色框代表Map,绿色代表Reduce,是一个完成的MapReduce Job,这个Job需要将结果输出到HDFS中,左侧做了四个MapReduce任务,其级联关系是树形结构,其中有三道HDFS的写入。右侧Tez情况下,只进行了一次写入,即只写入最终结果就可以了。

如图,性能情况对比,考察一个框架的选择,不仅仅考察架构层次的思想,同时还要考察性能,不仅要看宣传,还要看疗效。如图展示了Tez框架做Hive查询的性能提升情况,比如整体性能提升190-200倍,其中Query表示做了哪些操作,在200G数据,20个节点,每个节点有24GB内存的情况下的耗时。

在表里面有个维度的概念,就拿excel表格来说,作为单一的工作表,就包含二维(行和列);而一个excel文件,包含多个工作表, 如“sheet1、sheet2”这些工作表页列就是第三维;用多个xlsx文件来组成一个项目,这些文件序列就是第四维;把一组组xlsx文件放在一个个目录中,那么这些目录序列就是第五维。

6.1.2 Tez的特征

Tez的特征总结为七点,如下:

  1. 面向大规模数据处理应用的分布式执行框架/引擎,是一个分布式计算数据处理框架,可以集成到YARN中,作为YARN底层透明的给应用程序提供底层数据执行框架。
  2. 基于DAG(Database Availability Group)方式表达数据处理应用的计算框架,DAG方式是目前大数据处理的重要思想,之前基于MapReduce写MapReduce串联程序的话,DAG将这种工作进行了优化及提升,可以基于DAG写上层应用,底层交由框架执行,Tez在这个层次上工作。
  • 构建在YARN上,YARN是集群管理的协调者,Tez可以非常容易的部署在YARN集群上,可以提供透明计算任务的接口。
  • 简化的部署,可以很简单的部署在YARN上,只需要提供一个配置即可,将Tez的库上传到HDFS中,而且Tez的应用可能会使多个版本的,每个版本依赖的库之间可能会产生差异,这不影响多个版本Tez的应用程序并行的在YARN集群上执行,即版本管理对Tez来说不是问题。对于开发者来说,版本管理负责而且让人焦虑,不同版本提供接口不一样,形式也可能发生变化,如何管理这些变化,对于开发者来说是一件非常困难的事,Tez简化了这种操作。
  • 自动适配数据类型,在大数据处理里面,从数据文件里面映射出<key, value>键值对,还有对于高级语言,如JAVA SCALA、PYTHON的元组,和键值对在理解层次上没有差异,在数据表达上有差异,Tez Task可以完全的兼容这种情况,可以输入一个File,输出一个Stream,中间的处理可以用键值对,也可以用元组。
  • 动态物理数据流程执行优化,需要根据集群环境变化,将一些中间处里的Local File和结果放到内存里面,是否放在内存里面是根据集群的资源情况做动态决策,无需应用层干预。
  • 有一个可配置的执行计划,Tez有一个调度执行引擎,在引擎里面可以做动态物理执行计划,这个任务在不同的数据集上物理执行计划是不一样的,比如在10G数据尺寸上,将第二个Reduce阶段由原来的100个缩减到10个。在hadoop里面,一个Reduce来源于多个Map的输出,相同key不会被多个Reduce处理,但是有可能一个Reduce处理多个key。

6.2 搭建环境

ssh远程连接app-11。

6.2.1 安装apache-maven

下载文件apache-maven-3.6.0-bin.tar.gz,并上传到到/hadoop/tools

注:在root账户下操作

解压缩:tar –xf apache-maven-3.6.0-bin.tar.gz

替换设置文件/hadoop/tools/apache-maven-3.6.0/conf

<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<!--
 | This is the configuration file for Maven. It can be specified at two levels:
 |
 |  1. User Level. This settings.xml file provides configuration for a single user,
 |                 and is normally provided in ${user.home}/.m2/settings.xml.
 |
 |                 NOTE: This location can be overridden with the CLI option:
 |
 |                 -s /path/to/user/settings.xml
 |
 |  2. Global Level. This settings.xml file provides configuration for all Maven
 |                 users on a machine (assuming they're all using the same Maven
 |                 installation). It's normally provided in
 |                 ${maven.conf}/settings.xml.
 |
 |                 NOTE: This location can be overridden with the CLI option:
 |
 |                 -gs /path/to/global/settings.xml
 |
 | The sections in this sample file are intended to give you a running start at
 | getting the most out of your Maven installation. Where appropriate, the default
 | values (values used when the setting is not specified) are provided.
 |
 |-->
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
  <!-- localRepository
   | The path to the local repository maven will use to store artifacts.
   |
   | Default: ${user.home}/.m2/repository
  <localRepository>/path/to/local/repo</localRepository>
  -->
<localRepository>hadoop.tools.repo</localRepository>
  <!-- interactiveMode
   | This will determine whether maven prompts you when it needs input. If set to false,
   | maven will use a sensible default value, perhaps based on some other setting, for
   | the parameter in question.
   |
   | Default: true
  <interactiveMode>true</interactiveMode>
  -->

  <!-- offline
   | Determines whether maven should attempt to connect to the network when executing a build.
   | This will have an effect on artifact downloads, artifact deployment, and others.
   |
   | Default: false
  <offline>false</offline>
  -->

  <!-- pluginGroups
   | This is a list of additional group identifiers that will be searched when resolving plugins by their prefix, i.e.
   | when invoking a command line like "mvn prefix:goal". Maven will automatically add the group identifiers
   | "org.apache.maven.plugins" and "org.codehaus.mojo" if these are not already contained in the list.
   |-->
  <pluginGroups>
    <!-- pluginGroup
     | Specifies a further group identifier to use for plugin lookup.
    <pluginGroup>com.your.plugins</pluginGroup>
    -->
  </pluginGroups>

  <!-- proxies
   | This is a list of proxies which can be used on this machine to connect to the network.
   | Unless otherwise specified (by system property or command-line switch), the first proxy
   | specification in this list marked as active will be used.
   |-->
  <proxies>
    <!-- proxy
     | Specification for one proxy, to be used in connecting to the network.
     |
    <proxy>
      <id>optional</id>
      <active>true</active>
      <protocol>http</protocol>
      <username>proxyuser</username>
      <password>proxypass</password>
      <host>proxy.host.net</host>
      <port>80</port>
      <nonProxyHosts>local.net|some.host.com</nonProxyHosts>
    </proxy>
    -->
  </proxies>

  <!-- servers
   | This is a list of authentication profiles, keyed by the server-id used within the system.
   | Authentication profiles can be used whenever maven must make a connection to a remote server.
   |-->
  <servers>
    <!-- server
     | Specifies the authentication information to use when connecting to a particular server, identified by
     | a unique name within the system (referred to by the 'id' attribute below).
     |
     | NOTE: You should either specify username/password OR privateKey/passphrase, since these pairings are
     |       used together.
     |
    <server>
      <id>deploymentRepo</id>
      <username>repouser</username>
      <password>repopwd</password>
    </server>
    -->

    <!-- Another sample, using keys to authenticate.
    <server>
      <id>siteServer</id>
      <privateKey>/path/to/private/key</privateKey>
      <passphrase>optional; leave empty if not used.</passphrase>
    </server>
    -->
  </servers>

  <!-- mirrors
   | This is a list of mirrors to be used in downloading artifacts from remote repositories.
   |
   | It works like this: a POM may declare a repository to use in resolving certain artifacts.
   | However, this repository may have problems with heavy traffic at times, so people have mirrored
   | it to several places.
   |
   | That repository definition will have a unique id, so we can create a mirror reference for that
   | repository, to be used as an alternate download site. The mirror site will be the preferred
   | server for that repository.
   |-->
  <mirrors>
    <!-- mirror
     | Specifies a repository mirror site to use instead of a given repository. The repository that
     | this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used
     | for inheritance and direct lookup purposes, and must be unique across the set of mirrors.
     |
    <mirror>
      <id>mirrorId</id>
      <mirrorOf>repositoryId</mirrorOf>
      <name>Human Readable Name for this Mirror.</name>
      <url>http://my.repository.com/repo/path</url>
    </mirror>
     -->

        <mirror>
            <id>alimaven</id>
            <mirrorOf>central</mirrorOf>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
        </mirror>
    
        <mirror>
            <id>repo1</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://repo1.maven.org/maven2/</url>
        </mirror>
    
        <mirror>
            <id>repo2</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://repo2.maven.org/maven2/</url>
        </mirror>

  </mirrors>

  <!-- profiles
   | This is a list of profiles which can be activated in a variety of ways, and which can modify
   | the build process. Profiles provided in the settings.xml are intended to provide local machine-
   | specific paths and repository locations which allow the build to work in the local environment.
   |
   | For example, if you have an integration testing plugin - like cactus - that needs to know where
   | your Tomcat instance is installed, you can provide a variable here such that the variable is
   | dereferenced during the build process to configure the cactus plugin.
   |
   | As noted above, profiles can be activated in a variety of ways. One way - the activeProfiles
   | section of this document (settings.xml) - will be discussed later. Another way essentially
   | relies on the detection of a system property, either matching a particular value for the property,
   | or merely testing its existence. Profiles can also be activated by JDK version prefix, where a
   | value of '1.4' might activate a profile when the build is executed on a JDK version of '1.4.2_07'.
   | Finally, the list of active profiles can be specified directly from the command line.
   |
   | NOTE: For profiles defined in the settings.xml, you are restricted to specifying only artifact
   |       repositories, plugin repositories, and free-form properties to be used as configuration
   |       variables for plugins in the POM.
   |
   |-->
  <profiles>
    <!-- profile
     | Specifies a set of introductions to the build process, to be activated using one or more of the
     | mechanisms described above. For inheritance purposes, and to activate profiles via <activatedProfiles/>
     | or the command line, profiles have to have an ID that is unique.
     |
     | An encouraged best practice for profile identification is to use a consistent naming convention
     | for profiles, such as 'env-dev', 'env-test', 'env-production', 'user-jdcasey', 'user-brett', etc.
     | This will make it more intuitive to understand what the set of introduced profiles is attempting
     | to accomplish, particularly when you only have a list of profile id's for debug.
     |
     | This profile example uses the JDK version to trigger activation, and provides a JDK-specific repo.
    <profile>
      <id>jdk-1.4</id>

      <activation>
        <jdk>1.4</jdk>
      </activation>

      <repositories>
        <repository>
          <id>jdk14</id>
          <name>Repository for JDK 1.4 builds</name>
          <url>http://www.myhost.com/maven/jdk14</url>
          <layout>default</layout>
          <snapshotPolicy>always</snapshotPolicy>
        </repository>
      </repositories>
    </profile>
    -->

    <!--
     | Here is another profile, activated by the system property 'target-env' with a value of 'dev',
     | which provides a specific path to the Tomcat instance. To use this, your plugin configuration
     | might hypothetically look like:
     |
     | ...
     | <plugin>
     |   <groupId>org.myco.myplugins</groupId>
     |   <artifactId>myplugin</artifactId>
     |
     |   <configuration>
     |     <tomcatLocation>${tomcatPath}</tomcatLocation>
     |   </configuration>
     | </plugin>
     | ...
     |
     | NOTE: If you just wanted to inject this configuration whenever someone set 'target-env' to
     |       anything, you could just leave off the <value/> inside the activation-property.
     |
    <profile>
      <id>env-dev</id>

      <activation>
        <property>
          <name>target-env</name>
          <value>dev</value>
        </property>
      </activation>

      <properties>
        <tomcatPath>/path/to/tomcat/instance</tomcatPath>
      </properties>
    </profile>
    -->
  </profiles>

  <!-- activeProfiles
   | List of profiles that are active for all builds.
   |
  <activeProfiles>
    <activeProfile>alwaysActiveProfile</activeProfile>
    <activeProfile>anotherAlwaysActiveProfile</activeProfile>
  </activeProfiles>
  -->
</settings>

修改部分为:使用阿里云镜像的maven库,这样的话编译过程会非常快,否则非常慢,需要上国外网站下载很多安装包。

        <mirror>
            <id>alimaven</id>
            <mirrorOf>central</mirrorOf>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
        </mirror>

修改为hadoop用户:chown -R hadoop:hadoop apache-maven-3.6.0

切换到hadoop用户:

设置环境变量:vi ~/.bashrc

export MVN_HOME=/hadoop/tools/apache-maven-3.6.0
export PATH=$PATH:${MVN_HOME}/bin

是环境变量生效:source ~/.bashrc

查看版本:mvn -version

注:也可以使用专门的编译虚拟机进行编译,专用编译虚拟机镜像:

6.2.2 编译Tez

hadoop用户,切换到/tmp目录下,创建tez目录:mkdir tez

上传文件至/tmp/tez:

解压缩文件:tar -xf apache-tez-0.9.0-src.tar.gz

删除文件内原有pom.xml:rm -rf apache-tez-0.9.0-src/pom.xml

拷贝已有pom.xml:cp pom.xml apache-tez-0.9.0-src

目的:注释掉从国外网站下载相关组件,加速编译工作。

修改部分:注销

<!--
  <repositories>
    <repository>
      <id>${distMgmtSnapshotsId}</id>
      <name>${distMgmtSnapshotsName}</name>
      <url>${distMgmtSnapshotsUrl}</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>maven2-repository.atlassian</id>
      <name>Atlassian Maven Repository</name>
      <url>https://maven.atlassian.com/repository/public</url>
      <layout>default</layout>
    </pluginRepository>
    <pluginRepository>
      <id>${distMgmtSnapshotsId}</id>
      <name>${distMgmtSnapshotsName}</name>
      <url>${distMgmtSnapshotsUrl}</url>
      <layout>default</layout>
    </pluginRepository>
  </pluginRepositories>
-->

切换到源码目录开始编译工作:/tmp/tez/apache-tez-0.9.0-src

mvn -X clean package -DskipTests=true -Dhadoop.version=3.1.2 -Phadoop28 -P\!hadoop27 -Dprotoc.path=/hadoop/tools/protobuf-2.5.0/bin/protoc -Dmaven.javadoc.skip=true

下载编译安装包.gz到本地:

注:编译出来的版本是0.9.1,实际测试时,因为版本匹配原因,后续还是用资源提供的0.9.0版本。

后续工作都以这两个安装包开展:

6.2.3 安装Tez

6.2.3.1 启动集群

用ssh登录到app-11节点,并切换到hadoop用户:su – hadoop

启动集群,并检查集群是否正常工作。

6.2.3.2 上传安装

切换到hadoop用户,创建Tez的安装目录:mkdir /hadoop/Tez

上传安装包到/hadoop/Tez:

将tez-0.9.0.tar.gz上传到HDFS中/user/tez中:

创建HDFS目录:hdfs dfs -mkdir /user/tez

上传文件tez-0.9.0.tar.gz到/user/tez目录

hdfs dfs -put tez-0.9.0.tar.gz /user/tez

检查app-12和app-13是否也有

6.2.3.3 配置文件

切换目录:cd /hadoop/Hadoop/hadoop-3.1.2/etc/hadoop/

将三个配置文件拷贝到该目录:已有的就替换

将这三个配置文件同时上传到app-12和app-13

scp hadoop-env.sh mapred-site.xml tez-site.xml app-12:/hadoop/Hadoop/hadoop-3.1.2/etc/hadoop/
scp hadoop-env.sh mapred-site.xml tez-site.xml app-13:/hadoop/Hadoop/hadoop-3.1.2/etc/hadoop/
6.2.3.3.1 tez-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<configuration>
	<property>
		<name>tez.lib.uris</name>
		<value>${fs.defaultFS}/user/tez/tez-0.9.0.tar.gz</value>
	</property>
</configuration>
  • 创建了一个URL,指向HDFS上传的目录,其中fs.defaultFS为core-site.xml中定义的HDFS默认名。
	<property>
		<name>tez.lib.uris</name>
		<value>${fs.defaultFS}/user/tez/tez-0.9.0.tar.gz</value>
	</property>
6.2.3.3.2 hadoop-env.sh
# An additional, custom CLASSPATH. Site-wide configs should be
# handled via the shellprofile functionality, utilizing the
# hadoop_add_classpath function for greater control and much
# harder for apps/end-users to accidentally override.
# Similarly, end users should utilize ${HOME}/.hadooprc .
# This variable should ideally only be used as a short-cut,
# interactive way for temporary additions on the command line.
TEZ_CONF_DIR=/hadoop/Hadoop/hadoop-3.1.2/etc/hadoop/tez-site.xml
TEZ_JARS=/hadoop/Tez/tez-0.9.0-minimal
# export HADOOP_CLASSPATH="/some/cool/path/on/your/machine"
export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*
  • 和之前Hadoop集群配置相比,添加了三个配置
TEZ_CONF_DIR=/hadoop/Hadoop/hadoop-3.1.2/etc/hadoop/tez-site.xml
TEZ_JARS=/hadoop/Tez/tez-0.9.0-minimal
# export HADOOP_CLASSPATH="/some/cool/path/on/your/machine"
export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*
6.2.3.3.3 mapred-site.xml
<configuration>
	<!-- 采用yarn-tez作为mapreduce的资源调度框架 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn-tez</value>
    </property>
	
	<!-- Task使用的classpath-->
	<property>
		 <name>mapreduce.application.classpath</name>
		 <value>
		  /hadoop/Hadoop/hadoop-3.1.2/etc/hadoop,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/common/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/common/lib/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/hdfs/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/hdfs/lib/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/mapreduce/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/mapreduce/lib/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/yarn/*,
		  /hadoop/Hadoop/hadoop-3.1.2/share/hadoop/yarn/lib/*
		 </value>
	 </property>
	<!-- client将applicationID以及需要的jar包文件等上传到hdfs的指定目录-->
	<property>
		<name>yarn.app.mapreduce.am.staging-dir</name>
		<value>/hadoop/Hadoop/hadoop-3.1.2/tmp/hadoop-yarn/staging</value>
	</property>
	
	<!-- 指定mapreduce jobhistory地址 -->
	<property>
		<name>mapreduce.jobhistory.address</name>
		<value>app-12:10020</value>
	</property>
	<!-- 任务历史服务器的web地址 -->
	<property>
		<name>mapreduce.jobhistory.webapp.address</name>
		<value>app-12:19888</value>
	</property>
</configuration>
  • 和之前相比,将MapReduce框架yarn改为yarn-tez执行引擎
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn-tez</value>
    </property>
6.2.3.4 tez-0.9.tar.gz

在/hadoop/Tez目录下创建目录tez-0.9.0-minimal

mkdir tez-0.9.0-minimal

解压缩到/Hadoop/Tez/ tez-0.9.0-minimal目录

tar -xf tez-0.9.0-minimal.tar.gz -C tez-0.9.0-minimal 

因为在hadoop-env.sh中设置TEZ_JARS为此目录,包括了JAR包和程序

6.2.3.5 重启hadoop

关闭集群:stop-all.sh

关闭app-12的historyserver服务

ssh app-12 "mapred --daemon stop historyserver"

检查集群是否关闭,此处Zookeeper不用重启

启动hadoop:start-all.sh

启动app-12的historyserver服务

Hadoop自带了一个历史服务器,可以通过历史服务器查看已经运行完的Mapreduce作业记录,比如用了多少个Map、用了多少个Reduce、作业提交时间、作业启动时间、作业完成时间等信息。默认情况下,Hadoop历史服务器是没有启动的,我们可以通过下面的命令来启动Hadoop历史服务器

ssh app-12 "mapred --daemon start historyserver"

因为hadoop的WEB页面是在app-12:8088上打开的。

检查集群是否启动成功

6.2.3.6 案例

测试:利用已经有的example程序做mapreduce。

已有程序:tez-examples-0.9.1.jar

运行jar里面的orderedwordcount命令,不仅计数还排序,

hadoop jar tez-examples-0.9.1.jar orderedwordcount /test/data/hdfs-site.xml /test/output2

输入HDFS中:/test/data/hdfs-site.xml
输出HDFS中:/test/output2

查看结果:hdfs dfs -ls /test/output2

打开文件:hdfs dfs -cat /test/output2/part-v002-o000-r-00000

已按升序排好

6.2.4 拷贝到app-12

创建Tez目录并拷贝

ssh hadoop@app-12 "cd /hadoop && mkdir Tez"
scp -r /hadoop/Tez/* hadoop@app-12:/hadoop/Tez/

查看是否成功

6.2.5 拷贝到app-13

创建Tez目录并拷贝

ssh hadoop@app-13 "cd /hadoop && mkdir Tez"
scp -r /hadoop/Tez/* hadoop@app-13:/hadoop/Tez/

查看是否成功

6.3 编程实践

通过编写wordcount程序了解DAG是如何运行的。

6.3.1 总体架构与创建

整个过程如图:Soucre为HDFS文件或者文件夹,Soucre经过Map(tokenizer)后,输出<Dear,1>、<Dear,1>、<World,1>键值对,然后对键值Reduce操作行求和(summation),再将key和value换一下位置,即键值对<Dear,2>换成<2,Dear>,再进行排序,根据key也就是之前的value进行排序,然后再进行一个sorter,sorter的工作仅仅是将键值对<2,Dear>换回<Dear,2>,写入Sink。

右上角有一个物理并行实例图,因为所有的过程都是在逻辑上进行的,其实是多并行实例的并行的执行,所以是多分区多Map,到达多Reduce。

6.3.2 代码解析

代码文件OrderedWordCount.java位于安装资源下:

6.3.2.1 DAG流程
package org.apache.tez.examples;

import java.io.IOException;
import java.io.PrintStream;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.tez.client.CallerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.examples.WordCount.TokenProcessor;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;

import org.apache.commons.cli.Options;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;

import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;

/**
 * Simple example that extends the WordCount example to show a chain of processing.
 * The example extends WordCount by sorting the words by their count.
 */
public class OrderedWordCount extends Configured implements Tool {
  
  private static String SORTER = "Sorter";
  private static final Logger LOG = LoggerFactory.getLogger(OrderedWordCount.class);
  private static String INPUT = "Input";
  private static String OUTPUT = "Output";
  private static String TOKENIZER = "Tokenizer";
  private static String SUMMATION = "Summation";
  private HadoopShim hadoopShim;
  private boolean disableSplitGrouping = false;
  private boolean generateSplitInClient = false;
  /*
   * Example code to write a processor in Tez.
   * Processors typically apply the main application logic to the data.
   * TokenProcessor tokenizes the input data.
   * It uses an input that provide a Key-Value reader and writes
   * output to a Key-Value writer. The processor inherits from SimpleProcessor
   * since it does not need to handle any advanced constructs for Processors.
   */
  public static class TokenProcessor extends SimpleProcessor {
    IntWritable one = new IntWritable(1);
    Text word = new Text();

    public TokenProcessor(ProcessorContext context) {
      super(context);
    }

    @Override
    public void run() throws Exception {
      Preconditions.checkArgument(getInputs().size() == 1);
      Preconditions.checkArgument(getOutputs().size() == 1);
      // the recommended approach is to cast the reader/writer to a specific type instead
      // of casting the input/output. This allows the actual input/output type to be replaced
      // without affecting the semantic guarantees of the data type that are represented by
      // the reader and writer.
      // The inputs/outputs are referenced via the names assigned in the DAG.
      KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION).getWriter();
      while (kvReader.next()) {
        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
        while (itr.hasMoreTokens()) {
          word.set(itr.nextToken());
          // Count 1 every time a word is observed. Word is the key a 1 is the value
          kvWriter.write(word, one);
        }
      }
    }

  }
  /*
   * SumProcessor similar to WordCount except that it writes the count as key and the 
   * word as value. This is because we can and ordered partitioned key value edge to group the 
   * words with the same count (as key) and order the counts.
   */
  public static class SumProcessor extends SimpleProcessor {
    public SumProcessor(ProcessorContext context) {
      super(context);
    }

    @Override
    public void run() throws Exception {
      Preconditions.checkArgument(getInputs().size() == 1);
      Preconditions.checkArgument(getOutputs().size() == 1);
      // the recommended approach is to cast the reader/writer to a specific type instead
      // of casting the input/output. This allows the actual input/output type to be replaced
      // without affecting the semantic guarantees of the data type that are represented by
      // the reader and writer.
      // The inputs/outputs are referenced via the names assigned in the DAG.
      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SORTER).getWriter();
      KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
      while (kvReader.next()) {
        Text word = (Text) kvReader.getCurrentKey();
        int sum = 0;
        for (Object value : kvReader.getCurrentValues()) {
          sum += ((IntWritable) value).get();
        }
        // write the sum as the key and the word as the value
        kvWriter.write(new IntWritable(sum), word);
      }
    }
  }
  
  /**
   * No-op sorter processor. It does not need to apply any logic since the ordered partitioned edge 
   * ensures that we get the data sorted and grouped by the the sum key.
   */
  public static class NoOpSorter extends SimpleMRProcessor {

    public NoOpSorter(ProcessorContext context) {
      super(context);
    }

    @Override
    public void run() throws Exception {
      Preconditions.checkArgument(getInputs().size() == 1);
      Preconditions.checkArgument(getOutputs().size() == 1);
      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
      KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(SUMMATION).getReader();
      while (kvReader.next()) {
        Object sum = kvReader.getCurrentKey();
        for (Object word : kvReader.getCurrentValues()) {
          kvWriter.write(word, sum);
        }
      }
      // deriving from SimpleMRProcessor takes care of committing the output
    }
  }
  
  public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
      int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName) throws IOException {

    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
        TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping)
          .generateSplitsInAM(!isGenerateSplitInClient).build();

    DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
        TextOutputFormat.class, outputPath).build();

    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
        TokenProcessor.class.getName()));
    tokenizerVertex.addDataSource(INPUT, dataSource);

    // Use Text key and IntWritable value to bring counts for each word in the same partition
    // The setFromConfiguration call is optional and allows overriding the config options with
    // command line parameters.
    OrderedPartitionedKVEdgeConfig summationEdgeConf = OrderedPartitionedKVEdgeConfig
        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
            HashPartitioner.class.getName())
        .setFromConfiguration(tezConf)
        .build();

    // This vertex will be reading intermediate data via an input edge and writing intermediate data
    // via an output edge.
    Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(
        SumProcessor.class.getName()), numPartitions);
    
    // Use IntWritable key and Text value to bring all words with the same count in the same 
    // partition. The data will be ordered by count and words grouped by count. The
    // setFromConfiguration call is optional and allows overriding the config options with
    // command line parameters.
    OrderedPartitionedKVEdgeConfig sorterEdgeConf = OrderedPartitionedKVEdgeConfig
        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
            HashPartitioner.class.getName())
        .setFromConfiguration(tezConf)
        .build();

    // Use 1 task to bring all the data in one place for global sorted order. Essentially the number
    // of partitions is 1. So the NoOpSorter can be used to produce the globally ordered output
    Vertex sorterVertex = Vertex.create(SORTER, ProcessorDescriptor.create(
        NoOpSorter.class.getName()), 1);
    sorterVertex.addDataSink(OUTPUT, dataSink);

    // No need to add jar containing this class as assumed to be part of the tez jars.
    
    DAG dag = DAG.create(dagName);
    dag.addVertex(tokenizerVertex)
        .addVertex(summationVertex)
        .addVertex(sorterVertex)
        .addEdge(
            Edge.create(tokenizerVertex, summationVertex,
                summationEdgeConf.createDefaultEdgeProperty()))
        .addEdge(
            Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
    return dag;  
  }
  /**
   * @param dag           the dag to execute
   * @param printCounters whether to print counters or not
   * @param logger        the logger to use while printing diagnostics
   * @return Zero indicates success, non-zero indicates failure
   * @throws TezException
   * @throws InterruptedException
   * @throws IOException
   */
  public int runDag(DAG dag, TezClient tezClient, Logger logger) throws TezException,
      InterruptedException, IOException {
    tezClient.waitTillReady();

    CallerContext callerContext = CallerContext.create("TezExamples",
        "Tez Example DAG: " + dag.getName());
    ApplicationId appId = tezClient.getAppMasterApplicationId();
    if (hadoopShim == null) {
      Configuration conf = (getConf() == null ? new Configuration(false) : getConf());
      hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
    }

    if (appId != null) {
      TezUtilsInternal.setHadoopCallerContext(hadoopShim, appId);
      callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
    }
    dag.setCallerContext(callerContext);

    DAGClient dagClient = tezClient.submitDAG(dag);
    Set<StatusGetOpts> getOpts = Sets.newHashSet();
    DAGStatus dagStatus;
    dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);

    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
      logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
      return -1;
    }
    return 0;
  }
  
  protected int runJob(String[] args, TezConfiguration tezConf,
      TezClient tezClient) throws Exception {
    DAG dag = createDAG(tezConf, args[0], args[1],
        1,
        disableSplitGrouping,
        generateSplitInClient, 
		"OrderedWordCount");
    LOG.info("Running OrderedWordCount");
    return runDag(dag, tezClient, LOG);
  }
  
  @Override
  public final int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
	
	// shims相关类是用来兼容不同的hadoop版本的
    hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();

    int result = validateArgs(otherArgs);
    if (result != 0) {
	  printUsage();
      return result;
    }

    TezConfiguration tezConf = new TezConfiguration(getConf());
    UserGroupInformation.setConfiguration(tezConf);
	
	// the tez client instance to use to run the DAG if any custom monitoring is required.
	TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
    tezClient.start();
	
    try {
      return runJob(otherArgs, tezConf, tezClient);
    } finally {
       tezClient.stop();
    }
  }
  
  protected void printUsage() {
    System.err.println("Usage: " + " in out [numPartitions]");
  }

  protected int validateArgs(String[] otherArgs) {
    if (otherArgs.length < 2 || otherArgs.length > 3) {
      return 2;
    }
    return 0;
  }
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
    System.exit(res);
  }
}

此处了解DAG定义:在public static DAG createDAG方法里面:

public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName)
  • 定义DataSourceDescriptor,来源于输入inputPath
DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping) .generateSplitsInAM(!isGenerateSplitInClient).build();
  • 定义DataSinkDescriptor输出位置,以及输出的类型
DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath).build();
  • 定义第一个节点,可以理解为Map节点,先定义TOKENIZER节点,然后由ProcessorDescriptor定义一个相关处理方法,处理方法为TokenProcessor,TokenProcessor主要做Map工作,包括读取数据并写到输出或者中间的传输通道上,将每一行解析为一个个单词,然后将每个单词加上1,变成键值对形式,写到输出,典型的Map程序,用DAG方式表达计算的逻辑。
Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(        TokenProcessor.class.getName()));
tokenizerVertex.addDataSource(INPUT, dataSource);
  • 创建边,给下一个顶点即求和的顶点, OrderedPartitionedKVEdgeConfig不仅仅要分区,同时还要排序,边的输入为Map输出的键值对<key,value>,类型为< Text.class.getName(), IntWritable.class.getName()>即<string, int>。
OrderedPartitionedKVEdgeConfig summationEdgeConf = OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),ashPartitioner.class.getName()).setFromConfiguration(tezConf).build();
  • 定义求和顶点,由SumProcessor定义,SumProcessor和Reduce操作类似,从输入流程中取到所有做Map操作结果的键值对,将想用key的value求和,然后写入输出,写入前进行换位,即将<key,value>换成<value,key>。这样后续尽可以针对value进行排序。
Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(SumProcessor.class.getName()), numPartitions);
  • 再创建边,功能类似4,OrderedPartitionedKVEdgeConfig,输入类型为换序后的键值对,即< IntWritable.class.getName(), Text.class.getName()>也就是<int,value>。
OrderedPartitionedKVEdgeConfig sorterEdgeConf = OrderedPartitionedKVEdgeConfig .newBuilder(IntWritable.class.getName(), Text.class.getName(),HashPartitioner.class.getName()).setFromConfiguration(tezConf).build();
  • 接着排序,由于已经在边上做了排序,此处就不需要再做排序了,仅仅将键值对变换位置即可。即NoOpSorter做的工作仅仅就是将键值对进行换位。
Vertex sorterVertex = Vertex.create(SORTER, ProcessorDescriptor.create(NoOpSorter.class.getName()), 1);
sorterVertex.addDataSink(OUTPUT, dataSink);
  • 定义DAT,先添加tokenizerVertex、summationVertex、sorterVertex,然后添加边,边包括从哪个节点连接到哪个节点的信息。
DAG dag = DAG.create(dagName);
    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addVertex(sorterVertex).addEdge(Edge.create(tokenizerVertex, summationVertex,summationEdgeConf.createDefaultEdgeProperty())).addEdge(Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
return dag;  
}
6.3.2.2 总体流程

接下来讲述如何提交DAG,如何将DAG提交给集群运行。

1、Main运行的是ToolRunner.run方法

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
    System.exit(res);
  }

2、ToolRunner.run方法,

  • 获取Tez的Config
Configuration conf = getConf();
  • 解析输入信息
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
  • 兼容不同hadoop版本,针对不同版本定义一序列方法名,这些方法名有可能在不同版本对应的名称或者参数不一样,用来兼容差异。
hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
  • 参数检验
    int result = validateArgs(otherArgs);
    if (result != 0) {
	  printUsage();
      return result;
    }
  • 关键角色,创建TezConfiguration
    TezConfiguration tezConf = new TezConfiguration(getConf());
    UserGroupInformation.setConfiguration(tezConf);
  • 根据TezConfiguration创建TezClient端,需要将DAG程序提交给AM(ApplicationMaster),提交动作由app的客户端也就是Tez的Client端完成。
TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
tezClient.start();
  • 就可以运行了
    try {
      return runJob(otherArgs, tezConf, tezClient);
    } finally {
       tezClient.stop();
    }

3、runJob方法,创建DAG并提交,具体创建流程参考前面

DAG dag = createDAG(tezConf, args[0], args[1],1,disableSplitGrouping,generateSplitInClient, "OrderedWordCount");
LOG.info("Running OrderedWordCount");
return runDag(dag, tezClient, LOG);
  • runDag方法将DAT通过tezClient提交。
public int runDag(DAG dag, TezClient tezClient, Logger logger) throws TezException,      InterruptedException, IOException {
  • 等待客户端的完全启动
    tezClient.waitTillReady();
  • 创建上下文环境
CallerContext callerContext = CallerContext.create("TezExamples", "Tez Example DAG: " + dag.getName());
  • 获取AM的Id,每个应用都有一个appId,
    ApplicationId appId = tezClient.getAppMasterApplicationId();
    if (hadoopShim == null) {
      Configuration conf = (getConf() == null ? new Configuration(false) : getConf());
      hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
    }
    if (appId != null) {
      TezUtilsInternal.setHadoopCallerContext(hadoopShim, appId);
      callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
    }
    dag.setCallerContext(callerContext);
  • 用tezClient提交已经定义的DAG
DAGClient dagClient = tezClient.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
  • 提交完成后,等待完成,
    DAGStatus dagStatus;
    dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
      logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
      return -1;
    }
    return 0;
  }

这就是整个过程。

6.3.3 编译

6.3.3.1 解析脚本文件

编译脚本文件buildExamples.sh,位置如下:

#!/bin/sh
	if [ "hadoop" != `whoami` ]; then echo "run in hadoop user" && exit ; fi
	
	SRC_PATH=org/apache/tez/examples
	MAIN_CLASS=org.apache.tez.examples.OrderedWordCount
	SRC_TAR_NAME=org.tar
	BUILD_JAR_NAME=app.jar
	export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
	rm -rf $BUILD_JAR_NAME
	rm -rf $(echo $SRC_PATH|sed -r 's@/.*@@g')
	tar -xf $SRC_TAR_NAME
	hadoop com.sun.tools.javac.Main $SRC_PATH/*.java
	echo "Manifest-Version: 1.0" > $SRC_PATH/MANIFEST.MF
	echo "Main-Class: $MAIN_CLASS" >> $SRC_PATH/MANIFEST.MF
	echo "Class-Path: " >> $SRC_PATH/MANIFEST.MF
	echo "" >> $SRC_PATH/MANIFEST.MF
	jar cvfm $BUILD_JAR_NAME $SRC_PATH/MANIFEST.MF $SRC_PATH/*.class
	
	# hadoop jar app.jar org.apache.tez.examples.OrderedWordCount /installTest/tez/data /installTest/tez/output2
	# |
	# |
	# V
	# hadoop jar app.jar /installTest/hadoop/data /installTest/tez/output3
	if [ $# -ge 1 ]; then
        case "$1" in
			run)
				hdfs dfs -rm -r -f /installTest/tez
				hdfs dfs -mkdir -p /installTest/tez/data
				hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml  /installTest/tez/data
				hadoop jar $BUILD_JAR_NAME /installTest/tez/data /installTest/tez/output
				hdfs dfs -cat /installTest/tez/output/part-v002-o000-r-00000
				;;
			*)
				echo "unknown parameter $1."
				;;
        esac
	fi
  • root用户登录
#!/bin/sh
	if [ "hadoop" != `whoami` ]; then echo "run in hadoop user" && exit ; fi
  • 定义包路径
	SRC_PATH=org/apache/tez/examples
  • 运行类位置
	MAIN_CLASS=org.apache.tez.examples.OrderedWordCount
  • 位于同一目录下文件名
	SRC_TAR_NAME=org.tar
	BUILD_JAR_NAME=app.jar
	export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
  • 删除已有的,避免冲突
	rm -rf $BUILD_JAR_NAME
	rm -rf $(echo $SRC_PATH|sed -r 's@/.*@@g')
  • 解压缩程序包
	tar -xf $SRC_TAR_NAME
  • 编译程序
	hadoop com.sun.tools.javac.Main $SRC_PATH/*.java
  • 创建装配配置文件,将JAR包入口程序定义为org.apache.tez.examples.OrderedWordCount
org.apache.tez.examples.OrderedWordCount
	echo "Manifest-Version: 1.0" > $SRC_PATH/MANIFEST.MF
	echo "Main-Class: $MAIN_CLASS" >> $SRC_PATH/MANIFEST.MF
	echo "Class-Path: " >> $SRC_PATH/MANIFEST.MF
	echo "" >> $SRC_PATH/MANIFEST.MF
  • 对所有CLASS进行装配,打包编译后的class文件
	jar cvfm $BUILD_JAR_NAME $SRC_PATH/MANIFEST.MF $SRC_PATH/*.class
  • 如果不定义入口类,则需要在hadoop jar命令中指定入口类org.apache.tez.examples.OrderedWordCount
	if [ $# -ge 1 ]; then
        case "$1" in
			run)
  • 删除目录
	hdfs dfs -rm -r -f /installTest/tez
	hdfs dfs -mkdir -p /installTest/tez/data
  • 上传测试文件
	hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml  /installTest/tez/data
  • 运行
	hadoop jar $BUILD_JAR_NAME /installTest/tez/data /installTest/tez/output
  • 查看运行结果
	hdfs dfs -cat /installTest/tez/output/part-v002-o000-r-00000
				;;
			*)
				echo "unknown parameter $1."
				;;
        esac
	fi
6.3.3.2 编译

用ssh登录到app-11节点,并切换到hadoop用户:su – hadoop

启动集群,并检查集群是否正常工作。

将org文件夹打包成org.tar

清空/tmp/tez目录:rm -rf *

上传文件org.tar到/tmp/tez目录

源码打包是为了保留文件结构及路径

上传脚本文件到/tmp/tez

赋予脚本执行权限:chmod a+x buildExamples.sh

6.3.4 运行

运行:./buildExamples.sh run

  • 查看输出结果:hdfs dfs -cat /installTest/tez/output/part-v002-o000-r-00000
hdfs dfs -cat /installTest/tez/output/part-v002-o000-r-00000

结果已经按递增顺序排好。

发表回复