高性能爬虫相关

时间:2021-03-11 19:38:47

高性能爬虫相关

本质

高性能爬虫相关高性能爬虫相关
import socket
sk = socket.socket()
sk.setblocking(False) # 不阻塞

# 连接的过程是:阻塞
sk.connect(('www.cnblogs.com', 80))

# HTTP协议
content = "GET /wupeiqi HTTP/1.1\r\nHost: www.cnblogs.com\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n"
sk.sendall(content.encode('utf-8'))

# 等待服务端返回内容:阻塞
data = sk.recv(8096)
print(data.decode('utf-8'))
sk.close()
1.本质socket

 

不阻塞

高性能爬虫相关高性能爬虫相关
# 1.本质socket
import socket
sk = socket.socket()
sk.setblocking(False) # 不阻塞

# 连接的过程是:阻塞
sk.connect(('www.cnblogs.com', 80))

# HTTP协议
content = "GET /wupeiqi HTTP/1.1\r\nHost: www.cnblogs.com\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n"
sk.sendall(content.encode('utf-8'))

# 等待服务端返回内容:阻塞
data = sk.recv(8096)
print(data.decode('utf-8'))
sk.close()
不阻塞

运行结果:注意会报错,但是请求已经发出去了。

高性能爬虫相关

 

非阻塞:不等待

捕捉异常

单线程发送很多请求

高性能爬虫相关高性能爬虫相关
import socket
import select
class AsyncHttp(object):
    def __init__(self):
        self.fds = []
        self.conn = []

    def add(self,url):  # 创建socket链接
        sk = socket.socket()
        sk.setblocking(False)  # 设置不阻塞
        try:
            # 连接的过程是:默认阻塞
            sk.connect((url,80))
        except BlockingIOError as e:
            pass

        self.fds.append(sk)
        self.conn.append(sk)

    def run(self): # 监听socket是否发生变化
        while True:
            """
            fds=[sk2,sk3] # 未返回数据的
            conn=[sk3]  # 未连接成功的
            """
            r,w,e = select.select(self.fds,self.conn,[],0.05)
            # w = 已经连接成功的socket列表  w=[sk1,sk2]
            for client in w:
                client.sendall(b'GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n')
                # 已经连接成功的socket,无需再继续监听。
                self.conn.remove(client)

            # r = 服务端给用户返回数据了 r=[sk1,]
            for cli in r:
                data = cli.recv(8096)
                print('获取到相应数据',data)

                cli.close()  # 断开连接:短连接、无状态
                self.fds.remove(cli)  # 不再监听

            if not self.fds:  # fds=[] 跳出while循环
                break

ah = AsyncHttp()

ah.add('www.cnblogs.com') # sk1
ah.add('www.baidu.com') # sk2
ah.add('www.luffycity.com') # sk3

ah.run()
捕捉异常

 

异步:回调函数

 再写一个参数,回调函数

高性能爬虫相关高性能爬虫相关
# 不阻塞 : 不等待.  异步:回调函数
import socket
import select

class Request(object):
    def __init__(self,sk,callback):
        self.sk = sk
        self.callback = callback

    def fileno(self):
        return self.sk.fileno()

class AsyncHttp(object):
    def __init__(self):
        self.fds = []
        self.conn = []

    def add(self,url,callback):  # 创建socket链接
        sk = socket.socket() # 实例化对象
        sk.setblocking(False)  # 设置不阻塞
        try:
            # 连接的过程是:默认阻塞
            sk.connect((url,80))
        except BlockingIOError as e:
            pass

        # 将socket封装到Request里。
        req = Request(sk,callback)
        self.fds.append(req)
        self.conn.append(req)

    def run(self): # 监听socket是否发生变化
        while True:
            """
            fds=[req(sk,callback),req,req] # 未返回数据的
            conn=[req,req,req]  # 未连接成功的
            """

            # IO多路复用  监听sk.fileno()  =  req.fileno()
            r,w,e = select.select(self.fds,self.conn,[],0.05)

            # w = 已经连接成功的socket列表  w=[sk1,sk2]
            for req in w:
                req.sk.sendall(b'GET /wupeiqi HTTP/1.1\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36\r\n\r\n')
                # 已经连接成功的socket,无需再继续监听。
                self.conn.remove(req)

            # r = 服务端给用户返回数据了 r=[sk1,]
            for req in r:
                data = req.sk.recv(8096)
                req.callback(data)

                req.sk.close()  # 断开连接:短连接、无状态
                self.fds.remove(req)  # 不再监听

            if not self.fds:  # fds=[] 跳出while循环
                break

ah = AsyncHttp()

def callback1(data):
    print(111,data)

def callback2(data):
    print(222,data)

def callback3(data):
    print(333,data)

ah.add('www.cnblogs.com',callback1) # sk1
ah.add('www.baidu.com',callback2) # sk2
ah.add('www.luffycity.com',callback3) # sk3

ah.run()
异步:回调函数

运行结果:各自有各自的解析方式

 高性能爬虫相关

 

            内容梳理:
                a. 并发方案
                    - 多进程 
                    - 多线程
                    - 利用“异步非阻塞”模块实现单线程并发请求。
                    
                b. 异步非阻塞
                    - 非阻塞
                    - 异步
                
                c. 如何自定义异步非阻塞模块?
                    基于socket设置setblocking和IO多路复用来实现。
                    爬虫发送Http请求本质创建socket对象;
                    IO多路复用"循环"监听socket是否发生变化,一旦发生变化, 我们可以自定义操作(触发某个函数的执行)

 

协程

什么是协程?   

协程是 “微线程” ,让一个线程 先执行某几行代码 再调到某处 执行某几行代码。


高性能爬虫相关高性能爬虫相关
def func1():
    yield 1
    yield 2
    yield 3
    yield 4


g = func1()

"""
# 调用方式1:
v = g.__next__()
print(v)  # 1
v = g.__next__()
print(v)  # 2
"""
# 调用方式2:

v = g.send(None)
print(v)  #1
v = g.send(None)
print(v)  #2
yield

 

使用

情况1

# 情况一:
import asyncio
import requests


@asyncio.coroutine
def fetch_async(func, *args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None, func, *args)
    response = yield from future
    print(response.url, len(response.content))


tasks = [
    fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
    fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

 

情况2

# 情况二:
import gevent
from gevent import monkey

monkey.patch_all()
import requests


def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, len(response.content))


# ##### 发送请求 #####
gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.sogo.com/', req_kwargs={}),
])
# ##### 发送请求(协程池控制最大协程数量) #####
# from gevent.pool import Pool
# pool = Pool(None)
# gevent.joinall([
#     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
# ])

 

情况3

 

 

# 情况三:
from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


d_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    d = getPage(bytes(url, encoding='utf8'))
    d.addCallback(callback)

    d_list.append(d)

# 用于检查是否页面已经全部下载完成,如果已下载完成那么,就停止循环。
dlist = defer.DeferredList(d_list)
dlist.addBoth(all_done)  #

reactor.run()