import requests
from fake_useragent import UserAgent
from requests.exceptions import Timeout
from urllib.parse import quote, unquote
import re, json, os, hashlib
from lxml import etree
import time
from multiprocessing import Process, Queue, Pool # 之前想使用多进程,通过队列处理图片下载。没有实现
userAgent = UserAgent()
headers = {
"User-Agent": userAgent.random,
"Host": "tuchong.com",
"Referer": "https://tuchong.com/explore/"
}
baseUrl = "https://tuchong.com/rest/tag-categories/"
baseTagUrl = "https://tuchong.com/rest/tags/"
tagReferer = "https://tuchong.com/tags/"
timeout = 5
s = requests.Session()
dic = {
"subject": [],
"style": [],
"equipment": [],
"location": [],
}
categoriesDict = {
"subject": "题材",
"style": "风格",
"equipment": "器材",
"location": "地区",
}
def getCategoryPage(url, category, page=1):
try:
url = url + category
params = {
"page": page,
"count": 20
}
response = s.get(url=url, headers=headers, timeout=timeout, params=params)
if response.status_code == 200:
response.category = category
return response
except Timeout as e:
print(e)
return None
def getTagNameUrl(response):
if not response:
return None
data_dict = response.json()
tag_list = data_dict.get("data").get("tag_list")
tag_name_list = [tag.get("tag_name") for tag in tag_list]
return tag_name_list
def getNextPageUrl(response):
if not response:
return []
data_dict = response.json()
pages = int(data_dict.get("data").get("pages"))
for page in range(2, pages + 1):
yield page
def getAllTag():
global dic
s.get(url="https://tuchong.com/explore/", headers=headers, timeout=timeout)
for category in categoriesDict.keys():
print("获取 -{}- 第 <{}> 页tagName信息.........".format(categoriesDict.get(category), 1))
response = getCategoryPage(url=baseUrl, category=category)
tag_name_list = getTagNameUrl(response) or []
dic.get(category).extend(tag_name_list)
time.sleep(1)
for page in getNextPageUrl(response):
print("获取 -{}- 第 <{}> 页tagName信息.........".format(categoriesDict.get(category), page))
response = getCategoryPage(url=baseUrl, category=category, page=page)
tag_name_list = getTagNameUrl(response) or []
dic.get(category).extend(tag_name_list)
time.sleep(1)
def getTagPage(url, tag, page):
tag = quote(tag)
url = url + tag + "/posts"
params = {
"page": page,
"count": 20,
"order": "weekly"
}
headers["Referer"] = tagReferer + tag + "/"
try:
response = requests.get(url=url, params=params, headers=headers, timeout=timeout)
if response.status_code == 200:
return response
except Timeout as e:
print(e)
return None
def getImagesInfo(response):
print('---')
if not response:
return None
result = response.json().get("result")
if result == "INVALID":
print("数据取完了")
return None
postList = response.json().get("postList")
imageUrlList = [dic.get("url") for dic in postList]
titleList = [dic.get("title").strip() for dic in postList]
for img_url_title in zip(titleList, imageUrlList):
img_url_title = list(img_url_title)
yield img_url_title
def get_md5(img_url):
m = hashlib.md5()
m.update(bytes(img_url, encoding="utf-8"))
return m.hexdigest()
def download(imgsUrl):
if imgsUrl:
for img_url in imgsUrl:
response = requests.get(url=img_url)
name = get_md5(img_url)
print("正在下载{}...".format(img_url))
with open(os.path.join(BASE_PATH, name) + ".jpg", "wb") as f:
f.write(response.content)
def gogo(tagname):
page = 1
while True:
response = getTagPage(url=baseTagUrl, tag=tagname, page=page)
print("开始爬取 {} 第 {} 页...".format(tagname, page))
info = getImagesInfo(response) or []
if not response:
return
for info_tuple in info:
imgsUrl = putImageUrl(info_tuple)
download(imgsUrl)
page += 1
time.sleep(5)
def putImageUrl(img_url_title_list):
if img_url_title_list:
img_url = img_url_title_list[1]
try:
response = s.get(url=img_url, headers=headers, timeout=timeout)
html = etree.HTML(response.text)
imgsUrl = html.xpath("//article[@class='post-content']/img/@src")
return imgsUrl
except requests.exceptions.ConnectionError as e:
print(e)
return None
def downloadImage():
for key in dic:
tagname_list = dic.get(key)
for tagname in tagname_list:
gogo(tagname)
def run():
getAllTag()
print("所有tag信息获取完毕.........")
print("开始获取每个tag的内容.........")
downloadImage()
if __name__ == '__main__':
BASE_PATH = r"D:\tuchong"
run()