一个非常简单的多线程并行URL提取(没有队列)

时间:2021-09-11 20:43:00

I spent a whole day looking for the simplest possible multithreaded URL fetcher in Python, but most scripts I found are using queues or multiprocessing or complex libraries.

我花了一整天的时间在Python中寻找最简单的多线程URL提取器,但我发现的大多数脚本都使用队列或多处理或复杂的库。

Finally I wrote one myself, which I am reporting as an answer. Please feel free to suggest any improvement.

最后我自己写了一个,我作为答案报告。请随时提出任何改进建议。

I guess other people might have been looking for something similar.

我想其他人可能一直在寻找类似的东西。

5 个解决方案

#1


30  

Simplifying your original version as far as possible:

尽可能简化原始版本:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

The only new tricks here are:

这里唯一的新技巧是:

  • Keep track of the threads you create.
  • 跟踪您创建的主题。
  • Don't bother with a counter of threads if you just want to know when they're all done; join already tells you that.
  • 如果你只想知道他们什么时候完成,不要打扰线程的反击;加入已经告诉你了。
  • If you don't need any state or external API, you don't need a Thread subclass, just a target function.
  • 如果您不需要任何状态或外部API,则不需要Thread子类,只需要一个目标函数。

#2


17  

multiprocessing has a thread pool that doesn't start other processes:

多处理有一个不启动其他进程的线程池:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

The advantages compared to Thread-based solution:

与基于线程的解决方案相比的优势:

  • ThreadPool allows to limit the maximum number of concurrent connections (20 in the code example)
  • ThreadPool允许限制最大并发连接数(代码示例中为20)
  • the output is not garbled because all output is in the main thread
  • 输出没有乱码,因为所有输出都在主线程中
  • errors are logged
  • 记录错误
  • the code works on both Python 2 and 3 without changes (assuming from urllib.request import urlopen on Python 3).
  • 代码适用于Python 2和3而不做任何更改(假设从Python 3上的urllib.request import urlopen)。

#3


9  

The main example in the concurrent.futures does everything you want, a lot more simply. Plus, it can handle huge numbers of URLs by only doing 5 at a time, and it handles errors much more nicely.

concurrent.futures中的主要示例可以完成您想要的所有操作,更简单。此外,它可以通过一次只执行5次来处理大量的URL,并且它可以更好地处理错误。

Of course this module is only built in with Python 3.2 or later… but if you're using 2.5-3.1, you can just install the backport, futures, off PyPI. All you need to change from the example code is to search-and-replace concurrent.futures with futures, and, for 2.x, urllib.request with urllib2.

当然这个模块只是用Python 3.2或更高版本构建的......但是如果你使用的是2.5-3.1,你可以在PyPI上安装backport,期货。您需要更改示例代码的所有内容是使用future搜索并替换concurrent.futures,对于2.x,使用urllib2替换urllib.request。

Here's the sample backported to 2.x, modified to use your URL list and to add the times:

以下是向后移植到2.x的示例,修改为使用您的URL列表并添加时间:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

But you can make this even simpler. Really, all you need is:

但你可以使这更简单。真的,你所需要的只是:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data

with futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)

#4


0  

I am now publishing a different solution, by having the worker threads not-deamon and joining them to the main thread (which means blocking the main thread until all worker threads have finished) instead of notifying the end of execution of each worker thread with a callback to a global function (as I did in the previous answer), as in some comments it was noted that such way is not thread-safe.

我现在发布一个不同的解决方案,让工作线程不是-damon并将它们连接到主线程(这意味着阻塞主线程,直到所有工作线程完成),而不是通知一个工作线程的执行结束回调一个全局函数(正如我在上一个答案中所做的那样),正如在一些评论中指出的那样,这种方式不是线程安全的。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)

#5


-1  

This script fetches the content from a set of URLs defined in an array. It spawns a thread for each URL to be fetch, so it is meant to be used for a limited set of URLs.

