使用python 多线程爬取代理ip

时间:2024-05-20 08:00:19

很多时候都需要用到代理ip,一个简单的方式就是写爬虫到网络上爬。这里以 西刺代理 http://www.xicidaili.com/ 为例。

零、简单从浏览器看下网页时怎么打开的:
这里以chrome浏览器为例,按f12打开开发者工具,点击Network开始记录请求。然后在地址栏输入 http://www.xicidaiil.com/nn 按回车,可看见下图:

使用python 多线程爬取代理ip
在右边的name一栏里可以看到打开这个网页时依次有哪些请求(因为打开网页并不止打开了这个网页,还有网页包含的引用的js\css\jpg文件等)。点下nn,右边显示这次请求的总体情况(General),响应头(Response Headers)请求头(Requests Heades)。浏览器通过对这些响应的渲染得到我们看到的画面。在比较简单的情况下,网页的内容不在js等的额外请求中,也就是说,就在nn的响应中,我们可以得到我们需要的全部文字内容。我们需要做的,就是向浏览器请求这些内容并且用程序将我们需要的内容‘摘’出来并存储好。

一、requests库快速入门:
写爬虫python有个很强大的库,requests(中文教程链接:http://docs.python-requests.org/zh_CN/latest/index.html)。使用pip即可安装。

我们写一个简单的例子:

1 import requests
2 r = requests.get('http://119.29.27.158/ip123456789')
3 print r.content
`该链接是一个检测代理的网页,现在我们先不用管怎么用,程序的输出是: 1.2.3.4;None;None (假定你的ip是1.2.3.4)。我们用浏览器打开该网址,可以看到网页的内容跟程序的输出是一毛一样的。(因为为了方便起见,这个网页省去了结构。所以程序的输出跟浏览器渲染后的内容一样。)

 

使用requests库下载网页就是这么简单。不过很多时候网址并不是很欢迎爬虫来,这时候我们就要把自己伪装成浏览器。而最简单的伪装就是定制请求头。请求头是一个字典,然后在requests类的get方法中作为headers参数出入。一般需要定制的参数有三个,user-agent\host\referer。下面是例子:``

d = {}
d[‘user-agent’] = ‘Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1’
d[‘Host’] = ‘www.xicidaili.com
d[‘Referer’] ='http://www.xicidaili.com/nn/'5 r = requests.get(‘http://www.xicidaili.com/nn/1’,headers = d)6 print r.status_code #200


不过有一点需要注意的是host要跟网页的站点对应,不然会响应5XX 服务器错误响应。如果不知道怎么填写可以参照上图中 Requests Headers 的内容。通过判断r.status_code可以判断是正常访问还是被服务器拉黑了。如果响应码是503则是被识别为爬虫,可以休息一下了。

 二、多线程及Queue模块

    爬虫是io密集型,挺适合多线程的。线程间用Queue队列共享内容。这个另外写一篇吧

 三、完整源码

-- coding:utf-8 --

from bs4 import BeautifulSoup as bs
import time
import requests
import bs4
import pymysql
import random
import json
import os
from Queue import Queue
import threading
#初始化user-agent库文件,用闭包不断生成ua
def getUa():
ua = []
if not os.path.isfile(‘ua.txt’):
with open(‘ua.txt’,‘w’) as f:
while True:
line = raw_input(‘init ua,press Enter to finish:’)
if line == ‘’:
break
f.write(line)
with open(‘ua.txt’,‘r’) as f:
for i in f:
ua.append(i[:-1])
lens = len(ua)
def getUa1(ua=ua,lens=lens):
index = random.randrange(lens)
return ua[index]
return getUa1
#初始化数据库文件,返回数据库配置字典
def getIni():
if os.path.isfile(‘shujuku.ini’):
f = open(‘shujuku.ini’,‘r’)
d = json.loads(f.readline())
f.close()
else:
f = open(‘shujuku.ini’,‘w’)
d = {}
while True:
d[‘host’] = raw_input(‘host:’)
d[‘user’] = raw_input(‘use name:’)
d[‘passwd’] = raw_input(‘password:’)
d[‘type’] = raw_input(‘mysql?:’)
d[‘db’] = raw_input(‘database:’)
d[‘table’] = raw_input(‘table:’)
conform = raw_input(‘press ENTER to conform:’)
if conform == ‘’:
break
f.write(json.dumps(d))
f.close()
os.system(‘chmod 660 shujuku.ini’)
return d
#初始化数据库,返回游标
def getTable(d):
conn = pymysql.connect(host =d[u’host’],user=d[u’user’],passwd=d[u’passwd’],db=d[u’type’],charset=‘utf8’)
cur = conn.cursor()
cur.execute(‘USE ‘+d[u’db’])
table = d[u’table’]
return conn,cur,table
#释放游标
def closeTable(conn,cur):
cur.close()
conn.close()
#从dbQ队列读取并写入数据库,打log
def dbWrite(cur,table,dbQ,logQ):
while True:
logQ.put(‘new db write %s’%time.ctime(),1)
d,key = dbQ.get(1)
try:
num = cur.execute(‘SELECT %s FROM %s WHERE %s = “%s”’%(key,table,key,d[key]))
except:
continue
if num != 0 :
continue #exist
keys = [i for i in d.keys()]
values = [d[i].encode(‘utf-8’) for i in keys]
keys = unicode(keys)[1:-1].replace("’",’’).encode(‘utf-8’)
values = str(values)[1:-1].replace("’",’"’)
s = ‘INSERT INTO %s (%s) VALUES (%s);’%(table,keys,values)
try:
cur.execute(s)
cur.connection.commit()
except:
logQ.put(“error:insert:%s %s”%(s,time.ctime()),1)
#数据库support字段为0表示还没验证过的ip
def dbRead(cur,table,num):
num = cur.execute(‘SELECT ip FROM %s WHERE support = 0 LIMIT %d’%(table,num))
return cur.fetchall()
#模仿scrapy生成待抓取列表
def getUrl(todo):
todo = todo
def iters(todo=todo):
if todo!= []:
if todo[0][1] == 0:
todo.pop(0)
url = todo[0][0] + str(todo[0][1])
todo[0][1] -= 1
return unicode(url)
return iters
#生成url的线程
def writeUrlQ(urlQ,todo,logQ):
urlF = getUrl(todo)
while True:
logQ.put(‘new url %s’%time.ctime(),1)
urls = urlF()
if urls == None:
break
urlQ.put(urls,1)
#生成ua的线程
def writeUaQ(uaQ,logQ):
uas = getUa()
while True:
logQ.put(‘new ua %s’%time.ctime(),1)
uaQ.put(uas(),1)
#打log的线程
def writeLogQ(logQ):
with open(‘daili.log’,‘w’) as f:
while True:
logs = logQ.get(1)
logs = logs + ‘\n’
f.write(unicode(logs).encode(‘utf-8’))
f.flush()
#在抓取的最后将没抓取到的页面再抓一次
def solveWrong(urlQ,wrong):
while wrong!= []:
urlQ.put(wrong.pop(),1)
#抓取
def parse(urlQ,uaQ,logQ,cur,table,wrong,dbQ):
d1 = {}
d1[‘host’] = ‘www.xicidaili.com
d1[‘user-agent’]= uaQ.get(1) d1[‘Connection’] = ‘Keep-alive’
d1[‘Cache-Control’] = ‘max-age = 0’ d1[‘Update-Insecure-Requests’] = ‘1’ d1[‘Accept’] = ‘text/html,application/xhtml+xml,application/xml;q=0.9,image/webp./;q=0.8’ d1[‘Accept-Encoding’] = ‘gzip,deflate,sdch’ d1[‘Accept-Language’] = ‘zh-CN,zh;q=0.8’

r = requests.Session()    sleepT = 3600 发现被ban时的睡眠时间
while True:
    logQ.put('new parse %s'%time.ctime(),1)
    urls = urlQ.get(1)
    ref = urls.split('/')
    if int(ref[-1] ) >1:
        ref[-1] = unicode(int(ref[-1])-1)
    ref = '/'.join(ref)
    d1['referer'] =ref
    try:
        res = r.get(urls,headers = d1,timeout=5)
    except:
        logQ.put("Error:timeout:%s"%urls)
        d1['user-agent']= uaQ.get(1)
        continue        #如果页面果断或者不是200则可能有问题
    if len(res.content) < 1000 or res.status_code != 200:
        logQ.put('Wrong: url is: %s,status is %s,ua is %s,time:%s '%(urls,str(res.status_code),d1['user-agent'],time.ctime()),1)
        wrong.append(urls)
        r = requests.Session()
        d1['user-agent']= uaQ.get(1)
        if res.status_code == 503:                sleepT += 1800
            time.sleep(sleepT) #感觉是直接禁ip了,换ua头也没办法,休息一下吧
        continue
    #使用bs4解析
    text = ''.join(res.content.split('\n'))
    b = bs(text,'lxml')
    for i in b.table.children:
        if type(i) is bs4.element.Tag:
            l = i.findAll('td')
            if len(l)<5:
                continue
            ip = l[1].get_text()+':'+l[2].get_text()
            location = ''.join(l[3].get_text().split(' '))
            d = {'ip':ip,'location':location,'support':'0'}
            dbQ.put((d,'ip'))
    time.sleep(3)

#验证ip
def check(cur,table,logQ): while True: ret = dbRead(cur,table,20) for i in ret: ip = i[0] proxies = {‘http’:ip} try: r = requests.get(‘http://119.29.27.158/ip123456789’,proxies = proxies,timeout = 5) if (r.content.split(’:’)[0] == ip.split(’:’)[0]) and (r.content.split(’:’)[1] == ‘None’) and (r.content.split(’:’)[2] == ‘None’): cur.execute(‘UPDATE ip SET support = “1” WHERE ip = “%s”’%ip) logQ.put(“get %s %s”%(ip,time.ctime())) else: cur.execute(‘UPDATE ip SET support = “2” WHERE ip = “%s”’%ip) logQ.put(“miss1 %s %s”%(ip,time.ctime())) except: print ‘timeout’ cur.execute(‘UPDATE ip SET support = “2” WHERE ip = “%s”’%ip) logQ.put(“miss2 %s %s”%(ip,time.ctime())) finally: print cur.fetchone() cur.connection.commit() if len(ret)<20: print ‘check done’ break
#待抓取列表
todo =[[ ‘http://www.xicidaili.com/nn/’,145]]

urlQ = Queue(32)
logQ = Queue(32)
uaQ = Queue(4)
dbQ = Queue(32)
checkQ = Queue(32)
threads = []
wrong = []

d = getIni()
conn,cur,table = getTable(d)

threads.append(threading.Thread(target=writeUrlQ ,args = (urlQ,todo,logQ)))
threads.append(threading.Thread(target= writeUaQ,args = (uaQ,logQ)))
threads.append(threading.Thread(target= writeLogQ,args = (logQ,)))
threads.append(threading.Thread(target= dbWrite,args = (cur,table,dbQ,logQ)))
for i in range(3):
threads.append(threading.Thread(target= parse,args = (urlQ,uaQ,logQ,cur,table,wrong,dbQ)))

for i in threads:
i.start()

threads[0].join()

threads.append(threading.Thread(target= solveWrong,args = (urlQ,wrong)))
threads[-1].start()

threads.append(threading.Thread(target= check,args = (cur,table,logQ)))
threads[-1].start()
threads[-1].join()

closeTable(conn,cur)

最后在数据库里

SELECT count(ip) FROM table WHERE support = 1;