前言
containerd 只是一个守护进程,容器的实际运行时由 runC 控制。containerd 主要职责是镜像管理(镜像、元信息等)、容器执行(调用最终运行时组件执行)
一. containerd 源码编译
需要安装依赖包:btrfs-tools
直接 make 即可生成 ctr containerd containerd-shim binaries 可执行文件
二. containerd main 函数
2.1 入口目录为
cmd/containerd/main.go 中 main 函数,NewApp 使用了第三个一个命令行设置,设置名字,以及命令行启动参数,子命令等
app := cli.NewApp() app.Name = "containerd" app.Version = version.Version app.Usage = usage app.Flags = []cli.Flag{ cli.StringFlag{ Name: "config,c", Usage: "path to the configuration file", Value: defaultConfigPath, }, cli.StringFlag{ Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic]", }, cli.StringFlag{ Name: "address,a", Usage: "address for containerd's GRPC server", }, cli.StringFlag{ Name: "root", Usage: "containerd root directory", }, } app.Commands = []cli.Command{ configCommand, }
2.1 Action
这个函数设置的内容比较多,默认的 config 为:root 为 /var/lib/containerd,默认 socket 地址为 /run/containerd./containerd.sock,server.New 在 2.1.1 讲解
app.Action = func(context *cli.Context) error { if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) { // apply flags to the config if err := applyFlags(context, config); err != nil { address := config.GRPC.Address server, err := server.New(ctx, config) if config.Debug.Address != "" { l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid) } if config.Metrics.Address != "" { l, err := net.Listen("tcp", config.Metrics.Address) serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics) } l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid) serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC) return handleSignals(ctx, signals, server) }
2.1.1 New 函数创建以及初始化
containerd server:
- 创建 /var/lib/containerd 目录
- 从 /var/lib/containerd/plugins 加载 plugin
- 建立 GRPC server
- 初始化各个插件
- 最后注册服务就是各个插件里面的实现的接口 Register
// New creates and initializes a new containerd server func New(ctx context.Context, config *Config) (*Server, error) { if err := os.MkdirAll(config.Root, 0711); err != nil { return nil, err } if err := apply(ctx, config); err != nil { return nil, err } plugins, err := loadPlugins(config) rpc := grpc.NewServer( grpc.UnaryInterceptor(interceptor), grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), ) for _, p := range plugins { id := p.URI() initContext := plugin.NewContext( ctx, initialized, config.Root, id, ) initContext.Events = s.events initContext.Address = config.GRPC.Address // load the plugin specific configuration if it is provided if p.Config != nil { pluginConfig, err := config.Decode(p.ID, p.Config) initContext.Config = pluginConfig } instance, err := p.Init(initContext) if types, ok := initialized[p.Type]; ok { types[p.ID] = instance } else { initialized[p.Type] = map[string]interface{}{ p.ID: instance, } } // check for grpc services that should be registered with the server if service, ok := instance.(plugin.Service); ok { services = append(services, service) } } // register services after all plugins have been initialized for _, service := range services { if err := service.Register(rpc); err != nil { return nil, err } } return s, nil }
Init 函数 以及 Register 函数在第四章后讲解,各个插件初始,实例化并注册服务
三. plugin
Registration 数据结构:
type Registration struct { Type PluginType ID string Config interface{} Requires []PluginType Init func(*InitContext) (interface{}, error) added bool }
init 函数初始化类型包括
const ( RuntimePlugin PluginType = "io.containerd.runtime.v1" GRPCPlugin PluginType = "io.containerd.grpc.v1" SnapshotPlugin PluginType = "io.containerd.snapshotter.v1" TaskMonitorPlugin PluginType = "io.containerd.monitor.v1" DiffPlugin PluginType = "io.containerd.differ.v1" MetadataPlugin PluginType = "io.containerd.metadata.v1" ContentPlugin PluginType = "io.containerd.content.v1" )
四. plugin:io.containerd.grpc.v1.containers
路径: services/containers/service.go,ID 为 containers,提供基础元素的存储
func init() { plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, ID: "containers", Requires: []plugin.PluginType{ plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } return NewService(m.(*bolt.DB), ic.Events), nil }, }) }
实例化为 Service 结构体
type Service struct { db *bolt.DB publisher events.Publisher } func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer { return &Service{db: db, publisher: publisher} }
Register 接口实现,注册 GRPC
func (s *Service) Register(server *grpc.Server) error { api.RegisterContainersServer(server, s) return nil }
可以有如下 GRPC API 方法
type ContainersServer interface { Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error) List(context.Context, *ListContainersRequest) (*ListContainersResponse, error) Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error) Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error) Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error) }
根据下面这个对应哪个 handler 处理
var _Containers_serviceDesc = grpc.ServiceDesc{ ServiceName: "containerd.services.containers.v1.Containers", HandlerType: (*ContainersServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Get", Handler: _Containers_Get_Handler, }, { MethodName: "List", Handler: _Containers_List_Handler, }, { MethodName: "Create", Handler: _Containers_Create_Handler, }, { MethodName: "Update", Handler: _Containers_Update_Handler, }, { MethodName: "Delete", Handler: _Containers_Delete_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto", }
五. plugin:io.containerd.snapshotter.v1.btrfs
路径:snapshot/btrfs/btrfs.go,ID 为 btrfs
func init() { plugin.Register(&plugin.Registration{ ID: "btrfs", Type: plugin.SnapshotPlugin, Init: func(ic *plugin.InitContext) (interface{}, error) { return NewSnapshotter(ic.Root) }, }) }
实例化为 snapshotter 结构体
type snapshotter struct { device string // device of the root root string // root provides paths for internal storage. ms *storage.MetaStore }
路径 snapshot/overlay/overlay.go,注册插件 overlayfs
func init() { plugin.Register(&plugin.Registration{ Type: plugin.SnapshotPlugin, ID: "overlayfs", Init: func(ic *plugin.InitContext) (interface{}, error) { return NewSnapshotter(ic.Root) }, }) }
io.containerd.diff.v1
类型
路径 differ/differ.go,注册插件 base-diff
func init() { plugin.Register(&plugin.Registration{ Type: plugin.DiffPlugin, ID: "base-diff", Requires: []plugin.PluginType{ plugin.ContentPlugin, plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { c, err := ic.Get(plugin.ContentPlugin) if err != nil { return nil, err } md, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store))) }, }) }
六. plugin:io.containerd.runtime.v1.cgroup
路径:linux/runtime.go,ID 为 linux
func init() { plugin.Register(&plugin.Registration{ Type: plugin.RuntimePlugin, ID: "linux", Init: New, Requires: []plugin.PluginType{ plugin.TaskMonitorPlugin, plugin.MetadataPlugin, }, Config: &Config{ Shim: defaultShim, Runtime: defaultRuntime, }, }) }
init 函数为 New
func New(ic *plugin.InitContext) (interface{}, error) { if err := os.MkdirAll(ic.Root, 0711); err != nil { return nil, err } monitor, err := ic.Get(plugin.TaskMonitorPlugin) if err != nil { return nil, err } m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } cfg := ic.Config.(*Config) r := &Runtime{ root: ic.Root, remote: !cfg.NoShim, shim: cfg.Shim, shimDebug: cfg.ShimDebug, runtime: cfg.Runtime, monitor: monitor.(runtime.TaskMonitor), tasks: runtime.NewTaskList(), db: m.(*bolt.DB), address: ic.Address, events: ic.Events, } tasks, err := r.restoreTasks(ic.Context) if err != nil { return nil, err } for _, t := range tasks { if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil { return nil, err } } return r, nil }
返回结构体 Runtime
type Runtime struct { root string shim string shimDebug bool runtime string remote bool address string monitor runtime.TaskMonitor tasks *runtime.TaskList db *bolt.DB events *events.Exchange }
所有插件就不一一列举了,同样方法执行,至此建立 GRPC server 启动完毕。
总结:
加载插件,根据插件建立 GRPC API 服务