GO 实现高并发高可用分布式系统:Log微服务的实现

时间:2022-06-01 20:47:06

GO 实现高并发高可用分布式系统:Log微服务的实现

本文转载自微信公众号「Coding迪斯尼」,作者陈屹。转载本文请联系Coding迪斯尼公众号。

在大数据时代,具备高并发,高可用,理解微服务系统设计的人员需求很大,如果你想从事后台开发,在JD的描述中最常见的要求就是有所谓的“高并发”系统开发经验。但我发现在市面上并没有直接针对“高并发”,“高可用”的教程,你搜到的资料往往都是只言片语,要不就是阐述那些令人摸不着头脑的理论。但是技术的掌握必须从实践中来,我找了很久发现很少有指导人动手实践基于微服务的高并发系统开发,因此我希望结合自己的学习和实践经验跟大家分享一下这方面的技术,特别是要强调具体的动手实践来理解和掌握分布式系统设计的理论和技术。

所谓“微服务”其实没什么神奇的地方,它只不过是把我们原来聚合在一起的模块分解成多个独立的,基于服务器程序存在的形式,假设我们开发的后台系统分为日志,存储,业务逻辑,算法逻辑等模块,以前这些模块会聚合成一个整体形成一个复杂庞大的应用程序:

GO 实现高并发高可用分布式系统:Log微服务的实现

这种方式存在很多问题,第一是过多模块糅合在一起会使得系统设计过于复杂,因为模块直接存在各种逻辑耦合,这使得随着时间的推移,系统的开发和维护变得越来越困难。第二是系统越来越脆弱,只要其中一个模块发送错误或奔溃,整个系统可能就会垮塌。第三是可扩展性不强,系统很难通过硬件性能的增强而实现相应扩展。

要实现高并发,高可用,其基本思路就是将模块拆解,然后让他们成为独立运行的服务器程序,各个模块之间通过消息发送的方式完成配合:

GO 实现高并发高可用分布式系统:Log微服务的实现

这种模式的好处在于:1,模块之间解耦合,一个模块出问题对整个系统影响很小。2,可扩展,高可用,我们可以将模块部署到不同服务器上,当流量增加,我们只要简单的增加服务器数量就能使得系统的响应能力实现同等扩展。3,鲁棒性增强,由于模块能备份多个,其中一个模块出问题,请求可以重定向到其他同样模块,于是系统的可靠性能大大增强。

当然任何收益都有对应代价,分布式系统的设计开发相比于原来的聚合性系统会多出很多难点。例如负载均衡,服务发现,模块协商,共识达成等,分布式算法强调的就是这些问题的解决,但是理论总是抽象难以理解,倘若不能动手实现一个高可用高并发系统,你看多少理论都是雾里看花,越看越糊涂,所以我们必须通过动手实践来理解和掌握理论,首先我们从最简单的服务入手,那就是日志服务,我们将使用GO来实现。

首先创建根目录,可以命名为go_distributed_system,后面所有服务模块都实现在该目录下,然后创建子目录proglog,进去后我们再创建子目录internel/server/在这里我们实现日志服务的逻辑模块,首先在internel/server下面执行初始化命令:

  1. go mod init internal/server

这里开发的模块会被其他模块引用,所以我们需要创建mod文件。首先我们需要完成日志系统所需的底层数据结构,创建log.go文件,相应代码如下:

  1. package server
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type Log struct {
  7. mu sync.Mutex
  8. records [] Record
  9. }
  10. func NewLog() *Log {
  11. return &Log{ch : make(chan Record),}
  12. }
  13. func(c *Log) Append(record Record) (uint64, error) {
  14. c.mu.Lock()
  15. defer c.mu.Unlock()
  16. record.Offset = uint64(len(c.records))
  17. c.records = append(c.records, record)
  18. return record.Offset, nil
  19. }
  20. func (c *Log) Read(offset uint64)(Record, error) {
  21. c.mu.Lock()
  22. defer c.mu.Unlock()
  23. if offset >= uint64(len(c.records)) {
  24. return Record{}, ErrOffsetNotFound
  25. }
  26. return c.records[offset], nil
  27. }
  28. type Record struct {
  29. Value []byte `json:"value"`
  30. Offset uint64 `json:"offset"`
  31. }
  32. var ErrOffsetNotFound = fmt.Errorf("offset not found")

由于我们的日志服务将以http服务器程序的方式接收日志读写请求,因此多个读或写请求会同时执行,所以我们需要对records数组进行互斥操作,因此使用了互斥锁,在每次读取records数组前先获得锁,这样能防止服务在同时接收多个读写请求时破坏掉数据的一致性。

