本文实例为大家分享了Python编写下载器的具体代码,供大家参考,具体内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#!/bin/python3
# author: lidawei
# create: 2016-07-11
# version: 1.0
# 功能说明:
# 从指定的URL将文件取回本地
#####################################################
import http.client
import os
import threading
import time
import logging
import unittest
from queue import Queue
from urllib.parse import urlparse
logging.basicConfig(level = logging.DEBUG,
format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' ,
datefmt = '%a, %d %b %Y %H:%M:%S' ,
filename = 'Downloader_%s.log' % (time.strftime( '%Y-%m-%d' )),
filemode = 'a' )
class Downloader( object ):
'''''文件下载器'''
url = ''
filename = ''
def __init__( self , full_url_str, filename):
'''''初始化'''
self .url = urlparse(full_url_str)
self .filename = filename
def download( self ):
'''''执行下载,返回True或False'''
if self .url = = ' ' or self.url == None or self.filename == ' ' or self .filename = = None :
logging.error( 'Invalid parameter for Downloader' )
return False
successed = False
conn = None
if self .url.scheme = = 'https' :
conn = http.client.HTTPSConnection( self .url.netloc)
else :
conn = http.client.HTTPConnection( self .url.netloc)
conn.request( 'GET' , self .url.path)
response = conn.getresponse()
if response.status = = 200 :
total_size = response.getheader( 'Content-Length' )
total_size = ( int )(total_size)
if total_size > 0 :
finished_size = 0
file = open ( self .filename, 'wb' )
if file :
progress = Progress()
progress.start()
while not response.closed:
buffers = response.read( 1024 )
file .write(buffers)
finished_size + = len (buffers)
progress.update(finished_size, total_size)
if finished_size > = total_size:
break
# ... end while statment
file .close()
progress.stop()
progress.join()
else :
logging.error( 'Create local file %s failed' % ( self .filename))
# ... end if statment
else :
logging.error( 'Request file %s size failed' % ( self .filename))
# ... end if statment
else :
logging.error( 'HTTP/HTTPS request failed, status code:%d' % (response.status))
# ... end if statment
conn.close()
return successed
# ... end download() method
# ... end Downloader class
class DataWriter(threading.Thread):
filename = ''
data_dict = { 'offset' : 0 , 'buffers_byte' : b''}
queue = Queue( 128 )
__stop = False
def __init__( self , filename):
self .filename = filename
threading.Thread.__init__( self )
#Override
def run( self ):
while not self .__stop:
self .queue.get( True , 1 )
def put_data(data_dict):
'''''将data_dict的数据放入队列,data_dict是一个字典,有两个元素:offset是偏移量,buffers_byte是二进制字节串'''
self .queue.put(data_dict)
def stop( self ):
self .__stop = True
class Progress(threading.Thread):
interval = 1
total_size = 0
finished_size = 0
old_size = 0
__stop = False
def __init__( self , interval = 0.5 ):
self .interval = interval
threading.Thread.__init__( self )
#Override
def run( self ):
# logging.info(' Total Finished Percent Speed')
print ( ' Total Finished Percent Speed' )
while not self .__stop:
time.sleep( self .interval)
if self .total_size > 0 :
percent = self .finished_size / self .total_size * 100
speed = ( self .finished_size - self .old_size) / self .interval
msg = '%12d %12d %10.2f%% %12d' % ( self .total_size, self .finished_size, percent, speed)
# logging.info(msg)
print (msg)
self .old_size = self .finished_size
else :
logging.error( 'Total size is zero' )
# ... end while statment
# ... end run() method
def stop( self ):
self .__stop = True
def update( self , finished_size, total_size):
self .finished_size = finished_size
self .total_size = total_size
class TestDownloaderFunctions(unittest.TestCase):
def setUp( self ):
print ( 'setUp' )
def test_download( self ):
url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe'
filename = 'QQ8.4.exe'
dl = Downloader(url, filename)
dl.download()
def tearDown( self ):
print ( 'tearDown' )
if __name__ = = '__main__' :
unittest.main()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/ccpw_cn/article/details/51880860