本文实例讲述了Python 实现的微信爬虫。分享给大家供大家参考,具体如下:
单线程版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import urllib.request
import urllib.parse
import urllib.error
import re,time
headers = ( "User-Agent" ,
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" )
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
try :
# proxy = urllib.request.ProxyHandler({'http':proxy_addr}) ##使用代理版
# operner = urllib.request.build_opener()
# urllib.request.install_opener(operner)
headers = ( "User-Agent" ,
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" )
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
data = urllib.request.urlopen(url).read().decode( 'utf-8' )
# print (data)
return data
except urllib.error.URLError as e:
if hasattr (e, "code" ):
print (e.code)
elif hasattr (e, "reason" ):
print (e.reason)
except Exception as e:
print ( "exception" + str (e))
time.sleep( 1 )
##获取要爬取的url
def get_url(key, pagestart, pageend):
try :
keycode = urllib.parse.quote(key)
for page in range (pagestart, pageend + 1 ):
url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (
keycode, page)
data1 = use_proxy(url)
#print("data1的内容是", data1)
listurl_pattern = '<h3>.*?("http://.*?)</h3>'
result = re. compile (listurl_pattern, re.S).findall(data1)
for i in range ( len (result)):
res = result[i].replace( "amp;" , " ").split(" ")[0].replace(" \" ", " ")
list_url.append(res)
#print(list_url)
return list_url
except urllib.error.URLError as e:
if hasattr (e, "code" ):
print (e.code)
elif hasattr (e, "reason" ):
print (e.reason)
except Exception as e:
print ( "exception:" , e)
##通过获取的url爬行内容数据并处理
def get_url_content(list_url):
fh1 = open ( "D:\\python-script\\1.html" , 'wb' )
html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
fh1.write(html1.encode( "utf-8" ))
fh1.close()
fh = open ( "D:\\python-script\\1.html" , 'ab' )
for url in list_url:
data_content = use_proxy(url)
#print (data_content)
#sys.exit()
title_pattern = '<h2.*>.*?</h2>'
result_title = re. compile (title_pattern, re.S).findall(data_content)
##标题(str)
res_title = result_title[ 0 ].replace( "<h2 class=\"rich_media_title\" id=\"activity-name\">" , " ").replace(" < / h2>",
"").strip()
content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
content = re. compile (content_pattern, re.S).findall(data_content)
try :
fh.write(res_title.encode( "utf-8" ))
for i in content:
fh.write(i.strip().encode( "utf-8" ))
except UnicodeEncodeError as e:
continue
fh.write( "</body></html>" .encode( "utf-8" ))
if __name__ = = '__main__' :
pagestart = 1
pageend = 2
key = "人工智能"
get_url(key, pagestart, pageend)
get_url_content(list_url)
|
多线程版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import urllib.request
import urllib.parse
import urllib.error
import re,time
import queue
import threading
headers = ( "User-Agent" , "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" )
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
urlque = queue.Queue()
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
try :
# proxy = urllib.request.ProxyHandler({'http':proxy_addr})
# operner = urllib.request.build_opener()
# urllib.request.install_opener(operner)
headers = ( "User-Agent" ,
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36" )
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
data = urllib.request.urlopen(url).read().decode( 'utf-8' )
#print (data)
return data
except urllib.error.URLError as e:
if hasattr (e, "code" ):
print (e.code)
elif hasattr (e, "reason" ):
print (e.reason)
except Exception as e:
print ( "exception" + str (e))
time.sleep( 1 )
###获取文章的url连接,并将连接加入到队列
class get_url(threading.Thread):
def __init__( self ,key,pagestart,pageend,urlque):
threading.Thread.__init__( self )
self .pagestart = pagestart
self .pageend = pageend
self .key = key
self .urlque = urlque
def run( self ):
try :
keycode = urllib.parse.quote( self .key)
for page in range ( self .pagestart, self .pageend + 1 ):
url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (keycode,page)
data = use_proxy(url)
print ( "data1的内容是" ,data)
listurl_pattern = '<h3>.*?("http://.*?)</h3>'
result = re. compile (listurl_pattern,re.S).findall(data)
print (result)
if len (result) = = 0 :
print ( "没有可用的url" )
sys.exit()
for i in range ( len (result)):
res = result[i].replace( "amp;" ," ").split(" ")[0].replace(" \" " ," ")
#list_url.append(res) #加入列表
self .urlque.put(res) ##加入队列
self .urlque.task_done()
#return list_url
except urllib.error.URLError as e:
if hasattr (e, "code" ):
print (e.code)
elif hasattr (e, "reason" ):
print (e.reason)
except Exception as e:
print ( "exception:" ,e)
##根据url获取文章内容
class get_url_content(threading.Thread):
def __init__( self ,urlque):
threading.Thread.__init__( self )
self .urlque = urlque
def run( self ):
fh1 = open ( "D:\\python-script\\1.html" , 'wb' )
html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
fh1.write(html1.encode( "utf-8" ))
fh1.close()
fh = open ( "D:\\python-script\\1.html" , 'ab' )
while True :
try :
url = self .urlque.get()
data_content = use_proxy(url)
title_pattern = '<h2.*>.*?</h2>'
result_title = re. compile (title_pattern, re.S).findall(data_content)
##标题
res_title = result_title[ 0 ].replace( "<h2 class=\"rich_media_title\" id=\"activity-name\">" , " ").replace(" < / h2> "," ").strip()
content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
content = re. compile (content_pattern, re.S).findall(data_content)
#c = '<p style="max-width: 100%;box-sizing: border-box;min-height: 1em;text-indent: 2em;word-wrap: break-word !important;">'
# for i in content:
# ##内容
# c_content=i.replace(c, "").replace("<br /></p>", "").replace("</p>", "")
fh.write(res_title.encode( "utf-8" ))
for i in content:
fh.write(i.strip().encode( "utf-8" ))
except UnicodeEncodeError as e:
continue
fh.close()
class contrl(threading.Thread):
def __init__( self ,urlqueue):
threading.Thread.__init__( self )
self .urlqueue = urlqueue
while True :
print ( "程序正在执行" )
if self .urlqueue.empty():
time.sleep( 3 )
print ( "程序执行完毕" )
exit()
if __name__ = = '__main__' :
pagestart = 1
pageend = 2
key = "人工智能"
get_url = get_url(key,pagestart,pageend,urlque)
get_url.start()
get_content = get_url_content(urlque)
get_content.start()
cntrol = contrl(urlque)
cntrol.start()
|
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://www.cnblogs.com/FRESHMANS/p/8125594.html