高性能爬虫相关
本质
import socket sk = socket.socket() sk.setblocking(False) # 不阻塞 # 连接的过程是:阻塞 sk.connect(('www.cnblogs.com', 80)) # HTTP协议 content = "GET /wupeiqi HTTP/1.1\r\nHost: www.cnblogs.com\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n" sk.sendall(content.encode('utf-8')) # 等待服务端返回内容:阻塞 data = sk.recv(8096) print(data.decode('utf-8')) sk.close()
不阻塞
# 1.本质socket import socket sk = socket.socket() sk.setblocking(False) # 不阻塞 # 连接的过程是:阻塞 sk.connect(('www.cnblogs.com', 80)) # HTTP协议 content = "GET /wupeiqi HTTP/1.1\r\nHost: www.cnblogs.com\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n" sk.sendall(content.encode('utf-8')) # 等待服务端返回内容:阻塞 data = sk.recv(8096) print(data.decode('utf-8')) sk.close()
运行结果:注意会报错,但是请求已经发出去了。
非阻塞:不等待
捕捉异常
单线程发送很多请求
import socket import select class AsyncHttp(object): def __init__(self): self.fds = [] self.conn = [] def add(self,url): # 创建socket链接 sk = socket.socket() sk.setblocking(False) # 设置不阻塞 try: # 连接的过程是:默认阻塞 sk.connect((url,80)) except BlockingIOError as e: pass self.fds.append(sk) self.conn.append(sk) def run(self): # 监听socket是否发生变化 while True: """ fds=[sk2,sk3] # 未返回数据的 conn=[sk3] # 未连接成功的 """ r,w,e = select.select(self.fds,self.conn,[],0.05) # w = 已经连接成功的socket列表 w=[sk1,sk2] for client in w: client.sendall(b'GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n') # 已经连接成功的socket,无需再继续监听。 self.conn.remove(client) # r = 服务端给用户返回数据了 r=[sk1,] for cli in r: data = cli.recv(8096) print('获取到相应数据',data) cli.close() # 断开连接:短连接、无状态 self.fds.remove(cli) # 不再监听 if not self.fds: # fds=[] 跳出while循环 break ah = AsyncHttp() ah.add('www.cnblogs.com') # sk1 ah.add('www.baidu.com') # sk2 ah.add('www.luffycity.com') # sk3 ah.run()
异步:回调函数
再写一个参数,回调函数
# 不阻塞 : 不等待. 异步:回调函数 import socket import select class Request(object): def __init__(self,sk,callback): self.sk = sk self.callback = callback def fileno(self): return self.sk.fileno() class AsyncHttp(object): def __init__(self): self.fds = [] self.conn = [] def add(self,url,callback): # 创建socket链接 sk = socket.socket() # 实例化对象 sk.setblocking(False) # 设置不阻塞 try: # 连接的过程是:默认阻塞 sk.connect((url,80)) except BlockingIOError as e: pass # 将socket封装到Request里。 req = Request(sk,callback) self.fds.append(req) self.conn.append(req) def run(self): # 监听socket是否发生变化 while True: """ fds=[req(sk,callback),req,req] # 未返回数据的 conn=[req,req,req] # 未连接成功的 """ # IO多路复用 监听sk.fileno() = req.fileno() r,w,e = select.select(self.fds,self.conn,[],0.05) # w = 已经连接成功的socket列表 w=[sk1,sk2] for req in w: req.sk.sendall(b'GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n') # 已经连接成功的socket,无需再继续监听。 self.conn.remove(req) # r = 服务端给用户返回数据了 r=[sk1,] for req in r: data = req.sk.recv(8096) req.callback(data) req.sk.close() # 断开连接:短连接、无状态 self.fds.remove(req) # 不再监听 if not self.fds: # fds=[] 跳出while循环 break ah = AsyncHttp() def callback1(data): print(111,data) def callback2(data): print(222,data) def callback3(data): print(333,data) ah.add('www.cnblogs.com',callback1) # sk1 ah.add('www.baidu.com',callback2) # sk2 ah.add('www.luffycity.com',callback3) # sk3 ah.run()
运行结果:各自有各自的解析方式
内容梳理: a. 并发方案 - 多进程 - 多线程 - 利用“异步非阻塞”模块实现单线程并发请求。 b. 异步非阻塞 - 非阻塞 - 异步 c. 如何自定义异步非阻塞模块? 基于socket设置setblocking和IO多路复用来实现。 爬虫发送Http请求本质创建socket对象; IO多路复用"循环"监听socket是否发生变化,一旦发生变化, 我们可以自定义操作(触发某个函数的执行)
协程
什么是协程?
协程是 “微线程” ,让一个线程 先执行某几行代码 再调到某处 执行某几行代码。
def func1(): yield 1 yield 2 yield 3 yield 4 g = func1() """ # 调用方式1: v = g.__next__() print(v) # 1 v = g.__next__() print(v) # 2 """ # 调用方式2: v = g.send(None) print(v) #1 v = g.send(None) print(v) #2
使用
情况1
# 情况一: import asyncio import requests @asyncio.coroutine def fetch_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future print(response.url, len(response.content)) tasks = [ fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'), fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
情况2
# 情况二: import gevent from gevent import monkey monkey.patch_all() import requests def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url, len(response.content)) # ##### 发送请求 ##### gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.sogo.com/', req_kwargs={}), ]) # ##### 发送请求(协程池控制最大协程数量) ##### # from gevent.pool import Pool # pool = Pool(None) # gevent.joinall([ # pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), # pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}), # pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}), # ])
情况3
# 情况三: from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) d_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: d = getPage(bytes(url, encoding='utf8')) d.addCallback(callback) d_list.append(d) # 用于检查是否页面已经全部下载完成,如果已下载完成那么,就停止循环。 dlist = defer.DeferredList(d_list) dlist.addBoth(all_done) # reactor.run()