Kubelet
每个节点上都运行一个kubelet服务进程,默认监听10250端口,接收并执行master发来的指令,管理Pod及Pod中的容器。每个kubelet进程会在API Server上注册节点自身信息,定期向master节点汇报节点的资源使用情况,并通过cAdvisor监控节点和容器的资源。
节点管理
节点管理主要是节点自注册和节点状态更新:
- Kubelet可以通过设置启动参数 --register-node 来确定是否向API Server注册自己;
- 如果Kubelet没有选择自注册模式,则需要用户自己配置Node资源信息,同时需要告知Kubelet集群上的API Server的位置;
- Kubelet在启动时通过API Server注册节点信息,并定时向API Server发送节点新消息,API Server在接收到新消息后,将信息写入etcd
--api-server
//设置api server位置
--kubeconfig
//设置访问api server的证书
--cloud-provider
//如何从云服务读取相关元数据
--node-status-update-frequency
//向api server报告的间隔时间,默认10s
容器健康检查
Kubelet定期调用容器中的LivenessProbe探针来诊断容器的健康状况。LivenessProbe包含如下三种实现方式:
- ExecAction:在容器内部执行一个命令,如果该命令的退出状态码为0,则表明容器健康;
- TCPSocketAction:通过容器的IP地址和端口号执行TCP检查,如果端口能被访问,则表明容器健康;
- HTTPGetAction:通过容器的IP地址和端口号及路径调用HTTP GET方法,如果响应的状态码大于等于200且小于400,则认为容器状态健康。
简介
在Kubernetes集群中,每个Node节点都会启动一个kubelet进程。用于处理Master节点下发到的任务,管理Pod以及Pod中的容器。每个Kubelet进程会向APIServer注册节点信息,定期向Master节点汇报资源的使用情况,并通过cAdvise监控容器和节点资源。
- --node-sync-period: 设置同步的时间周期
- --register-node 参数设置为 true(默认为 true),kubelet 会向 apiserver 注册自己
- --kubeconfig: 登陆 apiserver 所需凭据/证书的目录
- --api-servers: apiserver 地址
关键结构
type KubeletConfiguration struct 略内容太多
type Pod struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` // Specification of the desired behavior of the pod. // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status // +optional Spec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` // Most recently observed status of the pod. // This data may not be up to date. // Populated by the system. // Read-only. // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status // +optional Status PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }是运行在一个host上容器的集合,通过客户端创建并调度到hosts
一. Kubelet启动流程
main 入口
cmd/kubelet/kubelet.go 源码如下:
func main() { s := options.NewKubeletServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if s.ExperimentalDockershim { if err := app.RunDockershim(&s.KubeletConfiguration, &s.ContainerRuntimeOptions); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } } if err := app.Run(s, nil); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
- options.NewKubeletServer(): 创建了一个KubeletServer结构
- 日志等的初始化
- verflag.PrintAndExitIfRequested(): 判断了参数是否是help,是的话直接打印help退出。
- 最后就关键函数app.Run(s, nil)
Run接口
func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
// TODO: this should be replaced by a --standalone flag standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// Set feature gates based on the value in KubeletConfiguration
err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
整个过程都是一个前期的准备工作,比如一些参数的准备。
UnsecuredKubeletDeps()接口
func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
if err != nil {
return nil, err
}
mounter := mount.New(s.ExperimentalMounterPath)
var writer kubeio.Writer = &kubeio.StdWriter{}
if s.Containerized {
glog.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = mount.NewNsenterMounter()
writer = &kubeio.NsenterWriter{}
}
var dockerClient libdocker.Interface
if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
dockerClient = libdocker.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration,
s.ImagePullProgressDeadline.Duration)
} else {
dockerClient = nil
}
return &kubelet.KubeletDeps{
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
Cloud: nil, // cloud provider might start background processes
ContainerManager: nil,
DockerClient: dockerClient,
KubeClient: nil,
ExternalKubeClient: nil,
Mounter: mounter,
NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
TLSOptions: tlsOptions,
}, nil
}
- 初始化TLS证书啥的
- kubelet可能会以容器的方式部署,需要配置标准输出,若部署到容器中,就会有namespace隔离的问题,导致kubelet无法访问docker容器的namespace,所以这里会进行判断,如果运行在容器中的话,就需要用到nsenter,它可以协助kubelet到指定的namespace运行命令。
- 返回KuneletDeps结构体
二. Kubelet运行
RunKubelet
内部,我们看到它最终会调用
startKublet
函数
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error { 。。。。。。。。。。。。。。。。。。。。。。。。 // process pods and exit. if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { startKubelet(k, podCfg, kubeCfg, kubeDeps) glog.Infof("Started kubelet %s", version.Get().String()) } return nil }运行 kubelet 主要启动两个功能,
k.Run()
来进入主循环,执行入口一个管道,会实时地发送过来 pod 最新的配置信息是 k.Run(podCfg.Updates())
,
k.ListenAndServe()
启动 kubelet 的 API 服务。
Run的代码如下:
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // init modulers,如imageManager、containerManager、oomWathcer、resourceAnalyzer if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) glog.Error(err) kl.runtimeState.setInitError(err) } // Start volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) // 起协程,定时向APIServer更新node status if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } // 起协程,定时同步网络状态 go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop) go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Start loop to sync iptables util rules if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) } // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). // 起协程,定时处理那些被killing pods go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // Start gorouting responsible for checking limits in resolv.conf if kl.resolverConfig != "" { go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop) } // Start component sync loops. kl.statusManager.Start() kl.probeManager.Start() // Start the pod lifecycle event generator. kl.pleg.Start() // 开启pods事件,用于处理APIServer下发的任务,updates是一个管道 kl.syncLoop(updates, kl) }基本上就是
kubelet
各种组件的启动,每个组件都是以 goroutine 运行的。最后的syncLoop处理所有 pod 更新的主循环,获取 pod 的变化(新建、修改和删除),调用对应的处理函数保证节点上的容器符合 pod 的配置。
syncLoop代码如下:
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") // The resyncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() for { if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) time.Sleep(5 * time.Second) continue } kl.syncLoopMonitor.Store(kl.clock.Now()) if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } }从三个channel观察pod变化,file api-server http。 当有变化时调用对应的处理函数,保证 pod 处于期望的状态。如果没有变化,会定期保证所有的容器和最新的期望状态保持一致。
循环调用
syncLoopIteration
方法。如果在每次出现错误,kubelet 会记录到 runtimeState
中,就等待 5 秒中继续循环。
三、Pod信息处理
syncLoopIteration代码如下所示:
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: // Update from a config source; dispatch it to the right handler // callback. if !open { glog.Errorf("Update channel is closed. Exiting the sync loop.") return false } switch u.Op { case kubetypes.ADD: glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") } // Mark the source ready after receiving at least one update from the // source. Once all the sources are marked ready, various cleanup // routines will start reclaiming resources. It is important that this // takes place only after kubelet calls the update handler to process // the update to ensure the internal pod cache is up-to-date. kl.sourcesReady.AddSource(u.Source) case e := <-plegCh: if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*v1.Pod{pod}) } else { // If the pod no longer exists, ignore the event. glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) } } if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } } case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { break } glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync)) kl.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { // The liveness manager detected a failure; sync the pod. // We should not use the pod from livenessManager, because it is never updated after // initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) break } glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) } case <-housekeepingCh: if !kl.sourcesReady.AllReady() { // If the sources aren't ready or volume manager has not yet synced the states, // skip housekeeping, as we may accidentally delete pods from unready sources. glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") } else { glog.V(4).Infof("SyncLoop (housekeeping)") if err := handler.HandlePodCleanups(); err != nil { glog.Errorf("Failed cleaning pods: %v", err) } } } return true }
configCh: 对于配置更改分发pods适合的回调接口处理事件
- 事件类型有kubetypes.ADD
- 事件类型有kubetypes.UPDATE
- 事件类型有kubetypes.REMOVE
- 事件类型有kubetypes.RECONCILE
- 事件类型有kubetypes.DELETE
- 事件类型有kubetypes.SET
分析用户添加新 pod 的情况,也就是
handler.HandlePodAdditions(u.Pods)
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() sort.Sort(sliceutils.PodsByCreationTime(pods)) for _, pod := range pods { existingPods := kl.podManager.GetPods() // Always add the pod to the pod manager. Kubelet relies on the pod // manager as the source of truth for the desired state. If a pod does // not exist in the pod manager, it means that it has been deleted in // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } if !kl.podIsTerminated(pod) { // Only go through the admission process if the pod is not // terminated. // We failed pods that we rejected, so activePods include all admitted // pods that are alive. activePods := kl.filterOutTerminatedPods(existingPods) // Check if we can admit the pod; if not, reject it. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.probeManager.AddPod(pod) } }
- 把所有的 pod 按照创建日期排序,最先创建的 pod 会先被处理
- 验证 pod 是否能在该节点运行,如果不可以拒绝
- 把 pod 分配给给 worker 做异步处理
dispatchWork代码如下所示:
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { if kl.podIsTerminated(pod) { if pod.DeletionTimestamp != nil { // If the pod is in a terminated state, there is no pod worker to // handle the work item. Check if the DeletionTimestamp has been // set, and force a status update to trigger a pod deletion request // to the apiserver. kl.statusManager.TerminatePod(pod) } return } // Run the sync in an async worker. kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) { if err != nil { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) } }, }) // Note the number of containers for new pods. if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } }
dispatchWork
主要工作就是把接收到的参数封装传给kl.podWorkers.UpdatePod
方法
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { pod := options.Pod uid := pod.UID var podUpdates chan UpdatePodOptions var exists bool p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { // We need to have a buffer here, because checkForUpdates() method that // puts an update into channel is called from the same goroutine where // the channel is consumed. However, it is guaranteed that in such case // the channel is empty, so buffer of size 1 is enough. podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- *options } else { // if a request to kill a pod is pending, we do not let anything overwrite that request. update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options } } }检查podUpdates这个map,由于是新创建的pod,会创建一个go routine,执行函数mannagePodLoop。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) { var lastSyncTime time.Time for update := range podUpdates { err := func() error { podUID := update.Pod.UID // This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { return err } err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now() return err }() // notify the call-back function if the operation succeeded or not if update.OnCompleteFunc != nil { update.OnCompleteFunc(err) } if err != nil { glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err) // if we failed sync, we throw more specific events for why it happened. // as a result, i question the value of this event. // TODO: determine if we can remove this in a future release. // do not include descriptive text that can vary on why it failed so in a pathological // scenario, kubelet does not create enough discrete events that miss default aggregation // window. p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "Error syncing pod") } p.wrapUp(update.Pod.UID, err) } }manangePodLoop调用syncPodFn函数去同步Pod,syncPodFn这个函数实际上是syncPod函数
syncPod函数内容比较多,挑比较容易理解的讲解一下,哈哈:
makePodDataDirs,创建pod数据目录,三个目录,pod,volume,plugin。即目录/var/lib/kubelet/uuid
func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error { uid := pod.UID if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) { return err } if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) { return err } if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) { return err } return nil }
WaitForAttachAndMount等待将该Pod依赖的Volume都挂载完毕
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error { expectedVolumes := getExpectedVolumes(pod) if len(expectedVolumes) == 0 { // No volumes to verify return nil } glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod)) uniquePodName := volumehelper.GetUniquePodName(pod) // Some pods expect to have Setup called over and over again to update. // Remount plugins for which this is true. (Atomically updating volumes, // like Downward API, depend on this to update the contents of the volume). vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) vm.actualStateOfWorld.MarkRemountRequired(uniquePodName) err := wait.Poll( podAttachAndMountRetryInterval, podAttachAndMountTimeout, vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes)) if err != nil { // Timeout expired unmountedVolumes := vm.getUnmountedVolumes(uniquePodName, expectedVolumes) if len(unmountedVolumes) == 0 { return nil } return fmt.Errorf( "timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v", pod.Namespace, pod.Name, unmountedVolumes) } glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod)) return nil }如果有 image secrets,去 apiserver 获取对应的 secrets 数据
如果有 image secrets,去 apiserver 获取对应的 secrets 数据
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret { pullSecrets := []v1.Secret{} for _, secretRef := range pod.Spec.ImagePullSecrets { secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name) if err != nil { glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err) continue } pullSecrets = append(pullSecrets, *secret) } return pullSecrets }
最终调用的是containerRuntime.SyncPod函数创建container,函数位置pkg/kublet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodContainerChanges(pod, podStatus) glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(api.Scheme, pod) if err != nil { glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err) } if podContainerChanges.SandboxID != "" { m.recorder.Eventf(ref, v1.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.") } else { glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod)) } } // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) { if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 { glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod)) } else { glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod)) } killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil) result.AddPodSyncResult(killResult) if killResult.Error() != nil { glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error()) return } } else { // Step 3: kill any running containers in this pod which are not to keep. for containerID, containerInfo := range podContainerChanges.ContainersToKill { glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err) return } } } // Keep terminated init containers fairly aggressively controlled m.pruneInitContainersBeforeStart(pod, podStatus, podContainerChanges.InitContainersToKeep) // We pass the value of the podIP down to generatePodSandboxConfig and // generateContainerConfig, which in turn passes it to various other // functions, in order to facilitate functionality that requires this // value (hosts file and downward API) and avoid races determining // the pod IP in cases where a container requires restart but the // podIP isn't in the status manager yet. // // We default to the IP in the passed-in pod status, and overwrite it if the // sandbox needs to be (re)started. podIP := "" if podStatus != nil { podIP = podStatus.IP } // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 { var msg string var err error glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) if err != nil { createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg) glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err) return } glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod)) podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) if err != nil { glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod)) result.Fail(err) return } // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIP passed in the pod status, since we just started the pod sandbox. podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus) glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod)) } } // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) glog.Error(message) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } // Step 5: start init containers. status, next, done := findNextInitContainerToRun(pod, podStatus) if status != nil && status.ExitCode != 0 { // container initialization has failed, flag the pod as failed initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name) initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode)) result.AddSyncResult(initContainerResult) if pod.Spec.RestartPolicy == v1.RestartPolicyNever { utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status)) return } utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status)) } if next != nil { if len(podContainerChanges.ContainersToStart) == 0 { glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod)) return } // If we need to start the next container, do so now then exit container := next startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod)) return } glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg)) return } // Successfully started the container; clear the entry in the failure glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) return } if !done { // init container still running glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod)) return } if podContainerChanges.InitFailed { glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod)) return } // Step 6: start containers in podContainerChanges.ContainersToStart. for idx := range podContainerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod)) continue } glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) continue } } return }
- Compute sandbox and container changes.
- Kill pod sandbox if necessary.
- Kill any containers that should not be running.
- Create sandbox if necessary.
- Create init containers.
- Create normal containers.
主要解析函数4. Create sandbox if necessary
主要函数 createPodSandbox 位于路径pkg/kublet/kuberuntime/kuberuntime_sandbox.go最主要调用RunPodSandbox函数,位于路径pkg/kublet/dockershim/docker_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) { podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) glog.Error(message) return "", message, err } // Create pod logs directory err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755) if err != nil { message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err) glog.Errorf(message) return "", message, err } podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig) if err != nil { message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err) glog.Error(message) return "", message, err } return podSandBoxID, "", nil }
func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) { // Step 1: Pull the image for the sandbox. image := defaultSandboxImage podSandboxImage := ds.podSandboxImage if len(podSandboxImage) != 0 { image = podSandboxImage } // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly. // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository // Only pull sandbox image when it's not present - v1.PullIfNotPresent. if err := ensureSandboxImageExists(ds.client, image); err != nil { return "", err } // Step 2: Create the sandbox container. createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { return "", fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err) } if err != nil || createResp == nil { return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } ds.setNetworkReady(createResp.ID, false) defer func(e *error) { // Set networking ready depending on the error return of // the parent function if *e == nil { ds.setNetworkReady(createResp.ID, true) } }(&err) // Step 3: Create Sandbox Checkpoint. if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return createResp.ID, err } // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) if err != nil { return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // Rewrite resolv.conf file generated by docker. // NOTE: cluster dns settings aren't passed anymore to docker api in all cases, // not only for pods with host network: the resolver conf will be overwritten // after sandbox creation to override docker's behaviour. This resolv.conf // file is shared by all containers of the same pod, and needs to be modified // only once per pod. if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { return createResp.ID, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { return createResp.ID, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // Do not invoke network plugins if in hostNetwork mode. if nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions(); nsOptions != nil && nsOptions.HostNetwork { return createResp.ID, nil } // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod // sandbox networking, but it might insert iptables rules or open ports // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations) if err != nil { // TODO(random-liu): Do we need to teardown network here? if err := ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod); err != nil { glog.Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err) } } return createResp.ID, err }
- 默认sandbox image为 gcr.io/google_containers/pause-amd64:3.0
- ensureSandboxImageExists 当镜像不存在是进行拉取工作
- CreateContainer最终通过docker API POST方法调用 /containers/create
- CreateCheckpoint写入文件,文件名为容器ID
- StartContainer最终通过docker API POST方法调用 /containers/containerID/start
- 重新写入resolv.conf由docker产生,pod里的容器共享
- InspectContainer最终通过docker API GET方法调用 /containers/containerID/json
- 为容器建立网络,通过CNI建立网络,建立loopback接口,建立网络设置为混杂模式(调用命令ip link show dev / ip set bridgeName promisc on)。流量流出需要SNAT(利用iptables规则链)
const NET_CONFIG_TEMPLATE = `{ "cniVersion": "0.1.0", "name": "kubenet", "type": "bridge", "bridge": "%s", "mtu": %d, "addIf": "%s", "isGateway": true, "ipMasq": false, "hairpinMode": %t, "ipam": { "type": "host-local", "subnet": "%s", "gateway": "%s", "routes": [ { "dst": "0.0.0.0/0" } ] } }`