NewNodeLifecycleController()-->
nc := &Controller{
kubeClient: kubeClient,
}
podInformer.Informer().AddEventHandler()
if nc.runTaintManager {
nodeInformer.Informer().AddEventHandler()
}
Run(stopCh <-chan struct{})-->
go nc.taintManager.Run(stopCh)-->
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
item, shutdown := tc.nodeUpdateQueue.Get()
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
tc.nodeUpdateChannels[hash] <- nodeUpdate:
item, shutdown := tc.podUpdateQueue.Get() // that pods are processed by the same worker as nodes
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
case tc.podUpdateChannels[hash] <- podUpdate:
go tc.worker(i, wg.Done, stopCh)-->
tc.handleNodeUpdate(nodeUpdate)-->
taints := getNoExecuteTaints(node.Spec.Taints)
tc.taintedNodes[node.Name] = taints
pods, err := tc.getPodsAssignedToNode(node.Name)
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)-->
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
// toleration 不匹配,立即驱逐
if !allTolerated {
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
}
minTolerationTime := getMinTolerationTime(usedTolerations)
startTime := now
triggerTime := startTime.Add(minTolerationTime)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
tc.handlePodUpdate(podUpdate)-->
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())-->
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())-->
worker := createWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)-->
delay := fireAt.Sub(createdAt)
if delay <= 0 {
go f(args)
return nil
}
timer := clock.AfterFunc(delay, func() { f(args) })
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))-->
// delete pod