【containerd 1.0 源码分析】containerd 启动流程分析

时间:2024-05-21 21:40:35



前言

    containerd 只是一个守护进程,容器的实际运行时由 runC 控制。containerd 主要职责是镜像管理(镜像、元信息等)、容器执行(调用最终运行时组件执行)
【containerd 1.0 源码分析】containerd 启动流程分析



一. containerd 源码编译


   需要安装依赖包:btrfs-tools
    直接 make 即可生成 ctr containerd containerd-shim binaries 可执行文件


二. containerd main 函数


   2.1 入口目录为 cmd/containerd/main.go 中 main 函数,NewApp 使用了第三个一个命令行设置,设置名字,以及命令行启动参数,子命令等
app := cli.NewApp()
app.Name = "containerd"
app.Version = version.Version
app.Usage = usage
app.Flags = []cli.Flag{
       cli.StringFlag{
              Name:  "config,c",
              Usage: "path to the configuration file",
              Value: defaultConfigPath,
       },
       cli.StringFlag{
              Name:  "log-level,l",
              Usage: "set the logging level [debug, info, warn, error, fatal, panic]",
       },
       cli.StringFlag{
              Name:  "address,a",
              Usage: "address for containerd's GRPC server",
       },
       cli.StringFlag{
              Name:  "root",
              Usage: "containerd root directory",
       },
}
app.Commands = []cli.Command{
       configCommand,
}

    2.1 Action 这个函数设置的内容比较多,默认的 config 为:root 为 /var/lib/containerd,默认 socket 地址为 /run/containerd./containerd.sock,server.New 在 2.1.1 讲解
app.Action = func(context *cli.Context) error {
       if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {

       // apply flags to the config
       if err := applyFlags(context, config); err != nil {
    
       address := config.GRPC.Address

       server, err := server.New(ctx, config)
   
       if config.Debug.Address != "" {
              l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid)
       }
       if config.Metrics.Address != "" {
              l, err := net.Listen("tcp", config.Metrics.Address)
      
              serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics)
       }

       l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid)

       serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC)

       return handleSignals(ctx, signals, server)
}


    2.1.1 New 函数创建以及初始化 containerd server:
  • 创建 /var/lib/containerd 目录
  • 从 /var/lib/containerd/plugins 加载 plugin
  • 建立 GRPC server
  • 初始化各个插件
  • 最后注册服务就是各个插件里面的实现的接口 Register
// New creates and initializes a new containerd server
func New(ctx context.Context, config *Config) (*Server, error) {
       if err := os.MkdirAll(config.Root, 0711); err != nil {
              return nil, err
       }
       if err := apply(ctx, config); err != nil {
              return nil, err
       }
       plugins, err := loadPlugins(config)
  
       rpc := grpc.NewServer(
              grpc.UnaryInterceptor(interceptor),
              grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
       )
     
       for _, p := range plugins {
              id := p.URI()

              initContext := plugin.NewContext(
                     ctx,
                     initialized,
                     config.Root,
                     id,
              )
              initContext.Events = s.events
              initContext.Address = config.GRPC.Address

              // load the plugin specific configuration if it is provided
              if p.Config != nil {
                     pluginConfig, err := config.Decode(p.ID, p.Config)
                  
                     initContext.Config = pluginConfig
              }
              instance, err := p.Init(initContext)

              if types, ok := initialized[p.Type]; ok {
                     types[p.ID] = instance
              } else {
                     initialized[p.Type] = map[string]interface{}{
                            p.ID: instance,
                     }
              }
              // check for grpc services that should be registered with the server
              if service, ok := instance.(plugin.Service); ok {
                     services = append(services, service)
              }
       }
       // register services after all plugins have been initialized
       for _, service := range services {
              if err := service.Register(rpc); err != nil {
                     return nil, err
              }
       }
       return s, nil
}

     Init 函数 以及 Register 函数在第四章后讲解,各个插件初始,实例化并注册服务


三. plugin

 
Registration 数据结构:
type Registration struct {
       Type     PluginType
       ID       string
       Config   interface{}
       Requires []PluginType
       Init     func(*InitContext) (interface{}, error)

       added bool
}
     
init 函数初始化类型包括
const (
       RuntimePlugin     PluginType = "io.containerd.runtime.v1"
       GRPCPlugin        PluginType = "io.containerd.grpc.v1"
       SnapshotPlugin    PluginType = "io.containerd.snapshotter.v1"
       TaskMonitorPlugin PluginType = "io.containerd.monitor.v1"
       DiffPlugin        PluginType = "io.containerd.differ.v1"
       MetadataPlugin    PluginType = "io.containerd.metadata.v1"
       ContentPlugin     PluginType = "io.containerd.content.v1"
)


四. plugin:io.containerd.grpc.v1.containers


路径: services/containers/service.go,ID 为 containers,提供基础元素的存储
func init() {
       plugin.Register(&plugin.Registration{
              Type: plugin.GRPCPlugin,
              ID:   "containers",
              Requires: []plugin.PluginType{
                     plugin.MetadataPlugin,
              },
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     m, err := ic.Get(plugin.MetadataPlugin)
                     if err != nil {
                            return nil, err
                     }
                     return NewService(m.(*bolt.DB), ic.Events), nil
              },
       })
}


