(7) NewMainKubelet函数分析
NewMainKubelet主要作用是创建Kubelet对象需要的Module对象。
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(
hostname string,
nodeName string,
dockerClient dockertools.DockerInterface,
kubeClient clientset.Interface,
rootDirectory string,
seccompProfileRoot string,
podInfraContainerImage string,
resyncInterval time.Duration,
pullQPS float32,
pullBurst int,
eventQPS float32,
eventBurst int,
containerGCPolicy kubecontainer.ContainerGCPolicy,
sourcesReadyFn config.SourcesReadyFn,
registerNode bool,
registerSchedulable bool,
standaloneMode bool,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
volumePlugins []volume.VolumePlugin,
networkPlugins []network.NetworkPlugin,
networkPluginName string,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
imageGCPolicy ImageGCPolicy,
diskSpacePolicy DiskSpacePolicy,
cloud cloudprovider.Interface,
autoDetectCloudProvider bool,
nodeLabels map[string]string,
nodeStatusUpdateFrequency time.Duration,
osInterface kubecontainer.OSInterface,
CgroupsPerQOS bool,
cgroupRoot string,
containerRuntime string,
runtimeRequestTimeout time.Duration,
rktPath string,
rktAPIEndpoint string,
rktStage1Image string,
mounter mount.Interface,
writer kubeio.Writer,
configureCBR0 bool,
nonMasqueradeCIDR string,
podCIDR string,
reconcileCIDR bool,
maxPods int,
podsPerCore int,
nvidiaGPUs int,
dockerExecHandler dockertools.ExecHandler,
resolverConfig string,
cpuCFSQuota bool,
daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
containerManager cm.ContainerManager,
outOfDiskTransitionFrequency time.Duration,
flannelExperimentalOverlay bool,
nodeIP net.IP,
reservation kubetypes.Reservation,
enableCustomMetrics bool,
volumeStatsAggPeriod time.Duration,
containerRuntimeOptions []kubecontainer.Option,
hairpinMode string,
babysitDaemons bool,
evictionConfig eviction.Config,
kubeOptions []Option,
enableControllerAttachDetach bool,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} else {
glog.Errorf("NewMainKubelet rootDirectory=%s.", rootDirectory)
}
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
listWatch := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
}
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return kubeClient.Core().Nodes().Watch(options)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
nodeRef := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
containerRefManager := kubecontainer.NewRefManager()
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
// TODO: remove when internal cbr0 implementation gets removed in favor
// of the kubenet network plugin
if networkPluginName == "kubenet" {
configureCBR0 = false
flannelExperimentalOverlay = false
}
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
dockerClient: dockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
httpClient: &http.Client{},
sourcesReady: config.NewSourcesReady(sourcesReadyFn),
registerNode: registerNode,
registerSchedulable: registerSchedulable,
standaloneMode: standaloneMode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
diskSpaceManager: diskSpaceManager,
cloud: cloud,
autoDetectCloudProvider: autoDetectCloudProvider,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
os: osInterface,
oomWatcher: oomWatcher,
CgroupsPerQOS: CgroupsPerQOS,
cgroupRoot: cgroupRoot,
mounter: mounter,
writer: writer,
configureCBR0: configureCBR0,
nonMasqueradeCIDR: nonMasqueradeCIDR,
reconcileCIDR: reconcileCIDR,
maxPods: maxPods,
podsPerCore: podsPerCore,
nvidiaGPUs: nvidiaGPUs,
syncLoopMonitor: atomic.Value{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: containerManager,
flannelExperimentalOverlay: flannelExperimentalOverlay,
flannelHelper: nil,
nodeIP: nodeIP,
clock: util.RealClock{},
outOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
reservation: reservation,
enableCustomMetrics: enableCustomMetrics,
babysitDaemons: babysitDaemons,
enableControllerAttachDetach: enableControllerAttachDetach,
}
if klet.flannelExperimentalOverlay {
klet.flannelHelper = NewFlannelHelper()
glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
}
if klet.nodeIP != nil {
if err := klet.validateNodeIP(); err != nil {
return nil, err
}
glog.Infof("Using node IP: %q", klet.nodeIP.String())
}
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(hairpinMode), containerRuntime, configureCBR0, networkPluginName); err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
glog.Fatalf("Invalid hairpin mode: %v", err)
} else {
klet.hairpinMode = mode
}
glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
machineInfo, err := klet.GetCachedMachineInfo()
if err != nil {
return nil, err
}
procFs := procfs.NewProcFS()
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
// Initialize the runtime.
switch containerRuntime {
case "docker":
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
kubecontainer.FilterEventRecorder(recorder),
klet.livenessManager,
containerRefManager,
klet.podManager,
machineInfo,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface,
klet.networkPlugin,
klet,
klet.httpClient,
dockerExecHandler,
oomAdjuster,
procFs,
klet.cpuCFSQuota,
imageBackOff,
serializeImagePulls,
enableCustomMetrics,
klet.hairpinMode == componentconfig.HairpinVeth,
seccompProfileRoot,
containerRuntimeOptions...,
)
case "rkt":
// TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{
Path: rktPath,
Stage1Image: rktStage1Image,
InsecureOptions: "image,ondisk",
}
rktRuntime, err := rkt.New(
rktAPIEndpoint,
conf,
klet,
recorder,
containerRefManager,
klet.podManager,
klet.livenessManager,
klet.httpClient,
klet.networkPlugin,
klet.hairpinMode == componentconfig.HairpinVeth,
utilexec.New(),
kubecontainer.RealOS{},
imageBackOff,
serializeImagePulls,
runtimeRequestTimeout,
)
if err != nil {
return nil, err
}
klet.containerRuntime = rktRuntime
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod, klet.containerRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.updatePodCIDR(podCIDR)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
// setup imageManager
imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
klet.runner = klet.containerRuntime
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
recorder)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, volumePlugins)
if err != nil {
return nil, err
}
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
hostname,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), recorder, nodeRef, klet.clock)
if err != nil {
return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
}
klet.evictionManager = evictionManager
klet.AddPodAdmitHandler(evictionAdmitHandler)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
// apply functional Option's
for _, opt := range kubeOptions {
opt(klet)
}
return klet, nil
}
step1: 创建service的cache.NewStore,设置service的监听函数listWatch,并设置对应的反射NewReflector,并设置serviceLister
step2: 创建node的cache.NewStore,设置fieldSelector,设置监听函数listWatch,设置对应的反射NewReflector,并设置nodeLister,nodeInfo和nodeRef
step3: newDiskSpaceManager
step4: kubecontainer.NewRefManager
step5: NewOOMWatcher(cadvisorInterface, recorder)
step6:Kubelet对象初始化
step7:判断 klet.flannelExperimentalOverlay标志位,NewFlannelHelper
step8: klet.validateNodeIP
step9: effectiveHairpinMode处理和klet.hairpinMode赋值
step10:network.InitNetworkPlugin
step11: machineInfo klet.GetCachedMachineInfo()
step12: procFs := procfs.NewProcFS()
step13: klet.podManager = kubepod.NewBasicPodManager()
step14: klet.containerRuntime = dockertools.NewDockerManager()
step15: klet.resourceAnalyzer = stats.NewResourceAnalyzer()
step16: klet.containerGC = kubecontainer.NewContainerGC()
step17: klet.imageManager = newImageManager()
step18: klet.runtimeCache = runtimeCache , klet.reasonCache = NewReasonCache(), klet.workQueue = queue.NewBasicWorkQueue(klet.clock) , klet.podWorkers = newPodWorkers
step19: klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
step20: setup eviction manager:
evictionManager, evictionAdmitHandler, err := eviction.NewManager
klet.evictionManager = evictionManager
klet.AddPodAdmitHandler(evictionAdmitHandler)
step21: enable active deadline handler:
activeDeadlineHandler = newActiveDeadlineHandler()
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
至此该函数终于完成,进行了kubelet中大多数对出来象的创建,从该函数出来后,kubelet进入到真正的run函数,启动对应的协程,进入到主循环
(8) startKubelet函数分析
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
// start the kubelet server
if kc.EnableServer {
go wait.Until(func() {
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
}, 0, wait.NeverStop)
}
if kc.ReadOnlyPort > 0 {
go wait.Until(func() {
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
}, 0, wait.NeverStop)
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, kubecontainer.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}
// Start volume manager
go kl.volumeManager.Run(wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod)
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Step 1: Promethues metrics.
metrics.Register(kl.runtimeCache)
// Step 2: Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// Step 3: If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
// Step 4: Start the image manager.
if err := kl.imageManager.Start(); err != nil {
return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
// Step 5: Start container manager.
if err := kl.containerManager.Start(); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
// Step 6: Start out of memory watcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("Failed to start OOM watcher %v", err)
}
// Step 7: Start resource analyzer
kl.resourceAnalyzer.Start()
return nil
}
Run函数中主要启动工作的协程。其中部分工作在initializeModules中完成
step1: initializeModules处理。
initializeModules中创建日志目录
initializeModules中启动imageManager协程
initializeModules中启动containerManager协程
initializeModules中启动oomWatcher协程( Start out of memory watcher)
initializeModules中启动resourceAnalyzer协程
step2:go kl.volumeManager.Run
step3:启动(synchronizes node status to master)协程
go wait.Until(kl.syncNodeStatus,
kl.registerWithApiserver()
step4: go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
step5: go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
step6:go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
step7: Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod)
step8: kl.pleg.Start()
step9:启动主处理的协程 kl.syncLoop(updates, kl)
step10: start the kubelet server(在StartKubelet中执行)
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
(8) Main Loop 协程中处理流程
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.errors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
}
}
/ syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. houseKeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
kl.syncLoopMonitor.Store(kl.clock.Now())
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
kl.sourcesReady.AddSource(u.Source)
switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejcted. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
pod, ok := kl.podManager.GetPodByUID(e.ID)
if !ok {
// If the pod no longer exists, ignore the event.
glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
break
}
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*api.Pod{pod})
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*api.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
}
kl.syncLoopMonitor.Store(kl.clock.Now())
return true
}
至此 kubelet的启动过程完成
参考大牛的博文:k8s源码分析-----kubelet(1)主要流程
http://blog.csdn.net/screscent/article/details/51086684