Kubernetes Scheduler 源码分析
Kubernetes Scheduler
最近在看 k8s 调度相关的一些内容,希望给自己这阵子了解的知识进行一个整理,会连载几篇文章,都会关于调度的,初步的想法是 kubernetes 的默认调度器,然后是新引入的 scheduler framework,然后是 kube-batch 和 volcano, 如果还有时间会对这些内容做一个整体性的总结,和我自己的一些看法。这里是第一篇 kubenetes 的默认调度器。
Kubernetes Scheduler 是 Kubernetes 的核心组件之一,主要根据一些调度算法,将没有调度将 Pod 放置到合适的 Node 上,然后对应 Node 上的 Kubelet 才能够运行这些 pod。可以认为 scheduler 就是集群负载的管理者,针对用户或者组件创建的 pod 的负载,为其在集群中找到一个合适的节点,然后让对应节点的 kubelet 服务进程将其运行起来。这篇文章的结构大概是这样的,我们先会从宏观上去看一下 scheduler 的工作原理,然后到每一个部分看一下对应关键代码的实现,其中会尝试去穿插一些我了解到的 scheduler 的版本迭代的过程,然后是一些思考和展望。为了方便复现和讲解,下面对kube-scheduler 代码的分析使用的是 kubernetes 的 v1.19.1 的版本的代码,如果是谈到其他版本也会在其中标注起来。
Kubernetes 默认的调度器就在 kubernetes 的 repos 中,跟其他的组件一样,都是把入口放在 cmd/kube-schduler 中,把实现放在 pkg/scheduler 中,也是用 cobra 命令行工具启动,我们先看一下,整体的架构。然后会从源码的角度一步步看其调用分析。
type Scheduler struct {
// 主要是缓存现在集群调度的状态,如上图所示,会保留集群中所有已调度 pod 的状态,node 的状态,和assumedpod,防止 pod 被重新调度。
SchedulerCache internalcache.Cache
// Algorithm 需要实现 Schedule 的方法,输入一个 pod 可以找到合适 node。默认是通过
// pkg/scheduler/core/generic_scheduler.go 的 generic_scheduler 实现的。
Algorithm core.ScheduleAlgorithm
// 获取下一个 pod
NextPod func() *framework.QueuedPodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.QueuedPodInfo, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue 会保留准备调度的 pod 的队列。
SchedulingQueue internalqueue.SchedulingQueue
// 所有的 plugin 都会以 profile 的形式供默认调度器使用
Profiles profile.Map
scheduledPodsHasSynced func() bool
// 用于和 api-server 的通信
client clientset.Interface
下图是我自己对 scheduler 架构的理解,调度主要的工作通过 informer watch 有没有没有调度的 pod,如果有会尝试找一个合适的 node,然后把 pod 和 node 的 binding 写回给 apiserver。集群状态的信息保留在 schedulerCache 中,集群未调度的 pod 在 schedulingQueue 中。这里只拎出Algorithm,SchedulerCache,SchedulingQueue 来介绍,Profiles 主要是跟 plugins 相关的组件,后面会找在 scheduler framework 中介绍。
- 首先 SchedulerCache 会通过 Informer 读取现在集群的 node, pod 的状态,缓存在内存中。同时监控集群资源状态(如果有创建 node 等事件发生也会及时同步) ,SchedulingQueue 会同时也会运行两个进程,分别定时把 BackoffQ 和 UnschedulabelQ 的 Pod 刷进去 activeQ 中。其中 activeQ 是通过heap 实现的待调度优先队列,BackoffQ ,UnschedulabelQ 顾名思义,是调度出错和不可调度队列,
- 如果有创建 pod 等事件的发生,且 pod 没有被调度过,会直接进入 SchedulingQueue 的 activeQ 队列中。如果已经调度了,就会进入 schedulerCache 中,把信息缓存起来。
- scheduler 主进程会轮询地执行 scheduleOne 的流程
- 通过 NextPod 接口从 activeQ 中读取入队 pod 的信息。
- 然后通过 Algorithm (通过 pkg/scheduler/core/generic_scheduler 实现) 为这个 pod 尝试找到一个合适 node 进行调度,如果这一步失败了,如果设置了可以抢占会触发抢占,不过这里先不涉及,这样这个 pod 会进入 UnschedulabelQ 的队列中。
- 如果调度成功会触发一次 assume,主要是在 cache 中记录这个 pod 为已经调度了,这样下次调度的时候就不会重新调度一次这个 pod。因为 assume 后面 scheduleOne 会起一个 goroutine 来负责 binding 的工作。scheduleOne 会跑下一个循环,如果没有 assume,可能导致 node 的资源会被重新使用等问题。
- binding 会通知 apiserver pod 和 node 的 binding 已经创建,让 api-server 通知 node 的 kubelet 去配置相应的环境,如(网络,存储等),如果 bind 成功。schedulerCache 会有一个进程 cleanupAssumedPods 不断地看这个pod 是否已经 binding 完成或者超时了,如果 binding 完成会去除这个 assume 的标志。
- 另外在 schedulingQueue 中,如上图的右侧部分,会起两个goroutine,flushUnschedulableQLeftover 会不断定时把之前调度失败的(UnschedulableQ) 送到 activeQ 中,flushUnschedulableQLeftover 会把出错的pod在 backoffQ 中往 activeQ刷。
- schedulerCache 也会起一个 goroutine,会不断地把清理已经调度的,或者超时的 assume pod。
这里我会尝试以一种尽可能详尽(啰嗦)的方式去介绍 kube-scheduler 的源码实现,中间会穿插一下,我对 scheduler 版本迭代的一些理解。我们会从 cmd/kube-scheduler/app/server.go 开始说起。
在 cmd/kube-scheduler/app/server.go 中,会通过 cobra 命令行定义 scheduler 的启动命令,最终是运行 cmd/kube-scheduler/app/server.go 中 runCommand 定义的逻辑,调用 Setup 函数,如下:
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup scheduler
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
if len(opts.WriteConfigTo) > 0 {...}
// Run scheduler
return Run(ctx, cc, sched)
// Setup 实现
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
// 1.初始化 client, EventBroadcaster,PodInformer,InformerFactory 并通过 config 回传。
c, err := opts.Config()
// Get the completed config
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
recorderFactory := getRecorderFactory(&cc)
// 2.Create the scheduler.
sched, err := scheduler.New(cc.Client,
if err != nil {...}
return &cc, sched, nil
Setup 函数主要干了几件事。
Setup 函数中,opt.Config() 主要做了下面几件事:
- createClients 启动 clientset
- events.NewEventBroadcasterAdapter(eventClient) 启动 EventBroadcaster
- c.InformerFactory = informers.NewSharedInformerFactory(client, 0) 启动 informer
上面这几步基本所有的 kubernetes 的组件基本都是有的,保证组件跟 api-server 的通信。
然后 scheduler.New 启动 scheduler 实例:
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
... // some config
// 初始化 schedulerCache
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// 注册 intree 的 plugins
registry := frameworkplugins.NewInTreeRegistry()
// 初始化 snapshot
snapshot := internalcache.NewEmptySnapshot()
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
podInformer: podInformer,
schedulerCache: schedulerCache,
registry: registry,
nodeInfoSnapshot: snapshot,
var sched *Scheduler
// 通过上面 configurator 创建 scheduler(sched)
// 可以通过 policy 或者 provider 创建
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.createFromProvider(*source.Provider)
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
initPolicyFromFile(source.Policy.File.Path, policy)
addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
schedulerCache = internalcache.New(30*time.Second, stopEverything) 启动 scheduler 的缓存,主要是会启动 pkg/scheduler/internal/cache/cache.go:cleanupAssumedPods 这个函数定期清理AssumedPods。
registry := frameworkplugins.NewInTreeRegistry() 会读取所有 intree 的 plugins 注册掉,譬如:注册interpodaffinity 的调度算法,这种 plugin 的注册方式是通过 v1.15 开始引入的 scheduler framework 实现的,我们这里不深入。
初始化 Snapshot,初始化一个 map 保存 node 信息,是用在 Algorithm.Schedule 的过程中的,主要是保留一份 cache 的备份
Configurator 是scheduler 的配置器,创建 scheduler(sched) 可以通过 configurator.createFromConfig(*policy) 或者 configurator.createFromProvider(*source.Provider) 来进行,不过无论使用哪种方式创建都会调用 configurator.create() 函数:
- 初始化profiles(plugin)
- 初始化 SchedulingQueue
- 用 NewGenericScheduler 实例化 Algorithm
至此,scheduler 的初始化工作基本完成了。
// create a scheduler from a set of registered plugins. func (c *Configurator) create() (*Scheduler, error) { var extenders []framework.Extender var ignoredExtendedResources []string if len(c.extenders) != 0 { ... // 如果有 extenders } if len(ignoredExtendedResources) > 0 { ... // 对 ignoredExtendedResources 有一些处理 } ... // 初始化 profiles profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory, frameworkruntime.WithPodNominator(nominator)) // 初始化队列优先级函数 lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc() // 初始化调度队列,需要优先级函数,和定义 backoffQ,UnScheduableQ 刷回 activeQ 的周期参数 podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) // 使用 NewGenericScheduler 初始化 Algorithm algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.disablePreemption, c.percentageOfNodesToScore, ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, SchedulingQueue: podQueue, }, nil }
addAllEventHandlers 为 scheduler 提供 eventHandler,如上面架构图所示:event 的接收者主要有两个,SchedulerCache 和 SchedulingQueue,前者是为了跟踪集群的资源和已调度 Pod 的状态(addNodeToCache,addPodToCache),后者主要是给没有调度的 Pod 入队到 activeQ 中(addPodToSchedulingQueue)。这里重点说一下,scheduler 算是在这里完成第一次的过滤。如果是未调度的pod,会经过 event 的过滤器,其中 assignedPod(t) 会看 pod.Spec.NodeName 判断 pod 是不是已经调度或者 assume 了,responsibleForPod(t, sched.Profiles) 会看 pod.Spec.SchedulerName 看一下调度器名字是不是本调度器。
// addAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { // scheduled pod cache podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return assignedPod(t) case cache.DeletedFinalStateUnknown: ... default: ... } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToCache, UpdateFunc: sched.updatePodInCache, DeleteFunc: sched.deletePodFromCache, }, }, ) // unscheduled pod queue podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: ... default: ... } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, ) informerFactory.Core().V1().Nodes().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: sched.addNodeToCache, UpdateFunc: sched.updateNodeInCache, DeleteFunc: sched.deleteNodeFromCache, }, ) ... informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(...) informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(...) informerFactory.Core().V1().Services().Informer().AddEventHandler(...) informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(...) } // 看一下是不是已经被调度了 func assignedPod(pod *v1.Pod) bool { return len(pod.Spec.NodeName) != 0 } // 看一下是不是我们负责的 func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) }
总结一下,Setup 就是初始化scheduler 的各个部件,包括 informer, schedulerCache, schedulingQueue,和实例化 scheduler。
runCommand 在执行完 Setup() 之后会调用Run(),Run 主要分几个部分,首先一般都需要先启动 informer 并等待同步完成,表示集群的状态是已知的,然后启动 scheduler.Run(),会把schedulerCache的清理缓存,schedulingQueue 的定时入队,和 scheduler 主流程 scheduleOne 分别用 goroutine 启动。
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
... // some config
// Prepare the event broadcaster.
... // setup 健康检查的服务器.
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
// Wait for all caches to sync before scheduling.
// 选主逻辑
if cc.LeaderElection != nil {
return fmt.Errorf("finished without leader elect")
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
SchedulingQueue.Run() 会起两个 goroutine ,flushBackoffQCompleted 主要负责把所有 backoff 计时完毕(duration 会因为失败变长)的 pod 往 activeQ刷。flushUnschedulableQLeftover 把所有在 unschedulableQ 的 pod 计时unschedulableQTimeInterval 完毕后送去 activeQ。
scheduleOne 是调度的主逻辑,下面会把 scheduleOne 的逻辑分为3部分,第一部分是调度部分,第二部分是对调度结果进行处理部分(抢占),第三部分是绑定等部分。
- 如上图,从schedulingQueue 的待调度队列 activeQ 取出 pod。
- 进行预选执行 findNodesThatFitPod,会起多个 goroutine 去看 nodeInfo 合不合适,NodeAffinity 也是在这里检查,返回预选 nodes 列表。
- 进行优选 prioritizedNodes,也会起多个 goroutine 去尝试把待调度的 pod 放到 node 上,计算每个预选的 nodes 的得分,这里会看 node 上现有的 podInfo,podAffinity 也是在这里检查
- 每个nodes 的得分汇总起来给 selectHost 选出一个合适的 nodes。
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
pod := podInfo.Pod
// 根据 prof 确定是不是要 skip 这个 pod
prof, err := sched.profileForPod(pod)
// 调度逻辑
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
... // 下面是 Part2
ScheudleOne 第一部分的逻辑如上:
- 首先 sched.NextPod() 会调用 scheduler 的 Pop 接口,后面会直接 PriorityQueue.activeQ.Pop()
- 查看 profile(plugin) 是否负责这个 pod,Profile, err := sched.profileForPod(pod) 【之前版本是 frameworkForPod】 会查看 pod 的 schedulername
- skipPodSchedule 有两种情况 skip,(1) pod 正在被删除,(2) pod 被 assumed
- scheduleResult, err := sched.Algorithm.Schedule这里会根据提供的算法调用 genericScheduler.Schedule 完成调度,这里需要展开一下。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
... // some preprocess
// 1. 生成这个时刻的 snapshot
if err := g.snapshot(); err != nil {
return result, err
// 2. 对 node 进行预选,过滤出合适的pod
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {...}
if len(feasibleNodes) == 0 {
return result, &FitError{...)
... // some metrics
// 如果只有一个 node 合适就不会进入优选环节,直接返回
if len(feasibleNodes) == 1 {
... // some metrics
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
// 3. 对 node 进行优选
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
if err != nil {
return result, err
... // some metrics
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
生成一个这个时刻的 snapshot,主要是这个时刻的 nodeInfo map 生成一份快照
findNodesThatFitPod 找什么 node 适合这个 pod,针对 node 的信息进行判断,其代码如下。
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
// Run "prefilter" plugins.
// 1. 运行预过滤 plugin 逻辑
s := prof.RunPreFilterPlugins(ctx, state, pod)
... //错误处理
// 2. 对 nodes 进行过滤,返回 nodes 列表
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
// 3. 运行 extender 过滤逻辑
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
return feasibleNodes, filteredNodesStatuses, nil
prof.RunPreFilterPlugins(ctx, state, pod) 先进行预过滤,主要对 pod 进行。
g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) 这部分会问这个 pod 调度到该 node 合适吗,主要会跟 node 中的 pod 进行亲和性检查等过滤性操作,这里展开一下:
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, err } // 1. 选择总共多少个 nodes 尝试去调度 numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) // Create feasible list with enough space to avoid growing it // and allow assigning. feasibleNodes := make([]*v1.Node, numNodesToFind) if !prof.HasFilterPlugins() { ... //如果没有 filterPlugins, 直接返回所有的 nodes } errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex var feasibleNodesLen int32 ctx, cancel := context.WithCancel(ctx) // 2. 启动16个 goroutine 并行过滤, checkNode := func(i int) { nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo) if err != nil {...} if fits { length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&feasibleNodesLen, -1) } else { feasibleNodes[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() if !status.IsSuccess() { statuses[nodeInfo.Node().Name] = status } statusesLock.Unlock() } } beginCheckNode := time.Now() statusCode := framework.Success defer func() { ... // some matrics // 起 16 个goroutine 跑 checkNode parallelize.Until(ctx, len(allNodes), checkNode) processedNodes := int(feasibleNodesLen) + len(statuses) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error return nil, err } return feasibleNodes, nil }
- 这里会定义 numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) 确定遍历 node 的数目,如果集群太大,遍历所有的 node 是比较耗时的,默认是如果 nodes 数目小于100,会遍历所有 node,如果太多会取百分比。
- 然后 会通过 parallelize 提供的并发接口调用 checkNode,一般并发数量为 16。checkNode 会调用 PodPassesFiltersOnNode ,会通过 RunFilterPlugins 运行各个 filterplugins 主要是看一下pod 跟在 node 里面的pod 有没有不兼容的关系,查一下优先级是不是equal 或者有比上面的pod 大,如果可以抢占?
- checkNode 会返回不同node 的适不适合的信息,如果适合,信息放在一个list 中 feasibleNodes[length-1] = nodeInfo.Node();如果不适合,在 statuses[nodeInfo.Node().Name] = status 登记错误信息
- findNodesThatPassExtenders 会遍历不同 extender,针对不同的资源,Filter(pod, feasibleNodes) 主要也是用户写的custom filter,也是返回 feasibleNodes( []v1.Node)。
前面 findNodesThatFitPod 返回了合适 []v1.Node,g.prioritizeNodes 会根据一些优先级算法计算不同node的得分给出这些node 的排序。
func (g *genericScheduler) prioritizeNodes( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { ... // Run PreScore plugins. preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes) if !preScoreStatus.IsSuccess() { return nil, preScoreStatus.AsError() } // Run the Score plugins. scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } ... // Summarize all scores. 汇总所有 node 的分数 result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } // 不同的extender 使用不同的goroutine 调用Prioritize函数 if len(g.extenders) != 0 && nodes != nil { ... } ... return result, nil }
- 分别调用不同 plugin 的 RunPreScorePlugins 前置评分
- RunScorePlugins 获取每个 plugin 对每个预选节点的分数,这里主要看一下,pod 调度上去会不会跟上面的 pod 有亲和性的关系(affinity) 等。如果说前面 filter 主要是针对 node 进行评价,这里主要是针对 node 里面的 pod 进行评价。
- 上面默认会起 16 goroutine 来对每个 plugin 进行计分,返回的 scoreMap key 是 plugin, 元素是一个 nodescore list,表示对每个节点的评分。
- 然后把所有的分数汇总在 result 中
- 如果有 extender 会分别起不同的 goroutine 去调用 extender.Prioritize,结果也是汇总到 result 中。
然后 host, err := g.selectHost(priorityList) 会根据前面 findNodesThatFitPod 的排序结果选择一个合适的pod。
继续 scheduleOne 的逻辑,这部分是指调度失败的错误处理。主要是抢占和驱逐,这部分跟上一部分的逻辑比较类似,不过输入的 nodes,会有一些过滤。
- 首先会通过 GetUpdatedPod 更新一下 pod 的信息
- 然后 PodEligibleToPreemptOthers 看一下 pod 是否可以抢占,主要判断是不是已经抢占了
- FindCandidates 会调用 nodesWherePreemptionMightHelp 对 node 进行预选过滤,保留硬性的过滤条件跟前面调度时 predicate 逻辑类似,selectVictims 是对 node 打分,跟前面 prioritize 类似。
- 然后 SelectCandidate 选择合适抢占的 node,这个跟前面 selectHost 类似。
- PrepareCandidate 清出足够的空间给 pod 调度上去。 不过这里在多调度器的时候可能会有一些问题,有可能刚清出资源就被其他调度器抢走了,造成不断循环。
pkg/scheduler/scheduler.go: scheduleOne
// 接着 part1 部分
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
nominatedNode := ""
if fitError, ok := err.(*core.FitError); ok {
if !prof.HasPostFilterPlugins() {
klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
} else {
// 后处理逻辑,(preempt)在这里实现。
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
... // 后处理
// 记录 SchedulingFailure,pod 入队 UnschedulableQ
sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
如果前面调度失败,会进入的处理错误逻辑,尝试调用 prof.RunPostFilterPlugins。这个是引入 scheduler framework 之后的形式,在 v1.15 之前的版本,是只有抢占的逻辑,如下 (github.com/kubernetes/kubernetes:v1.14.1 )
pkg/scheduler/scheduler.go: scheduleOne
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { // 如果调度失败 if fitError, ok := err.(*core.FitError); ok { if !util.PodPriorityEnabled() || sched.config.DisablePreemption { ... // 如果不允许抢占 } else { preemptionStartTime := time.Now() sched.preempt(pod, fitError) ... // some metrics } metrics.PodScheduleFailures.Inc() } else { ... // 其他错误 } return }
我们还是以 v1.19.1 plugin 中的抢占逻辑进行分析。
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { cs := pl.fh.ClientSet() ph := pl.fh.PreemptHandle() nodeLister := pl.fh.SnapshotSharedLister().NodeInfos() // 0) Fetch the latest version of <pod>. pod, err := util.GetUpdatedPod(cs, pod) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } // 1) Ensure the preemptor is eligible to preempt other pods. if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {...} // 2) Find all preemption candidates. candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister) if err != nil || len(candidates) == 0 {...} // 3) Interact with registered Extenders to filter out some candidates if needed. candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates) if err != nil { return "", err } // 4) Find the best candidate. bestCandidate := SelectCandidate(candidates) if bestCandidate == nil || len(bestCandidate.Name()) == 0 { return "", nil } // 5) Perform preparation work before nominating the selected candidate. if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil { return "", err } return bestCandidate.Name(), nil }
首先是通过GetUpdatedPod,确认最新的 pod 的版本(保证新的改动可以被接收到)
然后 PodEligibleToPreemptOthers 看一下这个 pod 适不适合执行抢占,因为这个pod 有可能已经抢占过了, 然后被抢占的pod 也在 terminating 了,就不会发生抢占。
FindCandidates 用于找到合适被抢占的候选nodes
// Each candidate is executable to make the given <pod> schedulable. func FindCandidates(ctx context.Context, cs kubernetes.Interface, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap, ph framework.PreemptHandle, nodeLister framework.NodeInfoLister, pdbLister policylisters.PodDisruptionBudgetLister) ([]Candidate, error) { allNodes, err := nodeLister.List() if err != nil { return nil, err } if len(allNodes) == 0 { return nil, core.ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(allNodes, m) ... return dryRunPreemption(ctx, ph, state, pod, potentialNodes, pdbs), nil }
首先 nodesWherePreemptionMightHelp 会把所有的nodes 重新查看一下,如果是因为 predicates 检查不通过的,尝试在上面去除 pod 看是否可能通过
然后把 pod 和 potential node 输入到 dryRunPreemption 中,dryRunPreemption 会起多个进程进行跟前面 schedule 里面 checkNode 类似,通过 checkNode 运行 selectVictimsOnNode,这个函数会尝试找到最小被抢占 pods 的集合(只要低优先级的 pods 够,就不会抢占node 内高优先级的)。
返回汇总后的cadidates 信息, 包括node 中被抢占 pods 的信息,
func selectVictimsOnNode( ctx context.Context, ph framework.PreemptHandle, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, ) ([]*v1.Pod, int, bool){ ... // 筛选被抢占的 pod podPriority := podutil.GetPodPriority(pod) for _, p := range nodeInfo.Pods { if podutil.GetPodPriority(p.Pod) < podPriority { potentialVictims = append(potentialVictims, p.Pod) if err := removePod(p.Pod); err != nil { return nil, 0, false } } } // 保留一些不必被抢占的,把 pod 分两类,一个是 如果抢占会造成违反了 PodDisruptionBudget(PDB) 的 pod(violatingVictims),一个是不会violatingVictims violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs) for _, p := range violatingVictims { if fits, err := reprievePod(p); err != nil { klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) return nil, 0, false } else if !fits { numViolatingVictim++ } } }
然后 SelectCandidate 选择最好的candidate,有一系列判断准则:
这个 node 是不是满足 Pod 干扰预算的条件。
这个 node 是不是只需要驱逐的pod 的优先级是不是最小的
驱逐这些pod 这个 node affinity 有没有被影响
是不是驱逐的 pod 数目最小
PrepareCandidate 会驱逐 victims pod,如果这些 pod 在waitting,不给调度,清楚他们的nominatedNodeName,如果有比较低优先级的pods 被提名到这个 node 会不合适。
pkg/scheduler/scheduler.go: scheduleOne
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil{...}
// 1. assume 设置 schedulerCache
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
// 2. reserve 预留 volume 等改动
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {...}
// 3. 是否满足 permit 条件
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
// 等待 permit 条件完备
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {...}
// Run "prebind" plugins.
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {...}
// 5. 绑定 pod 和 node
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {...}
... // some metrics
// Run "postbind" plugins.
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
如果前面 part1 schedule 的 sched.Algorithm.Schedule 调度算法调度成功,scheduleResult 返回一个建议调度的node,然后调用 sched.assume,这里主要是在 cache 中记录这个 pod 为已经调度了,这样下次调度的时候就不会重新调度一次这个 pod。
然后会调用,RunReservePluginsReserve,RunPermitPlugins,这些 plugins 设置的位点,我们假设没有plugins 设置了先跳过。
- reserve 会执行像 AssumePodVolumes 等,除了 pod assume 之外需要在 cache 保留的操作,unreserve 类似。
RunPermitPlugins 会成为bind 的准入,就是前面reserve 之后相关的资源就不会被使用了,不过还有一些条件没有满足,所以不可以 bind,就是为了占着资源,类似 gang-scheduling 的时候会用到。
就会进入绑定的逻辑(bind),bind 会另外起一个 goroutine 所以到这里,scheduleOne 就会到下一个调度循环了,使用这样的做法是 bind 实际上比较耗时,所以使用另外的进程进行,不会阻塞到调度,不过有趣的是在早期的 scheduler,并不是一开始就是这样的结构的,如v1.0.0 的版本中,调度完之后会直接 binding,会有比较大性能的损耗。
plugin/pkg/scheduler/scheduler.go 这时 scheduler 还在 plugins 中
func (s *Scheduler) scheduleOne() { dest, err := s.config.Algorithm.Schedule(pod, s.config.MinionLister) ... b := &api.Binding{ ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, Target: api.ObjectReference{ Kind: "Node", Name: dest, }, } s.config.Modeler.LockedAction(func() { bindingStart := time.Now() err := s.config.Binder.Bind(b) metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) if err != nil {...} assumed := *pod assumed.Spec.NodeName = dest s.config.Modeler.AssumePod(&assumed) }) }
sched.bind 首先 WaitOnPermit 会查看一下 permit 是否完成,前面 permit 实际上是在这里被挡住
然后会运行一些 prebind 的操作,像网络配置,存储配置等,像volumeBinder 会
- init volume bind, pv -> pvc
- 触发 volume provisioning
- 等待 pvc binding 完成
Prebind 成功就会 Bind, pod -> node,defaultBinder 就很简单
// Bind binds pods to nodes using the k8s client. func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}, } // 创建 bind err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } return nil }
如果成功会 RunPostBindPlugin,暂时没有 default 的 postBindPlugin.
总体上说,kube-scheduler 的工作方式和其他的 controller 并没有什么不同。如果把 scheduler 看成是一个黑盒的话, 可以认为 scheduler 的输入是一个个的 Pod,输出就是这个Pod 和某个节点 node 的绑定(Bind),如果是 controller 可能是输入一个资源,出去的是更新之后资源。它不对绑定成功与否负责,只是作为资源的分配者。某程度上,没有 scheduler 的话 kubernetes 也能正常运行,只需要手动创建 pod,然后使用 client 执行 CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
基本等同于实现了scheduler 的功能。
kube-scheduler 的设计比较简单,在多个版本的迭代中,其基本的逻辑没有特别大的变化,都是笨笨地一个个 pod 入队处理,我们在文中也穿插了一些相关的内容。我自己暂时感觉这部分还不是写得特别好,一个自己的功力还不够,一个是自己可以了解到的线索也太少,希望后续的更新可以完善。提出这个部分,我自己的思考是了解一下 kube-scheduler 的发展,顺便窥见软件工程的迭代概貌。我自己的理解是大致上都会遵循这样的一个历程:可用性 -> 性能优化 -> 可扩展性,然后在迭代的过程中保持稳定性。
另外,这里我刻意地把 plugins 和 extender 的部分进行了删减,一个是为了控制篇幅,一个是希望把这些可以 custom 的部分留待下一部分的 scheduler-framework 一并讲述。