网络爬虫学习:借助DeepSeek完善爬虫软件,增加停止任务功能

时间:2025-02-05 06:58:57

一、引言

我从24年11月份开始学习网络爬虫应用开发,经过2个来月的努力,终于完成了开发一款网络爬虫软件的学习目标。这几天对本次学习及应用开发进行一下回顾总结。前面已经发布了两篇日志:

网络爬虫学习:应用selenium从搜*狐搜索爬取新闻结果的数据

网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开

这是第三篇日志,记录如何为软件增加停止任务功能。

二、问题描述

软件初步开发完成后,我打包成exe文件提供给小伙伴试用,有小伙伴问我,软件怎么没有停止任务的功能?如果点击开始按钮后,又不想爬取了,想换个关键字再爬,但没有停止功能,就只能等着软件完成当前的所有任务或者关闭软件,这样不合理,建议我增加一个停止功能。

这个需求是合情合理的,但奈何我的能力有限,对爬虫知识学得不够深入,关于怎么停止当前正在执行的任务还真不会。但小伙伴提出的改进意见,那么我就要努力去完善,这样也是对自己能力的提升。

三、借助DeepSeek完善软件

在前一篇日志里,我记录了通过向DeepSeek提问,获得了自动下载对应版本msedgedriver的方法,让我的爬虫软件可以在小伙伴的电脑上正常运行。这次我决定继续向DeepSeek提问,获得实现停止爬虫任务的方法。

我一共向DeepSeek提了3个问题,最终获得了我想要的答案。

第1问:“我使用python语言开发爬虫软件,用到了threading.Thread和concurrent.futures,使爬虫在多线程下运行。我现在遇到了一个问题,当开始执行爬虫任务后,我该如何终止爬虫任务?

DeepSeek思考了22秒给了我答案,答案里有两个方法。这两个方法都是通过捕获 KeyboardInterrupt(Ctrl+C)或注册信号处理器,来停止所有线程。我在pycharm上尝试运行了这两个方法中的代码,发现都需要通过按下Ctrl+F2来停止爬虫任务,而我是希望在GUI上添加一个停止按钮,点击按钮后,停止爬虫任务,显然DeepSeek提供的答案并没有达到我的目的。

第1问的答案不符合我的需求,应该是我的问题描述还不够详细,我继续提问。

第2问:“上面给出的答案需要按Ctrl+C才能终止多线程爬虫任务,而我要开发的是带GUI的爬虫软件,该软件使用到了wxpython,我希望在GUI中设置一个停止按钮,当用户点击这个按钮后,能够停止多线程爬虫任务。

这一次,DeepSeek给了我一个基本符合要求的答案,不过DeepSeek提供的Python代码直接运行会报错,需要对错误的地方做一下修改。在pycharm上实现了点击停止按钮后中断爬虫任务的目的。

(DeepSeek的解答)

(DeepSeek提供的代码运行会报错,需要修改)

(在pycharm上运行代码,可以实现停止爬虫任务)

不过我在对上述代码进行深入研究,并尝试按照我的编程风格和实际需求进行修改后发现了其中的不足。由此我提出了第3问。

第3问:“上面的答案有效,不过有一个不足,就是最终实现代码中的CrawlerTask方法里模拟爬取操作代码做得很简单,且又是在while循环中,导致如果不点击中断会无限循环下去。而实际上,爬虫任务基本是有限次数的,任务完成后会通过break退出while循环。我向知道的是在爬虫任务完成后通过break退出while循环后,如何将self.start_btn设置为Enable,将self.stop_btn设置为Disable

我的这一问虽然把需求都提出来了,但是语句不够精炼,并且还要错别字。DeepSeek的回答真是让人惊艳,它用更精炼的文字,将我的问题准确的描述了出来。

好的,我现在需要解决用户的问题。用户之前已经得到了如何在GUI中通过停止按钮终止多线程爬虫的解决方案,但现在他指出了代码中的一个不足:CrawlerTask中的模拟爬取操作过于简单,且使用了无限循环,而实际爬虫任务应该是有限次数的,完成后自动停止。用户想知道当任务自然完成时,如何正确更新按钮状态(启用开始按钮,禁用停止按钮)。

DeepSeek给出了详细的分析和具体的实现步骤,真的很赞。

(第2问的代码需要修改的地方)

