Kubernetes GC in V1.3 源码分析

时间:2022-12-19 05:44:26

本文是对Kubernetes V1.3发布的新Garbage Collector模块的源码解读。实际上本文的是基于kubernetes v1.4的代码进行分析的,和V1.3没有大的改动。GC worker的默认值从V1.3中的5调整为V1.4中的20。阅读本文前,请先阅读Kubernetes GC in v1.3。如果可以,先把kubernetes gc pr多看几遍,或许你根本不需要看我的这两篇博客。

Garbage Collector的定义

GarbageCollector is responsible for carrying out cascading deletion, and removing ownerReferences from the dependents if the owner is deleted with DeleteOptions.OrphanDependents=true.

type GarbageCollector struct { restMapper meta.RESTMapper // metaOnlyClientPool uses a special codec, which removes fields except for // apiVersion, kind, and metadata during decoding. metaOnlyClientPool dynamic.ClientPool // clientPool uses the regular dynamicCodec. We need it to update // finalizers. It can be removed if we support patching finalizers. clientPool dynamic.ClientPool dirtyQueue *workqueue.TimedWorkQueue orphanQueue *workqueue.TimedWorkQueue monitors []monitor propagator *Propagator clock clock.Clock registeredRateLimiter *RegisteredRateLimiter registeredRateLimiterForMonitors *RegisteredRateLimiter // GC caches the owners that do not exist according to the API server. absentOwnerCache *UIDCache }

接下来看看创建Garbage Collector的逻辑,主要看Propagator和monitors的构造。

func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
    gc := &GarbageCollector{
        metaOnlyClientPool:               metaOnlyClientPool,
        clientPool:                       clientPool,
        restMapper:                       mapper,
        clock:                            clock.RealClock{},
        dirtyQueue:                       workqueue.NewTimedWorkQueue(),
        orphanQueue:                      workqueue.NewTimedWorkQueue(),
        registeredRateLimiter:            NewRegisteredRateLimiter(deletableResources),
        registeredRateLimiterForMonitors: NewRegisteredRateLimiter(deletableResources),
        absentOwnerCache:                 NewUIDCache(500),
    }
    gc.propagator = &Propagator{
        eventQueue: workqueue.NewTimedWorkQueue(),
        uidToNode: &concurrentUIDToNode{
            RWMutex:   &sync.RWMutex{},
            uidToNode: make(map[types.UID]*node),
        },
        gc: gc,
    }
    for resource := range deletableResources {
        if _, ok := ignoredResources[resource]; ok {
            glog.V(6).Infof("ignore resource %#v", resource)
            continue
        }
        kind, err := gc.restMapper.KindFor(resource)
        if err != nil {
            return nil, err
        }
        monitor, err := gc.monitorFor(resource, kind)
        if err != nil {
            return nil, err
        }
        gc.monitors = append(gc.monitors, monitor)
    }
    return gc, nil
}

注意在构建monitors的过程中,需要屏蔽以下Resources的monitor。

var ignoredResources = map[schema.GroupVersionResource]struct{}{
    schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}:              {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}:                                           {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}:                                  {},
    schema.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}:                                             {},
    schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1beta1", Resource: "tokenreviews"}:             {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}:      {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}:  {},
    schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
}

启动Garbage Collector

  • 每个monitor的controller都启动一个go协程运行,10s遍历一次所有monitor是否已经完成监控资源的同步。
  • 启动go协程运行Propagator的processEvent方法,开始逐条处理Propagator的Event Queue中的Event。
  • 默认启动20个GC worker(GarbageCollector.worker())对Dirty Queue中的Resources进行处理。
  • 默认启动20个GC orphanFinalizer (GarbageCollector.orphanFinalizer())对orphan Queue中的Resources进行处理。
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
    glog.Infof("Garbage Collector: Initializing")
    for _, monitor := range gc.monitors {
        go monitor.controller.Run(stopCh)
    }

    wait.PollInfinite(10*time.Second, func() (bool, error) {
        for _, monitor := range gc.monitors {
            if !monitor.controller.HasSynced() {
                glog.Infof("Garbage Collector: Waiting for resource monitors to be synced...")
                return false, nil
            }
        }
        return true, nil
    })
    glog.Infof("Garbage Collector: All monitored resources synced. Proceeding to collect garbage")

    // worker
    go wait.Until(gc.propagator.processEvent, 0, stopCh)

    for i := 0; i < workers; i++ {
        go wait.Until(gc.worker, 0, stopCh)
        go wait.Until(gc.orphanFinalizer, 0, stopCh)
    }
    Register()
    <-stopCh
    glog.Infof("Garbage Collector: Shutting down")
    gc.dirtyQueue.ShutDown()
    gc.orphanQueue.ShutDown()
    gc.propagator.eventQueue.ShutDown()
}

