前言:
上一篇文章 我学习使用pandas进行简单的数据分析,但是各位...... Pandas处理、分析不了TB级别数据的大数据,于是再看看Hadoop。
另附上人心不足蛇吞象 对故事一的感悟:
Hadoop背景
我接触过的数据总结为3类:
1.结构化数据
关系数据中的数据,有字段进行约束;(有规则)
2.半结构化数据
HTMLXml/Json....这种数据虽然有结构,但约束不是很严格;(还有些规则可言)
3.非结构化数据
.text文本/日志....这种数据没有head、body、key这些标签标记,更没有什么字段约束;(没有规则可言)
4.如何储存海量的非结构化数据?
那么问题来了我们如何把大量的非结构化/半结构化的数据储存起来,进行高效得 分析、检索呢?
Google公司通过论文方式 提出了的解决方案;(没告诉咋实现哦!)
1.如何完成海量数据安全储存?
把海量数据分布式存储,不同得服务器集群节点;(分布式:以后数据越大也不怕了,可以动态扩展服务器来分解。)
2.如何对海量数据高效分析、检索?
MapReduce:编程思想 Simplified Data Processing on Large Clusters
把一个某个复杂的计算任务 --------》分割成小的任务单元----------》并行在各个节点上运行
搜集各个节点上运行结果---------》合并运行--------》二次map------>二次reduce........》直到 计算出结果位置;
5.什么是Hadoop?
有一个大神级程序员 Dong Cutting,受Google以上三篇论文的启发,用Java开发出来Hadoop,
6.python怎么调用Hadoop?
hadoop的MapReduce这么厉害,作为python小白我怎么调用它呢?Hadoop的调用API也叫MapReduce
一、Hadoop v2 架构图
二、Hadoop的运行模型
HDFS集群: data_node 数据存储节点 name_node 名称节点 、secondary_node辅助名称节点
YARN:集群资源管理
三、centos7安装Hadoop2.6.3
1.环境准备
centos7中一般已经自带JDK
[root@localhost zhanggen]# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK -Bit Server VM (build 25.102-b14, mixed mode)
[root@localhost profile.d]# yum -y install java-1.8.-openjdk*
Centos7关闭防火墙
查看状态: systemctl status firewalld
开机禁用 : systemctl disable firewalld
开机启用 : systemctl enable firewalld
Centos7 关闭selinux服务
[root@localhost hdfs]# setenforce
[root@localhost hdfs]# getenforce
Enforcing
[root@localhost hdfs]# setenforce
[root@localhost hdfs]# getenforce
Permissive
3.编译
[root@localhost bdapps]# mkdir /bdapps/
[root@localhost bdapps]# ls
hadoop-2.6.
[root@localhost bdapps]# tar -zxvf /home/zhanggen/Desktop/hadoop-2.6..tar.gz -C /bdapps/
[root@localhost bdapps]# ln -sv /bdapps/hadoop-2.6. /bdapps/hadoop
‘/bdapps/hadoop’ -> ‘/bdapps/hadoop-2.6.’
4.设置Java和Hadoop相关环境变量
export HADOOP_PREFIX=/bdapps/hadoop export PATH=$PATH:${HADOOP_PREFIX}/bin:${HADOOP_PREFIX}/sbin export HADOOP_YARN_HOME=${HADOOP_PREFIX} export HADOOP_MAPPRED_HOME=${HADOOP_PREFIX} export HADOOP_COMMON_HOME=${HADOOP_PREFIX} export HADOOP_HDFS_HOME=${HADOOP_PREFIX}
source /etc/profile.d/hadoop.sh
export java_HOME=/usr
source /etc/profile.d/java.sh
groupadd hadoop
useradd -g hadoop hadoop
5.创建Hadoop用户
mkdir -pv /data/hadoop/hdfs/{nn,dn,snn}
chown -R hadoop:hadoop /data/hadoop/hdfs/
6.创建存储DataNode和 NameNode存储数据的目录
cd /bdapps/hadoop/
mkdir logs
chown -R hadoop:hadoop ./*
7.创建Hadoop存储日志的目录
PS:如果你的MapReduce任务执行失败了,去获取applicationId查看报错信息
yarn logs -applicationId application_1551852706740_0001 #查看任务执行日志
征服Hadoop的奥秘:首先取到程序运行日志-------》再分析日志中出现错误的原因------》解决问题
8.Hadoop主要配置文件(/bdapps/hadoop/etc/hadoop)
8.0.core-site.xml
针对NameNode IP地址 、端口(默认为8020)
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://0.0.0.0:8020</value>
<final>false</final>
</property>
</configuration>
core-site.xml
8.1.hdfs-site.xml
针对HDFS相关的属性,每一个数据块的副本数量、NN和DA存储数据的目录 step6中创建的目录。
<configuration>
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:50075</value>
</property> <property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///data/hadoop/hdfs/nn</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///data/hadoop/hdfs/dn</value>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>file:///data/hadoop/hdfs/snn</value>
</property>
<property>
<name>fs.checkpoint.edits.dir</name>
<value>file:///data/hadoop/hdfs/snn</value>
</property>
</configuration>
hdfs-site.xml
8.2.mapred-site.xml(指定使用yarn)
指定MapReduce是单独运行 还是运行在yarn之上,Hadoop2肯定是运行在yarn之上的;见 二、Hadoop的运行模型
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
mapred-site.xml
8.3.yarn-site.xml
yarn-site.xml 用于配置YARN进程及YARN的相关属性,首先需要指定ResourceManager守护进程的主机和监听的端口,对于伪分布式模型来讲,其主机为localhost,
默认的端口为8032;其次需要指定ResourceManager使用的scheduler,以及NodeManager的辅助服务。一个简要的配置示例如下所示:
<configuration>
<property>
<name>yarn.resourcemanager.address</name>
<value>0.0.0.0:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>0.0.0.0:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>0.0.0.0:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>0.0.0.0:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:8088</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.auxservices.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
</configuration>
vim yarn-site.xml
8.4.slave文件
slave文件存储了当前集群所有slave节点的列表,对于伪分布式模型,其文件内容仅应该为localhost,这特的确是这个文件的默认值。因此,为分布式模型中,次文件的内容保持默认即可。
PS:
如果服务器/虚拟机的进程起不来请确保本地 IP和配置文件里面的IP是否已经发生变化!
8.5.格式化HDFS
在HDFS的NN启动之前需要先初始化其用于存储数据的目录。
如果hdfs-site.xml中dfs.namenode.name.dir属性指定的目录不存在,格式化命令会自动创建之;
如果事先存在,请确保其权限设置正确,此时格式操作会清除其内部的所有数据并重新建立一个新的文件系统,需要以hdfs用户的身份执行如下命令
// :: INFO namenode.FSImage: Allocated new BlockPoolId: BP--127.0.0.1-
19/03/01 11:31:22 INFO common.Storage: Storage directory /data/hadoop/hdfs/nn has been successfully formatted.
// :: INFO namenode.NNStorageRetentionManager: Going to retain images with txid >=
// :: INFO util.ExitUtil: Exiting with status
// :: INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/127.0.0.1
************************************************************/
[hdfs@localhost hadoop]$ hdfs namenode -format
9.启动Hadoop
HDFS格式化完成之后就可以启动 去/bdapps/hadoop/etc/hadoop目录下启动Hadoop的5大守护进程了。
9.1.启动HDFS集群
HDFS有3个守护进程:namenode、datanode和secondarynamenode,他们都表示通过hadoop-daemon.sh脚本启动或停止。以hadoop用户执行相关命令;
hadoop-daemon.sh start namenode
hadoop-daemon.sh start secondarynamenode
hadoop-daemon.sh start datanode jps #jps命令:专门用于查看当前运行的java程序的,还支持远程,python有吗?
NameNode
Jps
SecondaryNameNode
DataNode
启动HDFS集群
HDFS集群web访问接口:
http://127.0.0.1:50070/dfshealth.html#tab-overview
9.2启动yarn集群
切换成yarn用户:YARN有2个守护进程:resourcemanager和nodemanager,它们通过yarn-daemon.sh脚本启动或者停止。以hadoop用户执行相关命令即可。
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
jps
ResourceManager
NodeManager
Jps
启动yarn集群
yarn集群web访问接口:
http://127.0.0.1:8088/cluster
10.测试
使用Hadoop自带的 hadoop-mapreduce-examples-2.6.2.jar,执行MapReduce任务是否可以正常执行,如果可以就意味着安装成功了。
在执行任务是要切换到hdfs用户下
[hdfs@localhost mapreduce]$ yarn jar hadoop-mapreduce-examples-2.6..jar wordcount /test/a.txt /test/a.out
执行任务
[hdfs@localhost mapreduce]$ hdfs dfs -ls /test/a.out
Found items
-rw-r--r-- hdfs supergroup -- : /test/a.out/_SUCCESS
-rw-r--r-- hdfs supergroup -- : /test/a.out/part-r-
[hdfs@localhost mapreduce]$ hdfs dfs -cat /test/a.out/part-r-
aaaaaaaaa
aaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaa
查看任务执行结果
11、python3调用HDFS集群API
Hadoop安装好了;(虽说是伪分布式的,如果要做分布式做好ssh免密码登录,把配置文件分发出去就好了)
但是我在网上看到python的pyhdfs模块可以调用HDFS集群的API进行上传、下载、查找....文件...于是储备下来了,也许可以用作后期 Hadoop自动化项目;
注意:在使用pyhdfs模块之前一定要确保Hadoop的配置文件都监听在外网端口并修改host文件。
192.168.226.142 localhost #windows hosts文件的路径 C:\WINDOWS\system32\drivers\etc\host Linux /etc/host
pip install pyhdfs -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
import pyhdfs
fs = pyhdfs.HdfsClient(hosts='192.168.226.142,50070',user_name='hdfs')
fs.get_home_directory()#返回这个用户的根目录
fs.get_active_namenode()#返回可用的namenode节点 path='/zhanggen/'
file='myfile.txt'
file_name=path+file
#在上传文件之前,请修改本地 host文件 192.168.226.142 localhost C:\WINDOWS\system32\drivers\etc\host
print('路径已经存在') if fs.exists(path) else fs.mkdirs(path)
print('文件已存在') if fs.exists(path+file) else fs.copy_from_local('c.txt',path+file,) #上传本地文件到HDFS集群
fs.copy_to_local(path+file, 'zhanggen.txt')# 从HDFS集群上copy 文件到本地
fs.listdir(path) #以列表形式['a.out', 'a.txt'],返回指定目录下的所有文件
response=fs.open(path+file) #查看文件内容
print(response.read()) fs.append(file_name,'Thanks myself for fighting ',) #在HDFS集群的文件里面添加内容
response=fs.open(file_name) #查看文件内容
print(response.read())
print(fs.get_file_checksum(file_name)) #查看文件大小
print(fs.list_status(path))#查看单个路径的状态
print(fs.list_status(file_name))#查看单个文件状态
使用pyhdfs模块调用HDFS集群API
四、Python3调用Hadoop MapReduce API
pip3 install mrjob -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
hadoop fs -chown -R hadoop:hadoop /tmp #在执行MapReduce任务的时候hadoop用户会创建socket,通过jdbc访问。所以在执行你写得MapReduce之前一定要设置权限
MapReduce 任务工作流程(假设要对以下3行数据,统计词频):
a b c
a c
a
第1步:map 把每个字符串映射成键、值对
(a,1)(b,1)(c,1)
(a,1)(c1)
(a1)
自动shuffle & sort:
shuffle: 把相同键的 值组合成1个的列表,(洗牌:平时玩牌的时候 把手里数字/字母相同的扑克牌们,码放在一起例如3A一起,两个2一起!)
sort: 再根据键排序;
(a,[1,1,1])
(b,[1])
(c,[1,1])
第2步:shuffle and sort之后,把键相同的值放到列表了,就方便reduce的时候对值进行计算、聚合操作(sum,mean,max)了!
(a,3)
(b,1)
(c,2)
#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
import re class MRwordCount(MRJob):
'''
line:一行数据
(a,1)(b,1)(c,1)
(a,1)(c1)
(a1)
'''
def mapper(self, _, line):
pattern=re.compile(r'(\W+)')
for word in re.split(pattern=pattern,string=line):
if word.isalpha():
yield (word.lower(),1) def reducer(self, word, count):
#shuff and sort 之后
'''
(a,[1,1,1])
(b,[1])
(c,[1])
'''
l=list(count)
yield (word,sum(l)) if __name__ == '__main__':
MRwordCount.run() #run()方法,开始执行MapReduce任务。
python3版wordCount
python /MyMapReduce.py /a.txt -r hadoop #在Hadoop集群,执行Python的MapReduce任务。
[hdfs@localhost hadoop]$ python /MyMapReduce.py /a.txt -r hadoop
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /bdapps/hadoop/bin...
Found hadoop binary: /bdapps/hadoop/bin/hadoop
Using Hadoop version 2.6.2
Looking for Hadoop streaming jar in /bdapps/hadoop...
Found Hadoop streaming jar: /bdapps/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar
Creating temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477
Copying local files to hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/files/...
Running step 1 of 1...
packageJobJar: [/tmp/hadoop-unjar1053011439569578237/] [] /tmp/streamjob2611643769127644921.jar tmpDir=null
Connecting to ResourceManager at /192.168.226.142:8032
Connecting to ResourceManager at /192.168.226.142:8032
Total input paths to process : 1
number of splits:2
Submitting tokens for job: job_1551427459997_0003
Submitted application application_1551427459997_0003
The url to track the job: http://192.168.226.142:8088/proxy/application_1551427459997_0003/
Running job: job_1551427459997_0003
Job job_1551427459997_0003 running in uber mode : false
map 0% reduce 0%
map 50% reduce 0%
map 100% reduce 0%
map 100% reduce 100%
Job job_1551427459997_0003 completed successfully
Output directory: hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output
Counters: 49
File Input Format Counters
Bytes Read=18
File Output Format Counters
Bytes Written=18
File System Counters
FILE: Number of bytes read=54
FILE: Number of bytes written=331118
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=320
HDFS: Number of bytes written=18
HDFS: Number of large read operations=0
HDFS: Number of read operations=9
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=2
Launched map tasks=2
Launched reduce tasks=1
Total megabyte-seconds taken by all map tasks=20077568
Total megabyte-seconds taken by all reduce tasks=5390336
Total time spent by all map tasks (ms)=19607
Total time spent by all maps in occupied slots (ms)=19607
Total time spent by all reduce tasks (ms)=5264
Total time spent by all reduces in occupied slots (ms)=5264
Total vcore-seconds taken by all map tasks=19607
Total vcore-seconds taken by all reduce tasks=5264
Map-Reduce Framework
CPU time spent (ms)=1990
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=352
Input split bytes=302
Map input records=3
Map output bytes=36
Map output materialized bytes=60
Map output records=6
Merged Map outputs=2
Physical memory (bytes) snapshot=501116928
Reduce input groups=3
Reduce input records=6
Reduce output records=3
Reduce shuffle bytes=60
Shuffled Maps =2
Spilled Records=12
Total committed heap usage (bytes)=319430656
Virtual memory (bytes) snapshot=6355677184
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
job output is in hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output
Streaming final output from hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output...
"a" 3
"b" 1
"c" 2
Removing HDFS temp directory hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477...
Removing temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477...
[hdfs@localhost hadoop]$
执行结果
1.MapReduce案例
统计一下本周的报警情况
由于遗留了Zabbix报警未分类的问题,导致zabbix报警-----》转换到运维平台的工单信息---------》都是一个text字段!
#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
import re,csv key_list=['Free disk space','Zabbix agent','Alive ecerpdb.com','Oracle','FTP service','No data received from Orabbix','Alive ecpim'] class MRwordCount(MRJob):
def mapper(self, _, line): #文本有几行mapper 就执行几次
row = csv.reader([line]).__next__() #读取CSV文件的每一行,变成列表形式!
for key in key_list:
if key in row[-1]:
yield (key,1)
#自动shuffle & reduce
def reducer(self, word, count): #maper yeild 几个key ,reducer就执行几次
l=list(count)
yield (word,sum(l)) if __name__ == '__main__':
MRwordCount.run() #run()方法,开始执行MapReduce任务。
本周报警信息
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob
import re,csv,sys class University_top10(MRJob):
def mapper(self, _,line):
row = csv.reader([line]).__next__() # 读取CSV文件的每一行,变成列表形式!
if not row[0].isdigit():#跳过['名次', '学校名称', '总分', '类型', '所在省份', '所在城市', '办学方向', '主管部门']
return
yield ('top',(float(row[2]),row[1])) #学校名称,总分 def reducer(self, top_key,score_and_university_name):
top10=[]
for key in list(score_and_university_name):
top10.append(key)
top10.sort()
top10=top10[-10:]
top10.reverse()
for key in top10:
yield key[1],key[0] if __name__ == '__main__':
University_top10.run()
中国大学top10
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob
import re,csv,sys class University_top10(MRJob):
def mapper(self, _,line):
row = csv.reader([line]).__next__() # 读取CSV文件的每一行,变成列表形式!
if not row[0].isdigit(): #跳过['名次', '学校名称', '总分', '类型', '所在省份', '所在城市', '办学方向', '主管部门','人均消费']
return
yield ('top',(float(row[2]),row[1])) #yield('top',学校名称,总分)
if row[-1].isdigit():
yield ('cost',(float(row[-1]),row[1])) #yeild('coast',学校名称,人均消费) def reducer(self,key,value):
#由于mapper方法yeild了2个key【top和coast】,所以reducer方法执行2次
top10=[]
for list_item in list(value):
top10.append(list_item)
top10.sort()
top10=top10[-10:]
top10.reverse() if key=='top'else top10.sort()#求出得分前十的大学,和消费前十的大学
for list_item in top10:
yield list_item[1],list_item[0] if __name__ == '__main__':
University_top10.run()
求出得分前十的大学,和低消费 前十的大学
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob class Max_Mix_Temperature(MRJob):
def mapper(self, _,line):
row=line.split(',')
if row[2]== 'min':
yield 'min',(float(row[3]),row[1])
if row[2]=='max':
yield 'max',(float(row[3]),row[1]) def reducer(self,key,value):
l=list(value)
if key=='max':
yield key,max(l)
elif key=='min':
yield key,min(l) if __name__ == '__main__':
Max_Mix_Temperature.run()
求唐县各乡/镇出现的最低、最高温度
# #!/usr/bin/python
# # -*- coding:utf-8 -*-
from mrjob.job import MRJob,MRStep class Top3_Mean_Friends(MRJob):
def mapper1(self, _,line):
row=line.split(',')
if row[2].isdigit() and row[3].isdigit():
yield (row[2],int(row[3])) #返回年龄 和朋友个数 def reducer1(self,age,friends):
friends_count=list(friends)
yield (age, sum(friends_count)/len(friends_count)) #每个年龄段的 平均朋友个数 def mapper2(self, age,average_coun):
yield (None,(average_coun,str(age)+'year')) def reducer2(self, _,average_list): #在平均朋友个数的基础上,求出朋友数数量最大的top3
l=list(average_list)
l.sort()
top3=l[-3:]
top3.reverse()
for i in top3:
yield (i[0],i[1]) def steps(self): #连接多个mapper、reducer
return [
MRStep(mapper=self.mapper1,reducer=self.reducer1),
MRStep(mapper=self.mapper2,reducer=self.reducer2)
] if __name__ == '__main__':
Top3_Mean_Friends.run()
MRStep连接多个mapper、reducer函数
from mrjob.job import MRJob,MRStep class Top_AnnualSalary_Job(MRJob): #ID,Name,JobTitle,AnnualSalary,GrossSpend
def mapper1(self, _,line):
row=line.split(',')
if row[0]=='ID':
return
yield (row[2],int(row[3])) def reducer1(self,jobtitle,annualsalary):
AnnualSalary=list(annualsalary)
yield ('job_annualsalary',(sum(AnnualSalary)/len(AnnualSalary),jobtitle)) def mapper2(self,key,job_annualsalary):
yield key,job_annualsalary def reducer2(self, key, values):
l=list(values)
print('old',l)
new_l=[]
for i in l:
new_l.append(i)
new_l.sort(reverse=True)
new_l=new_l[0:3]
for k in new_l:
yield k[1],k[0] def steps(self): # 连接多个mapper、reducer
return [ MRStep(mapper=self.mapper1, reducer=self.reducer1),MRStep(mapper=self.mapper2, reducer=self.reducer2)] if __name__ == '__main__':
Top_AnnualSalary_Job.run()
求收入top3的行业
2.map + combine +reduce
map节点把所有集合计算的工作通过TCP协议传输到reduce节点会出现 单点负载压力的问题,所以combine出现了;
combine就是小的reduce,可以在map发送数据给reduce之前进行在map节点做初步的聚合运算,减小reduce节点的压力, 加速MapReduce任务的执行;