python 爬虫proxy,BeautifulSoup+requests+mysql 爬取样例

时间:2023-03-09 15:22:51
python 爬虫proxy,BeautifulSoup+requests+mysql 爬取样例

实现思路:

由于反扒机制,所以需要做代理切换,去爬取,内容通过BeautifulSoup去解析,最后入mysql库

1.在西刺免费代理网获取代理ip,并自我检测是否可用

2.根据获取的可用代理ip去发送requests模块的请求,带上代理

3.内容入库

注:日志模块在上一篇随笔

下面附上代码

1.可用代理获取

# -*- coding: utf-8 -*-
import random
import time
import requests
from bs4 import BeautifulSoup
import log_config
logger = log_config.getlogger('ip_pool', 'ip_pool.log') class IPProxyPool:
# 初始化,定义一个空数组ip_list用于存储ip代理
def __init__(self): # 代理ip获取网址
self.proxy_url_list = ['http://www.xicidaili.com', 'http://www.xicidaili.com/nn', 'http://www.xicidaili.com/nn/2']
self.ip_list = []
self.headers = {'Host': 'www.xicidaili.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
}
self.user_agent_list = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24"
] def get_xici_all_ip(self):
ip_lists = []
for proxy_url in self.proxy_url_list:
html = requests.get(proxy_url, headers=self.headers)
content = html.text
soup = BeautifulSoup(content, "lxml")
ip_body = soup.find("table", attrs={"id": "ip_list"})
ip_page_lists = ip_body.find_all("tr", attrs={"class": "odd"})
ip_lists = ip_lists + ip_page_lists
return ip_lists def get_ip_list(self):
ip_lists = self.get_xici_all_ip()
ip_test_pool = []
for ip in ip_lists:
http_type = ip.find_all("td")[5].get_text()
if http_type == 'HTTP':
ip_test_account = ip.find_all("td")[1].get_text()
ip_test_port = ip.find_all("td")[2].get_text()
ip_port_dict = {ip_test_account: ip_test_port}
ip_test_pool.append(ip_port_dict) for ipn in ip_test_pool:
ip_addr = "http://"
for ip, port in ipn.items():
ip_addr = ip_addr + ip + ':' + port
# ip代理有效性检验
statu = self.check_ip(ip_addr)
if statu:
# 将有效ip代理存储至数组ip_list中
self.ip_list.append(ip_addr.strip()) def check_ip(self, ip):
return self.microbell_proxy_ip(ip) def microbell_proxy_ip(self, ip):
try:
test_url = 'http://www.microbell.com/elitelist.html'
proxy = {'http': ip}
user_agent = self.random_agent()
headers_agent = {'User-Agent': user_agent}
response_body = requests.get(test_url, headers=headers_agent, proxies=proxy, timeout=5)
if response_body.status_code == 200:
# 即使返回了200,也可能不是我们访问的页面,而是代理给我们的页面,所以还需要做判断
#response_body.encoding('gbk')
content = response_body.text
soup = BeautifulSoup(content, "lxml")
body = soup.find("div", attrs={"class": "index_docmain"})
if body is None:
return False
if body.get_text() != "":
logger.info("ok proxy ip %s" % ip)
return True
else:
return False else:
return False except Exception as e:
logger.exception(e.message)
time.sleep(1)
return False def random_agent(self):
user_agent = random.choice(self.user_agent_list)
return user_agent if __name__ == "__main__":
IPProxyPool = IPProxyPool()
IPProxyPool.get_ip_list()
print IPProxyPool.ip_list # proxies = {
# "http": "http://118.190.95.35:9001" # 代理ip
# }
#
# headers = {
# 'Host': 'www.4399.com',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
#
# }
#
# http_url = "http://www.4399.com/"
# try:
# res = requests.get(url=http_url, headers=headers, proxies=proxies, timeout=3)
# if res.status_code == 200:
# print u"访问网页成功"
# else:
# print "faile"
# except Exception as e:
# print e