GC worker(Garbage Processor)

GC worker(Garbage Processor)dirty queue的处理逻辑:

  • Consists of the Dirty Queue and workers.
  • Each worker:
    • Dequeues an item from Dirty Queue.
    • If the item’s OwnerReferences is empty, continues to process the next item in the Dirty Queue.
    • Otherwise checks each entry in the OwnerReferences:
      • If at least one owner exists, do nothing.
      • If none of the owners exist, requests the API server to delete the item.

简单的说,GC worker就是在delete object的时候,先去遍历所有owner,只有当所有owner都已经被delete之后,才会请求api server去delete该object

func (gc *GarbageCollector) worker() {
    timedItem, quit := gc.dirtyQueue.Get()
    if quit {
        return
    }
    defer gc.dirtyQueue.Done(timedItem)
    err := gc.processItem(timedItem.Object.(*node))
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err))
        // retry if garbage collection of an object failed.
        gc.dirtyQueue.Add(timedItem)
        return
    }
    DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}


func (gc *GarbageCollector) processItem(item *node) error {
    // Get the latest item from the API server
    latest, err := gc.getObject(item.identity)
    if err != nil {
        if errors.IsNotFound(err) {
            // the Propagator can add "virtual" node for an owner that doesn't
            // exist yet, so we need to enqueue a virtual Delete event to remove
            // the virtual node from Propagator.uidToNode.
            glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
            event := &event{
                eventType: deleteEvent,
                obj:       objectReferenceToMetadataOnlyObject(item.identity),
            }
            glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
            gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
            return nil
        }
        return err
    }
    if latest.GetUID() != item.identity.UID {
        glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
        event := &event{
            eventType: deleteEvent,
            obj:       objectReferenceToMetadataOnlyObject(item.identity),
        }
        glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
        gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
        return nil
    }
    ownerReferences := latest.GetOwnerReferences()
    if len(ownerReferences) == 0 {
        glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
        return nil
    }
    // TODO: we need to remove dangling references if the object is not to be
    // deleted.
    for _, reference := range ownerReferences {
        if gc.absentOwnerCache.Has(reference.UID) {
            glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
            continue
        }
        // TODO: we need to verify the reference resource is supported by the
        // system. If it's not a valid resource, the garbage collector should i)
        // ignore the reference when decide if the object should be deleted, and
        // ii) should update the object to remove such references. This is to
        // prevent objects having references to an old resource from being
        // deleted during a cluster upgrade.
        fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
        client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
        if err != nil {
            return err
        }
        resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0)
        if err != nil {
            return err
        }
        owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name)
        if err == nil {
            if owner.GetUID() != reference.UID {
                glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
                gc.absentOwnerCache.Add(reference.UID)
                continue
            }
            glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
            return nil
        } else if errors.IsNotFound(err) {
            gc.absentOwnerCache.Add(reference.UID)
            glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
        } else {
            return err
        }
    }
    glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity)
    return gc.deleteObject(item.identity)
}

Propagator

