本文实例讲述了php与python实现的线程池多线程爬虫功能。分享给大家供大家参考,具体如下:
多线程爬虫可以用于抓取内容了这个可以提升性能了,这里我们来看php与python 线程池多线程爬虫的例子,代码如下:
php例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
<?php
class Connect extends Worker //worker模式
{
public function __construct()
{
}
public function getConnection()
{
if (!self:: $ch )
{
self:: $ch = curl_init();
curl_setopt(self:: $ch , CURLOPT_TIMEOUT, 2);
curl_setopt(self:: $ch , CURLOPT_RETURNTRANSFER, 1);
curl_setopt(self:: $ch , CURLOPT_HEADER, 0);
curl_setopt(self:: $ch , CURLOPT_NOSIGNAL, true);
curl_setopt(self:: $ch , CURLOPT_USERAGENT, "Firefox" );
curl_setopt(self:: $ch , CURLOPT_FOLLOWLOCATION, 1);
}
/* do some exception/error stuff here maybe */
return self:: $ch ;
}
public function closeConnection()
{
curl_close(self:: $ch );
}
/**
* Note that the link is stored statically, which for pthreads, means thread local
* */
protected static $ch ;
}
class Query extends Threaded
{
public function __construct( $url )
{
$this ->url = $url ;
}
public function run()
{
$ch = $this ->worker->getConnection();
curl_setopt( $ch , CURLOPT_URL, $this ->url);
$page = curl_exec( $ch );
$info = curl_getinfo( $ch );
$error = curl_error( $ch );
$this ->deal_data( $this ->url, $page , $info , $error );
$this ->result = $page ;
}
function deal_data( $url , $page , $info , $error )
{
$parts = explode ( "." , $url );
$id = $parts [1];
if ( $info [ 'http_code' ] != 200)
{
$this ->show_msg( $id , $error );
} else
{
$this ->show_msg( $id , "OK" );
}
}
function show_msg( $id , $msg )
{
echo $id . "\t$msg\n" ;
}
public function getResult()
{
return $this ->result;
}
protected $url ;
protected $result ;
}
function check_urls_multi_pthreads()
{
global $check_urls ; //定义抓取的连接
$check_urls = array ( 'http://xxx.com' => "xx网" ,);
$pool = new Pool(10, "Connect" , array ()); //建立10个线程池
foreach ( $check_urls as $url => $name )
{
$pool ->submit( new Query( $url ));
}
$pool ->shutdown();
}
check_urls_multi_pthreads();
python 多线程
def handle(sid): //这个方法内执行爬虫数据处理
pass
class MyThread(Thread):
"" "docstring for ClassName" ""
def __init__(self, sid):
Thread.__init__(self)
self.sid = sid
def run():
handle(self.sid)
threads = []
for i in xrange(1,11):
t = MyThread(i)
threads.append(t)
t.start()
for t in threads:
t.join()
|
python 线程池爬虫:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
from queue import Queue
from threading import Thread, Lock
import urllib.parse
import socket
import re
import time
seen_urls = set ([ '/' ])
lock = Lock()
class Fetcher(Thread):
def __init__( self , tasks):
Thread.__init__( self )
self .tasks = tasks
self .daemon = True
self .start()
def run( self ):
while True :
url = self .tasks.get()
print (url)
sock = socket.socket()
sock.connect(( 'localhost' , 3000 ))
get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n' . format (url)
sock.send(get.encode( 'ascii' ))
response = b''
chunk = sock.recv( 4096 )
while chunk:
response + = chunk
chunk = sock.recv( 4096 )
links = self .parse_links(url, response)
lock.acquire()
for link in links.difference(seen_urls):
self .tasks.put(link)
seen_urls.update(links)
lock.release()
self .tasks.task_done()
def parse_links( self , fetched_url, response):
if not response:
print ( 'error: {}' . format (fetched_url))
return set ()
if not self ._is_html(response):
return set ()
urls = set (re.findall(r '''(?i)href=["']?([^\s"'<>]+)''' ,
self .body(response)))
links = set ()
for url in urls:
normalized = urllib.parse.urljoin(fetched_url, url)
parts = urllib.parse.urlparse(normalized)
if parts.scheme not in (' ', ' http ', ' https'):
continue
host, port = urllib.parse.splitport(parts.netloc)
if host and host.lower() not in ( 'localhost' ):
continue
defragmented, frag = urllib.parse.urldefrag(parts.path)
links.add(defragmented)
return links
def body( self , response):
body = response.split(b '\r\n\r\n' , 1 )[ 1 ]
return body.decode( 'utf-8' )
def _is_html( self , response):
head, body = response.split(b '\r\n\r\n' , 1 )
headers = dict (h.split( ': ' ) for h in head.decode().split( '\r\n' )[ 1 :])
return headers.get( 'Content-Type' , ' ').startswith(' text / html')
class ThreadPool:
def __init__( self , num_threads):
self .tasks = Queue()
for _ in range (num_threads):
Fetcher( self .tasks)
def add_task( self , url):
self .tasks.put(url)
def wait_completion( self ):
self .tasks.join()
if __name__ = = '__main__' :
start = time.time()
pool = ThreadPool( 4 )
pool.add_task( "/" )
pool.wait_completion()
print ( '{} URLs fetched in {:.1f} seconds' . format ( len (seen_urls),time.time() - start))
|
希望本文所述对大家PHP程序设计有所帮助。