所有的日志读写请求会以http POST 和 GET的方式发起,数据通过json来封装,所以我们下面将创建一个http服务器对象,新建文件http.go,完成如下代码:

  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "github.com/gorilla/mux"
  6. )
  7. func NewHttpServer(addr string) *http.Server {
  8. httpsrv := newHttpServer()
  9. r := mux.NewRouter()
  10. r.HandleFunc("/", httpsrv.handleLogWrite).Methods("POST")
  11. r.HandleFunc("/", httpsrv.hadnleLogRead).Methods("GET")
  12. return &http.Server{
  13. Addr : addr,
  14. Handler: r,
  15. }
  16. }
  17. type httpServer struct{
  18. Log *Log
  19. }
  20. func newHttpServer() *httpServer {
  21. return &httpServer {
  22. Log: NewLog(),
  23. }
  24. }
  25. type WriteRequest struct {
  26. Record Record `json:"record"`
  27. }
  28. type WriteResponse struct {
  29. Offset uint64 `json:"offset"`
  30. }
  31. type ReadRequest struct {
  32. Offset uint64 `json:"offset"`
  33. }
  34. type ReadResponse struct {
  35. Record Record `json:"record"`
  36. }
  37. func (s *httpServer) handleLogWrite(w http.ResponseWriter, r * http.Request) {
  38. var req WriteRequest
  39. //服务以json格式接收请求
  40. err := json.NewDecoder(r.Body).Decode(&req)
  41. if err != nil {
  42. http.Error(w, err.Error(), http.StatusBadRequest)
  43. return
  44. }
  45. off, err := s.Log.Append(req.Record)
  46. if err != nil {
  47. http.Error(w, err.Error(), http.StatusInternalServerError)
  48. return
  49. }
  50. res := WriteResponse{Offset: off}
  51. //服务以json格式返回结果
  52. err = json.NewEncoder(w).Encode(res)
  53. if err != nil {
  54. http.Error(w, err.Error(), http.StatusInternalServerError)
  55. return
  56. }
  57. }
  58. func (s *httpServer) hadnleLogRead(w http.ResponseWriter, r *http.Request) {
  59. var req ReadRequest
  60. err := json.NewDecoder(r.Body).Decode(&req)
  61. if err != nil {
  62. http.Error(w, err.Error(), http.StatusBadRequest)
  63. return
  64. }
  65. record, err := s.Log.Read(req.Offset)
  66. if err == ErrOffsetNotFound {
  67. http.Error(w, err.Error(), http.StatusNotFound)
  68. return
  69. }
  70. if err != nil {
  71. http.Error(w, err.Error(), http.StatusInternalServerError)
  72. return
  73. }
  74. res := ReadResponse{Record: record}
  75. err = json.NewEncoder(w).Encode(res)
  76. if err != nil {
  77. http.Error(w, err.Error(), http.StatusInternalServerError)
  78. return
  79. }
  80. }

上面代码显示出“分布式”,“微服务”的特点。相应的功能代码以单独服务器的形式运行,通过网络来接收服务请求,这对应“分布式”,每个独立模块只完成一个特定任务,这就对应“微服务”,由于这种方式可以同时在不同的机器上运行,于是展示了“可扩展性”。

同时服务既然以http 服务器的形式存在,因此服务的请求和返回也要走Http形式,同时数据以Json方式进行封装。同时实现的逻辑很简单,但有日志写请求时,我们把请求解析成Record结构体后加入到队列末尾,当有读取日志的请求时,我们获得客户端发来的读取偏移,然后取出对应的记录,封装成json格式后返回给客户。

完成了服务器的代码后,我们需要将服务器运行起来,为了达到模块化的目的,我们把服务器的启动放置在另一个地方,在proglog根目录下创建cmd/server,在里面添加main.go:

  1. package main
  2. import (
  3. "log"
  4. "internal/server"
  5. )
  6. func main() {
  7. srv := server.NewHttpServer(":8080")
  8. log.Fatal(srv.ListenAndServe())
  9. }

同时为了能够引用internal/server下面的模块,我们需要在cmd/server下先通过go mod init cmd/server进行初始化,然后在go.mod文件中添加如下一行:

  1. replace internal/server => ../../internal/server

