实现

时间:2024-04-04 17:53:12

探针

probe模块主要负责探针功能,侦测要下载的文件是否包含Content-Length的HTTP头部。如果存在,那么会返回分块下载的文件大小,具体代码如下:

package probe

import (
	"fmt"
	"log"
	"net/http"
	"strconv"
)

type Probe struct {
	workers int
	url     string
}

func NewProbe(worker int, url string) *Probe {
	return &Probe{
		workers: worker,
		url:     url,
	}
}

func (p *Probe) GetFileSize() (int, error) {
	var size = -1

	client := &http.Client{}
	req, err := http.NewRequest("HEAD", p.url, nil)
	if err != nil {
		log.Fatal(err)
	}
	resp, err := client.Do(req)
	if err != nil {
		log.Fatal(err)
	}

	if header, ok := resp.Header["Content-Length"]; ok {
		fileSize, err := strconv.Atoi(header[0])
		if err != nil {
			log.Fatal("File size could not be determined : ", err)
		}

		size = fileSize / p.workers
	} else {
		log.Fatal("File size was not provided!")
		return size, fmt.Errorf("file size was not provided.")
	}
	return size, nil
}

通过发送一条HEAD的HTTP请求来拿到目标文件的大小,从而确定并发下载的分块大小。

下载器

接下来是下载器部分,首先定义下载器的结构体

type Downloader struct {
	result  chan Part
	size    int
	workers int
}

下载器包括了一个由文件分块组成的channel,它的定义如下

type Part struct {
	Data  []byte
	Index int
}

包含了文件分块的数据流以及对应索引顺序。同时下载器也定义了分块下载的大小,并发数量。

func (d *Downloader) Download(index int, url string) {
	client := &http.Client{}

	// calculate offset by multiplying
	// index with size
	start := index * d.size

	// Write data range in correct format
	// I'm reducing one from the end size to account for
	// the next chunk starting there
	dataRange := fmt.Sprintf("bytes=%d-%d", start, start+d.size-1)

	// if this is downloading the last chunk
	// rewrite the header. It's an easy way to specify
	// getting the rest of the file
	if index == d.workers-1 {
		dataRange = fmt.Sprintf("bytes=%d-", start)
	}
	log.Println(dataRange)

	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		// TODO: restart download
		return
	}
	req.Header.Add("Range", dataRange)
	resp, err := client.Do(req)
	if err != nil {
		// TODO: restart download
		return
	}

	defer resp.Body.Close()
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		// TODO: restart download
		return
	}

	d.result <- Part{Index: index, Data: body}
}

当执行下载操作时,该方法将向下载请求添加标头Range。此标头将指定要获取文件的哪些部分。HTTP请求完成后,数据将写入函数调用时传递的通道。
当下载开始后,不需要等待下载完成,可以直接开始合并分块文件,原理在于golang的channel本身就具有并发的属性。从channel中持续读取已经下载好的分块文件,然后根据索引顺序写入本地文件中。

func (d *Downloader) Merge(filename string) error {
	log.Println("start to merge data")

	parts := make([][]byte, d.workers)
	counter := 0
	for part := range d.result {
		counter++
		parts[part.Index] = part.Data
		if counter == d.workers {
			break
		}
	}
	log.Println("sort data as original order")

	file := []byte{}
	for _, part := range parts {
		file = append(file, part...)
	}
	log.Println("write data into buffer array")
	err := ioutil.WriteFile(filename, file, 0777)
	return err
}

运行

至此,我们可以编写一个main函数来测试并发下载器。下载的目标文件是http://212.183.159.230/512MB.zip ,大小为512MB,我们控制并发数为5,测试下载到本地的时间。

package main

import (
	"flag"
	"log"
	"time"

	"go-store/applications/downloader/download"
	"go-store/applications/downloader/probe"
)

var (
	// to test internet
	url = flag.String("url", "http://212.183.159.230/512MB.zip", "download url")
	// number of goroutines to spawn for download.
	workers = flag.Int("worker", 5, "concurrent downloader number")
	// filename for downloaded file
	filename = flag.String("file", "data.zip", "downloaded filename")
)

func main() {
	flag.Parse()
	start := time.Now()
	probe := probe.NewProbe(*workers, *url)
	size, err := probe.GetFileSize()
	if err != nil {
		panic(err)
	}

	results := make(chan download.Part, *workers)
	downloader := download.NewDownloader(results, size, *workers)
	for i := 0; i < *workers; i++ {
		go downloader.Download(i, *url)
	}

	err = downloader.Merge(*filename)
	end := time.Now()
	if err != nil {
		panic(err)
	}

	log.Println("cost time: ", end.Sub(start))
}

结果如下

song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ go build main.go 
song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ ./main 
2023/02/26 12:13:59 bytes=429496728-
2023/02/26 12:13:59 bytes=107374182-214748363
2023/02/26 12:13:59 bytes=214748364-322122545
2023/02/26 12:13:59 bytes=322122546-429496727
2023/02/26 12:13:59 bytes=0-107374181
2023/02/26 12:14:21 start to merge data
2023/02/26 12:14:21 sort data as original order
2023/02/26 12:14:23 write data into buffer array
2023/02/26 12:14:23 cost time:  24.43482453s

用时约25s。对比直接下载该文件

song@ubuntu20-04:~/Downloads$ curl http://212.183.159.230/512MB.zip -o 512M.zip
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  512M  100  512M    0     0  14.6M      0  0:00:34  0:00:34 --:--:-- 17.9M

用时34s,并发下载器的速度提升了10s左右。