Golang 的类Scrapy爬虫实现

时间:2022-02-03 20:38:47

核心部分

N0.1 Request/Response

type Response struct {
    Url  string
    Body string
    Meta *map[string]string
}

type Request struct {
    Url   string
    Parse func(response *Response)
    Meta  *map[string]string
}

请求与响应的封装结构体,对于 ResponseUrl 为请求时的 UrlBody 为此 Url 的网页源码, Meta 为上一层传过来的数据(之所以用指针是因为指针省内存);

对于 Request 结构, Url 是需要请求的链接, Parse 是该 Url 对应的解析函数, Meta 是传给 Parse 函数参数 response 的数据。

No.2 请求池/数据池

var RequestChan = make(chan *Request, 100)        // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池

请求池 RequestChan 用于存放等待完成的请求,数据池 DataChan 用于存放还未处理的数据。

No.3 细节部分

var wg sync.WaitGroup                             //定义一个同步等待的组
var swg sync.WaitGroup                            // 等待数据处理结束
var Save = func(data *map[string]string) {        // 保存数据的函数
    fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数

    defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
    resp, err1 := http.Get(url)
    if err1 != nil {
        statusCode = -100
        return
    }
    defer resp.Body.Close()
    data, err2 := ioutil.ReadAll(resp.Body)
    if err2 != nil {
        statusCode = -200
        return
    }
    statusCode = resp.StatusCode
    content = string(data)
    return
}

func doRequest(request *Request) {
    defer wg.Done()
    html, sta := Download(request.Url)
    if sta != 200 {
        return
    }
    request.Parse(&Response{request.Url, html, request.Meta})
}

func save() {
    defer swg.Done()
    for {
        data := <-DataChan
        if data == nil {
            break
        }
        Save(data)
    }
}

对于请求与数据的处理,我们需要全部完成,且并发数量要限制在一定范围内,因此我们定义了 wgswg 两个协程组, Save 变量是一个函数,之所以定义成一个变量,是因为 Save 是提供给用户操作的,例如用户可能将数据保存在文件、数据库、云盘等等地方,我们只提供一个默认保存方案,即向控制台输出数据。同理 Download 变量也是一个函数,用于处理下载问题。doRequest 函数不提供给用户操作,因此其首字母小写,其作用是处理请求 Requestsave 函数是我们的数据处理函数,从数据池中取出数据,然后调用用户定义的 Save 方法处理数据。

NO.4 调度器Scheduler(核心)

func Scheduler(threads int) {
    go save()
    swg.Add(1)
    for {
        for i := 0; i < threads; i++ { // 每次开启这么多线程
            req := <-RequestChan
            if req == nil {
                DataChan <- nil
                return
            }
            go doRequest(req)
            wg.Add(1)
        }
        wg.Wait()
    }
    swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}

调度器的作用是调度请求,首先,调度器先开启数据处理线程 save 然后再依次开启 threads 个线程处理请求,当前一批 request 请求完了后,再从请求池里取出 threadsrequest 进行请求,当所有请求完毕后,将数据池的队尾写入 nil 提示所有 request 都处理完毕了, 如果数据也处理完毕,则可结束数据处理线程,此时调度结束,控制权交给用户的 main 函数。

NO.5 完整代码

package myspider

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
)

type Response struct {
    Url  string
    Body string
    Meta *map[string]string
}

type Request struct {
    Url   string
    Parse func(response *Response)
    Meta  *map[string]string
}

var RequestChan = make(chan *Request, 100)        // 请求池
var DataChan = make(chan *map[string]string, 100) // 数据池
var wg sync.WaitGroup                             //定义一个同步等待的组
var swg sync.WaitGroup                            // 等待数据处理结束
var Save = func(data *map[string]string) {        // 保存数据的函数
    fmt.Println(data)
}
var Download = func(url string) (content string, statusCode int) { // 网页源码下载函数

    defer func() { fmt.Printf("get<%d>: %s\n", statusCode, url) }()
    resp, err1 := http.Get(url)
    if err1 != nil {
        statusCode = -100
        return
    }
    defer resp.Body.Close()
    data, err2 := ioutil.ReadAll(resp.Body)
    if err2 != nil {
        statusCode = -200
        return
    }
    statusCode = resp.StatusCode
    content = string(data)
    return
}

func doRequest(request *Request) {
    defer wg.Done()
    html, sta := Download(request.Url)
    if sta != 200 {
        return
    }
    request.Parse(&Response{request.Url, html, request.Meta})
}

func save() {
    defer swg.Done()
    for {
        data := <-DataChan
        if data == nil {
            break
        }
        Save(data)
    }
}

func Scheduler(threads int) {
    go save()
    swg.Add(1)
    for {
        for i := 0; i < threads; i++ { // 每次开启这么多线程
            req := <-RequestChan
            if req == nil {
                DataChan <- nil
                return
            }
            go doRequest(req)
            wg.Add(1)
        }
        wg.Wait()
    }
    swg.Wait() // 必须在这里等待,不然数据有可能还没处理完就挂掉了
}

