主要内容
Python多进程与多线程
Python使用Hadoop分布式计算库mrjob
Python使用Spark分布式计算库PySpark
例子:分别使用MapReduce和Spark实现wordcount
正则表达式简介
日期和时间
常用内建模块: collections; itertools
进程与线程
进程:程序的一次执行(程序装载入内存,系统分配资源运行)
每个进程有自己的内存空间、数据栈等,只能使用进程间通讯,而不能直接共享信息
线程:所有线程运行在同一个进程中,共享相同的运行环境
每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口
线程的运行可以被抢占(中断),或暂时被挂起(睡眠),让其他线程运行(让步)
一个进程中的各个线程间共享同一片数据空间
全局解释器锁GIL
GIL全称全局解释器锁Global Interpreter Lock, GIL并不
是Python的特性,它是在实现Python解析器(CPython)时
所引入的一个概念
GIL是一把全局排他锁,同一时刻只有一个线程在运行
毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
multiprocessing库的出现很大程度上是为了弥补thread库因为
GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方
便迁移。唯一的不同就是它使用了多进程而不是多线程。每个
进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
顺序执行单线程与同时执行两个并发线程
join阻塞进程直到线程执行完毕
Python 多进程( multiprocessing)
fork操作:
调用一次,返回两次。因为操作系统自动把当前进程(称为父
进程)复制了一份(称为子进程), 然后分别在父进程和子进
程内返回。子进程永远返回0,而父进程返回子进程的ID。子
进程只需要调用getppid()就可以拿到父进程的ID。
由于Windows没有fork调用,上面的代码在Windows上无法运行。
multiprocessing
multiprocessing是跨平台版本的多进程模块,它提供了
一个Process类来代表一个进程对象,下面是示例代码:
这个程序如果用单进程写则需要执行10秒以上的时间,而用多进程则启动10个进程并行执行,只需要用1秒多的时间。
进程间通信Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
进程池Pool
用于批量创建子进程,可以灵活控制子进程的数量
多进程与多线程对比
在一般情况下多个进程的内存资源是相互独立的,而多线程可以共享同一个进程中的内存资源
函数式编程
三大特性:
immutable data 不可变数据
first class functions:函数像变量一样使用
尾递归优化:每次递归都重用stack
好处:
parallelization 并行
lazy evaluation 惰性求值
determinism 确定性
函数式编程http://coolshell.cn/articles/10822.html
函数式编程技术
技术:
map & reduce
pipeline
recursing 递归
currying
higher order function 高阶函数
Python中的lambda和map、 filter、 reduce
lambda:快速定义单行的最小函数, inline的匿名函数
Python中的lambda和map、 filter、 reduce
map(function, sequence) :对sequence中的item依次执行function(item),执行结果组成一个List返回
Python中的lambda和map、 filter、 reduce
filter(function, sequence):对sequence中的item依次执行function(item),将执行结果为
True的item组成一个List/String/Tuple(取决于sequence的类型)返回
Python中的lambda和map、 filter、 reduce
reduce(function, sequence, starting_value):对sequence中的item顺序迭代调用function,
如果有starting_value,还可以作为初始值调用
例子:计算数组中的平均数
正常写法:
函数式编程:
这样的代码是在描述要干什么,而不是怎么干
Hadoop
Hadoop是Apache开源组织的一个分布式计算开源框架。
核心的设计就是: MapReduce和HDFS( HadoopDistributed File System)
MapReducer
思想:任务的分解与结果的汇总
基于Linux管道的MapReducer
import sys for line in sys.stdin: ls = line.split() for word in ls: if len(word.strip()) != 0: print word + ',' + str(1)
import sys word_dict = {} for line in sys.stdin: ls = line.split(',') word_dict.setdefault(ls[0], 0) word_dict[ls[0]] += int(ls[1]) for word in word_dict: print word, word_dict[word]
$ cat wordcount.input | python mapper.py | python reducer.py | sort -k 2r
Output:
n world 3
n hello 2
n hi 1
Hadoop Streaming & mrjob
Hadoop有Java和Streaming两种方式来编写MapReduce任务。
Java的优点是计算效率高,并且部署方便,直接打包成一个jar文件就行了。
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。
Streaming单机测试:
cat input | mapper | sort | reducer > output
mrjob实质上就是在Hadoop Streaming的命令行上包了一层,有了统一的Python界面,无需你再去直接调用Hadoop Streaming命令。
Mrjob实现wordcount
from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(self, _, line): yield "chars", len(line) yield "words", len(line.split()) yield "lines", 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': MRWordFrequencyCount.run()
Spark
Spark是基于map reduce算法实现的分布式计算框架:
Spark的中间输出和结果输出可以保存在内存中,从而不再需要读写HDFS。
Spark能更好地用于数据挖掘与机器学习等需要迭代的map reduce的算法中。
Spark与Hadoop结合
Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算。
本地模式
Standalone模式
Mesoes模式
yarn模式
RDD
弹性分布式数据集Resilient Distributed Datasets:
集群节点上不可变、已分区对象
可序列化
可以控制存储级别(内存、磁盘等)来进行重用。
计算特性:
血统lineage
惰性计算lazy evaluation
生成方式:
文件读取
来自父RDD
PySpark实现WordCount
正则表达式
两种模式匹配:搜索search()和匹配match()
判断一个字符串是否是合法的Email地址
作业1:电话号码正则匹配
例子:
+008613112345678
+861795101023231212
+8608715432231
01023459764
06346046499
010120
时间和日期
time模块和datetime模块
import time print time.time() print time.localtime() for i in range(3): time.sleep(0.5) print "Tick!"
1479487832.06 time.struct_time(tm_year=2016, tm_mon=11, tm_mday=19, tm_hour=0, tm_min=50, tm_sec=32, tm_wday=5, tm_yday=324, tm_isdst=0) Tick! Tick! Tick!
import datetime print "today is: ", datetime.date.today() print "now is: ", datetime.datetime.now() print datetime.date(2016,6,4) print datetime.time(14,00)
today is: 2016-11-19 now is: 2016-11-19 00:50:38.551905 2016-06-04 14:00:00
# 计算昨天和明天的日期 import datetime today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) print yesterday,today,tomorrow
2016-11-18 2016-11-19 2016-11-20
作业2:计算日期之间的工作日
有用的内建函数
enumerate函数
# 对一个列表或数组既要遍历索引又要遍历元素时 l = [1,2,3] for i in range (len(l)): print i ,l[i]
0 1 1 2 2 3
# enumerate会将数组或列表组成一个索引序列。使我们再获取索引和索引内容的时候更加方便如下: for index,text in enumerate(l): print index ,text
0 1 1 2 2 3
集合模块collections
collections是Python内建的一个集合模块,提供了许多有用的集合类。
deque是为了高效实现插入和删除操作的双向列表,适合用于队列和栈。
OrderedDict的Key会按照插入的顺序排列。 Counter是一个简单的计数器,也是dict的一个子类。
from collections import namedtuple Point = namedtuple('Point', ['x', 'y']) p = Point(1, 2) print p.x print p.y
from collections import deque q = deque(['a', 'b', 'c']) q.append('x') q.appendleft('y') print q deque(['y', 'a', 'b', 'c', 'x'])
from collections import defaultdict dd = defaultdict(lambda: 'N/A') dd['key1'] = 'abc' print dd['key1'] # key1存在 print dd['key2'] # key2不存在,返回默认值 abc N/A
from collections import OrderedDict d = dict([('a', 1), ('b', 2), ('c', 3)]) print d # dict的Key是无序的,{'a': 1, 'c': 3, 'b': 2} od = OrderedDict([('a', 1), ('b', 2), ('c', 3)]) print od # OrderedDict的Key是有序的,OrderedDict([('a', 1), ('b', 2), ('c', 3)]) {'a': 1, 'c': 3, 'b': 2} OrderedDict([('a', 1), ('b', 2), ('c', 3)])
from collections import Counter c = Counter() for ch in 'programming': c[ch] = c[ch] + 1 print c #Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1}) Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
迭代器itertools
为类序列对象提供了一个类序列接口
无限迭代器:
在最短输入序列终止的迭代器:
组合生成器:
import itertools for i in itertools.izip(itertools.count(1), ['a', 'b', 'c']): print i
(1, 'a') (2, 'b') (3, 'c')
参考:http://python.usyiyi.cn/python_278/library/itertools.html