实例化为 Service 结构体

type Service struct {
       db        *bolt.DB
       publisher events.Publisher
}

func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer {
       return &Service{db: db, publisher: publisher}
}

Register 接口实现,注册 GRPC
func (s *Service) Register(server *grpc.Server) error {
       api.RegisterContainersServer(server, s)
       return nil
}

可以有如下 GRPC API 方法
type ContainersServer interface {
       Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
       List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
       Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
       Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
       Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error)
}

根据下面这个对应哪个 handler 处理
var _Containers_serviceDesc = grpc.ServiceDesc{
       ServiceName: "containerd.services.containers.v1.Containers",
       HandlerType: (*ContainersServer)(nil),
       Methods: []grpc.MethodDesc{
              {
                     MethodName: "Get",
                     Handler:    _Containers_Get_Handler,
              },
              {
                     MethodName: "List",
                     Handler:    _Containers_List_Handler,
              },
              {
                     MethodName: "Create",
                     Handler:    _Containers_Create_Handler,
              },
              {
                     MethodName: "Update",
                     Handler:    _Containers_Update_Handler,
              },
              {
                     MethodName: "Delete",
                     Handler:    _Containers_Delete_Handler,
              },
       },
       Streams:  []grpc.StreamDesc{},
       Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto",
}


五. plugin:io.containerd.snapshotter.v1.btrfs


路径:snapshot/btrfs/btrfs.go,ID 为 btrfs
func init() {
       plugin.Register(&plugin.Registration{
              ID:   "btrfs",
              Type: plugin.SnapshotPlugin,
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     return NewSnapshotter(ic.Root)
              },
       })
}
    
实例化为 snapshotter 结构体
type snapshotter struct {
       device string // device of the root
       root   string // root provides paths for internal storage.
       ms     *storage.MetaStore
}


路径 snapshot/overlay/overlay.go,注册插件 overlayfs
func init() {
       plugin.Register(&plugin.Registration{
              Type: plugin.SnapshotPlugin,
              ID:   "overlayfs",
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     return NewSnapshotter(ic.Root)
              },
       })
}

io.containerd.diff.v1 类型
    路径 differ/differ.go,注册插件 base-diff
func init() {
       plugin.Register(&plugin.Registration{
              Type: plugin.DiffPlugin,
              ID:   "base-diff",
              Requires: []plugin.PluginType{
                     plugin.ContentPlugin,
                     plugin.MetadataPlugin,
              },
              Init: func(ic *plugin.InitContext) (interface{}, error) {
                     c, err := ic.Get(plugin.ContentPlugin)
                     if err != nil {
                            return nil, err
                     }
                     md, err := ic.Get(plugin.MetadataPlugin)
                     if err != nil {
                            return nil, err
                     }
                     return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
              },
       })
}

六. plugin:io.containerd.runtime.v1.cgroup


路径:linux/runtime.go,ID 为 linux
func init() {
       plugin.Register(&plugin.Registration{
              Type: plugin.RuntimePlugin,
              ID:   "linux",
              Init: New,
              Requires: []plugin.PluginType{
                     plugin.TaskMonitorPlugin,
                     plugin.MetadataPlugin,
              },
              Config: &Config{
                     Shim:    defaultShim,
                     Runtime: defaultRuntime,
              },
       })
}

init 函数为 New
func New(ic *plugin.InitContext) (interface{}, error) {
       if err := os.MkdirAll(ic.Root, 0711); err != nil {
              return nil, err
       }
       monitor, err := ic.Get(plugin.TaskMonitorPlugin)
       if err != nil {
              return nil, err
       }
       m, err := ic.Get(plugin.MetadataPlugin)
       if err != nil {
              return nil, err
       }
       cfg := ic.Config.(*Config)
       r := &Runtime{
              root:      ic.Root,
              remote:    !cfg.NoShim,
              shim:      cfg.Shim,
              shimDebug: cfg.ShimDebug,
              runtime:   cfg.Runtime,
              monitor:   monitor.(runtime.TaskMonitor),
              tasks:     runtime.NewTaskList(),
              db:        m.(*bolt.DB),
              address:   ic.Address,
              events:    ic.Events,
       }
       tasks, err := r.restoreTasks(ic.Context)
       if err != nil {
              return nil, err
       }
       for _, t := range tasks {
              if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
                     return nil, err
              }
       }
       return r, nil
}

返回结构体 Runtime
type Runtime struct {
       root      string
       shim      string
       shimDebug bool
       runtime   string
       remote    bool
       address   string

       monitor runtime.TaskMonitor
       tasks   *runtime.TaskList
       db      *bolt.DB
       events  *events.Exchange
}


   所有插件就不一一列举了,同样方法执行,至此建立 GRPC server 启动完毕。

总结:
   加载插件,根据插件建立 GRPC API 服务