干货好文|一文洞悉kubernetes资源调度机制

发布时间:2021-11-01 | 信息来源: | 发布作者:沃趣科技


前言

kube-scheduler 在k8s集群中负责pod的调度。他主要的职责是监听pod 资源,对没有绑定node 的pod,根据特定的调度算法与策略选择集群中最优的work node去运行这个pod。

本文基于kube-scheduler v1.21.1版本,对kube-scheduler的运行机制结合代码做一个简单的解读。


基本原理

kube-scheduler在设计上并不复杂,为pod获取最优节点主要分为2个阶段3个步骤

调度阶段

  • predicates,为带调度的pod过滤集群中不适合运行的节点。kube-scheduler提供了一些过滤策略,多个策略可以组合使用。

  • priority,经过predicate过滤后, priority负责对剩余的节点「评分」,比如剩余资源较多的节点会获得较高的评分,符合Pod NodeAffinityPriority的节点会获得较高的评分。 priority阶段只会返回一个节点,当最高分的节点有多个时将按照round-robin的方式选择Node, kube-scheduler 内置了一些「评分策略」,同样也可以组合使用。

绑定阶段

  • bind,为pod绑定提名节点,发送请求到apiserver。



扩展调度策略

当kube-scheduler默认加载的过滤策略与优先级算法不满足我们的需求时,kube-scheduler也提供了接口让我们「扩展」调度策略。


scheduler policy

  • 通过kube-scheduler的配置文件,选择使用哪些过滤策略和评分算法,只允许使用k8s 已实现好的过滤策略和评分策略

  • 在policy配置中定义scheduler extender,通过webhook来扩展调度策略,开发者提供两个接口分别用来处理"过滤"和“评分”。 一个接口用来处理kube-scheduler已经过滤后的节点列表。 一个接口用来处理kube-scheduler已经评分后的节点列表。


scheduler framework

  • pod的调度流程以插件的方式实现,原有的过滤策略和评分策略全部都以in tree plugin的方式实现。对过滤、评分策略的扩展以out of tree plugin方式实现。 整个调度流程提供丰富的扩展点,每个扩展点绑定plugin,在调度流程走到相应的扩展点时,按顺序执行该扩展点的plugin。开发者可以向kube-scheduler注册插件,以介入pod的调度绑定流程。

  • kube-scheduler v1.15版提出,v1.18版release,后续扩展调度策略的主流方案。


Scheduler Framework

调度系统的在调度时的目的往往是动态的,可能是成本优先、质量优先、最大资源利用率优先等等,这与业务场景有关。


正是因为调度系统的调度策略是与业务场景相关联的,很难用一套调度策略满足所有业务场景。越来越多的调度策略加入到kube-scheduler中,使得kube-scheduler的调度逻辑越来越复杂,复杂的调度器是难以维护的,早期的kube-scheduler虽然也具备了扩展能力使得开发者可以为特定的业务场景设计调度策略,但受到如下方面的限制。


  • 扩展点的数量只有两个:过滤后、评分后。扩展程序只能在kube-scheduler过滤后与评分后介入调度流程。

  • kube-scheduler与扩展程序的程序使用HTTP通信,每一次通信都设计json的序列化与反序列化,性能较差。

  • 扩展程序作为一个独立的程序,要么只处理kube-scheduler传递过来的数据,要么自行构建一套k8s资源缓存,存在额外的资源开销

  • 扩展程序无法感知被调度的资源当前处于什么状态。如果一个pod被kube-scheduler判定为不可调度,扩展程序是无法感知的。


因为这些限制无法构建一个高性能、多功能的调度器,为此社区提出scheduler framework来解决kube-scheduler的扩展与性能问题。使得调度程序的核心更简单,方便维护。


接下来我们来聊一聊scheduler framework定义的一些 「对象」分别表示什么,承担什么作用。


ExtensionPoints(扩展点)