Propagator中的single worker对Event Queue的处理逻辑见如下代码。再对比kubernetes gc pr对Propagator的功能描述。我就不多说废话了!

  • The Propagator is for optimization, not for correctness.
  • Consists of an Event Queue, a single worker, and a DAG of owner-dependent relations.
    • The DAG stores only name/uid/orphan triplets, not the entire body of every item.
  • Watches for create/update/delete events for all resources, enqueues the events to the Event Queue.
  • Worker:
    • Dequeues an item from the Event Queue.
    • If the item is an creation or update, then updates the DAG accordingly.
      • If the object has an owner and the owner doesn’t exist in the DAG yet, then apart from adding the object to the DAG, also enqueues the object to the Dirty Queue.
    • If the item is a deletion, then removes the object from the DAG, and enqueues all its dependent objects to the Dirty Queue.
  • The propagator shouldn’t need to do any RPCs, so a single worker should be sufficient. This makes locking easier.
  • With the Propagator, we only need to run the Scanner when starting the GC to populate the DAG and the Dirty Queue.
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() {
    timedItem, quit := p.eventQueue.Get()
    if quit {
        return
    }
    defer p.eventQueue.Done(timedItem)
    event, ok := timedItem.Object.(*event)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object))
        return
    }
    obj := event.obj
    accessor, err := meta.Accessor(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
        return
    }
    typeAccessor, err := meta.TypeAccessor(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
        return
    }
    glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
    // Check if the node already exsits
    existingNode, found := p.uidToNode.Read(accessor.GetUID())
    switch {
    case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
        newNode := &node{
            identity: objectReference{
                OwnerReference: metav1.OwnerReference{
                    APIVersion: typeAccessor.GetAPIVersion(),
                    Kind:       typeAccessor.GetKind(),
                    UID:        accessor.GetUID(),
                    Name:       accessor.GetName(),
                },
                Namespace: accessor.GetNamespace(),
            },
            dependents: make(map[*node]struct{}),
            owners:     accessor.GetOwnerReferences(),
        }
        p.insertNode(newNode)
        // the underlying delta_fifo may combine a creation and deletion into one event
        if shouldOrphanDependents(event, accessor) {
            glog.V(6).Infof("add %s to the orphanQueue", newNode.identity)
            p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode})
        }
    case (event.eventType == addEvent || event.eventType == updateEvent) && found:
        // caveat: if GC observes the creation of the dependents later than the
        // deletion of the owner, then the orphaning finalizer won't be effective.
        if shouldOrphanDependents(event, accessor) {
            glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity)
            p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode})
        }
        // add/remove owner refs
        added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
        if len(added) == 0 && len(removed) == 0 {
            glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event)
            return
        }
        // update the node itself
        existingNode.owners = accessor.GetOwnerReferences()
        // Add the node to its new owners' dependent lists.
        p.addDependentToOwners(existingNode, added)
        // remove the node from the dependent list of node that are no long in
        // the node's owners list.
        p.removeDependentFromOwners(existingNode, removed)
    case event.eventType == deleteEvent:
        if !found {
            glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
            return
        }
        p.removeNode(existingNode)
        existingNode.dependentsLock.RLock()
        defer existingNode.dependentsLock.RUnlock()
        if len(existingNode.dependents) > 0 {
            p.gc.absentOwnerCache.Add(accessor.GetUID())
        }
        for dep := range existingNode.dependents {
            p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep})
        }
    }
    EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime))
}

orphanFinalizer

orphanFinalizer dequeues a node from the orphanQueue, then finds its dependents based on the graph maintained by the GC, then removes it from the OwnerReferences of its dependents, and finally updates the owner to remove the “Orphan” finalizer. The node is add back into the orphanQueue if any of these steps fail.
阅读如下代码的同时,对比kubernetes gc pr对OrphanFinalizer的功能描述。

简单的说,orphanFinalizer最重要的就是通过owners和dependents的对应关系,在delete object的时候,去把自己从它的dependents object的ownerReference中删除。

  • Watches for update events that meet two conditions:

    • the updated object has the identifier of the finalizer in ObjectMeta.Finalizers;
    • ObjectMeta.DeletionTimestamp is updated from nil to non-nil.
  • Removes the object in the event from the OwnerReferences of its dependents.

    • dependent objects can be found via the DAG kept by the GC, or by relisting the dependent resource and checking the OwnerReferences field of each potential dependent object.
  • Also removes any dangling owner references the dependent objects have.
  • At last, removes the itself from the ObjectMeta.Finalizers of the object.
func (gc *GarbageCollector) orphanFinalizer() {
    timedItem, quit := gc.orphanQueue.Get()
    if quit {
        return
    }
    defer gc.orphanQueue.Done(timedItem)
    owner, ok := timedItem.Object.(*node)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object))
    }
    // we don't need to lock each element, because they never get updated
    owner.dependentsLock.RLock()
    dependents := make([]*node, 0, len(owner.dependents))
    for dependent := range owner.dependents {
        dependents = append(dependents, dependent)
    }
    owner.dependentsLock.RUnlock()

    err := gc.orhpanDependents(owner.identity, dependents)
    if err != nil {
        glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
        gc.orphanQueue.Add(timedItem)
        return
    }
    // update the owner, remove "orphaningFinalizer" from its finalizers list
    err = gc.removeOrphanFinalizer(owner)
    if err != nil {
        glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
        gc.orphanQueue.Add(timedItem)
    }
    OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}

Gabarbage Collector is a controller started by controller-manager

kube-controller-manager组件启动的时候,会启动所有配置的controller,包括garbagecollector,replicationcontroller,horizontalpodautoscaling等。

/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go

// Run runs the CMServer.  This should never exit.
func Run(s *options.CMServer) error {
    ...
        err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
        glog.Fatalf("error running controllers: %v", err)
        panic("unreachable")
    ...
}

func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
    for controllerName, initFn := range controllers {
            time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
            ...
            started, err := initFn(ctx)
            ...
        }
}

