本文实例为大家分享了python批量爬取下载抖音视频的具体代码,供大家参考,具体内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
import os
import requests
import re
import sys
import asyncio
import aiohttp
headers = {
'user-agent' : 'mozilla/5.0 (iphone; cpu iphone os 11_0 like mac os x) applewebkit/604.1.38 (khtml, like gecko) '
'version/11.0 mobile/15a372 safari/604.1'
}
video_urls, page = [], 1
def get_info(url):
"""
:param url: 用户的链接
:return:返回name,dytk,user_id 参数
"""
name = none
dytk = none
user_id = none
try :
response = requests.get(url, headers = headers)
user_id = response.url.split( '/' )[ 5 ].split( '?' )[ 0 ]
name = re.search(r 'class="nickname">(.*?)<' , response.text)[ 1 ]
dytk = re.search(r "dytk: '(.*?)'" , response.text)[ 1 ]
except (typeerror, indexerror):
sys.stdout.write( 'waring:输入的链接错误' )
except requests.exceptions:
sys.stdout.write( 'waring:链接错误' )
finally :
return name, user_id, dytk
def make_dir(name):
"""
建立文件夹
:param name: 用户名称
:return:
"""
if not os.path.isdir(name):
os.mkdir(name)
else :
pass
def get_all_video(user_id, max_cursor, dytk):
"""
获取视频的地址
:param user_id:
:param max_cursor:
:param dytk:
:return:
"""
url = "https://www.amemv.com/aweme/v1/aweme/post/?"
params = { 'user_id' : user_id,
'count' : 21 ,
'max_cursor' : max_cursor,
'dytk' : dytk}
try :
response = requests.get(url = url, params = params, headers = headers)
if response.status_code = = 200 :
datas = response.json()
for data in datas[ 'aweme_list' ]:
name = data.get( 'share_info' ).get( 'share_desc' )
url = data.get( 'video' ).get( 'play_addr' ).get( 'url_list' )[ 0 ].replace( 'playwm' , 'play' )
video_urls.append([name, url])
if datas[ 'has_more' ] = = 1 and datas.get( 'max_cursor' ) ! = 0 :
global page
print (f '收集第{page}页视频' )
page + = 1
return get_all_video(user_id, datas.get( 'max_cursor' ), dytk)
else :
print ( '收集完成' )
return video_urls
else :
print ( '状态码:' , response.status_code)
return none
except exception as e:
print ( 'waring:' , e)
return
async def download_video(index, name, video_name, url):
"""
下载视频
:param index: 视频id
:param name: 用户名称
:param video_name: 视频名称
:param url: 下载url
:return:
"""
print (f '正在下载第{index}个视频:{video_name}' )
video_path = '{}/{}.mp4' . format (name, video_name)
if not os.path.isfile(video_path):
try :
async with aiohttp.clientsession() as session:
async with session.get(url = url, headers = headers, ssl = false) as response:
with open (video_path, 'wb' ) as f:
while true:
chunk = await response.content.read( 1024 )
f.write(chunk)
if not chunk:
break
print (f '下载完成第{index}个视频:{video_name}' )
except exception as e:
print ( 'waring:download faild' , video_name, e)
return
else :
print ( '文件已存在' )
def main():
url = 'http://v.douyin.com/deorkn/'
name, user_id, dytk = get_info(url)
if not (name, user_id, dytk):
return
make_dir(name)
get_all_video(user_id, 0 , dytk)
print (f '{name}:总共有{len(video_urls)}个视频' )
tasks = []
for index, item in enumerate (video_urls, 1 ):
video_name = item[ 0 ]
url = item[ 1 ]
tasks.append(asyncio.ensure_future(download_video(index, name, video_name, url)))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(asyncio.sleep( 0 ))
loop.close()
print (f '{name}视频下载完成!' )
if __name__ = = '__main__' :
main()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_43513350/article/details/85692569