扩展点表示在调度绑定周期中的一个“阶段”,kube-scheduler会在每一个阶段执行做一些事情,以完成pod的调度绑定。Scheduler Frameworker的工作流按照以下顺序,执行相应扩展点的插件。每一个扩展点可以绑定多个插件,一个扩展点的每一个插件都需要返回处理结果,如果处理结果为错误,则该pod将直接打回「待调度队列」等待下一次调度。



Predicates

从待调度的pod队列中,拿出一个pod开始实现调度绑定逻辑时,将依次通过预定义的「扩展点」,扩展点对于开发者来说是一个接口,实现这个接口的对象被称之为「插件」。


一个插件可以实现多个扩展点接口。


多个插件实现同一个扩展点接口,Schduler会按序执行每一个插件。

  • 在走到post filter扩展点时,按顺序执行相应的plugin,如果有一个插件返回成功或失败,那么其他的plugin都不会执行。

  • 在走到bind阶段时,按顺序执行相应的plugin,如果有一个plugin执行了绑定,那么其他plugin都不会执行


CycleState

每一个pod的调度绑定流程,都会关联一个CycleState对象,它用于存储当前pod整个调度绑定流程的所有数据,在每一个扩展点中,所有的plugin都可以拿到这个对象,可以从该对象读取或写入一些必要的信息。


CycleState是对所有plugin共享的。


扩展点详解

在每一个扩展点,插件都可以介入进来执行一些“操作”,下面对每一个扩展点做一个简单的说明。

  • queueSort 待调度队列的重排序,plugin可以在这个阶段把优先级比较高的pod放到队列的前面,使得他们可以优先调度。

  • PreFilter ,过滤之前的预处理,检查pod,可以将pod标记为不可调度。这个扩展点framework是不会提供node信息的,只能对pod做检查。虽然可以通过ClientSet去拿所有的node信息,但不推荐,在大规模k8s环境下,会影响效率。

  • Filter  ,检查node是否可以运行待调度的pod。插件在这个扩展点拿到的是node信息,不是node列表。插件仅需检查framework提供的node是否可以运行当前的pod

  • PostFilter,在filter扩展点,无法找到一个合适的node,被判定为无法调度时,就会触发该扩展点插件执行。内置的插件通过这个扩展点,实现了高优先级pod资源抢占机制。

  • PreScore,前置评分主要用于对pod和候选node列表做一个检查,并将处理结果放到CycleState中,以方便在Score扩展为评分逻辑提供一些辅助信息。

  • Score, 给filter后产生的可用节点列表打分,返回可用node列表及其评分。

  • NormalizeScore,他是Score扩展点接口定义的一个方法,在评分完成后,还可以修正每个node分数。它必须要保证评分的范围在0-100以内。

  • framework在拿到评分结果后会选择评分最高的节点返回。

  • Reserve,实现该扩展点的插件,可为pod预留关联资源,如pod关联了pvc资源,那么在kube-scheduler的缓存中提前为pod绑定pvc与pv资源。

  • Reserve扩展点需要实现reserve、unreserve方法,分别是执行资源预留与回滚资源预留。

  • kube-scheduler的VolumeBinding插件在这个扩展点实现了pod关联的pvc、pv资源预留。

  • Permit,可以阻止或延迟pod绑定node

  • PreBind,绑定前执行一些任务,如将pod的关联资源的绑定持久化至k8s。如果PreBind失败会执行Reserve扩展点的Unreserve方法回滚pod的关联资源绑定。

  • Bind, 将pod和node绑定,实现bind扩展点的插件按顺序调用,只要有一个插件完成绑定,后续插件都会全部跳过。Bind失败会执行Reserve扩展点的Unreserve方法回滚pod的关联资源绑定。

  • PostBind, 绑定后执行一些逻辑,可以做一些清理工作。


任意一个扩展点插件返回错误都会中断该pod的调度,并返回到调度队列


Multi Scheduler

Scheduler Framework的扩展能力远远超过之前Scheduler Policy,同时基于Scheduler Framework引入了新的Scheduler配置:KubeSchedulerProfile。