func newControllerInitializers() map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefuleset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["certificatesigningrequests"] = startCSRController

    return controllers
}

对于garbage collector的启动,则是调用startGarbageCollectorController方法,最终启动一个go协程执行前面分析过的garbageCollector.Run(workers, ctx.Stop)。

/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go

func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
    ...
    go garbageCollector.Run(workers, ctx.Stop)

    return true, nil
}

前面提到,默认会启动20个Garbage worker & orphanFinalizer,这是在controller manager创建的时候指定的该默认值。注意,在V1.3版本中,默认值为5。

/Users/garnett/workspace/go/src/k8s.io/kubernetes/cmd/kube-controller-manager/app/options/options.go

// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
    s := CMServer{
        KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{
        ...
        ConcurrentGCSyncs:       20,
        ...
    }
    ...
}

End-to-End Examples

最后,最为补充,我不得不把kubernetes gc pr中的example拿出来,这个是理解GC整个流程最好的东西。

This section presents an example of all components working together to enforce the cascading deletion or orphaning.

Life of a Deployment and its descendants

  1. User creates a deployment D1.
  2. The Propagator of the GC observes the creation. It creates an entry of D1 in the DAG.
  3. The deployment controller observes the creation of D1. It creates the replicaset R1, whose OwnerReferences field contains a reference to D1, and has the “orphan” finalizer in its ObjectMeta.Finalizers map.
  4. The Propagator of the GC observes the creation of R1. It creates an entry of R1 in the DAG, with D1 as its owner.
  5. The replicaset controller observes the creation of R1 and creates Pods P1~Pn, all with R1 in their OwnerReferences.
  6. The Propagator of the GC observes the creation of P1~Pn. It creates entries for them in the DAG, with R1 as their owner.

    In case the user wants to cascadingly delete D1’s descendants, then

  7. The user deletes the deployment D1, with DeleteOptions.OrphanDependents=false. API server checks if D1 has “orphan” finalizer in its Finalizers map, if so, it updates D1 to remove the “orphan” finalizer. Then API server deletes D1.

  8. The “orphan” finalizer does not take any action, because the observed deletion shows D1 has an empty Finalizers map.
  9. The Propagator of the GC observes the deletion of D1. It deletes D1 from the DAG. It adds its dependent object, replicaset R1, to the dirty queue.
  10. The Garbage Processor of the GC dequeues R1 from the dirty queue. It finds R1 has an owner reference pointing to D1, and D1 no longer exists, so it requests API server to delete R1, with DeleteOptions.OrphanDependents=false. (The Garbage Processor should always set this field to false.)
  11. The API server updates R1 to remove the “orphan” finalizer if it’s in the R1’s Finalizers map. Then the API server deletes R1, as R1 has an empty Finalizers map.
  12. The Propagator of the GC observes the deletion of R1. It deletes R1 from the DAG. It adds its dependent objects, Pods P1~Pn, to the Dirty Queue.
  13. The Garbage Processor of the GC dequeues Px (1 <= x <= n) from the Dirty Queue. It finds that Px have an owner reference pointing to D1, and D1 no longer exists, so it requests API server to delete Px, with DeleteOptions.OrphanDependents=false.
  14. API server deletes the Pods.

    In case the user wants to orphan D1’s descendants, then

  15. The user deletes the deployment D1, with DeleteOptions.OrphanDependents=true.

  16. The API server first updates D1, with DeletionTimestamp=now and DeletionGracePeriodSeconds=0, increments the Generation by 1, and add the “orphan” finalizer to ObjectMeta.Finalizers if it’s not present yet. The API server does not delete D1, because its Finalizers map is not empty.
  17. The deployment controller observes the update, and acknowledges by updating the D1’s ObservedGeneration. The deployment controller won’t create more replicasets on D1’s behalf.
  18. The “orphan” finalizer observes the update, and notes down the Generation. It waits until the ObservedGeneration becomes equal to or greater than the noted Generation. Then it updates R1 to remove D1 from its OwnerReferences. At last, it updates D1, removing itself from D1’s Finalizers map.
  19. The API server handles the update of D1, because i) DeletionTimestamp is non-nil, ii) the DeletionGracePeriodSeconds is zero, and iii) the last finalizer is removed from the Finalizers map, API server deletes D1.
  20. The Propagator of the GC observes the deletion of D1. It deletes D1 from the DAG. It adds its dependent, replicaset R1, to the Dirty Queue.
  21. The Garbage Processor of the GC dequeues R1 from the Dirty Queue and skips it, because its OwnerReferences is empty.