此脚本从数组中定义的一组URL中提取内容。它为每个要获取的URL生成一个线程,因此它用于一组有限的URL。

Instead of using a queue object, each thread is notifying its end with a callback to a global function, which keeps count of the number of threads running.

每个线程都使用对全局函数的回调来通知其末尾,而不是使用队列对象,该函数记录了运行的线程数。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()

#1


30  

Simplifying your original version as far as possible:

尽可能简化原始版本:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

The only new tricks here are:

这里唯一的新技巧是:

  • Keep track of the threads you create.
  • 跟踪您创建的主题。
  • Don't bother with a counter of threads if you just want to know when they're all done; join already tells you that.
  • 如果你只想知道他们什么时候完成,不要打扰线程的反击;加入已经告诉你了。
  • If you don't need any state or external API, you don't need a Thread subclass, just a target function.
  • 如果您不需要任何状态或外部API,则不需要Thread子类,只需要一个目标函数。

#2


17  

multiprocessing has a thread pool that doesn't start other processes:

多处理有一个不启动其他进程的线程池:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

The advantages compared to Thread-based solution:

与基于线程的解决方案相比的优势:

  • ThreadPool allows to limit the maximum number of concurrent connections (20 in the code example)
  • ThreadPool允许限制最大并发连接数(代码示例中为20)
  • the output is not garbled because all output is in the main thread
  • 输出没有乱码,因为所有输出都在主线程中
  • errors are logged
  • 记录错误
  • the code works on both Python 2 and 3 without changes (assuming from urllib.request import urlopen on Python 3).
  • 代码适用于Python 2和3而不做任何更改(假设从Python 3上的urllib.request import urlopen)。

#3


9  

The main example in the concurrent.futures does everything you want, a lot more simply. Plus, it can handle huge numbers of URLs by only doing 5 at a time, and it handles errors much more nicely.

concurrent.futures中的主要示例可以完成您想要的所有操作,更简单。此外,它可以通过一次只执行5次来处理大量的URL,并且它可以更好地处理错误。

Of course this module is only built in with Python 3.2 or later… but if you're using 2.5-3.1, you can just install the backport, futures, off PyPI. All you need to change from the example code is to search-and-replace concurrent.futures with futures, and, for 2.x, urllib.request with urllib2.

当然这个模块只是用Python 3.2或更高版本构建的......但是如果你使用的是2.5-3.1,你可以在PyPI上安装backport,期货。您需要更改示例代码的所有内容是使用future搜索并替换concurrent.futures,对于2.x,使用urllib2替换urllib.request。

Here's the sample backported to 2.x, modified to use your URL list and to add the times:

以下是向后移植到2.x的示例,修改为使用您的URL列表并添加时间:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

But you can make this even simpler. Really, all you need is:

但你可以使这更简单。真的,你所需要的只是:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data

with futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)

#4


0  

I am now publishing a different solution, by having the worker threads not-deamon and joining them to the main thread (which means blocking the main thread until all worker threads have finished) instead of notifying the end of execution of each worker thread with a callback to a global function (as I did in the previous answer), as in some comments it was noted that such way is not thread-safe.

我现在发布一个不同的解决方案,让工作线程不是-damon并将它们连接到主线程(这意味着阻塞主线程,直到所有工作线程完成),而不是通知一个工作线程的执行结束回调一个全局函数(正如我在上一个答案中所做的那样),正如在一些评论中指出的那样,这种方式不是线程安全的。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)

#5


-1  

This script fetches the content from a set of URLs defined in an array. It spawns a thread for each URL to be fetch, so it is meant to be used for a limited set of URLs.

此脚本从数组中定义的一组URL中提取内容。它为每个要获取的URL生成一个线程,因此它用于一组有限的URL。

Instead of using a queue object, each thread is notifying its end with a callback to a global function, which keeps count of the number of threads running.

每个线程都使用对全局函数的回调来通知其末尾,而不是使用队列对象,该函数记录了运行的线程数。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()