在KubeSchedulerProfile中可以定义每一个扩展点使用哪些插件,禁用哪些插件。

在KubeSchedulerProfile中可以定义多个Profile。一个Profile表示一个调度器,在kube-scheduler初始化时会读取KubeSchedulerProfile创建多个framework对象,kube scheduler 通过pod.spec.schedulerName,找到对应的framework对象,使用该对象为pod执行调度绑定流程


如下定义了两个调度器:default-scheduler、no-scoring-scheduler。


apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
profiles:
  - schedulerName: default-scheduler
  - schedulerName: no-scoring-scheduler
    plugins:
      preScore:
        disabled:
        - name: "*"
      score:
        disabled:
        - name: "*"



基于Scheduler Framework的调度绑定实现


调度队列的设计

kube-scheduler的pod调度队列由PriorityQueue对象实现。他最核心的数据结构主要是3个子队列


activeQ

activeQ子队列包含正在等待调度的 pod,由「堆」数据结构实现。queueSort扩展点的插件可以对该队列中的pod 做排序,以实现高优先级的pod优先调度

kube-scheduler内置的queueSort扩展点插件 queueSortPlugin会将activeQ队列中的pod,按照优先级与创建时间排序。将高优先级、创建时间早的pod放在队列头部。

同时activeQ中的pod  被拿出来时,会关联一个SchedulingCycle。他是调度队列里面的一个计数器,每次pod被拿出来就加1。


podBackoffQ

backoff 是并发编程中常见的一种机制,就是如果一个任务重复执行,但依旧失败,则会按照失败的次数提高重试等待时间,避免频繁重试浪费资源。

运行失败的pod都会放到backoff队列中,并在一段时间后移至activeQ中,这个”一段时间“具体是多久,则取决于它的失败次数,最大不会超过10s(DefaultPodMaxBackoffDuration)


当你观察k8s集群里一个 一直crash的pod,他的status会变成crashLoopBackoff,像这种pod都会进入podBackoffQ队列


unschedulableQ

在调度时,被判定为无法调度的pod,都将存放至该队列。 无法调度是指pod的要求,当前集群无法满足,如pod的cpu要求高,当前集群所有节点都不满足。此时pod就会被判定为无法调度进入unschedulableQ队列。


调度队列的使用

调度队列在运行期间:

每隔1s检查podBackOffQ队列中是否有pod可以放进activeQ

每隔30s把unschedulableQ中长时间(默认60s)处于不可调度的pod移至backoff队列。

每隔0s,scheduler对象从activeQ获取待调度的pod

当一个pod在调度周期被判定为不可调度进入到unschedulerQ队列后,如果集群资源发生了变化,比如新增了node,删除了pod等。一个不可调度的pod就有了调度成功的可能性。它将等待60s后,定时任务触发将这个pod移动至backoffQ队列,并再次从backoffQ队列移动至activeQ队列,准备下一次调度。

这样子对于pod的调度会是一个效率的很低的事情,因为他需要的时间太长了。为了提高效率,scheduler对象监听pod与node资源发生变化时,都会调用PriorityQueue对象的movePodsToActiveOrBackoffQueue方法。该方法会将不可调度的pod会被重新放进activeQ或者backoffQ,同时moveRequestCycle会设置为当前的schedulingCycle。

至于具体移动到哪个队列,则根据moveRequestCycle是否大于等于SchedulingCycle,如果大于等于则放到backOff队列,否则放到unschedulerQ队列。结合前面的分析,只有资源发生变化时,moveRequestCycle才有可能大于等于当前的SchedulingCycle。那么就说明,在判断pod无法调度后,k8s集群环境已经发生了变化。那么此时判定无法调的pod在集群变化后,还是有可能可以调度的,所以放到backOff队列中,是为了让他尽快发起调度重试。


最终工作流程如下:



调度主流程

  • scheduler对象从调度队列的activeQ队列取待调度的pod

  • scheduler对象根据pod的schedule Name 字段获取对应的framework对象。

  • scheduler对象调用generic scheduler对象执行framework的 queue sort、prefilter、filter、prescore、score扩展点。

  • 若generic scheduler在执行时,返回错误

  • 错误类型为FitError,表示过滤阶段失败,触发抢占机制,执行framework的postfilter扩展点

  • 将pod重新加入调度队列

  • scheduler 对象执行assume,对拿到提名节点的pod在缓存中执行节点绑定操作,这样绑定流程可以异步去执行。 同时,在kube-scheduler的pod缓存中,这个pod是已经在node上正常运行的,那么在后续pod调度时,对于节点资源的评估,也是包涵这个pod已经占用节点上的一部分资源的。

  • scheduler对象执行framework的Reserve、Permit扩展点。

  • goroutine异步执行framework的Prebind、Bind、PostBind扩展点。



调度周期是同步执行的,完成调度周期的工作流后,会通过goroutine异步执行绑定周期,这样可以无需等待绑定结果,立刻为下一个pod开启调度。


大规模K8s集群调度瓶颈

当k8s 集群节点规模比较大时,如果每一个pod都需要遍历所有node来判定哪个node是”合适“的。那么这个调度流程效率会变的特别低。

kube-scheduler会控制参与调度的node数量来提高调度效率,在默认情况下,如果k8s节点的数量少于100个,那么所有的节点都会参与调度。否则,将根据设置的节点百分比选择部分节点参与调度。


控制节点数量

  • 调用prefilter 扩展点的插件,检查pod是否可以被调度,如果任一PreFilter插件返回错误,那么pod将打回待调度队列

  • 遍历所有节点,并调用Filter扩展点的插件,并记录适合该pod节点的数量,一旦达到数量限制或plugin的filter方法返回失败,将停止遍历。

  • 调用filter扩展点插件时,可能会执行两遍filter,具体的原因和抢占功能有关,文章末尾解释原因

  • 在寻找适合pod的node列表时,将开启16个(默认16个)goroutine 并行筛选。每个goroutine会各自负责所有节点中的一部分。

  • 记录停止遍历时,node列表遍历到什么位置。在下一个pod的调度周期,将从这个位置开始遍历node列表。这样可以保障集群中每一个节点的都有公平的被调度机会。


代码分析如下:

计算节点数量限制


func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
  //  对于节点数量少于100的,全部节点参与调度
  //  percentageOfNodesToScore是集群所有节点参与数量的百分比,如果设置为100,就是所有节点都参与调度
  if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
    return numAllNodes
  }
  
  adaptivePercentage := g.percentageOfNodesToScore
//当numAllNodes大于100时,且配置的百分比小于等于0,那么这里需要计算出一个百分比
// 计算公式:百分比 = 50 - (总节点数)/125
  if adaptivePercentage <= 0 { basePercentageOfNodesToScore := int32(50) adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125 if adaptivePercentage < minFeasibleNodesPercentageToFind { adaptivePercentage = minFeasibleNodesPercentageToFind } } numNodes = numAllNodes * adaptivePercentage / 100 if numNodes < minFeasibleNodesToFind { return minFeasibleNodesToFind } return numNodes }


获取可调度节点


// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(
  ctx context.Context,
  fwk framework.Framework,
  state *framework.CycleState,
  pod *v1.Pod,
  diagnosis framework.Diagnosis,
  nodes []*framework.NodeInfo) ([]*v1.Node, error) {
  // 计算node数量限制
  numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))

  // 存放合适的node列表
  feasibleNodes := make([]*v1.Node, numNodesToFind)
  // 如果没有插件实现Filter扩展点,就直接截取所有node列表中的一段
  // 从上一次停止查找的node 后面开始截
  if !fwk.HasFilterPlugins() {
    length := len(nodes)
    for i := range feasibleNodes {
      feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
    }
    g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
    return feasibleNodes, nil
  }
  errCh := parallelize.NewErrorChannel()
  var statusesLock sync.Mutex
  var feasibleNodesLen int32
  ctx, cancel := context.WithCancel(ctx)
  // 执行所有插件的Filter,
  checkNode := func(i int) {
    // 从上一个调度周期中离开的节点开始检查节点是否合适,执行所有插件的filter
    nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
    status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
    if status.Code() == framework.Error {
      errCh.SendErrorWithCancel(status.AsError(), cancel)
      return
    }
    // 如果这个节点合适,那么就把他放到feasibleNodes列表中
    if status.IsSuccess() {
      
      length := atomic.AddInt32(&feasibleNodesLen, 1)
      if length > numNodesToFind {
        cancel()
        atomic.AddInt32(&feasibleNodesLen, -1)
      } else {
        feasibleNodes[length-1] = nodeInfo.Node()
      }
    } else {
      statusesLock.Lock()
      diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
      diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
      statusesLock.Unlock()
    }
  }

  beginCheckNode := time.Now()
  statusCode := framework.Success
  defer func() {
    // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
    // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
    // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
    metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
  }()

  // 开协程执行filter,直到数量达到限制
  fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
  // 设置下次开始遍历node的位置
  processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
  g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)

  feasibleNodes = feasibleNodes[:feasibleNodesLen]
  if err := errCh.ReceiveError(); err != nil {
    statusCode = framework.Error
    return nil, err
  }
  return feasibleNodes, nil
}






抢走低优先级pod的资源


kube-scheduler为了保障高优先级的pod可以优先调度,在pod被判定为无法调度时,并不会直接将其放到unschedulerQ队列,而是检查有没有优先级比当前pod低的的pod可以抢占。如果有则执行抢占流程。

pod的priority用来表示pod的优先级,如果没有设置这个字段,那么pod的优先级就是0。k8s控制平面的所有组件全部都是高优先级,其优先级都被设置为2000001000。

”抢占“的逻辑由PostFilter扩展点来实现,后面我们主要分析kube-scheduler内置的DefaultPreemption插件实现PostFilter时,是如何实现抢占逻辑的。


func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
  defer func() {
    metrics.PreemptionAttempts.Inc()
  }()

  nnn, status := pl.preempt(ctx, state, pod, m)
  if !status.IsSuccess() {
    return nil, status
  }
  // This happens when the pod is not eligible for preemption or extenders filtered all candidates.
  if nnn == "" {
    return nil, framework.NewStatus(framework.Unschedulable)
  }
  return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
}



DefaultPreemption插件首先会去获取node列表,然后获取最新的要执行抢占的pod信息,接着分下面几步执行抢占:

  1. 检查是pod否可实施抢占。调用PodEligibleToPreemptOthers方法,检查当前pod是否能够进行抢占,如果当前的pod已经抢占了一个node节点且该节点有pod正在执行优雅退出,那么不应该执行抢占。

  2. 查找可抢占的节点。调用FindCandidates找到所有node中能被抢占的node节点,并返回候选node列表以及node节点中需要被删除的pod(牺牲者);

  3. 寻找最佳抢占目标。调用SelectCandidate方法在所有候选列表中找出最合适的node节点执行抢占;

  4. 执行抢占。调用PrepareCandidate方法删除被抢占的node节点中victim pod(牺牲者),以及清除牺牲者的NominatedNodeName字段信息;

  5. 牺牲者pod资源发生变化,被kube-sheculer监听到,重新加入调度队列,等待重新调度。


PodEligibleToPreemptOthers

这个方法会检查该pod是否已经抢占过其他node节点,如果是的话就遍历这个节点上的所有pod对象,如果发现节点上有pod资源对象的优先级小于待调度pod资源对象并处于terminating状态,说明这个node正在执行低优先级pod驱逐,已经有正在删除的pod,等待删除成功后,释放资源,高优先级的pod就会占用这个node。