示例

NO.1 古诗词网(纵向结构)

package main

import (
    . "./myspider"
    "regexp"
)

/** aspx">雨打梨花深闭门,忘了青春,误了青春。</a><span style=" color:#65645F; margin-top:-7px; float:left; margin-left:5px; margin-right:10px;">____</span><a style=" float:left;" target="_blank" href="/view_72645.aspx">唐寅《一剪梅·雨打梨花深闭门》</a> </div> */
var textItem = regexp.MustCompile(`aspx">(.*?)<.*?aspx">(.*?)</a>`)

// <a style="width:60px;" href="Default.aspx?p=2&amp;c=&amp;t=">下一页</a>
var nextReg = regexp.MustCompile(`href="(.*?)">下一页</a>`)
var lll = 0

func parse(response *Response) {
    nt := nextReg.FindStringSubmatch(response.Body)
    if len(nt) == 2 && lll < 2 { // 有下一页
        lll++
        RequestChan <- &Request{"http://so.gushiwen.org/mingju/" + nt[1], parse, nil}
    } else { // 最后一页,设置标志
        defer func() { RequestChan <- nil }()
    }
    res := textItem.FindAllStringSubmatch(response.Body, 100)
    for _, it := range res {
        if len(it) == 3 {
            data := make(map[string]string)
            data["text"] = it[1]
            data["title"] = it[2]
            DataChan <- &data
        }
    }

}

func main() {
    RequestChan <- &Request{"http://so.gushiwen.org/mingju/", parse, nil}
    Scheduler(4)
}

执行过程:首先生成第一个请求,扔到请求池,然后开始同时处理4个请求的调度过程。

NO.3 8edy电影网(纵横结构)

// httpwww.8edy.tvkh
package main

import (
    . "./myspider"
    "fmt"
    "os"
    "regexp"
)

// <a href="/movie/28293/" target="_blank">独立日2</a>
var detReg = regexp.MustCompile(`"(/movie/[0-9]*?/)" target="_blank">`)

// <a href="/kh/p2/">下一页</a>
var nt = regexp.MustCompile(`"(/kh/p[0-9]*?)">下一页`)

var cur = 0
var max = 2

func parse1(response *Response) {
    cur++
    for _, link := range detReg.FindAllStringSubmatch(response.Body, 40) { // 具体电影情况
        RequestChan <- &Request{"http://www.8edy.tv" + link[1], parse2, nil}
    }
    next := nt.FindStringSubmatch(response.Body)
    if len(next) > 0 && cur < max { // 下一页链接
        RequestChan <- &Request{"http://www.8edy.tv" + next[1], parse1, nil}
    } else { // 最后一页
        RequestChan <- nil
    }
}

/* <div class="nrjb_tlt"><h1 class="lvzi">末日危城</h1> <br>类型:动作电影 科幻电影 <br> 年份:2008<br> 地区:欧美<br> 语言: <br> 格式:MP4 / 3GP <br> 导演:<a href=" /do/s?wd=乌维·鲍尔" target="_blank">乌维·鲍尔</a><br> <div class="zhuyan"><span class="mvdf">主演:</span><p class="nrnmd"> 杰森·斯坦森 雷·利奥塔 克莱尔·弗兰妮</p></div></div> */
var titleReg = regexp.MustCompile(`<h1 class="lvzi">(.*?)</h1>`)
var ftypeReg = regexp.MustCompile(`类型:(.*?)<br/>`)
var yearReg = regexp.MustCompile(`年份:(.*?)<br/>`)
var areaReg = regexp.MustCompile(`地区:(.*?)<br/>`)
var formReg = regexp.MustCompile(`格式:(.*?)<br/>`)

func parse2(response *Response) {
    html := response.Body
    data := make(map[string]string)
    data["title"] = titleReg.FindStringSubmatch(html)[1]
    data["type"] = ftypeReg.FindStringSubmatch(html)[1]
    data["year"] = yearReg.FindStringSubmatch(html)[1]
    data["area"] = areaReg.FindStringSubmatch(html)[1]
    data["form"] = formReg.FindStringSubmatch(html)[1]
    DataChan <- &data
}

func main() {
    file, err := os.Create("8edy.csv")
    defer file.Close()
    if err != nil {
        fmt.Println(err)
        return
    }
    file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", "标题", "类型", "年份", "地区", "格式"))
    Save = func(data *map[string]string) {
        file.WriteString(fmt.Sprintf("%s,%s,%s,%s,%s\n", (*data)["title"], (*data)["type"], (*data)["year"], (*data)["area"], (*data)["form"]))
    }
    RequestChan <- &Request{"http://www.8edy.tv/kh/", parse1, nil}
    Scheduler(10)
}

过程与上面的一样,这个程序多一个自定义数据存储过程。

更多更新在这里:https://github.com/ChenL1994/GoScrapy