python3下载抖音视频的代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
# -*- coding:utf-8 -*-
from contextlib import closing
import requests, json, re, os, sys, random
from ipaddress import ip_address
from subprocess import popen, pipe
import urllib
class douyin( object ):
def __init__( self , width = 500 , height = 300 ):
"""
抖音app视频下载
"""
rip = ip_address( '0.0.0.0' )
while rip.is_private:
rip = ip_address( '.' .join( map ( str , (random.randint( 0 , 255 ) for _ in range ( 4 )))))
self .headers = {
'accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' ,
'accept-encoding' : 'gzip, deflate, br' ,
'accept-language' : 'zh-cn,zh;q=0.9' ,
'pragma' : 'no-cache' ,
'cache-control' : 'no-cache' ,
'upgrade-insecure-requests' : '1' ,
'user-agent' : 'mozilla/5.0 (linux; u; android 5.1.1; zh-cn; mi 4s build/lmy47v) applewebkit/537.36 (khtml, like gecko) version/4.0 chrome/53.0.2785.146 mobile safari/537.36 xiaomi/miuibrowser/9.1.3' ,
'x-real-ip' : str (rip),
'x-forwarded-for' : str (rip),
}
def get_video_urls( self , user_id, type_flag = 'f' ):
"""
获得视频播放地址
parameters:
user_id:查询的用户uid
returns:
video_names: 视频名字列表
video_urls: 视频链接列表
nickname: 用户昵称
"""
video_names = []
video_urls = []
share_urls = []
max_cursor = 0
has_more = 1
i = 0
share_user_url = 'https://www.douyin.com/share/user/%s' % user_id
share_user = requests.get(share_user_url, headers = self .headers)
while share_user.status_code ! = 200 :
share_user = requests.get(share_user_url, headers = self .headers)
_dytk_re = re. compile (r "dytk\s*:\s*'(.+)'" )
dytk = _dytk_re.search(share_user.text).group( 1 )
_nickname_re = re. compile (r '<p class="nickname">(.+?)<\/p>' )
nickname = _nickname_re.search(share_user.text).group( 1 )
urllib.request.urlretrieve( 'https://raw.githubusercontent.com/jack-cherish/python-spider/master/douyin/fuck-byted-acrawler.js' , 'fuck-byted-acrawler.js' )
try :
popen([ 'node' , '-v' ], stdout = pipe, stderr = pipe).communicate()
except (oserror, ioerror) as err:
print ( '请先安装 node.js: https://nodejs.org/' )
sys.exit()
user_url_prefix = 'https://www.douyin.com/aweme/v1/aweme/favorite' if type_flag = = 'f' else 'https://www.douyin.com/aweme/v1/aweme/post'
print ( '解析视频链接中' )
while has_more ! = 0 :
process = popen([ 'node' , 'fuck-byted-acrawler.js' , str (user_id)], stdout = pipe, stderr = pipe)
_sign = process.communicate()[ 0 ].decode().strip( '\n' ).strip( '\r' )
user_url = user_url_prefix + '/?user_id=%s&max_cursor=%s&count=21&aid=1128&_signature=%s&dytk=%s' % (user_id, max_cursor, _sign, dytk)
req = requests.get(user_url, headers = self .headers)
while req.status_code ! = 200 :
req = requests.get(user_url, headers = self .headers)
html = json.loads(req.text)
try :
while html[ 'aweme_list' ] = = []:
i = i + 1
sys.stdout.write( '已重新链接' + str (i) + '次 (若超过100次,请ctrl+c强制停止再重来)' + '\r' )
sys.stdout.flush()
process = popen([ 'node' , 'fuck-byted-acrawler.js' , str (user_id)], stdout = pipe, stderr = pipe)
_sign = process.communicate()[ 0 ].decode().strip( '\n' ).strip( '\r' )
user_url = user_url_prefix + '/?user_id=%s&max_cursor=%s&count=21&aid=1128&_signature=%s&dytk=%s' % (user_id, max_cursor, _sign, dytk)
req = requests.get(user_url, headers = self .headers)
while req.status_code ! = 200 :
req = requests.get(user_url, headers = self .headers)
html = json.loads(req.text)
except :
pass
i = 0
for each in html[ 'aweme_list' ]:
try :
url = 'https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&line=0&ratio=720p&media_type=4&vr_type=0&test_cdn=none&improve_bitrate=0'
uri = each[ 'video' ][ 'play_addr' ][ 'uri' ]
video_url = url % uri
except :
continue
share_desc = each[ 'share_info' ][ 'share_desc' ]
if os.name = = 'nt' :
for c in r '\/:*?"<>|' :
nickname = nickname.replace(c, ' ').strip().strip(' \.')
share_desc = share_desc.replace(c, '').strip()
share_id = each[ 'aweme_id' ]
if share_desc in [ '抖音-原创音乐短视频社区' , 'tiktok' , '']:
video_names.append(share_id + '.mp4' )
else :
video_names.append(share_id + '-' + share_desc + '.mp4' )
share_urls.append(each[ 'share_info' ][ 'share_url' ])
video_urls.append(video_url)
max_cursor = html[ 'max_cursor' ]
has_more = html[ 'has_more' ]
return video_names, video_urls, share_urls, nickname
def get_download_url( self , video_url, watermark_flag):
"""
获得带水印的视频播放地址
parameters:
video_url:带水印的视频播放地址
returns:
download_url: 带水印的视频下载地址
"""
# 带水印视频
if watermark_flag = = true:
download_url = video_url.replace( '/play/' , '/playwm/' )
# 无水印视频
else :
download_url = video_url.replace( '/playwm/' , '/play/' )
return download_url
def video_downloader( self , video_url, video_name, watermark_flag = false):
"""
视频下载
parameters:
video_url: 带水印的视频地址
video_name: 视频名
watermark_flag: 是否下载带水印的视频
returns:
无
"""
size = 0
video_url = self .get_download_url(video_url, watermark_flag = watermark_flag)
with closing(requests.get(video_url, headers = self .headers, stream = true)) as response:
chunk_size = 1024
content_size = int (response.headers[ 'content-length' ])
if response.status_code = = 200 :
sys.stdout.write( ' [文件大小]:%0.2f mb\n' % (content_size / chunk_size / 1024 ))
with open (video_name, 'wb' ) as file :
for data in response.iter_content(chunk_size = chunk_size):
file .write(data)
size + = len (data)
file .flush()
sys.stdout.write( ' [下载进度]:%.2f%%' % float (size / content_size * 100 ) + '\r' )
sys.stdout.flush()
def run( self ):
"""
运行函数
parameters:
none
returns:
none
"""
self .hello()
print ( '搜索api需要登录,暂时使用uid下载\n分享用户页面,用浏览器打开短链接,原始链接中/share/user/后的数字即是uid' )
user_id = input ( '请输入id (例如95006183):' )
user_id = user_id if user_id else '95006183'
watermark_flag = input ( '是否下载带水印的视频 (0-否(默认), 1-是):' )
watermark_flag = watermark_flag if watermark_flag! = ' ' else ' 0 '
watermark_flag = bool ( int (watermark_flag))
type_flag = input ( 'f-收藏的(默认), p-上传的:' )
type_flag = type_flag if type_flag! = ' ' else ' f'
save_dir = input ( '保存路径 (例如"e:/download/", 默认"./download/"):' )
save_dir = save_dir if save_dir else "./download/"
video_names, video_urls, share_urls, nickname = self .get_video_urls(user_id, type_flag)
nickname_dir = os.path.join(save_dir, nickname)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if nickname not in os.listdir(save_dir):
os.mkdir(nickname_dir)
if type_flag = = 'f' :
if 'favorite' not in os.listdir(nickname_dir):
os.mkdir(os.path.join(nickname_dir, 'favorite' ))
print ( '视频下载中:共有%d个作品!\n' % len (video_urls))
for num in range ( len (video_urls)):
print ( ' 解析第%d个视频链接 [%s] 中,请稍后!\n' % (num + 1 , share_urls[num]))
if '\\' in video_names[num]:
video_name = video_names[num].replace( '\\', ' ')
elif '/' in video_names[num]:
video_name = video_names[num].replace( '/' , '')
else :
video_name = video_names[num]
video_path = os.path.join(nickname_dir, video_name) if type_flag! = 'f' else os.path.join(nickname_dir, 'favorite' , video_name)
if os.path.isfile(video_path):
print ( '视频已存在' )
else :
self .video_downloader(video_urls[num], video_path, watermark_flag)
print ( '\n' )
print ( '下载完成!' )
def hello( self ):
"""
打印欢迎界面
parameters:
none
returns:
none
"""
print ( '*' * 100 )
print ( '\t\t\t\t抖音app视频下载小助手' )
print ( '\t\t作者:jack cui、steven7851' )
print ( '*' * 100 )
if __name__ = = '__main__' :
douyin = douyin()
douyin.run()
|
总结
以上所述是小编给大家介绍的python3下载抖音视频的完整代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!原文链接:https://blog.csdn.net/qq_44621510/article/details/90693710