1、Hadoop背景介绍
什么是Hadoop:
1. HADOOP是apache旗下的一套开源软件平台
2. HADOOP提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理
有个案例:
有个客户端Driver在运行,然后向集群中发送运行的请求,集群中各个服务器名为worker;Driver中存在task任务,打包成一个jar包,Driver需要把程序发到worker上去,在客户端用NIO方式向服务端集群发包,因此task.jar包会发到worker中,然后客户端再发送一个启动脚本,”java -cpcn.itcast.bigdata.Task.task.jar /home/Hadoop/data/1.txxt -Xmx=2000m -Xms=2000m“,在worker中运行exec(“command”)即可运行。
上面描述可知,过程非常复杂,因此需要一个更便捷的方式,即Hadoop:
3. HADOOP的核心组件有:
A. HDFS(分布式文件系统):用来存储相应的数据
B. YARN(运算资源调度系统):用来调度相应的数据
C. MAPREDUCE(分布式运算编程框架)
4. 广义上来说,HADOOP通常是指一个更广泛的概念——HADOOP生态圈:
下图为主要的Hadoop架构:
Hadoop产生背景:
1. HADOOP最早起源于Nutch。百度即为:爬虫+搜索引擎;Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题——如何解决数十亿网页的存储和索引问题。
2. 2003年、2004年谷歌发表的两篇论文为该问题提供了可行的解决方案。
——分布式文件系统(GFS),可用于处理海量网页的存储
——分布式计算框架MAPREDUCE,可用于处理海量网页的索引计算问题。
3. Nutch的开发人员完成了相应的开源实现HDFS和MAPREDUCE,并从Nutch中剥离成为独立项目HADOOP,到2008年1月,HADOOP成为Apache*项目,迎来了它的快速发展期。
Hadoop在大数据、云计算中的位置和关系:
1. 云计算是分布式计算、并行计算、网格计算、多核计算、网络存储、虚拟化、负载均衡等传统计算机技术和互联网技术融合发展的产物。借助IaaS(基础设施即服务)、PaaS(平台即服务)、SaaS(软件即服务)等业务模式,把强大的计算能力提供给终端用户。
2. 现阶段,云计算的两大底层支撑技术为“虚拟化”和“大数据技术”
3. 而HADOOP则是云计算的PaaS层的解决方案之一,并不等同于PaaS,更不等同于云计算本身。
国内外Hadoop应用案例介绍:
1、HADOOP应用于数据服务基础平台建设
2、HADOOP用于用户画像
3、HADOOP用于网站点击流日志数据挖掘
1.5、国内HADOOP就业整体情况
A. 大数据产业已纳入国家十三五规划
B. 各大城市都在进行智慧城市项目建设,而智慧城市的根基就是大数据综合平台
C. 互联网时代数据的种类,增长都呈现爆发式增长,各行业对数据的价值日益重视
D. 相对于传统JAVAEE技术领域来说,大数据领域的人才相对稀缺
E. 随着现代社会的发展,数据处理和数据挖掘的重要性只会增不会减,因此,大数据技术是一个尚在蓬勃发展且具有长远前景的领域
2、HADOOP就业职位要求
大数据是个复合专业,包括应用开发、软件平台、算法、数据挖掘等,因此,大数据技术领域的就业选择是多样的,但就HADOOP而言,通常都需要具备以下技能或知识:
A. HADOOP分布式集群的平台搭建
B. HADOOP分布式文件系统HDFS的原理理解及使用
C. HADOOP分布式运算框架MAPREDUCE的原理理解及编程
D. Hive数据仓库工具的熟练应用
E. Flume、sqoop、oozie等辅助工具的熟练使用
F. Shell/python等脚本语言的开发能力
3、Hadoop生态圈以及各组成部分:
HDFS:分布式文件系统
MAPREDUCE:分布式运算程序开发框架
HIVE:基于大数据技术(文件系统+运算框架)的SQL数据仓库工具
HBASE:基于HADOOP的分布式海量数据库
ZOOKEEPER:分布式协调服务基础组件
Mahout:基于mapreduce/spark/flink等分布式运算框架的机器学习算法库
Oozie:工作流调度框架
Sqoop:数据导入导出工具
Flume:日志数据采集框架
2 分布式系统概述
注:由于大数据技术领域的各类技术框架基本上都是分布式系统,因此,理解hadoop、storm、spark等技术框架,都需要具备基本的分布式系统概念
2.1 分布式软件系统(Distributed Software Systems)
² 该软件系统会划分成多个子系统或模块,各自运行在不同的机器上,子系统或模块之间通过网络通信进行协作,实现最终的整体功能
² 比如分布式操作系统、分布式程序设计语言及其编译(解释)系统、分布式文件系统和分布式数据库系统等。
2.2 分布式软件系统举例:solrcloud
A. 一个solrcloud集群通常有多台solr服务器
B. 每一个solr服务器节点负责存储整个索引库的若干个shard(数据分片)
C. 每一个shard又有多台服务器存放若干个副本互为主备用
D. 索引的建立和查询会在整个集群的各个节点上并发执行
E. solrcloud集群作为整体对外服务,而其内部细节可对客户端透明
总结:利用多个节点共同协作完成一项或多项具体业务功能的系统就是分布式系统。
2.3 分布式应用系统模拟开发
需求:可以实现由主节点将运算任务发往从节点,并将各从节点上的任务启动;
程序清单:
AppMaster
AppSlave/APPSlaveThread
Task
程序运行逻辑流程:
3、离线数据分析流程介绍
注:本环节主要感受数据分析系统的宏观概念及处理流程,初步理解hadoop等框架在其中的应用环节,不用过于关注代码细节
一个应用广泛的数据分析系统:“web日志数据挖掘”
3.1、需求分析
案例名称:
“网站或APP点击流日志数据挖掘系统”。
通过js写的网址,同时可以用http://tool.chinaz.com/tools/urlencode.aspx,网站进行相应的转码,看到网站中写了哪些内容。
一般中型的网站(10W的PV以上),每天会产生1G以上Web日志文件。大型或超大型的网站,可能每小时就会产生10G的数据量。
具体来说,比如某电子商务网站,在线团购业务。每日PV数100w,独立IP数5w。用户通常在工作日上午10:00-12:00和下午15:00-18:00访问量最大。日间主要是通过PC端浏览器访问,休息日及夜间通过移动设备访问较多。网站搜索浏量占整个网站的80%,PC用户不足1%的用户会消费,移动用户有5%会消费。
对于日志的这种规模的数据,用HADOOP进行日志分析,是最适合不过的了。
3.1.2、案例需求描述
“Web点击流日志”包含着网站运营很重要的信息,通过日志分析,我们可以知道网站的访问量,哪个网页访问人数最多,哪个网页最有价值,广告转化率、访客的来源信息,访客的终端信息等。
3.1.3 数据来源
本案例的数据主要由用户的点击行为记录
获取方式:在页面预埋一段js程序,为页面上想要监听的标签绑定事件,只要用户点击或移动到标签,即可触发ajax请求到后台servlet程序,用log4j记录下事件信息,从而在web服务器(nginx、tomcat等)上形成不断增长的日志文件。
形如:
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" |
3.2、数据处理流程
3.2.1 流程图解析
本案例跟典型的BI系统极其类似,整体流程如下:
但是,由于本案例的前提是处理海量数据,因而,流程中各环节所使用的技术则跟传统BI完全不同,后续课程都会一一讲解:
1) 数据采集:定制开发采集程序,或使用开源框架FLUME
2) 数据预处理:定制开发mapreduce程序运行于hadoop集群
3) 数据仓库技术:基于hadoop之上的Hive
4) 数据导出:基于hadoop的sqoop数据导入导出工具
5) 数据可视化:定制开发web程序或使用kettle等产品
6) 整个过程的流程调度:hadoop生态圈中的oozie工具或其他类似开源产品
推荐系统的流程图:
推荐原料中主要存储的是历史数据,很难处理实时的数据;
因此,加上kafka,数据源源不断的获取的同时会进行数据分析,然后推荐相应的产品给用户,统计当前的最新动态,以此更新数据库。
上面的方法是用SQL,从统计的角度来分析,还有机器学习的方式获取,主要是用mahout运算得到。
3.2.2 项目技术架构图
3.2.3 项目相关截图(感性认识,欣赏即可)
a) Mapreudce程序运行
b) 在Hive中查询数据
c) 将统计结果导入mysql
./sqoop export --connect jdbc:mysql://localhost:3306/weblogdb --username root --password root --table t_display_xx --export-dir /user/hive/warehouse/uv/dt=2014-08-03 |
3.3 项目最终效果
经过完整的数据处理流程后,会周期性输出各类统计指标的报表,在生产实践中,最终需要将这些报表数据以可视化的形式展现出来,本案例采用web程序来实现数据可视化
效果如下所示:
4、集群搭建
4.1、Hadoop集群搭建
4.1.1、集群介绍:
HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起。
HDFS集群:
负责海量数据的存储,集群中的角色主要有 NameNode/ DataNode
YARN集群:
负责海量数据运算时的资源调度,集群中的角色主要有ResourceManager /NodeManager
(那mapreduce是什么呢?它其实是一个应用程序开发包)
分布式的构建,需要两个namenode,在一台服务器上,resource manager可以单独放一台服务器,也可以与name node放在一起,data node需要有三个,且与node manager放在一起,如上图(一共四台服务器)。
本集群搭建案例,以5节点为例进行搭建,角色分配如下:
hdp-node-01 NameNode SecondaryNameNode hdp-node-02 ResourceManager hdp-node-03 DataNode NodeManager hdp-node-04 DataNode NodeManager hdp-node-05 DataNode NodeManager |
Useradd Hadoop;
密码设置为:hadoop;
部署图如下:
4.1.2、服务器准备
本案例使用虚拟机服务器来搭建HADOOP集群,所用软件及版本:
Vmware 12.0
Redhat 6 64bit
4.1.3、网络环境准备
采用NAT方式联网
在Windows10中配置hosts:
Hadoop_env.xml:用来配置hadoop环境参数配置;
Core-site.xml:核心参数配置;
HDFS-site.xml:HDFS配置;
Yarn-site.xml:yarn配置;
4.1.4、服务器系统设置
1)添加HADOOP用户(此为其他账号,暂时使用root,后期研究使用其他账号)
// 切换root用户
$ su root
// 创建hadoop用户组
# groupadd hadoop
// 在hadoop用户组中创建hadoop用户
# useradd -g hadoop hadoop
// 修改用户hadoop密码
# passwd hadoop
2)为HADOOP用户分配sudoer权限
// 修改sudoers配置文件给hadoop用户添加sudo权限
# vim /etc/sudoers
hadoop ALL=(ALL) ALL
// 测试是否添加权限成功
# exit
$ sudo ls /root
3)同步时间
如果集群节点时间不同步,可能会出现节点宕机或引发其它异常问题,所以在生产环境中一般通过配置NTP服务器实现集群时间同步。本集群在hadoop-master1节点设置ntp服务器,具体方法如下:
// 切换root用户
$ su root
// 查看是否安装ntp
# rpm -qa | grep ntp
// 安装ntp
# yum install -y ntp
// 配置时间服务器
# vim /etc/ntp.conf
# 禁止所有机器连接ntp服务器
restrict default ignore
# 允许局域网内的所有机器连接ntp服务器
restrict 172.16.20.0 mask 255.255.255.0nomodify notrap
# 使用本机作为时间服务器
server 127.127.1.0
// 启动ntp服务器
# service ntpd start
// 设置ntp服务器开机自动启动
# chkconfig ntpd on
集群其它节点通过执行crontab定时任务,每天在指定时间向ntp服务器进行时间同步,方法如下:
// 切换root用户
$ su root
// 执行定时任务,每天00:00向服务器同步时间,并写入日志
# crontab -e
0 0 * * * /usr/sbin/ntpdate hadoop-master1>> /home/hadoop/ntpd.log
// 查看任务
# crontab -l
4)设置主机名
// 切换root用户
$ su root
// 修改本机IP地址
# vim/etc/sysconfig/network-scripts/ifcfg-eth0
// 重启网络服务
# service network restart
// 修改主机名
# hostnamectl set-hostname 主机名
// 查看主机名
# hostnamectl status
5)配置内网域名映射:
// 切换root用户
$ su root
// 编辑hosts文件
# vim /etc/hosts
172.16.20.81 hadoop-master1
172.16.20.82 hadoop-master2
172.16.20.83 hadoop-slave1
172.16.20.84 hadoop-slave2
172.16.20.85 hadoop-slave3
6)配置ssh免密登陆
// 在hadoop-master1节点生成SSH**对
$ ssh-****** -t rsa
$ ssh-copy-id hdp-master2
authorized_keys:存放远程免密登录的公钥,主要通过这个文件记录多台机器的公钥
id_rsa : 生成的私钥文件
id_rsa.pub :生成的公钥文件
know_hosts : 已知的主机公钥清单
如果希望ssh公钥生效需满足至少下面两个条件:
1).ssh目录的权限必须是700
2) .ssh/authorized_keys文件权限必须是600
备注:在其余节点上执行同样的操作,确保集群中任意节点都可以ssh免密码登录到其它各节点。
7)配置防火墙
// 切换root用户
$ su root
//重启后永久性生效:
开启:chkconfig iptables on
关闭:chkconfig iptablesoff
//即时生效,重启后失效:
开启:service iptablesstart
关闭:service iptables stop
需要说明的是对于Linux下的其它服务都可以用以上命令执行开启和关闭操作。
在开启了防火墙时,做如下设置,开启相关端口,
修改/etc/sysconfig/iptables文件,添加以下内容:
-ARH-Firewall-1-INPUT -m state ——state NEW -m tcp -p tcp ——dport 80 -j ACCEPT
-ARH-Firewall-1-INPUT -m state ——state NEW -m tcp -p tcp ——dport 22 -j ACCEPT
// 开机关闭Selinux
# vim /etc/selinux/config
SELINUX=disabled
// 重启机器后root用户查看Selinux状态
# getenforce
8)安装jdk:
//卸载系统自带的openjdk
$suroot
#rpm-qa | grep java
#rpm-e --nodeps java-1.7.0-openjdk-1.7.0.75-2.5.4.2.el7_0.x86_64
#rpm-e --nodeps java-1.7.0-openjdk-headless-1.7.0.75-2.5.4.2.el7_0.x86_64
#rpm-e --nodeps tzdata-java-2015a-1.el7_0.noarch
#exit
//解压jdk安装包
$tar-xvf jdk-7u79-linux-x64.tar.gz
//删除安装包
$rmjdk-7u79-linux-x64.tar.gz
//修改用户环境变量
$cd ~
$vim.bash_profile
exportJAVA_HOME=/home/hadoop/app/jdk1.7.0_79
exportPATH=$PATH:$JAVA_HOME/bin
//使修改的环境变量生效
$source.bash_profile
//测试jdk是否安装成功
$java-version
设置开机自启动:
/etc/rc.local添加:
exportJAVA_OPTS=-Xmx1024M
exportJAVA_HOME=/usr/local/jdk
exportPATH=$PATH:$JAVA_HOME/bin
9)Zookeeper集群安装
Zookeeper是一个开源分布式协调服务,其独特的Leader-Follower集群结构,很好的解决了分布式单点问题。目前主要用于诸如:统一命名服务、配置管理、锁服务、集群管理等场景。大数据应用中主要使用Zookeeper的集群管理功能。
本集群使用zookeeper-3.4.5-cdh5.7.1版本。首先在Hadoop-slave1节点安装Zookeeper,方法如下:
//新建目录
$mkdir app/cdh
//解压zookeeper安装包
$tar -xvf zookeeper-3.4.5-cdh5.7.1.tar.gz -C app/cdh/
//删除安装包
$rm -rf zookeeper-3.4.5-cdh5.7.1.tar.gz
//配置用户环境变量
$vim /etc/profile
exportZOOKEEPER_HOME=/home/hadoop/app/cdh/zookeeper-3.4.5-cdh5.7.1
exportPATH=$PATH:$ZOOKEEPER_HOME/bin
//使修改的环境变量生效
$source /etc/profile
//修改zookeeper的配置文件
$cd app/cdh/zookeeper-3.4.5-cdh5.7.1/conf/
$cp zoo_sample.cfg zoo.cfg
$vim zoo.cfg
#客户端心跳时间(毫秒)
tickTime=2000
#允许心跳间隔的最大时间
initLimit=10
#同步时限
syncLimit=5
#数据存储目录
dataDir=/home/hadoop/app/cdh/zookeeper-3.4.5-cdh5.7.1/data
#数据日志存储目录
dataLogDir=/home/hadoop/app/cdh/zookeeper-3.4.5-cdh5.7.1/data/log
#端口号
clientPort=2181
#集群节点和服务端口配置
server.1=hadoop-slave1:2888:3888
server.2=hadoop-slave2:2888:3888
server.3=hadoop-slave3:2888:3888
#以下为优化配置
#服务器最大连接数,默认为10,改为0表示无限制
maxClientCnxns=0
#快照数
autopurge.snapRetainCount=3
#快照清理时间,默认为0
autopurge.purgeInterval=1
//创建zookeeper的数据存储目录和日志存储目录
$cd ..
$mkdir -p data/log
//在data目录中创建一个文件myid,输入内容为1
$echo "1" >> data/myid
//修改zookeeper的日志输出路径(注意CDH版与原生版配置文件不同)
$vim libexec/zkEnv.sh
Vi/usr/apps/zookeeper/zookeeper-3.4.10/bin/zkEnv.sh
if[ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="$ZOOKEEPER_HOME/logs"
fi
if[ "x${ZOO_LOG4J_PROP}" = "x" ]
then
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi
//修改zookeeper的日志配置文件
$vim conf/log4j.properties
zookeeper.root.logger=INFO,ROLLINGFILE
//创建日志目录
$mkdir logs
将hadoop-slave1节点上的Zookeeper目录同步到hadoop-slave2和hadoop-slave3节点,并修改Zookeeper的数据文件。此外,不要忘记设置用户环境变量。
//在hadoop-slave1中将zookeeper目录复制到其它节点
$cd ~
$scp -r app/cdh/zookeeper-3.4.5-cdh5.7.1hadoop-slave2:/home/hadoop/app/cdh
$scp -r app/cdh/zookeeper-3.4.5-cdh5.7.1 hadoop-slave3:/home/hadoop/app/cdh
//在hadoop-slave2中修改data目录中的myid文件
$echo "2" >app/cdh/zookeeper-3.4.5-cdh5.7.1/data/myid
//在hadoop-slave3中修改data目录中的myid文件
$echo "3" >app/cdh/zookeeper-3.4.5-cdh5.7.1/data/myid
最后,在安装了Zookeeper的各节点上启动Zookeeper,并查看节点状态,方法如下:
//启动
启动zookeeper,进入zookeeper的bin目录,执行命令:zkServer.sh start。如果没有执行权限,需要增加权限。
$./zkServer.sh start
//查看状态
如何判断zookeeper有没有启动成功呢?执行命令:zkServer.sh status。如果返回如下信息证明zookeeper启动成功。
$./zkServer.sh status
//关闭
关闭zookeeper,执行命令zkServer.sh stop。如果返回如下信息证明zookeeper启动成功(It is probably not running)。
$./zkServer.shstop
10)设置开机自启动:
首先请登陆你的linux服务器
用cd 命令切换到/etc/rc.d/init.d/目录下
接着用touch zookeeper创建一个文件
然后为这个文件添加可执行权限
chmod+x zookeeper
接着用vi zookeeper来编辑这个文件
接着在zookeeper里面输入如下内容
#!/bin/bash
#chkconfig:234520 90
#description:zookeeper
#processname:zookeeper
case$1 in
start) su root /usr/local/sw/zookeeper/bin/zkServer.shstart;;
stop) su root/usr/local/sw/zookeeper/bin/zkServer.sh stop;;
status) su root/usr/local/sw/zookeeper/bin/zkServer.sh status;;
restart) su root /usr/local/sw/zookeeper/bin/zkServer.shrestart;;
*) echo "requirestart|stop|status|restart" ;;
esac
然后保存退出
先按esc
然后按:键盘
接这个输入wq即可保存退出
这个时候我们就可以用service zookeeper start/stop来启动停止zookeeper服务了
最后一点我们需要开机自动启动
所以需要添加到启动里面
使用chkconfig --add zookeeper命令吧zookeeper添加到开机启动里面
添加完成之后接这个使用chkconfig --list 来看看我们添加的zookeeper是否在里面
如果上面的操作都正常的话;你就可以重启你的linux服务器了
Hadoop HA安装部署:
// 在hadoop-master1节点解压hadoop安装包
$ tar-xvf hadoop-2.6.5.tar.gz -C/home/hadoop/app/
// 删除安装包
$ rm hadoop-2.6.5.tar.gz
// 修改hadoop-env.sh文件
$ cd /home/hadoop/app/ hadoop-2.6.5/etc/hadoop
$ vim hadoop-env.sh
Export JAVA_HOME=/home/hadoop/app/jdk1.7.0_79
// 配置core-site.xml文件
$ vim core-site.xml
此为统一资源定位符,hadoop的运算层与存储层是解耦的,fs为文件系统,此时应用的为fs文件系统。
hdfs://mycluster:9000:统一资源定位符;
hdfs为使用的协议;mycluster为访问的机器名称;9000为对应的端口;当客户端访问server时直接访问namenode,然后由namenode来决定访问哪台server。
上面为缓存数据的存放位置;
<configuration>
<!-- 指定hdfs的nameservices名称为mycluster,与hdfs-site.xml的HA配置相同,统一资源定位符-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster:9000</value>
</property>
<!-- 指定缓存文件存储的路径-->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/app/hadoop-2.6.5/tmp</value>
</property>
<!-- 配置hdfs文件被永久删除前保留的时间(单位:分钟),默认值为0表明垃圾回收站功能关闭-->
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<!-- 指定zookeeper地址,配置HA时需要 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>hdp-node-1:2181,hdp-node-2:2181,hdp-node-3:2181</value>
</property>
</configuration>
// 配置hdfs-site.xml文件
$ vim hdfs-site.xml
将文件分散了,然后存放在不同的文件系统中,对他格式化就是对他建立一些数据目录。
<configuration>
<!-- 指定hdfs元数据存储的路径-->
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/app/hadoop-2.6.5/data/namenode</value>
</property>
<!-- 指定hdfs数据存储的路径-->
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/app/hadoop-2.6.5/data/datanode</value>
</property>
<!-- 数据备份的个数,客户端将数据传给HDFS,HDFS会将文件创建三份-->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 关闭权限验证 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!-- 开启WebHDFS功能(基于REST的接口服务)-->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<!-- //////////////以下为HDFS HA的配置//////////////-->
<!-- 指定hdfs的nameservices名称为mycluster-->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<!--指定mycluster的两个namenode的名称分别为nn1,nn2-->
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<!-- 指定hdfs的nameservices名称为mycluster-->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<!-- 指定mycluster的两个namenode的名称分别为nn1,nn2-->
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<!-- 配置nn1,nn2的rpc通信端口 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>hdp-master1:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>hdp-master2:8020</value>
</property>
<!-- 配置nn1,nn2的http通信端口 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>hdp-master1:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>hdp-master2:50070</value>
</property>
<!--指定namenode元数据存储在journalnode中的路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hdp-node-1:8485;hdp-node-2:8485;hdp-node-3:8485/mycluster</value>
</property>
<!-- 指定journalnode日志文件存储的路径-->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/home/hadoop/app/hadoop-2.6.5/data/journal</value>
</property>
<!-- 指定HDFS客户端连接activenamenode的java类 -->
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制为ssh -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 指定秘钥的位置 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/hadoop/.ssh/id_rsa</value>
</property>
<!-- 开启自动故障转移 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
// 配置mapred-site.xml文件
$ vim mapred-site.xml
<configuration>
<!-- 指定MapReduce计算框架使用YARN用来定义集群的名称,yarn放在一个平台上跑,就是MapReduce运行的平台名称,如果不写则默认会在local中运行,不会在集群上跑。-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 指定jobhistory server的rpc地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hdp-master1:10020</value>
</property>
<!-- 指定jobhistory server的http地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hdp-master1:19888</value>
</property>
<!-- 开启uber模式(针对小作业的优化)-->
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>true</value>
</property>
<!-- 配置启动uber模式的最大map数 -->
<property>
<name>mapreduce.job.ubertask.maxmaps</name>
<value>9</value>
</property>
<!-- 配置启动uber模式的最大reduce数 -->
<property>
<name>mapreduce.job.ubertask.maxreduces</name>
<value>1</value>
</property>
</configuration>
// 配置yarn-site.xml文件
$ vim yarn-site.xml
放在资源调度平台yarn上去跑
<configuration>
<!-- Site specific YARN configuration properties -->
<!-- NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 配置WebApplication Proxy安全代理(防止yarn被攻击) -->
<property>
<name>yarn.web-proxy.address</name>
<value>hdp-master2:8888</value>
</property>
<!-- 开启日志-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 配置日志删除时间为7天,-1为禁用,单位为秒-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<!-- 修改日志目录-->
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/logs</value>
</property>
<!-- 配置nodemanager可用的资源内存-->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<!-- 配置nodemanager可用的资源CPU-->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<!-- //////////////以下为YARNHA的配置//////////////-->
<!-- 开启YARNHA -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 启用自动故障转移-->
<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 指定YARNHA的名称 -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarncluster</value>
</property>
<!-- 指定两个resourcemanager的名称 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 配置rm1,rm2的主机 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hdp-master-1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hdp-master-2</value>
</property>
<!-- 配置YARN的http端口 -->
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>hdp-master-1:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>hdp-master-2:8088</value>
</property>
<!-- 配置zookeeper的地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadp-node-1:2181,hdp-node-2:2181,hdp-node-3:2181</value>
</property>
<!-- 配置zookeeper的存储位置 -->
<property>
<name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>
<!-- 开启yarnresourcemanager restart -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 配置resourcemanager的状态存储到zookeeper中 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<!-- 开启yarnnodemanager restart -->
<property>
<name>yarn.nodemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 配置nodemanagerIPC的通信端口-->
<property>
<name>yarn.nodemanager.address</name>
<value>0.0.0.0:45454</value>
</property>
</configuration>
// 配置slaves文件
$ vi slaves
hdp-node-1
hdp-node-2
hdp-node-3
// 创建配置文件中涉及的目录
$ cd /home/hadoop/app/hadoop-2.6.5/
$ mkdir-p data/tmp
$ mkdir-p data/journal
$ mkdir-p data/namenode
$ mkdir-p data/datanode
// 将hadoop工作目录同步到集群其它节点:
$ scp -r /home/hadoop/app/hadoop-2.6.5/hdp-master2:/home/hadoop/app/
$ scp -r /home/hadoop/app/hadoop-2.6.5/hdp-node-1:/home/hadoop/app/
$ scp -r /home/hadoop/app/hadoop-2.6.5/hdp-node-2:/home/hadoop/app/
$ scp -r /home/hadoop/app/hadoop-2.6.5/hdp-node-3:/home/hadoop/app/
// 在集群各节点上修改用户环境变量
$ vi /etc/profile
exportHADOOP_HOME=/home/hadoop/app/hadoop-2.6.5
exportLD_LIBRARY_PATH=$HADOOP_HOME/lib/native
exportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
// 使修改的环境变量生效
$ source /etc/profile
// 解决本地库文件不存在的问题
在apache官网下载hadoop-2.6.5.tar.gz,解压后将lib/native下所有文件复制到$HADOOP_HOME/lib/native中。
4.1.6、Hadoop集群的初始化
// 启动zookeeper集群(分别在slave1、slave2和slave3上执行)
$ ./zkServer.sh start
// 格式化ZKFC(在master1上执行)
$ ./hdfs zkfc -formatZK
// 启动journalnode(分别在slave1、slave2和slave3上执行)
$ ./hadoop-daemon.sh startjournalnode
// 格式化HDFS(在master1上执行)
$ ./hdfs namenode -format (生成一些初始目录)
// 将格式化后master1节点hadoop工作目录中的元数据目录复制到master2节点
$ scp -r app/hadoop-2.6.5/data/namenode/*hdp-master2: /app/hadoop-2.6.5/data/namenode/
// 初始化完毕后可关闭journalnode(分别在slave1、slave2和slave3上执行)
$ ./hadoop-daemon.sh stopjournalnode
4.1.7、启动集群
1)格式化zk
主节点上进入hadoop目录
然后执行:
./bin/hadoop namenode–format
新版本用下面的语句不用hadoop命令了
./bin/hdfs namenode –format
提示:successfully formatted表示格式化成功
2)集群启动步骤
//启动zookeeper集群(分别在slave1、slave2和slave3执行)
$./zkServer.sh start
//启动HDFS(在master1执行)
$./start-dfs.sh
备注:此命令分别在master1/master2节点启动了NameNode和ZKFC,分别在slave1/slave2/slave3节点启动了DataNode和JournalNode,如下图所示。
//启动YARN(在master2执行)
$./start-yarn.sh
备注:此命令在master2节点启动了ResourceManager,分别在slave1/slave2/slave3节点启动了NodeManager。
//启动YARN的另一个ResourceManager(在master1执行,用于容灾)。
$./yarn-daemon.sh start resourcemanager
//启动YARN的安全代理(在master2执行)
$./yarn-daemon.sh start proxyserver
备注:proxyserver充当防火墙的角色,可以提高访问集群的安全性
//启动YARN的历史任务服务(在master1执行)
$./mr-jobhistory-daemon.shstart historyserver
备注:yarn-daemon.sh starthistoryserver已被弃用;CDH版本似乎有个问题,即mapred-site.xml配置的mapreduce.jobhistory.address和mapreduce.jobhistory.webapp.address参数似乎不起作用,实际对应的端口号是10200和8188,而且部需要配置就可以在任意节点上开启历史任务服务。
3)集群启动截图
hadoop-master1开启了NameNode、ResourceManager、HistoryServer和ZKFC,如下图所示:
hadoop-master2开启了NameNode、ResourceManager、ProxyServer和ZKFC,如下图所示:
hadoop-slave1、hadoop-slave2和hadoop-slave3分别开启了DataNode、JournalNode、NodeManager和ZooKeeper,如下图所示:
4)Web UI
下图为http://hadoop-master1:50070,可看到NameNode为active状态:
下图为http://hadoop-master2:50070,可看到NameNode为standby状态:
HDFS还有一个隐藏的UI页面http://hadoop-master1:50070/dfshealth.jsp比较好用:
下图为http://hadoop-master2:8088,可看到ResourceManager为active状态:
下图为http://hadoop-master1:8088,可看到ResourceManager为standby状态,它会自动跳转到http://hadoop-master2:8088:
下图为http://hadoop-master1:19888,可查看历史任务信息:
4.1.8、遇到的问题:
1)解决Unable to load native-hadoop library for your platform
安装hadoop启动之后总有警告:Unable to load native-hadooplibrary for your platform... using builtin-java classes where applicable
原因:
Apache提供的hadoop本地库是32位的,而在64位的服务器上就会有问题,因此需要自己编译64位的版本。
1、首先找到对应自己hadoop版本的64位的lib包,可以自己手动去编译,但比较麻烦,也可以去网上找,好多都有已经编译好了的。
2、可以去网站:http://dl.bintray.com/sequenceiq/sequenceiq-bin/ 下载对应的编译版本
3、将准备好的64位的lib包解压到已经安装好的hadoop安装目录的lib/native 和 lib目录下:
1. [hadoop@hadoopTest ~]$ tar -xvf hadoop-native-64-2.7.0.tar -C hadoop-2.7.2/lib/native
1. [hadoop@hadoopTest ~]$ tar -xvf hadoop-native-64-2.7.0.tar -C hadoop-2.7.2/lib
4、然后增加环境变量:
1. [hadoop@hadoopTest hadoop-2.7.2]$ vi /etc/profile
5、增加下面的内容:
1. export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
2. export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
6、让环境变量生效
1. [hadoop@hadoopTest hadoop-2.7.2]$ source /etc/profile
7、自检hadoop checknative –a 指令检查
1. <pre name="code" class="java">[hadoop@hadoopTest hadoop-2.7.2]$ hadoop checknative –a
4.1.8 测试
9 功能测试
// 向HDFS上传数据
$ hadoop fs -put webcount.txt/input
// 查看HDFS上的数据
$ hadoop fs -ls /input
$ hadoop fs -cat/input/webcount.txt
// 向YARN提交MapReduce任务,该任务用于分析网站日志文件webcount.txt统计每小时的点击次数
$ hadoop jar mr-webcount-0.0.1-SNAPSHOT.jar com.mr.demo.WebCountDriver/input/webcount.txt/output/webcount 1 1
// 在HDFS查看结果
$ hadoopfs -ls /output/webcount
$ hadoopfs -cat/output/webcount/part-r-00000
// 通过Web UI查看任务信息和历史任务信息
5 集群使用初步
5.1 HDFS使用
1、查看集群状态
命令:hdfs dfs admin –report
可以看出,集群共有3个datanode可用
也可打开web控制台查看HDFS集群信息,在浏览器打开http://hdp-node-01:50070/
打开hdp-master-1的hdfs之后,可以看到一个网页,点击utilities,可以进入到hdfs的文件目录系统中,:
这一网页可以显示hdfs中存放的内容,但是无法通过网页进行上传下载,需要使用命令行的客户端,即bin目录下的hadoop。
2、上传文件到HDFS
² 查看HDFS中的目录信息
命令: hadoop fs –ls /:查看hdfs的目录;
² 上传文件
命令: hadoop fs -put ./ scala-2.10.6.tgz /
如果要实现文件的上传下载,可以直接使用hadoop的命令客户端,即在linux中创建要传输的文件,然后再使用hadoop fs -put 文件名 要存放的路径;即可实现文件的存放,存放的文件可以在浏览器中查看:
而后进入到50070端口的网页,刷新页面就能找到上传的文件内容,可以实现下载。
由于上传的文件太小,所以产生了一个数据块,但是如果数据超过128MB,则数据会切分成多个数据块。
数据会对应的存放在服务器上:
² 从HDFS下载文件
命令: hadoop fs -get /yarn-site.xml
当下载文件时,hadoop会自动的将分块的数据拼接成一个完整的数据块,然后下载下来。
Hdfs的工作原理为:当存储一个200MB的文件时需要用到socket以及nio内容,当存储到128MB时,会重新生成一个socket,然后将文件传输到另外一个服务器上,存储剩下的文件内容。
创建文件夹:
5.2MAPREDUCE使用
mapreduce是hadoop中的分布式运算编程框架,只要按照其编程规范,只需要编写少量的业务逻辑代码即可实现一个强大的海量数据并发处理程序
5.2.1 Demo开发——wordcount
1、需求
从大量(比如T级别)文本文件中,统计出每一个单词出现的总次数
2、mapreduce实现思路
Map阶段:
a) 从HDFS的源数据文件中逐行读取数据
b) 将每一行数据切分出单词
c) 为每一个单词构造一个键值对(单词,1)
d) 将键值对发送给reduce
Reduce阶段:
a) 接收map阶段输出的单词键值对
b) 将相同单词的键值对汇聚成一组
c) 对每一组,遍历组中的所有“值”,累加求和,即得到每一个单词的总次数
d) 将(单词,总次数)输出到HDFS的文件中
4、具体编码实现
(1)定义一个mapper类
//首先要定义四个泛型的类型 //keyin: LongWritable valuein: Text //keyout: Text valueout:IntWritable public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //map方法的生命周期: 框架每传一行数据就被调用一次 //key : 这一行的起始点在文件中的偏移量 //value: 这一行的内容 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行数据转换为string String line = value.toString(); //将这一行切分出各个单词 String[] words = line.split(" "); //遍历数组,输出<单词,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } } |
(2)定义一个reducer类
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //遍历这一组kv的所有v,累加到count中 for(IntWritable value:values){ count += value.get(); } context.write(key, new IntWritable(count)); } } |
(3)定义一个主类,用来描述job并提交job
public class WordCountRunner { //把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里。。。。。。)描述成一个job对象 //把这个描述好的job提交给集群去运行 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //指定我这个job所在的jar包 // wcjob.setJar("/home/hadoop/wordcount.jar"); wcjob.setJarByClass(WordCountRunner.class); wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); //设置我们的业务逻辑Mapper类的输出key和value的数据类型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //设置我们的业务逻辑Reducer类的输出key和value的数据类型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定要处理的数据所在的位置 FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt"); //指定处理完成之后的结果所保存的位置 FileOutputFormat.setOutputPath(wcjob,new Path("hdfs://hdp-server01:9000/wordcount/output/")); //向yarn集群提交这个job boolean res = wcjob.waitForCompletion(true); System.exit(res?0:1); } |
5.2.2 程序打包运行
1. 将程序打包
2. 准备输入数据
vi /home/hadoop/test.txt
Hello tom Hello jim Hello ketty Hello world Ketty tom |
在hdfs上创建输入数据文件夹:
hadoop fs -mkdir -p /wordcount/input
将words.txt上传到hdfs上
Hadoop fs –put /home/hadoop/words.txt /wordcount/input
3. 将程序jar包上传到集群的任意一台服务器上
上传的目录为:hadoop下的share文件夹中:
4. 使用命令启动执行wordcount程序jar包,用这一命令来计算刚才上传的word.txt文件的内容。
$ hadoop jarhadoop-mapreduce-examples-2.6.5.jar wordcount /wordcount/input//wordcount/output
5. 查看执行结果
在浏览器中可以看到相应上传的文件。
$ hadoop fs –cat /wordcount/out/part-r-00000