func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
  if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
    klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod))
    return false
  }
  nomNodeName := pod.Status.NominatedNodeName

  // 检查pod是否已经有提名node,如果有那么说明已经执行过抢占
  if len(nomNodeName) > 0 {
    if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
      return true
    }
    // 获取抢占的node
    if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
    // 检查这个node中是否存在正处于terminating状态的pod,且优先级比当前pod低
      podPriority := corev1helpers.PodPriority(pod)
      for _, p := range nodeInfo.Pods {
        if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority { // There is a terminating pod on the nominated node. return false } } } } return true }



FindCandidates

FindCandidates方法首先会获取node列表,然后调用nodesWherePreemptionMightHelp方法来找出predicates 阶段失败但是通过抢占也许能够调度成功的nodes,因为并不是所有的node都可以通过抢占来调度成功。最后调用dryRunPreemption方法来获取符合条件的node节点。





func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {
  allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
  if err != nil {
    return nil, nil, framework.AsStatus(err)
  }
  if len(allNodes) == 0 {
    return nil, nil, framework.NewStatus(framework.Error, "no nodes available")
  }
  // 找到可以占用的node
  potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
  if len(potentialNodes) == 0 {
    klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
    // In this case, we should clean-up any existing nominated node name of the pod.
    // 如果当前pod不存在可以抢占的node,那么就把pod的提名node信息给删掉
    if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil {
      klog.ErrorS(err, "cannot clear "NominatedNodeName" field of pod", "pod", klog.KObj(pod))
      // We do not return as this error is not critical.
    }
    return nil, unschedulableNodeStatus, nil
  }
  // 获取PDB对象,PDB能够限制同时终端的pod资源对象的数量,以保证集群的高可用性
  pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
  if err != nil {
    return nil, nil, framework.AsStatus(err)
  }

  offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))
  if klog.V(5).Enabled() {
    var sample []string
    for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ { sample = append(sample, potentialNodes[i].Node().Name) } klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) } // 寻找符合条件的node,并封装成candidate数组返回 candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates) for node, status := range unschedulableNodeStatus { nodeStatuses[node] = status } return candidates, nodeStatuses, nil }


SelectCandidate

这个方法里面会调用candidatesToVictimsMap方法做一个node name和victims映射map,然后调用pickOneNodeForPreemption执行主要过滤逻辑。


func SelectCandidate(candidates []Candidate) Candidate {
  // 如果没有候选node,就直接返回nil
  if len(candidates) == 0 {
    return nil
  }
  // 如果只有一个候选node,就直接返回该node
  if len(candidates) == 1 {
    return candidates[0]
  }
  // 拿到所有候选node里面,需要“牺牲”的pod 映射关系
  victimsMap := candidatesToVictimsMap(candidates)
  // 选择一个候选node
  candidateNode := pickOneNodeForPreemption(victimsMap)
  
  // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
  // preemption plugins that exercise different candidates on the same nominated node.
  if victims := victimsMap[candidateNode]; victims != nil {
    return &candidate{
      victims: victims,
      name:    candidateNode,
    }
  }

  // We shouldn"t reach here.
  klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates)
  // To not break the whole flow, return the first candidate.
  return candidates[0]
}



PrepareCandidate

至此拿到了候选node,以及该node上需要牺牲的pod。

  • 删除需要牺牲的pod。

  • 找到所有提名这个node的 pod,且优先级比当前pod低的。

  • 去除这些pod的Nominated信息。并且将这些pod移至activeQ队列,让他们重新调度




func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status {
  // 删除候选node上 需要牺牲的pod
  for _, victim := range c.Victims().Pods {
    // If the victim is a WaitingPod, send a reject message to the PermitPlugin.
    // Otherwise we should delete the victim.
    if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
      waitingPod.Reject(pluginName, "preempted")
    } else if err := util.DeletePod(cs, victim); err != nil {
      klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
      return framework.AsStatus(err)
    }
    fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
      pod.Namespace, pod.Name, c.Name())
  }
  metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
 

  // 找到优先级比当前pod低,且也提名了该候选node的pod。
  nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
  // 删除这些pod的Nominate信息,并移至activeQ队列
  if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
    klog.ErrorS(err, "cannot clear "NominatedNodeName" field")
    // We do not return as this error is not critical.
  }

  return nil
}