然后执行命令 go mod tidy,这样本地模块就知道根据给定的目录转换去引用模块,最后使用go run main.go启动日志服务,现在我们要做的是测试服务器的可用性,我们同样在目录下创建server_test.go,然后编写测试代码,基本逻辑就是想服务器发送日志写请求,然后再发送读请求,并比较读到的数据是否和我们写入的数据一致,代码如下:

  1. package main
  2. import(
  3. "encoding/json"
  4. "net/http"
  5. "internal/server"
  6. "bytes"
  7. "testing"
  8. "io/ioutil"
  9. )
  10. func TestServerLogWrite(t *testing.T) {
  11. var tests = []struct {
  12. request server.WriteRequest
  13. want_response server.WriteResponse
  14. } {
  15. {request: server.WriteRequest{server.Record{[]byte(`this is log request 1`), 0}},
  16. want_response: server.WriteResponse{Offset: 0, },},
  17. {request: server.WriteRequest{server.Record{[]byte(`this is log request 2`), 0}},
  18. want_response: server.WriteResponse{Offset: 1, },},
  19. {request: server.WriteRequest{server.Record{[]byte(`this is log request 3`), 0}},
  20. want_response: server.WriteResponse{Offset: 2, },},
  21. }
  22. for _, test := range tests {
  23. //将请求转换成json格式并post给日志服务
  24. request := &test.request
  25. request_json, err := json.Marshal(request)
  26. if err != nil {
  27. t.Errorf("convert request to json fail")
  28. return
  29. }
  30. resp, err := http.Post("http://localhost:8080", "application/json",bytes.NewBuffer(request_json))
  31. defer resp.Body.Close()
  32. if err != nil {
  33. t.Errorf("http post request fail: %v", err)
  34. return
  35. }
  36. //解析日志服务返回结果
  37. body, err := ioutil.ReadAll(resp.Body)
  38. var response server.WriteResponse
  39. err = json.Unmarshal([]byte(body), &response)
  40. if err != nil {
  41. t.Errorf("Unmarshal write response fail: %v", err)
  42. }
  43. //检测结果是否与预期一致
  44. if response.Offset != test.want_response.Offset {
  45. t.Errorf("got offset: %d, but want offset: %d", response.Offset, test.want_response.Offset)
  46. }
  47. }
  48. var read_tests = []struct {
  49. request server.ReadRequest
  50. want server.ReadResponse
  51. } {
  52. {request: server.ReadRequest{Offset : 0,},
  53. want: server.ReadResponse{server.Record{[]byte(`this is log request 1`), 0}} },
  54. {request: server.ReadRequest{Offset : 1,},
  55. want: server.ReadResponse{server.Record{[]byte(`this is log request 2`), 0}} },
  56. {request: server.ReadRequest{Offset : 2,},
  57. want: server.ReadResponse{server.Record{[]byte(`this is log request 3`), 0}} },
  58. }
  59. for _, test := range read_tests {
  60. request := test.request
  61. request_json , err := json.Marshal(request)
  62. if err != nil {
  63. t.Errorf("convert read request to json fail")
  64. return
  65. }
  66. //将请求转换为json并放入GET请求体
  67. client := &http.Client{}
  68. req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", bytes.NewBuffer(request_json))
  69. req.Header.Set("Content-Type", "application/json")
  70. resp, err := client.Do(req)
  71. if err != nil {
  72. t.Errorf("read request fail: %v", err)
  73. return
  74. }
  75. //解析读请求返回的结果
  76. defer resp.Body.Close()
  77. body, err := ioutil.ReadAll(resp.Body)
  78. var response server.ReadResponse
  79. err = json.Unmarshal([]byte(body), &response)
  80. if err != nil {
  81. t.Errorf("Unmarshal read response fail: %v", err)
  82. return
  83. }
  84. res := bytes.Compare(response.Record.Value, test.want.Record.Value)
  85. if res != 0 {
  86. t.Errorf("got value: %q, but want value : %q", response.Record.Value, test.want.Record.Value)
  87. }
  88. }
  89. }

完成上面代码后,使用go test运行,结果如下图所示:

GO 实现高并发高可用分布式系统:Log微服务的实现

从结果看到,我们的测试能通过,也就是无论是向日志服务提交写入请求还是读取请求,所得的结果跟我们预想的一致。总结一下,本节我们设计了一个简单的JSON/HTTP 日志服务,它能够接收基于JSON的http写请求和读请求,后面我们还会研究基于gPRC技术的微服务开发技术.

代码获取

https://github.com/wycl16514/golang_distribute_system_log_service.git

原文链接:https://mp.weixin.qq.com/s/gE1-52dvqlzsnBevlT-kFQ