在 Kubernetes Master 节点中,有三个重要的组件:ApiServer、ControllerManager 和 Scheduler,它们共同负责整个集群的管理。在本文中,我们尝试梳理一下ControllerManager的工作流程和原理。
什么是控制器管理器
根据官方文档:kube-controller-manager 运行控制器,这是处理集群中常规任务的后台线程。
例如,当通过 Deployment 创建的 Pod 异常退出时,RS Controller 会接受并处理退出并创建新的 Pod 以保持预期的副本数。
几乎每个特定的资源都由特定的 Controller 管理以维持预期的状态,而 Controller Manager 的职责是聚合所有 Controller:
- 提供基础设施以降低控制器实现的复杂性
- 启动和维护控制器的正常运行时间
这样,Controller 保证集群中的资源保持在预期状态,Controller Manager 确保 Controller 保持在预期状态。
控制器工作流程
在我们解释 Controller Manager 如何为 Controller 提供基础架构和运行时环境之前,让我们先了解一下 Controller 工作流程是什么样的。
从高维的角度来看,ControllerManager主要提供了分发事件的能力,而不同的Controller只需要注册相应的Handler即可等待接收和处理事件。
以Deployment Controller为例,其中的NewDeploymentController
方法pkg/controller/deployment/deployment_controller.go
包括Event Handler的注册,对于Deployment Controller,只需要根据不同的事件实现不同的处理逻辑,就可以实现对相应资源的管理。
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addDeployment,UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod,
})
可以看到,在ControllerManager的帮助下,Controller的逻辑可以很纯粹的通过实现对应的EventHandler来完成,那么ControllerManager具体做了哪些工作呢?
控制器管理器架构
帮助 Controller Manager 进行事件分发的关键模块是 client-go,而更关键的模块之一是 informer。
kubernetes在github上提供了client-go的架构图,从中可以看出Controller是描述的下半部分(CustomController),而Controller Manager主要是完成的上半部分。
Informer Factory
从上图中可以看出,Informer 是一个非常关键的“桥梁”,所以对 Informer 的管理是 Controller Manager 做的第一件事。
由于每个 Informer 都与 Api Server 保持一个 watch long 连接,因此这个单实例工厂通过为所有 Controller 提供一个唯一的入口点来获取 Informer,从而确保每种类型的 Informer 仅实例化一次。
这个单例工厂的初始化逻辑。
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),}
// Apply all optionsfor _, opt := range options {
factory = opt(factory)}
return factory
}
从上面的初始化逻辑可以看出,其中最重要的部分sharedInformerFactory
是名为 的map informers
,其中key是资源类型,value是关心该资源类型的Informer。每种类型的 Informer 只会被实例化一次并存储在map中。不同的 Controller 只有在需要相同的资源时才会得到相同的 Informer 实例。
对于Controller Manager来说,保持所有Informer正常工作是所有Controller正常工作的基本条件。通过这个sharedInformerFactory
map维护所有informer实例,所以sharedInformerFactory
也负责提供一个统一的启动入口。
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)
f.startedInformers[informerType] = true}}
}
Controller Manager启动时,最重要的是通过Start
这个工厂的方法运行所有的Informer。
Informer creation
以下是这些 Informer 的创建方式,Controller ManagerNewControllerInitializers
在cmd/kube-controller-manager/app/controllermanager.go
. 由于代码冗长,这里仅提供部署控制器的示例。
初始化部署控制器的逻辑startDeploymentController
在cmd/kube-controller-manager/app/apps.go
.
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {return nil, false, nil}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),)if err != nil {return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)return nil, true, nil
}
最关键的逻辑在 中deployment.NewDeploymentController
,实际上是创建了 Deployment Controller,创建函数的前三个参数分别是 Deployment、ReplicaSet 和 Pod 的 Informer。如您所见,Informer 的单例工厂提供了一个入口点,用于使用 ApiGroup 作为路径创建具有不同资源的 Informer。
但是,重要的是要注意这一点。Apps().V1().Deployments()
返回 type 的实例deploymentInformer
,但deploymentInformer
不是真正的 Informer(尽管它的 Informer 名称)。它只是一个模板类,其主要功能是为创建专注于部署的 Informer 提供模板作为特定资源。
// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
创建 Informer 的真正逻辑在deploymentInformer.Informer()
( client-go/informers/apps/v1/deployment.go
) 中,是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传递给 Informer 工厂f.defaultInformer
的方法来创建只关注 Deployment 资源的 Informer。InformerFor
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
简要说明。
- 您可以通过 Informer 工厂获取特定类型的 Informer 模板类(即
deploymentInformer
本例中)
-
Informer()
实际上,为特定资源创建 Informer 的是 Informer 模板类的方法。
- 该
Informer()
方法只是通过InformerFor
Informer 工厂创建真正的 Informer
这里使用了模板方法(设计模式),虽然有点绕,但是可以参考下图梳理一下。理解它的关键是 Informer 的差异化创建逻辑委托给了模板类。
最后,命名的结构sharedIndexInformer
将被实例化,并实际承担 Informer 的职责。它也是注册到 Informer 工厂映射的实例。
Informer operation
由于真正的 Informer 实例是一个类型的对象sharedIndexInformer
,当 Informer 工厂启动时(通过执行Start
方法),它sharedIndexInformer
就是实际运行的。
sharedIndexInformer
是client-go中的一个组件,它的方法Run
有几十行,但是工作量很大。这是我们进入控制器管理器最有趣的部分的地方。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})var wg wait.Group
defer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners}()
s.controller.Run(stopCh)
}
启动逻辑sharedIndexInformer
做了几件事。
- 创建一个名为 的队列
fifo
。
- 创建并运行一个名为
controller
.
- 开始了
cacheMutationDetector
。
- 开始了
processor
。
这些术语(或组件)在上一篇文章中没有提到,但这四件事是 Controller Manager 工作的核心,因此我将在下面逐一介绍。
sharedIndexInformer
sharedIndexInformer
是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(就像deploymentInformer
上面提到的)来创建一个特定于他们需要的 Informer。
sharedIndexInformer
包含一堆工具来完成 Informer 的工作,主要代码在client-go/tools/cache/shared_informer.go
. 它的创建逻辑也在其中。
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,}return sharedIndexInformer
}
在创建逻辑中,有几点需要注意:
- processor:提供EventHandler注册和事件分发的功能
- indexer:提供资源缓存功能
- listerWatcher:由模板类提供,包含特定资源的List和Watch方法
- objectType:用于标记要关注的具体资源类型
- cacheMutationDetector:监控Informer的缓存
此外,它还包含DeltaFIFO
队列和controller
上面的启动逻辑中提到的,下面分别介绍。
sharedProcessor
处理器是 sharedIndexInformer 中一个非常有趣的组件。ControllerManager通过一个Informer单例工厂保证不同的Controller共享同一个Informer,但是不同的Controller在共享的Informer上注册了不同的Handler。
处理器是管理注册的Handler并将事件分发给不同的Handler的组件。
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
sharedProcessor 工作的核心围绕着listeners
.
当我们向 Informer 注册一个 Handler 时,它最终会被转换为一个名为processorListener
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,}
ret.determineNextResync(now)
return ret
}
该实例主要包含两个通道和外部注册的 Handler 方法。此处实例化的processorListener
对象最终将被添加到sharedProcessor.listeners
列表中。
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)}
}
如图所示,Controller 中的 Handler 方法最终会添加到 Listener 中,Listener 会附加到sharedProcessor
前面说过,启动时sharedIndexInformer
会运行sharedProcessor
,启动的逻辑sharedProcessor
与这些有关listeners
。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)}
p.listenersStarted = true}()<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
可以看到,启动的时候会依次sharedProcessor
执行 的run
和pop
方法,所以现在来看这两个方法。listener
Starting the listener
由于监听器包含注册到Controller的Handler方法,所以监听器最重要的作用就是在事件发生时触发这些方法,并listener.run
不断从nextCh
通道中获取事件并执行相应的处理程序。
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:
p.handler.OnAdd(notification.newObj)case deleteNotification:
p.handler.OnDelete(notification.oldObj)default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})
// the only way to get here is if the p.nextCh is empty and closedif err == nil {
close(stopCh)}}, 1*time.Minute, stopCh)
}
可以看到listener.run
不断从nextCh
通道中获取事件,但是通道中的事件是nextCh
从哪里来的呢?将listener.pop
事件放入nextCh
.
listener.pop
是一个非常聪明和有趣的逻辑。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok bool
notification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to pop
nextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)}}}
}
listener
包含两个通道addCh
的原因nextCh
是:Informer 无法预测listener.handler
消耗事件的速度是否比产生事件的速度快,因此它添加了一个名为pendingNotifications
. 队列来保存没有被及时消费的事件。
pop
一方面,该方法不断获取最新事件,addCh
以确保生产者不会阻塞。然后它确定缓冲区是否存在,如果存在,则将事件添加到缓冲区,如果不存在,则尝试将其推送到nextCh
.
另一方面,它确定缓冲区中是否还有任何事件,如果还有库存,它会不断将其传递给nextCh
.
该pop
方法实现了一种带有缓冲区的分发机制,该缓冲区允许事件不断地从 传递addCh
到nextCh
。但是问题来了,这些addCh
事件是从哪里来的?
源代码非常简单,listener
有一个add
以事件为输入的方法,它将新事件推送到addCh
. 该add
方法由sharedProcessor
管理所有 s 的listener s
调用。
如上所述,sharedProcessor
负责管理所有的Handler和分发事件,但distribute
真正分发的是方法。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {for _, listener := range p.syncingListeners {
listener.add(obj)}} else {for _, listener := range p.listeners {
listener.add(obj)}}
}
到目前为止,我们对一个部分有了更清晰的了解:
- Controller 向 Informer 注册 Handler。
- Informer 通过
sharedProcessor
.
- Informer 接收事件并通过
sharedProcessor.distribute
.
- Controller由对应的Handler触发处理自己的逻辑
那么剩下的问题是 Informer 事件从何而来?
DeltaFIFO
在分析 Informer fetch 事件之前,需要提前告知的一个非常有趣的小工具fifo
是sharedIndexInformer.Run
.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
DeltaFIFO 是一个非常有趣的队列,其代码定义在client-go/tools/cache/delta_fifo.go
. 对于队列来说,最重要的肯定是 Add 和 Pop 方法。DeltaFIFO 提供了几种 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)来区分不同的方法,但最终都是执行queueActionLocked
。
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}
// If object is supposed to be deleted (last event is Deleted),// then we should ignore Sync events, because it would result in// recreation of this object.if actionType == Sync && f.willObjectBeDeletedLocked(id) {return nil}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)}
f.items[id] = newDeltas
f.cond.Broadcast()} else {// We need to remove this from our map (extra items in the queue are// ignored if they are not in the map).delete(f.items, id)}return nil
}
该queueActionLocked
方法的第一个参数 actionType 是事件类型。
const (Added DeltaType = "Added" // watch api 获得的创建事件Updated DeltaType = "Updated" // watch api 获得的更新事件Deleted DeltaType = "Deleted" // watch api 获得的删除事件Sync DeltaType = "Sync" // 触发了 List Api,需要刷新缓存
)
事件类型和入队方式表明这是一个具有业务功能的队列,而不仅仅是“先进先出”,入队方式有两个非常巧妙的设计。
- 队列中的事件会先判断资源是否有未消费的事件,然后进行适当的处理。
- 如果 list 方法发现资源已经被删除,则不处理。
第二点比较容易理解,如果触发了列表请求,发现要处理的资源已经被删除了,那么就不需要再排队了。第一点需要和out of queue方法一起看。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}
f.cond.Wait()}
id := f.queue[0]
f.queue = f.queue[1:]if f.initialPopulationCount > 0 {
f.initialPopulationCount--}
item, ok := f.items[id]if !ok {// Item may have been deleted subsequently.continue}delete(f.items, id)
err := process(item)if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err
}
}
DeltaFIFO 的Pop
方法有一个输入,即处理函数。当它从队列中出来时,DeltaFIFO会首先根据资源id获取资源所有的事件,然后交给handler函数。
工作流程如图所示。
一般来说,DeltaFIFO的queue方法首先判断资源是否已经在items
,如果是,则资源还没有被消费(仍然在排队),所以直接将事件追加到items[resource_id]
。如果不在 中items
,则items[resource_id]
创建 then 并将资源 id 附加到queue
.
DeltaFIFO out-of-queue 方法从 获取队列顶部的资源 id queue
,然后从 获取该资源的所有事件items
,最后调用该方法PopProcessFunc
传入的类型处理程序Pop
。
所以,DeltaFIFO 的特点是队列中的(资源的)事件,当它从队列中出来时,它获取队列中第一个资源的所有事件。这种设计确保不会因为某个资源疯狂地创建事件而导致饥饿,从而使其他资源没有机会被处理。
控制器 controller
DeltaFIFO 是一个非常重要的组件,唯一真正使它有价值的是 Informer controller
。
虽然 K8s 源代码确实使用了这个词controller
,但这controller
不是像部署控制器那样的资源控制器。相反,它是一个自上而下的事件控制器(从 API 服务器获取事件并将它们发送到 Informer 进行处理)。
职责controller
是双重的。
- 通过 List-Watch 从 Api Server 获取事件并将事件推送到 DeltaFIFO
-
HandleDeltas
以 的方法sharedIndexInformer
作为参数调用 DeltaFIFO 的 Pop 方法
定义controller
很简单,其核心是Reflector
。
type controller struct {config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
controllerr
的代码Reflector
比较繁琐但是很简单,就是通过listerWatcher
sharedIndexInformer
中定义的list-watch
,将获取到的事件推送到DeltaFIFO
中。
控制器启动后,启动Reflector
,然后执行processLoop
,这是一个死循环,不断从DeltaFIFO中读取资源事件,并交给sharedIndexInformer
(分配给config.Process)的HandleDeltas
方法。
func (c *controller) processLoop() {for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)}}}
}
如果我们看一下 sharedIndexInformer 的 HandleDeltas 方法,我们可以看到整个事件消费过程是有效的。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
前面我们了解到,该processor.attribute
方法将事件分发给 all listeners
,并controller
使用Reflector
来从 ApiServer 中获取事件并放入队列中,然后通过processLoop
为要处理的资源从队列中取出事件,最后调用processor.attribute
via的HandleDeltas
方法sharedIndexInformer
。所有事件,最后processor.attribute
是通过 的HandleDeltas
方法调用的sharedIndexInformer
。
因此,我们可以按如下方式组织整个事件流。
Indexer
上面,我们整理了从事件接收到分发的所有逻辑,但是在sharedIndexInformer的HandleDeltas方法中,有一些逻辑比较有意思,就是所有的事件都是s.indexer
先更新再分发。
前文提到,Indexer是一个线程安全的存储,作为缓存来缓解资源控制器(Controller)查询资源时对ApiServer的压力。
当有事件更新时,会先刷新Indexer中的缓存,然后将事件分发给资源控制器,资源控制器会先从Indexer获取资源详情,从而减少对APIServer的不必要查询请求。
Indexer存储的具体实现在client-go/tools/cache/thread_safe_store.go中,数据存储在threadSafeMap
.
type threadSafeMap struct {lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers// indices maps a name to an Index
indices Indices
}
本质上,threadSafeMap
是一个带有读/写锁的映射,除此之外还可以定义索引,有趣的是由两个字段实现。
-
Indexers
是一个定义了多个索引函数的map,key是indexName,value是索引函数(计算资源的索引值)。
-
Indices
保存索引值和数据key的映射关系,Indices
是一个两级的map,第一级的key是indexName,对应Indexers
并决定用什么方法计算索引值,value是一个保存关联的map “索引值-资源键”关联。
相关逻辑比较简单,如下图所示。
MutationDetector
更新数据的HandleDeltas
方法除了.sharedIndexInformer
s.indexer
s.cacheMutationDetector
开头提到,在sharedIndexInformer
启动的时候,也会启动一个cacheMutationDetector
来监控索引器的缓存。
因为 indexer 缓存实际上是一个指针,所以多个 Controller 访问 indexer 的缓存资源实际上得到的是同一个资源实例。如果一个Controller玩不好,修改了一个资源的属性,肯定会影响其他Controller的正确性。
当 Informer 接收到新事件时,MutationDetector 会保存指向资源的指针(索引器也是如此)和资源的深层副本。通过周期性地检查指针指向的资源是否与开头存储的深拷贝相匹配,我们就知道缓存的资源是否被修改过。
但是,是否启用监控会受到环境变量的影响KUBE_CACHE_MUTATION_DETECTOR
。如果未设置此环境变量,sharedIndexInformer
将实例化dummyMutationDetector
并且在启动后不执行任何操作。
如果KUBE_CACHE_MUTATION_DETECTOR
为true
,sharedIndexInformer 实例化defaultCacheMutationDetector
,它以 1s 的间隔执行缓存的定期检查,如果发现缓存被修改,则触发故障处理函数,如果未定义该函数,则触发恐慌。
概括
本文对ControllerManager进行了狭义的解释,因为它不包括具体的资源管理器(Controller),而只是解释了ControllerManager是如何“管理控制器”的。
可以看到ControllerManager做了很多工作来保证Controller可以只关注自己关心的事件,而这项工作的核心就是Informer。当您了解 Informer 如何与其他组件一起工作时,就会清楚控制器管理器为资源管理器铺平了道路。