1、多线程
#IO密集型程序应该用多线程 import requests from threading import Thread,current_thread def parse_page(res): print('%s 解析 %s' %(current_thread().getName(),len(res))) def get_page(url,callback=parse_page): print('%s 下载 %s' %(current_thread().getName(),url)) response=requests.get(url) if response.status_code == 200: callback(response.text) if __name__ == '__main__': urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org'] for url in urls: t=Thread(target=get_page,args=(url,)) t.start()
#开启多进程或多线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时响应成百上千路的连接请求,
则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
2、线程池或进程池+异步调用:提交一个任务后并不会等待任务结束,而是继续下一行代码
“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。
“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,
都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。
#IO密集型程序应该用多线程,所以此时我们使用线程池 import requests from threading import current_thread from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def parse_page(res): res=res.result() print('%s 解析 %s' %(current_thread().getName(),len(res))) def get_page(url): print('%s 下载 %s' %(current_thread().getName(),url)) response=requests.get(url) if response.status_code == 200: return response.text if __name__ == '__main__': urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org'] pool=ThreadPoolExecutor(50) # pool=ProcessPoolExecutor(50) for url in urls: pool.submit(get_page,url).add_done_callback(parse_page) pool.shutdown(wait=True)
#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。
而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。
所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
3、高性能 以下都是单线程实现并发
上述无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的
3.1、在python3.3之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO),实现应用程序级别的切换
import asyncio @asyncio.coroutine def fetch_async(host, url='/'): print(host, url) reader, writer = yield from asyncio.open_connection(host, 80) request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,) request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close() tasks = [ fetch_async('www.cnblogs.com', '/wupeiqi/'), fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
3.2、 asyncio模块只能发tcp级别的请求,不能发http协议,因此,在我们需要发送http请求的时候,需要我们自定义http报头。自定义http报头多少有点麻烦,于是有了aiohttp模块,专门帮我们封装http报头,然后我们还需要用asyncio检测IO实现切换。
import aiohttp import asyncio @asyncio.coroutine def get_page(url): print('GET:%s' %url) response=yield from aiohttp.request('GET',url) data=yield from response.read() print(url,data) response.close() return 1 tasks=[ get_page('https://www.python.org/doc'), get_page('https://www.cnblogs.com/linhaifeng'), get_page('https://www.openstack.org') ] loop=asyncio.get_event_loop() results=loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print('=====>',results) #[1, 1, 1]
3.3、
import asyncio import requests @asyncio.coroutine def fetch_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future print(response.url, response.content) tasks = [ fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'), fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
3.4、 协程时介绍的gevent模块 # 实现io多路复用
from gevent import monkey;monkey.patch_all() import gevent import requests def get_page(url): print('GET:%s' %url) response=requests.get(url) print(url,len(response.text)) return 1 # g1=gevent.spawn(get_page,'https://www.python.org/doc') # g2=gevent.spawn(get_page,'https://www.cnblogs.com/linhaifeng') # g3=gevent.spawn(get_page,'https://www.openstack.org') # gevent.joinall([g1,g2,g3,]) # print(g1.value,g2.value,g3.value) #拿到返回值 #协程池 from gevent.pool import Pool pool=Pool(2) g1=pool.spawn(get_page,'https://www.python.org/doc') g2=pool.spawn(get_page,'https://www.cnblogs.com/linhaifeng') g3=pool.spawn(get_page,'https://www.openstack.org') gevent.joinall([g1,g2,g3,]) print(g1.value,g2.value,g3.value) #拿到返回值 gevent+requests
3.5、 封装了gevent+requests模块的grequests模块
#pip3 install grequests import grequests request_list=[ grequests.get('https://wwww.xxxx.org/doc1'), grequests.get('https://www.cnblogs.com/linhaifeng'), grequests.get('https://www.openstack.org') ] ##### 执行并获取响应列表 ##### # response_list = grequests.map(request_list) # print(response_list) ##### 执行并获取响应列表(处理异常) ##### def exception_handler(request, exception): # print(request,exception) print("%s Request failed" %request.url) response_list = grequests.map(request_list, exception_handler=exception_handler) print(response_list)
3.6、twisted:是一个网络框架,其中一个功能是发送异步请求,检测IO并自动切换
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) reactor.run()
3.7、tornado
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop def handle_response(response): """ 处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: print(response.body) def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() #发现上例在所有任务都完毕后也不能正常结束,为了解决该问题,让我们来加上计数器 from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop count=0 def handle_response(response): """ 处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: print(len(response.body)) global count count-=1 #完成一次回调,计数减1 if count == 0: ioloop.IOLoop.current().stop() def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] global count for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) count+=1 #计数加1 ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() Tornado
4、协程
是“微线程”,不存在;是由程序员人为创造出来并控制程序:先执行某段代码、再跳到某处执行某段代码。
协程是 “微线程” ,让一个线程 先执行某几行代码 再调到某处 执行某几行代码。 - 不一定遇到io再切,想什么时候切就什么时候切。 - 如果遇到非IO请求来回切换:性能更低。 -如果遇到IO(耗时)请求来回切换:性能高、实现并发(本质上利用IO等待的过程,再去干一些其他的事)
def func1(): print('adsfasdf') print('adsfasdf') print('adsfasdf') yield 1 print('adsfasdf') print('adsfasdf') print('adsfasdf') yield 2 yield 3 yield 4 def func2(): print('adsfasdf') print('adsfasdf') print('adsfasdf') yield 11 yield 12 yield 19 g1=func1() g2=func2() v1=g1.send(None) print('adsfasdf') print('adsfasdf') print('adsfasdf') v1=1 v2=g1.send(None) print('adsfasdf') print('adsfasdf') print('adsfasdf') v2=2 v3=g2.send(None)
5、IO多路复用的作用(监听多个socket是否发生变化 ( select ))
- select,内部循环检测socket是否发生变化;1024个socket
- poll,内部循环检测socket是否发生变化; - epoll,回调的方式
import select while True: # 让select模块帮助我们去检测sk1/sk2两个socket对象是否已经发生“变化” # r=[] # 如果r中有值 # r=[sk1,] 表示:sk1这个socket已经获取到响应的内容 # r=[sk1,sk2] 表示:sk1,sk2两个socket已经获取到响应的内容 # w=[],如果w中有值 # w=[sk1,], 表示:sk1这个socket已经连接成功; # w=[sk1,sk2],表示:sk1/sk2两个socket已经连接成功; r,w,e = select.select([sk1,sk2],[sk1,sk2],[],0.5) for client in w: content = "GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n" client.sendall(content.encode('utf-8')) for client in r: response = client.recv(8096) print(response) client.close()
6、异步非阻塞?
连接阻塞,接收返回数据 - 不等待(如果报错,捕捉异常) - 代码: sk = socket.socket() sk.setblocking(False) socket.socket.setblocking(flag) # 如果flag为0/fasle, 则将套接字设置为非阻塞模式。否则, 套接字将设置为阻塞模式。 #在非阻塞模式下, 如果recv()调用没有发现任何数据或者send()调用无法立即发送数据, 那么将引发socket.error异常。在阻塞模式下, 这些调用在处理之前都将被阻塞。
callback=parse()
- 回调,当达到某个指定的状态之后,自动调用特定函数。
setblocking=false callback=parse
7、 自定义异步非阻塞模块?
基于socket设置setblocking和IO多路复用(select)来实现。 爬虫发送Http请求本质创建socket对象; IO多路复用select "循环"监听socket是否发生变化,一旦发生变化, 我们可以自定义操作(触发某个函数的执行)
import socket import select class Request(object): def __init__(self,sk,callback): self.sk = sk self.callback = callback def fileno(self): return self.sk.fileno() class AsyncHttp(object): def __init__(self): self.fds = [] self.conn = [] def add(self,url,callback): sk = socket.socket() sk.setblocking(False) try: sk.connect((url,80)) except BlockingIOError as e: pass req = Request(sk,callback) self.fds.append(req) self.conn.append(req) def run(self): """ 监听socket是否发生变化 :return: """ while True: """ fds=[req(sk,callback),req,req] conn=[req,req,req] """ r,w,e = select.select(self.fds,self.conn,[],0.05) # sk.fileno() = req.fileno() # w=已经连接成功的socket列表 w=[sk1,sk2] for req in w: req.sk.sendall(b'GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n') # 已经连接成功的socket,无需再继续监听 self.conn.remove(req) # r=服务端给用户返回数据了 r=[sk1,] for req in r: data = req.sk.recv(8096) req.callback(data) req.sk.close() # 断开连接:短连接、无状态 self.fds.remove(req) # 不再监听 if not self.fds: break ah = AsyncHttp() def callback1(data): print(11111,data) def callback2(data): print(22222,data) def callback3(data): print(333333,data) ah.add('www.cnblogs.com',callback1) # sk1 ah.add('www.baidu.com',callback2) # sk2 ah.add('www.luffycity.com',callback3) # sk3 ah.run()
8、Http请求本质
socket只要拿到数据就断开,无状态的特性
TCP和UDP区别?
tcp 建立连接,发送数据,数据完整
udp 只是发送数据,数据可能丢包 不需要 监听和接收