Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

时间:2021-07-15 17:29:59

0. 前言

  • 继续上一篇博客阅读 Kubernetes 源码,参照《k8s 源码阅读》首先学习 Kubernetes 的一些核心组件,首先是 kube-scheduler
  • 本文严重参考原文:《k8s 源码阅读》之 2.2 章节:scheduler,加入部分自己阅读的体会作为自己的阅读笔记
  • 感谢《k8s 源码阅读》的作者们辛苦编写教材,在此郑重表示感谢,望大家多多支持!~

1. 整体设计

1.1 概述

  • 官网描述:
    • The Kubernetes scheduler runs as a process alongside the other master components such as the API server.
    • Its interface to the API server is to watch for Pods with an empty PodSpec.NodeName, and for each Pod,
    • it posts a binding indicating where the Pod should be scheduled.
  • Scheduler 是相对独立的一个组件,主动访问 API server,寻找等待调度的 Pod(PodSpec.NodeName 为空)
  • 然后通过一系列调度算法寻找哪个 Node 适合跑这个 Pod
  • 然后将这个 Pod 和 Node 的绑定关系发给 API server,即整个调度流程

1.2 源码层次

  • cmd/kube-scheduler/scheduler.go:main() 函数入口位置,在 Scheduler 过程开始前的一系列初始化工作
  • pkg/scheduler/scheduler.go:调度框架整体逻辑,抽象调用各个算法的 Interface
  • pkg/scheduler/core/generic_scheduler.go:计算哪些 Node适合跑哪些 Pod 的具体算法

1.3 调度流程

  • 通过一系列的 Predicates(预选) 过滤掉不能运行 Pod 的 Node
    • 比如一个 Pod 需要 500M 的内存,有些节点剩余内存只有 100M 了,就会被剔除
  • 通过一系列的 Priorities(优选)给剩下的 Node 排一个分数,寻找能够运行 Pod 的 Node 中最合适的一个
  • 得分最高的一个 Node 胜出,获得了运行 Pod 的资格
  • 若是上述预选过程中没有找到任何一个合适的 Node,则进入抢占模式,移除部分低优先级的 Pod,再选择 Node

1.4 Predicates 和 Priorities 策略

  • Predicates 是用于过滤不合适 Node的策略
  • Priorities 是用于计算 Node 分数的策略(作用在通过 Predicates 过滤的 Node上)
  • Kubernetes 默认内建了一些 Predicates 和 Priorities 策略,代码分别在:
    • pkg/scheduler/algorithm/predicates/predicates.go
    • pkg/scheduler/algorithm/priorities/

1.5 调度策略的修改

  • 默认调度策略是通过 defaultPredicates() 和 defaultPriorities() 函数定义的
  • 代码在 pkg/scheduler/algorithmprovider/defaults/defaults.go
  • 可以通过命令行 flag --policy-config-file 来覆盖默认行为
    • 可以修改 pkg/scheduler/algorithm/predicates/predicates.go/pkg/scheduler/algorithm/priorities/,然后注册到 defaultPredicates() 和 defaultPriorities() 来实现
    • 或者通过配置文件:
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "NoVolumeZoneConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : },
{"name" : "BalancedResourceAllocation", "weight" : },
{"name" : "ServiceSpreadingPriority", "weight" : },
{"name" : "EqualPriority", "weight" : }
],
"hardPodAffinitySymmetricWeight" : ,
"alwaysCheckAllPredicates" : false
}

2. 启动前逻辑

  • Scheduler 可以分为三层,第一层是调度器启动前的逻辑,包括:命令行参数解析、参数校验、调度器初始化等一系列逻辑

2.1 Cobra

2.1.1 简介

  • Cobra 既是一个创建强大的现代化命令行程序的库,又是一个用于生成应用和命令行文件的程序。有很多流行的 Golang 项目用了 Cobra(Kubernetes 和 Docker)

2.1.2 具体使用

  • Go 1.11 版本以上先暂时关闭 go modules,安装 Cobra:go get -u github.com/spf13/cobra/cobra
    • 若是网卡,可能导致安装不成功,可以先下载报错的依赖包,再重新 go get 安装
    • 安装成功后会看到 $GOPATH/
  • 进入 $GOPATH/src 目录,初始化项目:cobra init myapp --pkg-name myapp
$ cobra init myapp --pkg-name myapp
Your Cobra applicaton is ready at
/home/ao/go/src/myapp
$ ls myapp
cmd LICENSE main.go
$ pwd
/home/ao/go/src
  • 本地可以看到一个 main.go 和一个 cmd 目录
  • 查看 cmd/root.go:
package cmd