(DeepSeek优化后的代码,依然有bug,需要纠错后才能运行)

(关于优化代码的关键改进说明)

(两种典型场景验证)

我将DeepSeek提供的优化后的代码赋值到pycharm中,修改掉其中的bug,运行代码,让爬虫任务运行完成,完成后软件会自动将开始按钮启用,将停止按钮禁用。

另外,DeepSeek还给出了扩展建议,我对这部分内容也进行了吸收融合。

四、功能实现

我对DeepSeek给出的答案进行了研究和融合,最终得到了具有以下功能的示例:

1.软件的GUI使用wxpython(我基本使用wxpython来制作GUI);

2.使用 threading.Event() 作为全局停止标志,所有爬虫线程定期检查 stop_event.is_set()

3.使用wx.CallAfter实现跨线程安全更新 GUI

4.软件使用的线程池ThreadPoolExecutor,并且界面中添加了滑动条控制线程池的 max_workers 参数实现动态线程数调整,默认值为3,可以在1-6之间调整; 

5.示例模拟爬取6个url,每个url生成一个任务占用一个线程,每个任务随机爬取数量不同的页数,通过异常重试机制让每一页的数据均有3次的重试机会;

6.为每个future添加回调函数,触发状态检查;

7.在CheckThreadsStatus方法中,当所有future完成时,更新按钮状态;

8.支持任务进度显示,添加计数器统计已完成任务;

9.使用Queue线程安全队列收集结果,并能在任务结束后,查看结果。

10.按钮状态逻辑:

        初始状态下:"开始爬取"按钮启用,“停止爬取”和“查看结果”按钮停用;

        执行爬虫任务状态下:"停止爬取"按钮启用,“开始爬取”和“查看结果”按钮停用;

        任务完成或停止状态下:"开始爬取"和“查看结果”按钮按钮启用,“停止爬取”停用;

有了这个模版,就可以参照它对我的爬虫软件进行改进,从而实现增加停止爬虫任务的功能了。

(软件初始状态)

(正常执行完所有爬虫任务)

(强制停止爬虫任务)

(查看爬取的结果)

五、代码展示

最后放上实现上述功能的示例代码供参考,可以直接运行。

import wx
import threading
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time
import random


