Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考:
队列+多线程
定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问。
主线程读取文件中的 url 放入队列中,然后等待队列中所有的元素都被接收和处理完毕。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
from threading import Thread
import sys
from queue import Queue
import requests
concurrent = 200
def doWork():
while True :
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try :
res = requests.get(ourl)
return res.status_code, ourl
except :
return "error" , ourl
def doSomethingWithResult(status, url):
print (status, url)
q = Queue(concurrent * 2 )
for i in range (concurrent):
t = Thread(target = doWork)
t.daemon = True
t.start()
try :
for url in open ( "urllist.txt" ):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit( 1 )
|
运行结果如下:
有没有 get 到新技能?
线程池
如果你使用线程池,推荐使用更高级的 concurrent.futures 库:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import concurrent.futures
import requests
out = []
CONNECTIONS = 100
TIMEOUT = 5
urls = []
with open ( "urllist.txt" ) as reader:
for url in reader:
urls.append(url.strip())
def load_url(url, timeout):
ans = requests.get(url, timeout = timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers = CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
for future in concurrent.futures.as_completed(future_to_url):
try :
data = future.result()
except Exception as exc:
data = str ( type (exc))
finally :
out.append(data)
print (data)
|
协程 + aiohttp
协程也是并发非常常用的工具了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import asyncio
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str , session: ClientSession, * * kwargs) - > tuple :
try :
resp = await session.request(method = "GET" , url = url, * * kwargs)
except ClientConnectorError:
return (url, 404 )
return (url, resp.status)
async def make_requests(urls: set , * * kwargs) - > None :
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url = url, session = session, * * kwargs)
)
results = await asyncio.gather( * tasks)
for result in results:
print (f '{result[1]} - {str(result[0])}' )
if __name__ = = "__main__" :
import sys
assert sys.version_info > = ( 3 , 7 ), "Script requires Python 3.7+."
with open ( "urllist.txt" ) as infile:
urls = set ( map ( str .strip, infile))
asyncio.run(make_requests(urls = urls))
|
grequests
这是个第三方库,目前有 3.8K 个星,就是 Requests + Gevent,让异步 http 请求变得更加简单。Gevent 的本质还是协程。
使用前:
1
|
pip install grequests
|
使用起来那是相当的简单:
1
2
3
4
5
6
7
8
|
import grequests
urls = []
with open ( "urllist.txt" ) as reader:
for url in reader:
urls.append(url.strip())
rs = (grequests.get(u) for u in urls)
for result in grequests. map (rs):
print (result.status_code, result.url)
|
注意 grequests.map(rs)
是并发执行的。运行结果如下:
也可以加入异常处理:
1
2
3
4
5
6
7
8
9
10
|
>>> def exception_handler(request, exception):
... print ( "Request failed" )
>>> reqs = [
... grequests.get( 'http://httpbin.org/delay/1' , timeout = 0.001 ),
... grequests.get( 'http://fakedomain/' ),
... grequests.get( 'http://httpbin.org/status/500' )]
>>> grequests. map (reqs, exception_handler = exception_handler)
Request failed
Request failed
[ None , None , <Response [ 500 ]>]
|
最后的话
今天分享了并发 http 请求的几种实现方式,有人说异步(协程)性能比多线程好,其实要分场景看的,没有一种方法适用所有的场景,笔者就曾做过一个实验,也是请求 url,当并发数量超过 500 时,协程明显变慢。
以上就是Python并发编程队列与多线程最快发送http请求方式的详细内容,更多关于Python并发编程队列与多线程的资料请关注服务器之家其它相关文章!
原文链接:https://blog.csdn.net/somenzz/article/details/120030634