import (
"fmt"
"os"
"github.com/spf13/cobra" homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/viper" ) var cfgFile string // rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "myapp",
Short: "A brief description of your application",
Long: `A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}
  • 查看 main.go:
package main

import "myapp/cmd"

func main() {
cmd.Execute()
}
  • main.go 里 import了 myapp/cmd(也就是 root.go 文件)
    • 在 Execute 里面调用了 rootCmd.Execute() 方法
    • rootCmd 是*cobra.Command 类型
  • 添加新的命令:cobra add version
$ cobra add version
version created at /home/ao/go/src/myapp%
  • 查看 cmd/version.go:
    • init() 函数里面调用 rootCmd.AddCommand(versionCmd),子命令就是 version
package cmd

import (
"fmt" "github.com/spf13/cobra"
) // versionCmd represents the version command
var versionCmd = &cobra.Command{
Use: "version",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("version called")
},
} func init() {
rootCmd.AddCommand(versionCmd)
......
}
  • 添加多级子命令:

$ cobra add server
server created at /home/ao/go/src/myapp%
$ cobra add create -p serverCmd
create created at /home/ao/go/src/myapp%
  • 查看 cmd/create.go 文件:
    • init() 函数调用 serverCmd.AddCommand(createCmd),表示 create 为子命令
package cmd

import (
"fmt" "github.com/spf13/cobra"
) // createCmd represents the create command
var createCmd = &cobra.Command{
Use: "create",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("create called")
},
} func init() {
serverCmd.AddCommand(createCmd)
......
}

2.2 Scheduler 的 main 函数

  • 下面正式进入 Kubernetes 的 Scheduler 源码阅读
  • 查看 cmd/kube-scheduler/scheduler.go,删除非主干代码:
func main() {
command := app.NewSchedulerCommand()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit()
}
}
  • 查看定义 NewSchedulerCommand 的 cmd/kube-scheduler/app/server.go,删除非主干代码:
/ NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit()
}
},
}
return cmd
}
  • Schduler 启动时调用了 runCommand(cmd, args, opts),查看 cmd/kube-scheduler/app/server.go 关于 runCommand 定义,删除非主干代码:
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
c, err := opts.Config()
stopCh := make(chan struct{})
// Get the completed config
cc := c.Complete()
return Run(cc, stopCh)
}
  • 处理配置问题后调用了一个 Run() 函数,Run() 的作用是基于给定的配置启动 Scheduler,它只会在出错时或者 channel stopCh 被关闭时才退出,查看 cmd/kube-scheduler/app/server.go 关于 Run 定义,删除非主干代码:
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName)) // Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
} ctx, cancel := context.WithCancel(context.TODO())
defer cancel() go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}() // Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
  • 最终是要跑 sched.Run() 这个方法来启动 Scheduler,sched.Run() 方法就是 Scheduler 框架真正运行的逻辑
  • 上述有一个 sched 变量,linux 里经常会看到一些软件叫 ***d,d 也就是 daemon,守护进程的意思,也就是一直跑在后台的一个程序
    • 这里的 sched 也就是指 “Scheduler Daemon“
    • sched 的其实是 *Scheduler 类型,定义在 pkg/scheduler/scheduler.go
      • Scheduler 监控新创建的未被调度的 Pods,然后尝试寻找合适的 Node,回写一个绑定关系到 API Server
      • 这里也可以体会到 Daemon 的感觉,我们平时搭建的 k8s 集群中运行着一个 Daemon 进程叫做 kube-scheduler,在程序里面也就对应这样一个对象:Scheduler
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *factory.Config
}
  • Scheduler 结构体中的 Config 属性对象定义在 pkg/scheduler/factory/factory.go,主要属性如下:

// Config is an implementation of the Scheduler's configured input data.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor // NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod // SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
}

3. 整体调度框架

3.1 启动

  • 调度的第二层:负责 Scheduler 除了具体 Node 过滤算法外的工作逻辑
  • Scheduler 对象绑定了一个 Run() 方法,定义如下:
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, , sched.config.StopEverything)
}
  • Run 函数在 cmd/kube-scheduler/app/server.go 调用:
    // Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
  • 调用了 sched.Run() 之后就等待 ctx.Done()
  • wait.Until 实现:每隔 n 时间调用 f 一次,除非 channel c 被关闭
    • 这里的 n 就是 0,也就是一直调用,前一次调用返回下一次调用就开始了
    • f 指 sched.scheduleOne,c 指 sched.config.StopEverything

3.2 一个 Pod 的基本调度流程

  • scheduleOne 实现 1 个 Pod 的完整调度工作流
  • 这个过程是顺序执行的,也就是非并发的
    • 即前一个 Pod 的 scheduleOne 一完成,下一个 Pod 的 ScheduleOne 立马接着执行
  • scheduleOne 的主要逻辑:
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
suggestedHost, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
}
return
}
assumedPod := pod.DeepCopy()
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
err = sched.assume(assumedPod, suggestedHost)
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
}()
}
  • 参考流程如下:

Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

3.3 Schedule 算法计算合适 Node

  • 主流程核心步骤是 suggestedHost, err := sched.schedule(pod)
  • 这里完成了非抢占模式下 Node 的计算,包括预选过程、优选过程等
  • sched.schedule(pod) 方法定义见 pkg/scheduler/scheduler.go
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
return "", err
}
return host, err
}
  • 里面调用了 sched.config.Algorithm.Schedule() 方法(本身为接口),定义见 pkg/scheduler/algorithm/scheduler_interface.go
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Predicates() map[string]FitPredicate
Prioritizers() []PriorityConfig
}
  • 这个接口有 4 个方法,实现 ScheduleAlgorithm 接口的对象实现如何调度 Pods 到 Nodes 上
  • 默认的实现是 pkg/scheduler/core/generic_scheduler.gogenericScheduler 对象
    • Schedule():给定 Pod 和 Nodes,计算出一个适合跑 Pod 的 Node 并返回
    • Preempt():抢占模式
    • Predicates():预选过程
    • Prioritizers():优选过程

4. 一般调度过程

6. 参考文献