前言
简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探html5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.
源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# coding=utf-8
import asyncio
import multiprocessing
import os
import re
import time
from math import floor
from multiprocessing import manager
import aiohttp
import requests
from lxml import html
import threading
from src.my_lib import retry
from src.my_lib import time_statistics
class m3u8download:
_path = "./resource\\" # 本地文件路径
_url_seed = none # 资源所在链接前缀
_target_url = {} # 资源任务目标字典
_mode = ""
_headers = { "user-agent" : "mozilla/5.0" } # 浏览器代理
_target_num = 100
def __init__( self ):
self ._ml = manager(). list () # 进程通信列表
if not os.path.exists( self ._path): # 检测本地目录存在否
os.makedirs( self ._path)
exec_str = r 'chcp 65001'
os.system(exec_str) # 先切换utf-8输出,防止控制台乱码
def sniffing( self , url):
self ._url = url
print ( "开始嗅探..." )
try :
r = requests.get( self ._url) # 访问嗅探网址,获取网页信息
except :
print ( "嗅探失败,网址不正确" )
os.system( "pause" )
else :
tree = html.fromstring(r.content)
try :
source_url = tree.xpath( '//video//source/@src' )[ 0 ] # 嗅探资源控制文件链接,这里只针对一个资源控制文件
# self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名
except :
print ( "嗅探失败,未发现资源" )
os.system( "pause" )
else :
self .analysis(source_url)
def analysis( self , source_url):
try :
self ._url_seed = re.split( "/\w+\.m3u8" , source_url)[ 0 ] # 从资源控制文件链接解析域名
with requests.get(source_url) as r: # 访问资源控制文件,获得资源信息
src = re.split( "\n*#.+\n" , r.text) # 解析资源信息
for sub_src in src: # 将资源地址储存到任务字典
if sub_src:
self ._target_url[sub_src] = self ._url_seed + "/" + sub_src
except exception as e:
print ( "资源无法成功解析" , e)
os.system( "pause" )
else :
self ._target_num = len ( self ._target_url)
print ( "sniffing success!!!,found" , self ._target_num, "url." )
self ._mode = input (
"1:-> 单进程(low b)\n2:-> 多进程+多线程(网速开始biubiu飞起!)\n3:-> 多进程+协程(最先进的并发!!!)\n" )
if self ._mode = = "1" :
for path, url in self ._target_url.items():
self ._download(path, url)
elif self ._mode = = "2" or self ._mode = = "3" :
self ._multiprocessing()
def _multiprocessing( self , processing_num = 4 ): # 多进程,多线程
target_list = {} # 进程任务字典,储存每个进程分配的任务
pool = multiprocessing.pool(processes = processing_num) # 开启进程池
i = 0 # 任务分配标识
for path, url in self ._target_url.items(): # 分配进程任务
target_list[path] = url
i + = 1
if i % 10 = = 0 or i = = len ( self ._target_url): # 每个进程分配十个任务
if self ._mode = = "2" :
pool.apply_async( self ._sub_multithreading, kwds = target_list) # 使用多线程驱动方法
else :
pool.apply_async( self ._sub_coroutine, kwds = target_list) # 使用协程驱动方法
target_list = {}
pool.close() # join函数等待所有子进程结束
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool
while true:
if self ._judge_over():
self ._combine()
break
def _sub_multithreading( self , * * kwargs):
for path, url in kwargs.items(): # 根据进程任务开启线程
t = threading.thread(target = self ._download, args = (path, url,))
t.start()
@retry ()
def _download( self , path, url): # 同步下载方法
with requests.get(url, headers = self ._headers) as r:
if r.status_code = = 200 :
with open ( self ._path + path, "wb" )as file :
file .write(r.content)
self ._ml.append( 0 ) # 每成功一个就往进程通信列表增加一个值
percent = '%.2f' % ( len ( self ._ml) / self ._target_num * 100 )
print ( len ( self ._ml), ": " , path, "->ok" , "\tcomplete:" , percent, "%" ) # 显示下载进度
else :
print (path, r.status_code, r.reason)
def _sub_coroutine( self , * * kwargs):
tasks = []
for path, url in kwargs.items(): # 根据进程任务创建协程任务列表
tasks.append(asyncio.ensure_future( self ._async_download(path, url)))
loop = asyncio.get_event_loop() # 创建异步事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 注册任务列表
async def _async_download( self , path, url): # 异步下载方法
async with aiohttp.clientsession() as session:
async with session.get(url, headers = self ._headers) as resp:
try :
assert resp.status = = 200 , "e" # 断言状态码为200,否则抛异常,触发重试装饰器
with open ( self ._path + path, "wb" )as file :
file .write(await resp.read())
except exception as e:
print (e)
else :
self ._ml.append( 0 ) # 每成功一个就往进程通信列表增加一个值
percent = '%.2f' % ( len ( self ._ml) / self ._target_num * 100 )
print ( len ( self ._ml), ": " , path, "->ok" , "\tcomplete:" , percent, "%" ) # 显示下载进度
def _combine( self ): # 组合资源方法
try :
print ( "开始组合资源..." )
identification = str (floor(time.time()))
exec_str = r 'copy /b "' + self ._path + r '*.ts" "' + self ._path + 'video' + identification + '.mp4"'
os.system(exec_str) # 使用cmd命令将资源整合
exec_str = r 'del "' + self ._path + r '*.ts"'
os.system(exec_str) # 删除原来的文件
except :
print ( "资源组合失败" )
else :
print ( "资源组合成功!" )
def _judge_over( self ): # 判断是否全部下载完成
if len ( self ._ml) = = len ( self ._target_url):
return true
return false
@time_statistics
def app():
multiprocessing.freeze_support()
url = input ( "输入嗅探网址:\n" )
m3u8 = m3u8download()
m3u8.sniffing(url)
# m3u8.analysis(url)
if __name__ = = "__main__" :
app()
|
这里是两个装饰器的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import time
def time_statistics(fun):
def function_timer( * args, * * kwargs):
t0 = time.time()
result = fun( * args, * * kwargs)
t1 = time.time()
print ( "total time running %s: %s seconds" % (fun.__name__, str (t1 - t0)))
return result
return function_timer
def retry(retries = 3 ):
def _retry(fun):
def wrapper( * args, * * kwargs):
for _ in range (retries):
try :
return fun( * args, * * kwargs)
except exception as e:
print ( "@" , fun.__name__, "->" , e)
return wrapper
return _retry
|
打包成exe文件
使用pyinstaller -f download.py将程序打包成单个可执行文件.
这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.
关于协程
协程在python3.5进化到了async await版本,用 async 标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.
关于装饰器
装饰器源于闭包原理,这里使用了两种装饰器.
- @time_statistics:统计耗时,装饰器自己无参型
- @retry():设置重试次数,装饰器自己有参型
- 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于():
- 不带括号时,调用的是这个函数本身
- 带括号(此时必须传入需要的参数),调用的是函数的return结果
关于cmd控制台
程序会使用cmd命令来将下载的ts文件合并.
因为cmd默认使用gb2312编码,调用os.system()需要先切换成通用的utf-8输出,否则系统信息会乱码.
而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_37258787/article/details/80298084