class CrawlerGUI(wx.Frame):
    def __init__(self):
        super().__init__(parent=None, title="多线程爬虫工具", size=wx.Size(480, 560))
        self.stop_event = threading.Event()  # 全局停止标志
        self.executor = None  # 线程池
        self.futures = []  # 记录所有 Future 对象
        self.max_workers = 3  # 同时执行线程数
        self.result_queue = Queue()  # 线程安全队列,用于传递爬取的结果
        self.completed_tasks = 0  # 任务完成数统计

        # 初始化 UI
        self.InitUI()
        self.Centre()
        self.Show()

    def InitUI(self):
        panel = wx.Panel(self)
        vbox = wx.BoxSizer(wx.VERTICAL)

        # 控制按钮区域
        self.start_btn = wx.Button(panel, label="开始爬取")
        self.stop_btn = wx.Button(panel, label="停止爬取")
        self.thread_slider = wx.Slider(panel, value=3, minValue=1, maxValue=6, style=wx.SL_HORIZONTAL | wx.SL_LABELS)
        self.view_btn = wx.Button(panel, label="查看结果")
        self.log_text = wx.TextCtrl(panel, style=wx.TE_MULTILINE | wx.TE_READONLY)
        self.stop_btn.Enable(False)  # 初始状态为不可点击
        self.view_btn.Enable(False)  # 初始状态为不可点击

        # 按钮事件绑定
        self.start_btn.Bind(wx.EVT_BUTTON, self.OnStart)
        self.stop_btn.Bind(wx.EVT_BUTTON, self.OnStop)
        self.thread_slider.Bind(wx.EVT_SLIDER, self.OnThreadChange)
        self.view_btn.Bind(wx.EVT_BUTTON, self.OnView)
        self.Bind(wx.EVT_CLOSE, self.on_close)  # 绑定关闭事件处理器

        # 布局
        hbox = wx.BoxSizer(wx.HORIZONTAL)
        hbox.Add(self.start_btn, 0, wx.ALL, 5)
        hbox.Add(self.stop_btn, 0, wx.ALL, 5)
        hbox.Add(self.view_btn, 0, wx.ALL, 5)
        hbox.Add(self.thread_slider, 0, wx.ALL, 5)
        vbox.Add(hbox, 0, wx.EXPAND)
        vbox.Add(self.log_text, 1, wx.EXPAND | wx.ALL, 5)
        panel.SetSizer(vbox)

    def OnStart(self, event):
        """开始爬取按钮事件"""
        event.Skip()
        self.start_btn.Disable()
        self.stop_btn.Enable()
        self.view_btn.Disable()
        self.log_text.SetLabel("")  # 清空
        self.log_text.AppendText("爬虫已启动...\n")
        self.completed_tasks = 0  # 任务完成数重置

        # 重置停止标志
        self.stop_event.clear()

        # 提交任务(示例URL列表)
        urls = [f"https://example.com/{i}" for i in range(1, 7)]  # 示例6个URL
        # 初始化线程池(默认使用3个线程)
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)

        # 提交任务并绑定回调
        self.futures = []
        for url in urls:
            future = self.executor.submit(self.CrawlerTask, url)
            future.add_done_callback(self.OnTaskDone)  # 关键:添加完成回调
            self.futures.append(future)

    def OnTaskDone(self, future):
        """单个任务完成时的回调"""
        wx.CallAfter(self.CheckThreadsStatus)  # 安全触发状态检查
        self.completed_tasks += 1
        wx.CallAfter(self.UpdateProgress)

    def CrawlerTask(self, url):
        """处理单个URL的爬取任务"""
        if self.stop_event.is_set():
            return

        try:
            # 模拟有限次数的爬取操作(实际替换为真实抓取逻辑)
            pages = random.randint(4, 10)  # 生成随机数,让每次抓取的页数不同
            for page in range(1, pages):  # 假设每个URL需要抓取多个页面(页数小于pages)
                if self.stop_event.is_set():
                    break

                retries = 3
                while retries > 0 and not self.stop_event.is_set():
                    try:
                        # 模拟抓取过程
                        time.sleep(0.5)
                        # 在CrawlerTask中保存结果
                        data = f'{url}  page{page}  data'  # 模拟爬取到的数据
                        self.result_queue.put(data)
                        msg = f"抓取 {url} 第{page}页完成\n"
                        wx.CallAfter(self.log_text.AppendText, msg)

                        # 此处添加实际抓取代码:
                        # response = requests.get(url, timeout=5)
                        # ...

                        break
                    except Exception:
                        retries -= 1
                        time.sleep(1)
            # 任务自然完成时提示
            if not self.stop_event.is_set():
                wx.CallAfter(self.log_text.AppendText, f"{url} 任务完成!\n")
        except Exception as e:
            wx.CallAfter(self.log_text.AppendText, f"错误: {str(e)}\n")

    def CheckThreadsStatus(self):
        """检查线程状态并更新UI"""
        if all(future.done() for future in self.futures):
            self.start_btn.Enable()
            self.stop_btn.Disable()
            self.view_btn.Enable()
            self.log_text.AppendText("所有任务已完成!\n")
            self.executor.shutdown()  # 关闭线程池

    def UpdateProgress(self):
        """更新任务完成进度"""
        progress = f"完成进度: {self.completed_tasks}/{len(self.futures)}"
        self.log_text.AppendText(progress + "\n")

    def OnStop(self, event):
        """停止爬取按钮事件"""
        event.Skip()
        self.stop_btn.Disable()
        self.log_text.AppendText("正在停止爬虫...\n")
        self.stop_event.set()

        # 取消未开始的任务
        for future in self.futures:
            future.cancel()

        # 强制关闭线程池(如果使用Python 3.9+)
        if self.executor:
            self.executor.shutdown(wait=False)

        wx.CallAfter(self.CheckThreadsStatus)

    def OnThreadChange(self, event):
        """变更线程数"""
        event.Skip()
        self.max_workers = self.thread_slider.GetValue()

    def OnView(self, event):
        """查看结果"""
        event.Skip()
        wx.CallAfter(self.ProcessResults)

    def ProcessResults(self):
        """处理结果"""
        self.log_text.SetLabel("")  # 清空
        while not self.result_queue.empty():
            data = self.result_queue.get()
            # 更新GUI或保存到文件
            self.log_text.AppendText(f"{data}\n")



if __name__ == "__main__":
    app = wx.App()
    CrawlerGUI()
    app.MainLoop()