Kubernetes Scheduler 源码分析
Kubernetes Scheduler
最近在看 k8s 调度相关的一些内容,希望给自己这阵子了解的知识进行一个整理,会连载几篇文章,都会关于调度的,初步的想法是 kubernetes 的默认调度器,然后是新引入的 scheduler framework,然后是 kube-batch 和 volcano, 如果还有时间会对这些内容做一个整体性的总结,和我自己的一些看法。这里是第一篇 kubenetes 的默认调度器。
Kubernetes Scheduler 是 Kubernetes 的核心组件之一,主要根据一些调度算法,将没有调度将 Pod 放置到合适的 Node 上,然后对应 Node 上的 Kubelet 才能够运行这些 pod。可以认为 scheduler 就是集群负载的管理者,针对用户或者组件创建的 pod 的负载,为其在集群中找到一个合适的节点,然后让对应节点的 kubelet 服务进程将其运行起来。这篇文章的结构大概是这样的,我们先会从宏观上去看一下 scheduler 的工作原理,然后到每一个部分看一下对应关键代码的实现,其中会尝试去穿插一些我了解到的 scheduler 的版本迭代的过程,然后是一些思考和展望。为了方便复现和讲解,下面对kube-scheduler 代码的分析使用的是 kubernetes 的 v1.19.1 的版本的代码,如果是谈到其他版本也会在其中标注起来。
工作原理
Kubernetes 默认的调度器就在 kubernetes 的 repos 中,跟其他的组件一样,都是把入口放在 cmd/kube-schduler 中,把实现放在 pkg/scheduler 中,也是用 cobra 命令行工具启动,我们先看一下,整体的架构。然后会从源码的角度一步步看其调用分析。
调度器架构
调度器结构体主要的部分如下:pkg/scheduler/scheduler.go
type Scheduler struct {
// 主要是缓存现在集群调度的状态,如上图所示,会保留集群中所有已调度 pod 的状态,node 的状态,和assumedpod,防止 pod 被重新调度。
SchedulerCache internalcache.Cache
// Algorithm 需要实现 Schedule 的方法,输入一个 pod 可以找到合适 node。默认是通过
// pkg/scheduler/core/generic_scheduler.go 的 generic_scheduler 实现的。
Algorithm core.ScheduleAlgorithm
// 获取下一个 pod
NextPod func() *framework.QueuedPodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.QueuedPodInfo, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue 会保留准备调度的 pod 的队列。
SchedulingQueue internalqueue.SchedulingQueue
// 所有的 plugin 都会以 profile 的形式供默认调度器使用
Profiles profile.Map
scheduledPodsHasSynced func() bool
// 用于和 api-server 的通信
client clientset.Interface
}
下图是我自己对 scheduler 架构的理解,调度主要的工作通过 informer watch 有没有没有调度的 pod,如果有会尝试找一个合适的 node,然后把 pod 和 node 的 binding 写回给 apiserver。集群状态的信息保留在 schedulerCache 中,集群未调度的 pod 在 schedulingQueue 中。这里只拎出Algorithm,SchedulerCache,SchedulingQueue 来介绍,Profiles 主要是跟 plugins 相关的组件,后面会找在 scheduler framework 中介绍。
工作流程
- 首先 SchedulerCache 会通过 Informer 读取现在集群的 node, pod 的状态,缓存在内存中。同时监控集群资源状态(如果有创建 node 等事件发生也会及时同步) ,SchedulingQueue 会同时也会运行两个进程,分别定时把 BackoffQ 和 UnschedulabelQ 的 Pod 刷进去 activeQ 中。其中 activeQ 是通过heap 实现的待调度优先队列,BackoffQ ,UnschedulabelQ 顾名思义,是调度出错和不可调度队列,
- 如果有创建 pod 等事件的发生,且 pod 没有被调度过,会直接进入 SchedulingQueue 的 activeQ 队列中。如果已经调度了,就会进入 schedulerCache 中,把信息缓存起来。
- scheduler 主进程会轮询地执行 scheduleOne 的流程
- 通过 NextPod 接口从 activeQ 中读取入队 pod 的信息。
- 然后通过 Algorithm (通过 pkg/scheduler/core/generic_scheduler 实现) 为这个 pod 尝试找到一个合适 node 进行调度,如果这一步失败了,如果设置了可以抢占会触发抢占,不过这里先不涉及,这样这个 pod 会进入 UnschedulabelQ 的队列中。
- 如果调度成功会触发一次 assume,主要是在 cache 中记录这个 pod 为已经调度了,这样下次调度的时候就不会重新调度一次这个 pod。因为 assume 后面 scheduleOne 会起一个 goroutine 来负责 binding 的工作。scheduleOne 会跑下一个循环,如果没有 assume,可能导致 node 的资源会被重新使用等问题。
- binding 会通知 apiserver pod 和 node 的 binding 已经创建,让 api-server 通知 node 的 kubelet 去配置相应的环境,如(网络,存储等),如果 bind 成功。schedulerCache 会有一个进程 cleanupAssumedPods 不断地看这个pod 是否已经 binding 完成或者超时了,如果 binding 完成会去除这个 assume 的标志。
- 另外在 schedulingQueue 中,如上图的右侧部分,会起两个goroutine,flushUnschedulableQLeftover 会不断定时把之前调度失败的(UnschedulableQ) 送到 activeQ 中,flushUnschedulableQLeftover 会把出错的pod在 backoffQ 中往 activeQ刷。
- schedulerCache 也会起一个 goroutine,会不断地把清理已经调度的,或者超时的 assume pod。
Setup源码分析
这里我会尝试以一种尽可能详尽(啰嗦)的方式去介绍 kube-scheduler 的源码实现,中间会穿插一下,我对 scheduler 版本迭代的一些理解。我们会从 cmd/kube-scheduler/app/server.go 开始说起。
在 cmd/kube-scheduler/app/server.go 中,会通过 cobra 命令行定义 scheduler 的启动命令,最终是运行 cmd/kube-scheduler/app/server.go 中 runCommand 定义的逻辑,调用 Setup 函数,如下:
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
...
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup scheduler
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
if len(opts.WriteConfigTo) > 0 {...}
// Run scheduler
return Run(ctx, cc, sched)
}
// Setup 实现
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 1.初始化 client, EventBroadcaster,PodInformer,InformerFactory 并通过 config 回传。
c, err := opts.Config()
...
// Get the completed config
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
// 2.Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {...}
return &cc, sched, nil
}
Setup 函数主要干了几件事。
opt.config
Setup 函数中,opt.Config() 主要做了下面几件事:
- createClients 启动 clientset
- events.NewEventBroadcasterAdapter(eventClient) 启动 EventBroadcaster
- c.InformerFactory = informers.NewSharedInformerFactory(client, 0) 启动 informer
上面这几步基本所有的 kubernetes 的组件基本都是有的,保证组件跟 api-server 的通信。
scheduler.New
然后 scheduler.New 启动 scheduler 实例:
pkg/scheduler/scheduler.go
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
... // some config
// 初始化 schedulerCache
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// 注册 intree 的 plugins
registry := frameworkplugins.NewInTreeRegistry()
...
// 初始化 snapshot
snapshot := internalcache.NewEmptySnapshot()
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
podInformer: podInformer,
schedulerCache: schedulerCache,
...
registry: registry,
nodeInfoSnapshot: snapshot,
...
}
...
var sched *Scheduler
// 通过上面 configurator 创建 scheduler(sched)
// 可以通过 policy 或者 provider 创建
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
sc, err := configurator.createFromProvider(*source.Provider)
...
case source.Policy != nil:
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
initPolicyFromFile(source.Policy.File.Path, policy)
}
...
}
...
addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
}
-
schedulerCache = internalcache.New(30*time.Second, stopEverything) 启动 scheduler 的缓存,主要是会启动 pkg/scheduler/internal/cache/cache.go:cleanupAssumedPods 这个函数定期清理AssumedPods。
-
registry := frameworkplugins.NewInTreeRegistry() 会读取所有 intree 的 plugins 注册掉,譬如:注册interpodaffinity 的调度算法,这种 plugin 的注册方式是通过 v1.15 开始引入的 scheduler framework 实现的,我们这里不深入。
-
初始化 Snapshot,初始化一个 map 保存 node 信息,是用在 Algorithm.Schedule 的过程中的,主要是保留一份 cache 的备份
-
Configurator 是scheduler 的配置器,创建 scheduler(sched) 可以通过 configurator.createFromConfig(*policy) 或者 configurator.createFromProvider(*source.Provider) 来进行,不过无论使用哪种方式创建都会调用 configurator.create() 函数:
- 初始化profiles(plugin)
- 初始化 SchedulingQueue
- 用 NewGenericScheduler 实例化 Algorithm
至此,scheduler 的初始化工作基本完成了。
pkg/scheduler/core/generic_scheduler.go
// create a scheduler from a set of registered plugins. func (c *Configurator) create() (*Scheduler, error) { var extenders []framework.Extender var ignoredExtendedResources []string if len(c.extenders) != 0 { ... // 如果有 extenders } if len(ignoredExtendedResources) > 0 { ... // 对 ignoredExtendedResources 有一些处理 } ... // 初始化 profiles profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory, frameworkruntime.WithPodNominator(nominator)) // 初始化队列优先级函数 lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc() // 初始化调度队列,需要优先级函数,和定义 backoffQ,UnScheduableQ 刷回 activeQ 的周期参数 podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) // 使用 NewGenericScheduler 初始化 Algorithm algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.disablePreemption, c.percentageOfNodesToScore, ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, SchedulingQueue: podQueue, }, nil }
-
addAllEventHandlers 为 scheduler 提供 eventHandler,如上面架构图所示:event 的接收者主要有两个,SchedulerCache 和 SchedulingQueue,前者是为了跟踪集群的资源和已调度 Pod 的状态(addNodeToCache,addPodToCache),后者主要是给没有调度的 Pod 入队到 activeQ 中(addPodToSchedulingQueue)。这里重点说一下,scheduler 算是在这里完成第一次的过滤。如果是未调度的pod,会经过 event 的过滤器,其中 assignedPod(t) 会看 pod.Spec.NodeName 判断 pod 是不是已经调度或者 assume 了,responsibleForPod(t, sched.Profiles) 会看 pod.Spec.SchedulerName 看一下调度器名字是不是本调度器。
pkg/scheduler/eventhandlers.go
// addAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { // scheduled pod cache podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return assignedPod(t) case cache.DeletedFinalStateUnknown: ... default: ... } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToCache, UpdateFunc: sched.updatePodInCache, DeleteFunc: sched.deletePodFromCache, }, }, ) // unscheduled pod queue podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: ... default: ... } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, ) informerFactory.Core().V1().Nodes().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: sched.addNodeToCache, UpdateFunc: sched.updateNodeInCache, DeleteFunc: sched.deleteNodeFromCache, }, ) ... informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(...) informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(...) informerFactory.Core().V1().Services().Informer().AddEventHandler(...) informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(...) } // 看一下是不是已经被调度了 func assignedPod(pod *v1.Pod) bool { return len(pod.Spec.NodeName) != 0 } // 看一下是不是我们负责的 func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool { return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) }
总结一下,Setup 就是初始化scheduler 的各个部件,包括 informer, schedulerCache, schedulingQueue,和实例化 scheduler。
Run源码分析
runCommand 在执行完 Setup() 之后会调用Run(),Run 主要分几个部分,首先一般都需要先启动 informer 并等待同步完成,表示集群的状态是已知的,然后启动 scheduler.Run(),会把schedulerCache的清理缓存,schedulingQueue 的定时入队,和 scheduler 主流程 scheduleOne 分别用 goroutine 启动。
cmd/kube-scheduler/app/server.go
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
... // some config
// Prepare the event broadcaster.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
... // setup 健康检查的服务器.
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 选主逻辑
if cc.LeaderElection != nil {
...
}
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
SchedulingQueue.Run() 会起两个 goroutine ,flushBackoffQCompleted 主要负责把所有 backoff 计时完毕(duration 会因为失败变长)的 pod 往 activeQ刷。flushUnschedulableQLeftover 把所有在 unschedulableQ 的 pod 计时unschedulableQTimeInterval 完毕后送去 activeQ。
scheduleOnePart1
scheduleOne 是调度的主逻辑,下面会把 scheduleOne 的逻辑分为3部分,第一部分是调度部分,第二部分是对调度结果进行处理部分(抢占),第三部分是绑定等部分。
太长不看版
- 如上图,从schedulingQueue 的待调度队列 activeQ 取出 pod。
- 进行预选执行 findNodesThatFitPod,会起多个 goroutine 去看 nodeInfo 合不合适,NodeAffinity 也是在这里检查,返回预选 nodes 列表。
- 进行优选 prioritizedNodes,也会起多个 goroutine 去尝试把待调度的 pod 放到 node 上,计算每个预选的 nodes 的得分,这里会看 node 上现有的 podInfo,podAffinity 也是在这里检查
- 每个nodes 的得分汇总起来给 selectHost 选出一个合适的 nodes。
具体代码细节
pkg/scheduler/scheduler.go
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// 根据 prof 确定是不是要 skip 这个 pod
prof, err := sched.profileForPod(pod)
...
// 调度逻辑
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
... // 下面是 Part2
ScheudleOne 第一部分的逻辑如上:
- 首先 sched.NextPod() 会调用 scheduler 的 Pop 接口,后面会直接 PriorityQueue.activeQ.Pop()
- 查看 profile(plugin) 是否负责这个 pod,Profile, err := sched.profileForPod(pod) 【之前版本是 frameworkForPod】 会查看 pod 的 schedulername
- skipPodSchedule 有两种情况 skip,(1) pod 正在被删除,(2) pod 被 assumed
- scheduleResult, err := sched.Algorithm.Schedule这里会根据提供的算法调用 genericScheduler.Schedule 完成调度,这里需要展开一下。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
... // some preprocess
// 1. 生成这个时刻的 snapshot
if err := g.snapshot(); err != nil {
return result, err
}
...
// 2. 对 node 进行预选,过滤出合适的pod
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {...}
if len(feasibleNodes) == 0 {
return result, &FitError{...)
}
... // some metrics
// 如果只有一个 node 合适就不会进入优选环节,直接返回
if len(feasibleNodes) == 1 {
... // some metrics
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// 3. 对 node 进行优选
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
if err != nil {
return result, err
}
... // some metrics
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
FindNodesThatFitPod
-
生成一个这个时刻的 snapshot,主要是这个时刻的 nodeInfo map 生成一份快照
-
findNodesThatFitPod 找什么 node 适合这个 pod,针对 node 的信息进行判断,其代码如下。
//pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
// Run "prefilter" plugins.
// 1. 运行预过滤 plugin 逻辑
s := prof.RunPreFilterPlugins(ctx, state, pod)
... //错误处理
// 2. 对 nodes 进行过滤,返回 nodes 列表
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
// 3. 运行 extender 过滤逻辑
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
return feasibleNodes, filteredNodesStatuses, nil
}
-
prof.RunPreFilterPlugins(ctx, state, pod) 先进行预过滤,主要对 pod 进行。
-
g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) 这部分会问这个 pod 调度到该 node 合适吗,主要会跟 node 中的 pod 进行亲和性检查等过滤性操作,这里展开一下:
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, err } // 1. 选择总共多少个 nodes 尝试去调度 numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) // Create feasible list with enough space to avoid growing it // and allow assigning. feasibleNodes := make([]*v1.Node, numNodesToFind) if !prof.HasFilterPlugins() { ... //如果没有 filterPlugins, 直接返回所有的 nodes } errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex var feasibleNodesLen int32 ctx, cancel := context.WithCancel(ctx) // 2. 启动16个 goroutine 并行过滤, checkNode := func(i int) { nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo) if err != nil {...} if fits { length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&feasibleNodesLen, -1) } else { feasibleNodes[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() if !status.IsSuccess() { statuses[nodeInfo.Node().Name] = status } statusesLock.Unlock() } } beginCheckNode := time.Now() statusCode := framework.Success defer func() { ... // some matrics // 起 16 个goroutine 跑 checkNode parallelize.Until(ctx, len(allNodes), checkNode) processedNodes := int(feasibleNodesLen) + len(statuses) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error return nil, err } return feasibleNodes, nil }
- 这里会定义 numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) 确定遍历 node 的数目,如果集群太大,遍历所有的 node 是比较耗时的,默认是如果 nodes 数目小于100,会遍历所有 node,如果太多会取百分比。
- 然后 会通过 parallelize 提供的并发接口调用 checkNode,一般并发数量为 16。checkNode 会调用 PodPassesFiltersOnNode ,会通过 RunFilterPlugins 运行各个 filterplugins 主要是看一下pod 跟在 node 里面的pod 有没有不兼容的关系,查一下优先级是不是equal 或者有比上面的pod 大,如果可以抢占?
- checkNode 会返回不同node 的适不适合的信息,如果适合,信息放在一个list 中 feasibleNodes[length-1] = nodeInfo.Node();如果不适合,在 statuses[nodeInfo.Node().Name] = status 登记错误信息
- findNodesThatPassExtenders 会遍历不同 extender,针对不同的资源,Filter(pod, feasibleNodes) 主要也是用户写的custom filter,也是返回 feasibleNodes( []v1.Node)。
-
前面 findNodesThatFitPod 返回了合适 []v1.Node,g.prioritizeNodes 会根据一些优先级算法计算不同node的得分给出这些node 的排序。
func (g *genericScheduler) prioritizeNodes( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { ... // Run PreScore plugins. preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes) if !preScoreStatus.IsSuccess() { return nil, preScoreStatus.AsError() } // Run the Score plugins. scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } ... // Summarize all scores. 汇总所有 node 的分数 result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } // 不同的extender 使用不同的goroutine 调用Prioritize函数 if len(g.extenders) != 0 && nodes != nil { ... } ... return result, nil }
- 分别调用不同 plugin 的 RunPreScorePlugins 前置评分
- RunScorePlugins 获取每个 plugin 对每个预选节点的分数,这里主要看一下,pod 调度上去会不会跟上面的 pod 有亲和性的关系(affinity) 等。如果说前面 filter 主要是针对 node 进行评价,这里主要是针对 node 里面的 pod 进行评价。
- 上面默认会起 16 goroutine 来对每个 plugin 进行计分,返回的 scoreMap key 是 plugin, 元素是一个 nodescore list,表示对每个节点的评分。
- 然后把所有的分数汇总在 result 中
- 如果有 extender 会分别起不同的 goroutine 去调用 extender.Prioritize,结果也是汇总到 result 中。
-
然后 host, err := g.selectHost(priorityList) 会根据前面 findNodesThatFitPod 的排序结果选择一个合适的pod。
scheduleOnePart2
继续 scheduleOne 的逻辑,这部分是指调度失败的错误处理。主要是抢占和驱逐,这部分跟上一部分的逻辑比较类似,不过输入的 nodes,会有一些过滤。
太长不看版
- 首先会通过 GetUpdatedPod 更新一下 pod 的信息
- 然后 PodEligibleToPreemptOthers 看一下 pod 是否可以抢占,主要判断是不是已经抢占了
- FindCandidates 会调用 nodesWherePreemptionMightHelp 对 node 进行预选过滤,保留硬性的过滤条件跟前面调度时 predicate 逻辑类似,selectVictims 是对 node 打分,跟前面 prioritize 类似。
- 然后 SelectCandidate 选择合适抢占的 node,这个跟前面 selectHost 类似。
- PrepareCandidate 清出足够的空间给 pod 调度上去。 不过这里在多调度器的时候可能会有一些问题,有可能刚清出资源就被其他调度器抢走了,造成不断循环。
具体代码细节
pkg/scheduler/scheduler.go: scheduleOne
// 接着 part1 部分
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
nominatedNode := ""
if fitError, ok := err.(*core.FitError); ok {
if !prof.HasPostFilterPlugins() {
klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
} else {
// 后处理逻辑,(preempt)在这里实现。
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
... // 后处理
}
}
// 记录 SchedulingFailure,pod 入队 UnschedulableQ
sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
return
}
-
如果前面调度失败,会进入的处理错误逻辑,尝试调用 prof.RunPostFilterPlugins。这个是引入 scheduler framework 之后的形式,在 v1.15 之前的版本,是只有抢占的逻辑,如下 (github.com/kubernetes/kubernetes:v1.14.1 )
pkg/scheduler/scheduler.go: scheduleOne
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { // 如果调度失败 if fitError, ok := err.(*core.FitError); ok { if !util.PodPriorityEnabled() || sched.config.DisablePreemption { ... // 如果不允许抢占 } else { preemptionStartTime := time.Now() sched.preempt(pod, fitError) ... // some metrics } metrics.PodScheduleFailures.Inc() } else { ... // 其他错误 } return }
我们还是以 v1.19.1 plugin 中的抢占逻辑进行分析。
pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { cs := pl.fh.ClientSet() ph := pl.fh.PreemptHandle() nodeLister := pl.fh.SnapshotSharedLister().NodeInfos() // 0) Fetch the latest version of <pod>. pod, err := util.GetUpdatedPod(cs, pod) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } // 1) Ensure the preemptor is eligible to preempt other pods. if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {...} // 2) Find all preemption candidates. candidates, err := FindCandidates(ctx, cs, state, pod, m, ph, nodeLister, pl.pdbLister) if err != nil || len(candidates) == 0 {...} // 3) Interact with registered Extenders to filter out some candidates if needed. candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates) if err != nil { return "", err } // 4) Find the best candidate. bestCandidate := SelectCandidate(candidates) if bestCandidate == nil || len(bestCandidate.Name()) == 0 { return "", nil } // 5) Perform preparation work before nominating the selected candidate. if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod); err != nil { return "", err } return bestCandidate.Name(), nil }
抢占(preempt)的代码也比较清晰,可以参考注释来理解工作原理:
-
首先是通过GetUpdatedPod,确认最新的 pod 的版本(保证新的改动可以被接收到)
-
然后 PodEligibleToPreemptOthers 看一下这个 pod 适不适合执行抢占,因为这个pod 有可能已经抢占过了, 然后被抢占的pod 也在 terminating 了,就不会发生抢占。
-
FindCandidates 用于找到合适被抢占的候选nodes
// Each candidate is executable to make the given <pod> schedulable. func FindCandidates(ctx context.Context, cs kubernetes.Interface, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap, ph framework.PreemptHandle, nodeLister framework.NodeInfoLister, pdbLister policylisters.PodDisruptionBudgetLister) ([]Candidate, error) { allNodes, err := nodeLister.List() if err != nil { return nil, err } if len(allNodes) == 0 { return nil, core.ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(allNodes, m) ... return dryRunPreemption(ctx, ph, state, pod, potentialNodes, pdbs), nil }
-
首先 nodesWherePreemptionMightHelp 会把所有的nodes 重新查看一下,如果是因为 predicates 检查不通过的,尝试在上面去除 pod 看是否可能通过
-
然后把 pod 和 potential node 输入到 dryRunPreemption 中,dryRunPreemption 会起多个进程进行跟前面 schedule 里面 checkNode 类似,通过 checkNode 运行 selectVictimsOnNode,这个函数会尝试找到最小被抢占 pods 的集合(只要低优先级的 pods 够,就不会抢占node 内高优先级的)。
-
返回汇总后的cadidates 信息, 包括node 中被抢占 pods 的信息,
func selectVictimsOnNode( ctx context.Context, ph framework.PreemptHandle, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, ) ([]*v1.Pod, int, bool){ ... // 筛选被抢占的 pod podPriority := podutil.GetPodPriority(pod) for _, p := range nodeInfo.Pods { if podutil.GetPodPriority(p.Pod) < podPriority { potentialVictims = append(potentialVictims, p.Pod) if err := removePod(p.Pod); err != nil { return nil, 0, false } } } // 保留一些不必被抢占的,把 pod 分两类,一个是 如果抢占会造成违反了 PodDisruptionBudget(PDB) 的 pod(violatingVictims),一个是不会violatingVictims violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs) for _, p := range violatingVictims { if fits, err := reprievePod(p); err != nil { klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) return nil, 0, false } else if !fits { numViolatingVictim++ } } }
-
-
然后 SelectCandidate 选择最好的candidate,有一系列判断准则:
-
这个 node 是不是满足 Pod 干扰预算的条件。
-
这个 node 是不是只需要驱逐的pod 的优先级是不是最小的
-
驱逐这些pod 这个 node affinity 有没有被影响
-
是不是驱逐的 pod 数目最小
…
-
-
PrepareCandidate 会驱逐 victims pod,如果这些 pod 在waitting,不给调度,清楚他们的nominatedNodeName,如果有比较低优先级的pods 被提名到这个 node 会不合适。
-
scheduleOnePart3
具体代码细节
pkg/scheduler/scheduler.go: scheduleOne
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil{...}
...
// 1. assume 设置 schedulerCache
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
...
// 2. reserve 预留 volume 等改动
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {...}
// 3. 是否满足 permit 条件
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
// 等待 permit 条件完备
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {...}
// Run "prebind" plugins.
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {...}
// 5. 绑定 pod 和 node
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {...}
... // some metrics
// Run "postbind" plugins.
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
-
如果前面 part1 schedule 的 sched.Algorithm.Schedule 调度算法调度成功,scheduleResult 返回一个建议调度的node,然后调用 sched.assume,这里主要是在 cache 中记录这个 pod 为已经调度了,这样下次调度的时候就不会重新调度一次这个 pod。
-
然后会调用,RunReservePluginsReserve,RunPermitPlugins,这些 plugins 设置的位点,我们假设没有plugins 设置了先跳过。
- reserve 会执行像 AssumePodVolumes 等,除了 pod assume 之外需要在 cache 保留的操作,unreserve 类似。
-
RunPermitPlugins 会成为bind 的准入,就是前面reserve 之后相关的资源就不会被使用了,不过还有一些条件没有满足,所以不可以 bind,就是为了占着资源,类似 gang-scheduling 的时候会用到。
-
就会进入绑定的逻辑(bind),bind 会另外起一个 goroutine 所以到这里,scheduleOne 就会到下一个调度循环了,使用这样的做法是 bind 实际上比较耗时,所以使用另外的进程进行,不会阻塞到调度,不过有趣的是在早期的 scheduler,并不是一开始就是这样的结构的,如v1.0.0 的版本中,调度完之后会直接 binding,会有比较大性能的损耗。
plugin/pkg/scheduler/scheduler.go 这时 scheduler 还在 plugins 中
func (s *Scheduler) scheduleOne() { dest, err := s.config.Algorithm.Schedule(pod, s.config.MinionLister) ... b := &api.Binding{ ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, Target: api.ObjectReference{ Kind: "Node", Name: dest, }, } s.config.Modeler.LockedAction(func() { bindingStart := time.Now() err := s.config.Binder.Bind(b) metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) if err != nil {...} assumed := *pod assumed.Spec.NodeName = dest s.config.Modeler.AssumePod(&assumed) }) }
-
sched.bind 首先 WaitOnPermit 会查看一下 permit 是否完成,前面 permit 实际上是在这里被挡住
-
然后会运行一些 prebind 的操作,像网络配置,存储配置等,像volumeBinder 会
- init volume bind, pv -> pvc
- 触发 volume provisioning
- 等待 pvc binding 完成
-
Prebind 成功就会 Bind, pod -> node,defaultBinder 就很简单
pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
// Bind binds pods to nodes using the k8s client. func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}, } // 创建 bind err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } return nil }
如果成功会 RunPostBindPlugin,暂时没有 default 的 postBindPlugin.
总结
总体上说,kube-scheduler 的工作方式和其他的 controller 并没有什么不同。如果把 scheduler 看成是一个黑盒的话, 可以认为 scheduler 的输入是一个个的 Pod,输出就是这个Pod 和某个节点 node 的绑定(Bind),如果是 controller 可能是输入一个资源,出去的是更新之后资源。它不对绑定成功与否负责,只是作为资源的分配者。某程度上,没有 scheduler 的话 kubernetes 也能正常运行,只需要手动创建 pod,然后使用 client 执行 CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
基本等同于实现了scheduler 的功能。
kube-scheduler 的设计比较简单,在多个版本的迭代中,其基本的逻辑没有特别大的变化,都是笨笨地一个个 pod 入队处理,我们在文中也穿插了一些相关的内容。我自己暂时感觉这部分还不是写得特别好,一个自己的功力还不够,一个是自己可以了解到的线索也太少,希望后续的更新可以完善。提出这个部分,我自己的思考是了解一下 kube-scheduler 的发展,顺便窥见软件工程的迭代概貌。我自己的理解是大致上都会遵循这样的一个历程:可用性 -> 性能优化 -> 可扩展性,然后在迭代的过程中保持稳定性。
另外,这里我刻意地把 plugins 和 extender 的部分进行了删减,一个是为了控制篇幅,一个是希望把这些可以 custom 的部分留待下一部分的 scheduler-framework 一并讲述。