kubernetes1.8 源码分析之资源调度

时间:2022-09-22 14:37:04

之前1.4的源码分析已经讲过调度的过程,现在载对一些细节补充一下。首先是k8s会加载哪些资源调度算法呢,其实它支持两种,一种是配置文件,一种是默认代码中指定的,下面看看plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go里面默认的调度算法,一个是筛选

func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxEBSVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(aws.DefaultMaxEBSVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxGCEPDVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(DefaultMaxGCEPDVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxAzureDiskVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(DefaultMaxAzureDiskVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
"MatchInterPodAffinity",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
),

// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),

// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),

// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),

// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),

// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),

// Fit is determined by node disk mount condition.
factory.RegisterFitPredicate("CheckNodeDiskMountPressure", predicates.CheckNodeDiskMountPressurePredicate),

// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeNodeConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeNodePredicate(args.PVInfo, args.PVCInfo, nil)
},
),
)
}

筛选主要包括磁盘冲突、内存压力、节点状态,当然还包括节点端口标签选择等,
还有就是默认打分策略

func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),

// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),

// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),

// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),

// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),

// TODO: explain what it does.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
)
}

这个里面主要是一些资源优化调度保证资源平均使用以及标签和pod的亲和性关联等。当系统以及加载好这些过滤和打分的算法以后,那么当来了一个pod需要调度的时候,改怎么去调度呢?在说具体调度时候,先看看cache机制,这个和k8s其它组件里面的cache是一样的,避免频繁调用k8s 的apiserver。先看cache接口

type Cache interface {
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePod(pod *v1.Pod) error

// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *v1.Pod) error

// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *v1.Pod) error

// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error

// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(oldPod, newPod *v1.Pod) error

// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *v1.Pod) error

// AddNode adds overall information about node.
AddNode(node *v1.Node) error

// UpdateNode updates overall information about node.
UpdateNode(oldNode, newNode *v1.Node) error

// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error

// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error

// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error)
}

包括了pod和node的一些方法,当watch这些资源变化时候就可以更新里面的数据,篇幅有限,那一个方法说一下,先看注册事件plugin/pkg/scheduler/factory/factory.go

    nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
0,
)

上面的node add的事件将会出发cache的AddNode方法。

func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()

n, ok := cache.nodes[node.Name]
if !ok {
n = NewNodeInfo()
cache.nodes[node.Name] = n
}
return n.SetNode(node)
}

那么cache里面就可以保存这个node信息了。通过处理保存到cache里面的数据

func (n *NodeInfo) SetNode(node *v1.Node) error {
n.node = node
for rName, rQuant := range node.Status.Allocatable {
switch rName {
case v1.ResourceCPU:
n.allocatableResource.MilliCPU = rQuant.MilliValue()
case v1.ResourceMemory:
n.allocatableResource.Memory = rQuant.Value()
case v1.ResourceNvidiaGPU:
n.allocatableResource.NvidiaGPU = rQuant.Value()
case v1.ResourcePods:
n.allowedPodNumber = int(rQuant.Value())
case v1.ResourceStorageScratch:
n.allocatableResource.StorageScratch = rQuant.Value()
case v1.ResourceStorageOverlay:
n.allocatableResource.StorageOverlay = rQuant.Value()
default:
if v1helper.IsOpaqueIntResourceName(rName) {
n.allocatableResource.SetOpaque(rName, rQuant.Value())
}
}
}
n.taints = node.Spec.Taints
for i := range node.Status.Conditions {
cond := &node.Status.Conditions[i]
switch cond.Type {
case v1.NodeMemoryPressure:
n.memoryPressureCondition = cond.Status
case v1.NodeDiskPressure:
n.diskPressureCondition = cond.Status
case v1.NodeDiskMountPressure:
n.diskMountPressureCondition = cond.Status
default:
// We ignore other conditions.
}
}
n.generation++
return nil
}

当然处理缓存节点状态cache还缓存pod状态plugin/pkg/scheduler/schedulercache/cache.go

    podStates map[string]*podState
nodes map[string]*NodeInfo

说完了cache的机制后,我们回到上面,一个pod过来怎么样调度去bind呢。当一个pod需要被调度时候回执行
plugin/pkg/scheduler/core/generic_scheduler.go里面的Schedule

    err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err
}

trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache)

这个里面有两个方法需要分析,第一是UpdateNodeNameToInfoMap它是更新被调度节点信息的,plugin/pkg/scheduler/schedulercache/cache.go

func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
nodeNameToInfo[name] = info.Clone()
}
}
for name := range nodeNameToInfo {
if _, ok := cache.nodes[name]; !ok {
delete(nodeNameToInfo, name)
}
}
return nil
}

这个里面通过info.Clone()吧cache里面节点信息,复制给nodeNameToInfo这个调度时候使用的筛选节点。第二个要说的方法是重点,findNodesThatFit,这个是具体调度时候使用的,进入这个方法

fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache)

这个里面通过16协程并行去执行新节点检查,调用上面的podFitsOnNode方法去判读是否合适,而podFitsOnNode这个方法里面会遍历去判断筛选方法

    for predicateKey, predicate := range predicateFuncs {
// If equivalenceCache is available
if eCacheAvailable {
// PredicateWithECache will returns it's cached predicate results
fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash)
}

if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, meta, info)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}

if eCacheAvailable {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
}
}

if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
}
}

通过for循环逐一执行predicate这个函数指针。这个函数就是之前注册的筛选函数,通过返回fit是否为ture来判断是和合适,如果不合适,通过第二参数reasons去获取失败的原因。看一个最简单的磁盘压力的筛选函数

func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// is node under presure?
if nodeInfo.DiskPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
}
return true, nil, nil
}

就是上面说的返回fit是否合适和reason。好了,整个筛选的流程走完了!