抢占总结


高优先级的pod进行”抢占“时,会将pod的nominatedNodeName 字段,设置为被抢占的 Node 的名字。然后,在下一周期中决定是不是要运行在被抢占的节点上,当这个Pod在等待的时候,如果有其他更高优先级的 Pod 也要抢占同一个节点,那么调度器就会清空「被抢占者」的spec.nominatedNodeName 字段,从而允许更高优先级的抢占者执行抢占。这也使得「被抢占者」本身也有机会去重新抢占其他节点。

抢占者并不会立刻被调度到被抢占的 node 上,调度器只会将抢占者的 status.nominatedNodeName 字段设置为被抢占的 node 的名字。然后,抢占者会重新进入下一个调度周期,在新的调度周期里来决定是不是要运行在被抢占的节点上,当然,即使在下一个调度周期,调度器也不会保证抢占者一定会运行在被抢占的节点上。这样设计的一个重要原因是调度器只会通过标准的 DELETE API 来删除被抢占的 pod,所以,这些 pod 必然是有一定的“优雅退出”时间(默认是 30s)的。而在这段时间里,其他的节点也是有可能变成可调度的,或者直接有新的节点被添加到这个集群中来。


调度流程的二次过滤

在调度流程中Filter扩展点可能会执行两次,其主要目的是为了考虑高优先级的pod抢占了node的场景。

第一次会调用addNominatedPods方法将调度队列中找到node上优先级大于或等于当前pod的 pod集合加入到nodeInfo对象中,然后执行FilterPlugin列表。第二次则直接执行FilterPlugins列表。之所以第一次要这么做,是因为在pod抢占node的逻辑中,优先级高的pod先抢占node,抢占成功后将pod.Status.NominatedNodeName字段设置成当前的node,设置完成后scheduler就跑去执行下一个pod的调度逻辑了,这时pod很可能还没有真正在node上面跑起来。所以Scheduler的缓存中其实并没有将这类pod的信息,所以在调度当前pod的时候,会受这些高优先级pod的影响(pod和pod之间有pod亲和性、反亲和性等依赖关系),所以要先假设这类高优先级的pod已经在这个node中跑起来了,已经占用了节点上的一部分资源,这样当前pod的调度是以「节点最少剩下多少资源」来执行filter扩展点。

为了确保万无一失(万一这些高优先级的pod最终没在这个node跑起来),还得把这些高优先的pod排除掉再执行一次filter扩展点。 这样,无论其它高优先级的pod在不在这个node上,这个pod都能确保无冲突地调度在这些node上面。

代码实现如下:


func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
  var status *framework.Status

  podsAdded := false
  for i := 0; i < 2; i++ { stateToUse := state nodeInfoToUse := info if i == 0 { var err error podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info) if err != nil { return framework.AsStatus(err) } } else if !podsAdded || !status.IsSuccess() { break } statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return status } } return status }




结束语

得益于scheduler framework的设计,所有的调度策略都以插件的形式注入kube-scheduler,并协同工作完成pod的调度绑定。调度策略的实现分散在各个插件中,本文主要关注kube-scheduler一些重要插件的调度逻辑实现,起到抛砖引玉的作用。



引用

https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/624-scheduling-framework/README.md framework设计提案

https://draveness.me/system-design-scheduler/ 调度系统设计精要

https://zhuanlan.zhihu.com/p/33823266 集群资源调度系统设计架构

https://www.infoq.cn/article/lYUw79lJH9bZv7HrgGH5 进击的kubernetes调度系统





沃趣科技,让客户用上更好的数据库技术!