Python编写一个优美的下载器

时间:2021-08-16 05:40:18

本文实例为大家分享了Python编写下载器的具体代码,供大家参考,具体内容如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/bin/python3
# author: lidawei
# create: 2016-07-11
# version: 1.0
# 功能说明:
#  从指定的URL将文件取回本地
#####################################################
 
import http.client
import os
import threading
import time
import logging
import unittest
from queue import Queue
from urllib.parse import urlparse
 
logging.basicConfig(level = logging.DEBUG,
     format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
     datefmt = '%a, %d %b %Y %H:%M:%S',
     filename = 'Downloader_%s.log' % (time.strftime('%Y-%m-%d')),
     filemode = 'a')
 
class Downloader(object):
 '''''文件下载器'''
 url = ''
 filename = ''
 
 def __init__(self, full_url_str, filename):
  '''''初始化'''
  self.url = urlparse(full_url_str)
  self.filename = filename
 
 def download(self):
  '''''执行下载,返回True或False'''
  if self.url == '' or self.url == None or self.filename == '' or self.filename == None:
   logging.error('Invalid parameter for Downloader')
   return False
 
  successed = False
  conn = None
  if self.url.scheme == 'https':
   conn = http.client.HTTPSConnection(self.url.netloc)
  else:
   conn = http.client.HTTPConnection(self.url.netloc)
  conn.request('GET', self.url.path)
  response = conn.getresponse()
  if response.status == 200:
   total_size = response.getheader('Content-Length')
   total_size = (int)(total_size)
   if total_size > 0:
    finished_size = 0
    file = open(self.filename, 'wb')
    if file:
     progress = Progress()
     progress.start()
     while not response.closed:
      buffers = response.read(1024)
      file.write(buffers)
 
      finished_size += len(buffers)
      progress.update(finished_size, total_size)
      if finished_size >= total_size:
       break
     # ... end while statment
     file.close()
     progress.stop()
     progress.join()
    else:
     logging.error('Create local file %s failed' % (self.filename))
    # ... end if statment
   else:
    logging.error('Request file %s size failed' % (self.filename))
   # ... end if statment
  else:
   logging.error('HTTP/HTTPS request failed, status code:%d' % (response.status))
  # ... end if statment
  conn.close()
 
  return successed
 # ... end download() method
# ... end Downloader class
 
class DataWriter(threading.Thread):
 filename = ''
 data_dict = {'offset' : 0, 'buffers_byte' : b''}
 queue = Queue(128)
 __stop = False
 
 def __init__(self, filename):
  self.filename = filename
  threading.Thread.__init__(self)
 
 #Override
 def run(self):
  while not self.__stop:
   self.queue.get(True, 1)
 
 def put_data(data_dict):
  '''''将data_dict的数据放入队列,data_dict是一个字典,有两个元素:offset是偏移量,buffers_byte是二进制字节串'''
  self.queue.put(data_dict)
 
 def stop(self):
  self.__stop = True
 
class Progress(threading.Thread):
 interval = 1
 total_size = 0
 finished_size = 0
 old_size = 0
 __stop = False
 
 def __init__(self, interval = 0.5):
  self.interval = interval
  threading.Thread.__init__(self)
 
 #Override
 def run(self):
  # logging.info('  Total  Finished  Percent  Speed')
  print('  Total  Finished  Percent  Speed')
  while not self.__stop:
   time.sleep(self.interval)
   if self.total_size > 0:
    percent = self.finished_size / self.total_size * 100
    speed = (self.finished_size - self.old_size) / self.interval
    msg = '%12d %12d %10.2f%% %12d' % (self.total_size, self.finished_size, percent, speed)
    # logging.info(msg)
    print(msg)
 
    self.old_size = self.finished_size
   else:
    logging.error('Total size is zero')
  # ... end while statment
 # ... end run() method
 
 def stop(self):
  self.__stop = True
 
 def update(self, finished_size, total_size):
  self.finished_size = finished_size
  self.total_size = total_size
 
class TestDownloaderFunctions(unittest.TestCase):
 
 def setUp(self):
  print('setUp')
 
 def test_download(self):
  url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe'
  filename = 'QQ8.4.exe'
  dl = Downloader(url, filename)
  dl.download()
 
 def tearDown(self):
  print('tearDown')
 
if __name__ == '__main__':
 unittest.main()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/ccpw_cn/article/details/51880860