Lecture 02 Infrastructure: RPC & threads
一、多线程挑战
- 共享数据: 使用互斥信号量、或者避免共享
- 线程间协作: 使用channels 或者 waitgroup 来等待所有map线程结束
- 并发粒度:
- 粗粒度: 简单,但是并发性不高
- 细粒度: 更多的并发,但是处理复杂,可能会有更多的冲突和死锁
以下这段代码就能说明并发的粒度问题:
constructTaskArgs := func(phase jobPhase, task int) DoTaskArgs {
debug("task: %d\n", task)
var taskArgs DoTaskArgs
taskArgs.Phase = phase
taskArgs.JobName = jobName
taskArgs.NumOtherPhase = n_other
taskArgs.TaskNumber = task
if phase == mapPhase {
taskArgs.File = mapFiles[task]
}
return taskArgs
}
tasks := make(chan int) // act as task queue
go func() {
for i := 0; i < ntasks; i++ {
tasks <- i
}
}()
successTasks := 0
success := make(chan int)
loop:
for {
select {
case task := <-tasks:
go func() {
worker := <-registerChan
status := call(worker, "Worker.DoTask", constructTaskArgs(phase, task), nil)
if status {
success <- 1
go func() { registerChan <- worker }()
} else {
tasks <- task
}
}()
case <-success:
successTasks += 1
default:
if successTasks == ntasks {
break loop
}
}
}
里面不仅使用了task的channel, 还使用了success (channel) 来控制 successTask 的共享。
二、爬虫并发的问题
网络是一个有环的图,但是我们设计爬虫需要避免环。
- 一方面是因为重复遍历url,没有任何意义
- 另一方面只访问一次url可以减轻目标服务器负担
单线程爬虫:
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}
2.1 并发互斥爬虫
因此需要维护一张visited表来记录是否遍历过url,这里就会出现并发问题。
当T1 检查visited[url] , T2也检查visited[url] 两个线程都会认为没有访问过该url,这时候就会发生冲突,发生WW(write + write) 。解决办法是,维护一个Mutex 互斥信号量来访问visited这张表。
- 判断线程结束
使用sync.WaitGroup来保证线程执行完成
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
if f.fetched[url] {
f.mu.Unlock()
return
}
f.fetched[url] = true
f.mu.Unlock()
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u)
}
done.Wait()
return
}
func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}
2.2 并发通道爬虫
master启动worker去爬取url, worker将url送到同一个通道里面, master从通道获取url去爬取内容
共享的数据:
- 通道
- 发送到 通道的 slices 和 字符串
- 从master发送到worker的参数
//
// Concurrent crawler with channels
//
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func master(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
master(ch, fetcher)
}
三、什么时候使用共享空间和锁 vs 通道
state -- 共享空间和锁
communication -- 通道
waiting for events -- 通道
使用go 的 race dector
四、Remote Procedure Call(RPC)
4.1 软件架构:
客户端 handlers
stubs dispatcher(调度器)
rpc lib rpc lib
网络 ----- 网络
4.2 rpc过程:
- 首先双方定义发送的参数, 和返回的结构体
- 客户端 Dial()创建tcp连接请求 call() 调用rpc库来执行远程调用
- 服务器 声明一个带返回方法的对象 作为rpc处理器, 然后使用rpc库的Register函数来注册服务, rpc库:
- 读取每一个请求
- 为每一个请求创建一个goroutine
- 反序列化请求
- 调用目标函数
- 序列化返回值
- 将返回值通过tcp连接返回
4.3rpc 示例
client:
//
// Client
//
func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}
func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}
func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}
server
//
// Server
//
type KV struct {
mu sync.Mutex
data map[string]string
}
func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}
func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
val, ok := kv.data[args.Key]
if ok {
reply.Err = OK
reply.Value = val
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
return nil
}
func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.data[args.Key] = args.Value
reply.Err = OK
return nil
}
4.3 rpc怎么处理失败
问题:
- 网络延迟
- 丢包
- 服务器慢或者崩溃
处理办法:
- best effort:
- client调用call( ) 等待响应, 如果过了一会没收到响应那就再发送一个call( )
- 这个过程重复几次,然后放弃并且返回一个错误
- at most once:
- 针对服务端说的:当服务端收到相同的请求时
- 根据xid(client id 判断)如果收到相同请求 返回之前的处理结果
- xid 怎么保证唯一性
- 针对服务端说的:当服务端收到相同的请求时
- exactly once:
- 无限重试
- 冗余检查
- 容错服务