2.解析界面,获取想要的内容,并入库

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser
import datetime
import sys
import pymysql
import requests
from bs4 import BeautifulSoup
import log_config
import time
import random
from agent_and_proxy_ip_pool import IPProxyPool logger = log_config.getlogger('report', 'report.log') # 是否获取今天的数据 0 表示获取今天的数据,1表示获取全部数据 2表示页面没有数据
get_today_data = 1
if len(sys.argv) != 1:
if sys.argv[1] == 1:
get_today_data = 1
else:
print 'input error,please input 0->today ,1->all data'
exit() class research_report:
def __init__(self):
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
self.ip_proxy_pool = IPProxyPool()
# self.ip_proxy_pool.get_ip_list()
# self.ip_pool = self.ip_proxy_pool.ip_list
# logger.info('You can currently use IP %s' % self.ip_pool)
#多个可用的cookies
self.cookies_pool = [
'c=; ASPSESSIONIDQCQRQQCR=LEOOBOJCBAMFFDHMFBHFJKEE; __guid=188006958.3779224451650617000.1539657585525.2588; ASPSESSIONIDSATRTTDQ=MCDEIPFDLLKBNHPBBEMGBGFC; safedog-flow-item=C07B93F771; UM_distinctid=16680b1e9e411f-0674a4c85ccc2-454c092b-1fa400-16680b1e9e539d; CNZZDATA1752123=cnzz_eid%3D2075545357-1539752826-%26ntime%3D1539752826; Hm_lvt_d554f0f6d738d9e505c72769d450253d=1539757436; robih=vXuWjYMDvV6XmNxOuNmP; MBpermission=0; MBname=sunyue1993; did=67A671BFE; monitor_count=6; Hm_lpvt_d554f0f6d738d9e505c72769d450253d=1539757719'
]
self.get_today = get_today_data
self.user = conf.get("mysql", "user")
self.mysql_password = conf.get("mysql", "password")
self.database_name = conf.get("mysql", "database")
self.host = conf.get("mysql", "host")
self.port = conf.get("mysql", "port")
self.site_url = 'http://www.microbell.com/'
self.page_url = 'http://www.microbell.com/elitelist_1_0.html'
self.headers = {'Host': 'www.microbell.com',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8',
'Connection': 'keep-alive'
} # 生成随机agent
def get_random_headers(self):
# self.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
self.headers['User-Agent'] = self.ip_proxy_pool.random_agent()
self.headers['Cookie'] = random.choice(self.cookies_pool) # 获取指定数组中的随机ip
def get_random_proxy(self):
proxy_ip = random.choice(self.ip_pool)
proxies = {'http': proxy_ip}
return proxies # 获取列表页面
def get_html_content(self, page_num_url):
try:
self.get_random_headers()
req = requests.get(page_num_url, headers=self.headers, timeout=5)
req.encoding = 'gbk'
text = req.text
soup = BeautifulSoup(text, "lxml")
# soup = body.prettify #美化
report_list = soup.find_all("div", attrs={"class": "classbaogao_sousuo_list"})
list_data = []
logger.info("%s owner %s pages" % (page_num_url, len(report_list)))
if len(report_list) == 0:
return 2
for report_item in report_list:
url = self.site_url + report_item.table.tr.find_all("td")[1].a["href"]
title = report_item.table.tr.find_all("td")[1].a["title"]
item_data = {"url": url, "title": title}
list_data.append(item_data)
end_flag = self.get_list_page_data(list_data)
return end_flag
except Exception as e:
logger.exception("get list %s page fail error info : %s" % (page_num_url, e))
return 2 # 获取一页数据38条的每条的详情
def get_list_page_data(self, list_data):
try:
# 一页数据(38)数组入库一次
page_datas = []
now_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.get_random_headers()
#proxy_ip = self.get_random_proxy()
for item_data in list_data:
retry_num = 0
while retry_num < 3:
try:
# 休眠两秒,避免被封
t = random.uniform(3, 5)
time.sleep(t)
req = requests.get(item_data["url"], headers=self.headers, timeout=5)
req.encoding = 'gbk'
text = req.text
soup = BeautifulSoup(text, "lxml")
detail_div = soup.find("div", attrs={"class": "leftn2"})
tr_s = detail_div.table.find_all("tr")
public_time = tr_s[0].find_all("td")[2].span.get_text()
if self.get_today == 0:
# 如果是爬取今天的数据,会进入这个判断,如果当期获取的文件的时间早于今天就直接退出循环
logger.info("now spider today data")
today = datetime.date.today()
today_time = int(time.mktime(today.timetuple()))
time_array = time.strptime(public_time, "%Y-%m-%d %H:%M:%S")
pub_time = int(time.mktime(time_array))
if pub_time < today_time:
break
abstract_br_replace = soup.find("div", attrs={"class": "p_main"}).p.span
str1 = str(abstract_br_replace).replace("<br/>", r"\r\n")
abstract_object = BeautifulSoup(str1, "lxml")
[s.extract() for s in abstract_object("font")]
abstract = abstract_object.get_text()
sec_name = tr_s[0].find_all("td")[0].span.get_text()
sec_code = tr_s[0].find_all("td")[1].span.get_text()
report_type = tr_s[1].find_all("td")[0].span.get_text()
doc_type = tr_s[1].find_all("td")[1].span.get_text()
author = tr_s[1].find_all("td")[2].span.get_text()
provenance = tr_s[2].find_all("td")[0].span.get_text()
pages = tr_s[2].find_all("td")[1].span.get_text()
rec_rate = tr_s[2].find_all("td")[2].span.get_text()
doc_size = tr_s[3].find_all("td")[0].span.get_text()
promulgator = tr_s[3].find_all("td")[1].span.get_text()
#doc_url_str = soup.find("div", attrs={"class": "anniu_main"}).a["onclick"]
doc_url_str = ""
doc_url_list = doc_url_str.split(",")
doc_url = self.site_url + doc_url_list[2]
title = item_data["title"]
create_time = now_date
update_time = now_date
page_data = [title, sec_name, sec_code, public_time, report_type, doc_type, author, provenance,
pages,
rec_rate, doc_size, doc_url, promulgator, abstract, create_time, update_time]
page_datas.append(page_data)
break
except Exception as e:
retry_num += 1
if retry_num == 3:
logger.warning("current page is not get %s" % item_data)
if len(page_datas) > 0:
self.set_data_mysql(page_datas)
if self.get_today == 0:
if len(page_datas) < 38:
return 0
return 1
else:
return 2
except Exception as e:
logger.error("get detail page fail" % list_data, e)
return 2 # 批量插入mysql
def set_data_mysql(self, page_datas):
# 创建连接
conn = pymysql.connect(host=self.host, port=int(self.port), user=self.user, passwd=self.mysql_password,
db=self.database_name)
try:
# 创建游标
cursor = conn.cursor()
sql = "INSERT INTO report(title,sec_name,sec_code,public_time,report_type,doc_type,author," \
"provenance,pages,rec_rate,doc_size,doc_url,promulgator,abstract,create_time,update_time) " \
"VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
effect_row = cursor.executemany(sql, page_datas)
# 提交sql,不提交不会进入mysql
conn.commit()
logger.info("already into dabatabase %s" % effect_row)
finally:
conn.close() # 登录,获取cookie,暂时没用
# def login_in(self):
# data = {
# 'namelogin': self.user_name,
# 'pwdlogin': self.password
# }
# req = requests.post(self.login_url, headers=self.headers, data=data)
# req.encoding = req.apparent_encoding
# cookies = req.cookies.get_dict()
# print cookies # http://www.microbell.com/elitelist_1_0.html 初始页面,后面的页面只有url中的"1"这个值会变动
def process(self):
# 分析页面,总共不超过360页
if get_today_data == 0:
for i in range(1, 20):
base_url = "http://www.microbell.com/elitelist_%s_0.html" % i
logger.info("当前获取页面url=%s" % base_url)
end_flag = self.get_html_content(base_url)
if end_flag == 0:
logger.info("The page %s is already the last page" % base_url)
break
else:
for i in reversed(range(1, 107)):
base_url = "http://www.microbell.com/elitelist_%s_0.html" % i
logger.info("当前获取页面url=%s" % base_url)
self.get_html_content(base_url) if __name__ == "__main__":
research_class = research_report()
research_class.process()
else:
research_class = research_report()
research_class.process()