RecommenderFactory{}.Make()
**recommender** := routines.NewRecommender(config, *checkpointsGCInterval, useCheckpoints, *vpaObjectNamespace)
PodResourceRecommender: logic.CreatePodResourceRecommender()-->
targetCPUPercentile := 0.9
lowerBoundCPUPercentile := 0.5
upperBoundCPUPercentile := 0.95
targetMemoryPeaksPercentile := 0.9
lowerBoundMemoryPeaksPercentile := 0.5
upperBoundMemoryPeaksPercentile := 0.95
// new percentileEstimator
targetEstimator := NewPercentileEstimator(targetCPUPercentile, targetMemoryPeaksPercentile)
lowerBoundEstimator := NewPercentileEstimator(lowerBoundCPUPercentile, lowerBoundMemoryPeaksPercentile)
upperBoundEstimator := NewPercentileEstimator(upperBoundCPUPercentile, upperBoundMemoryPeaksPercentile)
if useCheckpoints {
recommender.GetClusterStateFeeder().InitFromCheckpoints()-->
feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
feeder.setVpaCheckpoint(&checkpoint)-->
cs := model.NewAggregateContainerState()-->
// CPUHistogramDecayHalfLife and MemoryHistogramDecayHalfLife are 24h
AggregateCPUUsage: util.**NewDecayingHistogram**(config.CPUHistogramOptions, config.CPUHistogramDecayHalfLife),
AggregateMemoryPeaks: util.**NewDecayingHistogram**(config.MemoryHistogramOptions, config.MemoryHistogramDecayHalfLife),
feeder.LoadVPAs()-->
vpaCRDs, err := feeder.vpaLister.List(labels.Everything())
feeder.clusterState.AddOrUpdateVpa(vpaCRD, selector)--> // build incluster vpa memory cache as a map[vpaid]
currentRecommendation = apiObject.Status.Recommendation
feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
feeder.setVpaCheckpoint(&checkpoint) -->
vpa, exists := feeder.clusterState.Vpas[vpaID]
// load resource histogram from VerticalPodAutoscalerCheckpoint
cs.**LoadFromCheckpoint**(&checkpoint.Status)--> // decayingHistogram.LoadFromCheckpoint()
vpa.ContainersInitialAggregateState[checkpoint.Spec.ContainerName]
feeder.clusterState.ObservedVpas = vpaCRDs
} else {
provider, err := history.NewPrometheusHistoryProvider(config) // history=8d, resolution=1h
recommender.GetClusterStateFeeder().InitFromHistoryProvider(provider)
clusterHistory, err := historyProvider.GetClusterHistory()-->
// rate(container_cpu_usage_seconds_total{%s}[%s]) // resolution=1h
// now - 8d(historyDuration)
p.readResourceHistory(res, historicalCpuQuery, model.ResourceCPU) -->
getContainerUsageSamplesFromSamples(ts.Values, resource)
// container_memory_working_set_bytes{%s} //
p.readResourceHistory(res, historicalMemoryQuery, model.ResourceMemory)
getContainerUsageSamplesFromSamples(ts.Values, resource)
feeder.clusterState.AddOrUpdatePod(podID, podHistory.LastLabels, apiv1.PodUnknown)-->
container.aggregator = cluster.findOrCreateAggregateContainerState(containerID)-->
aggregateContainerState, aggregateStateExists := cluster.aggregateStateMap[aggregateStateKey]
vpa.UseAggregationIfMatching(aggregateStateKey, aggregateContainerState)-->
aggregateContainerState = NewAggregateContainerState()
vpa.aggregateContainerStates[aggregationKey] = aggregation
vpa.UseAggregationIfMatching(aggregateStateKey, aggregateContainerState)-->
vpa.aggregateContainerStates[aggregationKey] = aggregation
aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy))
feeder.clusterState.**AddSample**() // 添加半衰期直方图采样点
}
ticker := time.Tick(*metricsFetcherInterval) // 1minute
for range ticker {
recommender.RunOnce()-->
r.clusterStateFeeder.LoadVPAs()
r.clusterStateFeeder.LoadPods()
r.clusterStateFeeder.LoadRealTimeMetrics()
containersMetrics, err := feeder.metricsClient.GetContainersMetrics()-->// get metrics from metrics-server
c.metricsGetter.PodMetricses(c.namespace)
feeder.clusterState.**AddSample**(sample) // (h *decayingHistogram) AddSample(value float64, weight float64, time time.Time)
// 添加半衰期直方图采样点
h.histogram.AddSample(value, weight*h.decayFactor(time), time)-->
decayFactor(time)-->
h.shiftReferenceTimestamp(timestamp) **// 超出最大允许时间时移动窗口**
return math.Exp2(float64(timestamp.Sub(h.referenceTimestamp)) / float64(h.halfLife)) **// 计算半衰期权重
// bucket(value) = floor(log(value/firstBucketSize*(ratio-1)+1) / log(ratio))**
(h *histogram) AddSample(value float64, weight float64, time time.Time)-->
// bucket(value) = floor(log(value/firstBucketSize*(ratio-1)+1) / log(ratio))
h.options.FindBucket(value)
**** Loop:
select {
case oomInfo := <-feeder.oomChan: // oomInfo := range parseEvictionEvent(event); o.observedOomsChannel <- oomInfo
feeder.clusterState.RecordOOM(oomInfo.ContainerID, oomInfo.Timestamp, oomInfo.Memory)-->
containerState.RecordOOM(timestamp, requestedMemory)-->
container.addMemorySample(&oomMemorySample, true)-->
container.aggregator.AddSample(&newPeak)
}
r.UpdateVPAs()-->
r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa))-->
GetContainerNameToAggregateStateMap()-->
AggregateStateByContainerName()-->
vpa.MergeCheckpointedState(containerNameToAggregateStateMap)-->
// merge status which is retrieved from checkpoint
for containerName, aggregation := range vpa.ContainersInitialAggregateState {
aggregateContainerState.MergeContainerState(aggregation)
}
GetRecommendedPodResources()-->
fraction := 1.0 / float64(len(containerNameToAggregateStateMap))
minResources := model.Resources{
model.ResourceCPU: model.ScaleResource(model.CPUAmountFromCores(*podMinCPUMillicores*0.001), fraction),
model.ResourceMemory: model.ScaleResource(model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), fraction),
}
recommender := &podResourceRecommender{
WithMinResources(minResources, r.targetEstimator),
WithMinResources(minResources, r.lowerBoundEstimator),
WithMinResources(minResources, r.upperBoundEstimator),
}
recommendation[containerName] = recommender.estimateContainerResources(aggregatedContainerState)-->
recommender.estimateContainerResources(aggregatedContainerState)-->
r.targetEstimator.GetResourceEstimation(s)-->
(e *percentileEstimator) GetResourceEstimation(s *model.AggregateContainerState)-->
**// 计算资源建议**
// Return the end of the threshold bucket.
s.AggregateCPUUsage.Percentile(e.cpuPercentile))
s.AggregateMemoryPeaks.Percentile(e.memoryPercentile))-->
// bucket(value) = floor(log(value/firstBucketSize*(ratio-1)+1) / log(ratio))
vpa.UpdateRecommendation(getCappedRecommendation(vpa.ID, resources, observedVpa.Spec.ResourcePolicy))-->
r.MaintainCheckpoints(ctx, *minCheckpointsPerRun)--> //10
r.checkpointWriter.StoreCheckpoints(ctx, now, minCheckpointsPerRun)-->
aggregatedContainerState.SaveToCheckpoint()-->
a.AggregateMemoryPeaks.SaveToChekpoint()
a.AggregateCPUUsage.SaveToChekpoint()--> // decaying_histogram.go:SaveToChekpoint()
h.histogram.SaveToChekpoint()--> // 保存半衰期直方图至checkpoint
r.clusterState.RateLimitedGarbageCollectAggregateCollectionStates(time.Now())
healthCheck.UpdateLastActivity()
}
https://www.infoq.cn/article/z40lmwmtoyvecq6tpoik