本文为大家分享了python实现的一个多线程网页下载器,供大家参考,具体内容如下
这是一个有着真实需求的实现,我的用途是拿它来通过 HTTP 方式向服务器提交游戏数据。把它放上来也是想大家帮忙挑刺,找找 bug,让它工作得更好。
keywords:python,http,multi-threads,thread,threading,httplib,urllib,urllib2,Queue,http pool,httppool
废话少说,上源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# -*- coding:utf-8 -*-
import urllib, httplib
import thread
import time
from Queue import Queue, Empty, Full
HEADERS = { "Content-type" : "application/x-www-form-urlencoded" ,
'Accept-Language' : 'zh-cn' ,
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0;Windows NT 5.0)' ,
"Accept" : "text/plain" }
UNEXPECTED_ERROR = - 1
POST = 'POST'
GET = 'GET'
def base_log(msg):
print msg
def base_fail_op(task, status, log):
log( 'fail op. task = %s, status = %d' % ( str (task), status))
def get_remote_data(tasks, results, fail_op = base_fail_op, log = base_log):
while True :
task = tasks.get()
try :
tid = task[ 'id' ]
hpt = task[ 'conn_args' ] # hpt <= host:port, timeout
except KeyError, e:
log( str (e))
continue
log( 'thread_%s doing task %d' % (thread.get_ident(), tid))
#log('hpt = ' + str(hpt))
conn = httplib.HTTPConnection( * * hpt)
try :
params = task[ 'params' ]
except KeyError, e:
params = {}
params = urllib.urlencode(params)
#log('params = ' + params)
try :
method = task[ 'method' ]
except KeyError:
method = 'GET'
#log('method = ' + method)
try :
url = task[ 'url' ]
except KeyError:
url = '/'
#log('url = ' + url)
headers = HEADERS
try :
tmp = task[ 'headers' ]
except KeyError, e:
tmp = {}
headers.update(tmp)
#log('headers = ' + str(headers))
headers[ 'Content-Length' ] = len (params)
try :
if method = = POST:
conn.request(method, url, params, headers)
else :
conn.request(method, url + params)
response = conn.getresponse()
except Exception, e:
log( 'request failed. method = %s, url = %s, params = %s headers = %s' % (
method, url, params, headers))
log( str (e))
fail_op(task, UNEXPECTED_ERROR, log)
continue
if response.status ! = httplib.OK:
fail_op(task, response.status, log)
continue
data = response.read()
results.put((tid, data), True )
class HttpPool( object ):
def __init__( self , threads_count, fail_op, log):
self ._tasks = Queue()
self ._results = Queue()
for i in xrange (threads_count):
thread.start_new_thread(get_remote_data,
( self ._tasks, self ._results, fail_op, log))
def add_task( self , tid, host, url, params, headers = {}, method = 'GET' , timeout = None ):
task = {
'id' : tid,
'conn_args' : { 'host' : host} if timeout is None else { 'host' : host, 'timeout' : timeout},
'headers' : headers,
'url' : url,
'params' : params,
'method' : method,
}
try :
self ._tasks.put_nowait(task)
except Full:
return False
return True
def get_results( self ):
results = []
while True :
try :
res = self ._results.get_nowait()
except Empty:
break
results.append(res)
return results
def test_google(task_count, threads_count):
hp = HttpPool(threads_count, base_fail_op, base_log)
for i in xrange (task_count):
if hp.add_task(i,
'www.google.cn' ,
'/search?' ,
{ 'q' : 'lai' },
# method = 'POST'
):
print 'add task successed.'
while True :
results = hp.get_results()
if not results:
time.sleep( 1.0 * random.random())
for i in results:
print i[ 0 ], len (i[ 1 ])
# print unicode(i[1], 'gb18030')
if __name__ = = '__main__' :
import sys, random
task_count, threads_count = int (sys.argv[ 1 ]), int (sys.argv[ 2 ])
test_google(task_count, threads_count)
|
有兴趣想尝试运行的朋友,可以把它保存为 xxxx.py,然后执行 python xxxx.py 10 4,其中 10 表示向 google.cn 请求 10 次查询,4 表示由 4 条线程来执行这些任务。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/gzlaiyonghao/article/details/4083852