Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7115876

Browse filesBrowse files
committed
Static CPU management policy alongside InPlacePodVerticalScaling
1 parent a5e3df5 commit 7115876
Copy full SHA for 7115876

File tree

Expand file treeCollapse file tree

11 files changed

+2800
-81
lines changed
Filter options
Expand file treeCollapse file tree

11 files changed

+2800
-81
lines changed

‎pkg/kubelet/cm/cpumanager/cpu_assignment.go

Copy file name to clipboardExpand all lines: pkg/kubelet/cm/cpumanager/cpu_assignment.go
+63-11Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,51 @@ type cpuAccumulator struct {
291291
availableCPUSorter availableCPUSorter
292292
}
293293

294-
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) *cpuAccumulator {
294+
func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, reusableCPUsForResize *cpuset.CPUSet, mustKeepCPUsForScaleDown *cpuset.CPUSet) *cpuAccumulator {
295295
acc := &cpuAccumulator{
296296
topo: topo,
297297
details: topo.CPUDetails.KeepOnly(availableCPUs),
298298
numCPUsNeeded: numCPUs,
299299
result: cpuset.New(),
300300
}
301301

302+
if reusableCPUsForResize != nil {
303+
if !reusableCPUsForResize.IsEmpty() {
304+
// Increase of CPU resources ( scale up )
305+
// Take existing from allocated
306+
// CPUs
307+
if numCPUs > reusableCPUsForResize.Size() {
308+
// scale up ...
309+
acc.take(reusableCPUsForResize.Clone())
310+
}
311+
312+
// Decrease of CPU resources ( scale down )
313+
// Take delta from allocated CPUs, if mustKeepCPUsForScaleDown
314+
// is not nil, use explicetely those. If it is nil
315+
// take delta starting from lowest CoreId of CPUs ( TODO esotsal, perhaps not needed).
316+
if numCPUs < reusableCPUsForResize.Size() {
317+
if mustKeepCPUsForScaleDown != nil {
318+
// If explicetely CPUs to keep
319+
// during scale down is given ( this requires
320+
// addition in container[].resources ... which
321+
// could be possible to patch ? Esotsal Note This means
322+
// modifying API code
323+
if !(mustKeepCPUsForScaleDown.Intersection(reusableCPUsForResize.Clone())).IsEmpty() {
324+
acc.take(mustKeepCPUsForScaleDown.Clone())
325+
} else {
326+
return acc
327+
}
328+
}
329+
}
330+
331+
if numCPUs == reusableCPUsForResize.Size() {
332+
// nothing to do return as is
333+
acc.take(reusableCPUsForResize.Clone())
334+
return acc
335+
}
336+
}
337+
}
338+
302339
if topo.NumSockets >= topo.NumNUMANodes {
303340
acc.numaOrSocketsFirst = &numaFirst{acc}
304341
} else {
@@ -747,15 +784,23 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
747784
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
748785
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
749786
// the least amount of free CPUs to the one with the highest amount of free CPUs.
750-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
751-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
787+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool, reusableCPUsForResize *cpuset.CPUSet, mustKeepCPUsForScaleDown *cpuset.CPUSet) (cpuset.CPUSet, error) {
788+
789+
// If the number of CPUs requested to be retained is not a subset
790+
// of reusableCPUs, then we fail early
791+
if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
792+
if (mustKeepCPUsForScaleDown.Intersection(reusableCPUsForResize.Clone())).IsEmpty() {
793+
return cpuset.New(), fmt.Errorf("requested CPUs to be retained %s are not a subset of reusable CPUs %s", mustKeepCPUsForScaleDown.String(), reusableCPUsForResize.String())
794+
}
795+
}
796+
797+
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy, reusableCPUsForResize, mustKeepCPUsForScaleDown)
752798
if acc.isSatisfied() {
753799
return acc.result, nil
754800
}
755801
if acc.isFailed() {
756802
return cpuset.New(), fmt.Errorf("not enough cpus available to satisfy request: requested=%d, available=%d", numCPUs, availableCPUs.Size())
757803
}
758-
759804
// Algorithm: topology-aware best-fit
760805
// 1. Acquire whole NUMA nodes and sockets, if available and the container
761806
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
@@ -864,25 +909,32 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
864909
// of size 'cpuGroupSize' according to the algorithm described above. This is
865910
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
866911
// a single core are allocated together.
867-
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
912+
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int, cpuSortingStrategy CPUSortingStrategy, reusableCPUsForResize *cpuset.CPUSet, mustKeepCPUsForScaleDown *cpuset.CPUSet) (cpuset.CPUSet, error) {
868913
// If the number of CPUs requested cannot be handed out in chunks of
869914
// 'cpuGroupSize', then we just call out the packing algorithm since we
870915
// can't distribute CPUs in this chunk size.
871916
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
872917
// Support for PreferAlignByUncoreCache to be done at beta release.
873918
if (numCPUs % cpuGroupSize) != 0 {
874-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
919+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false, reusableCPUsForResize, mustKeepCPUsForScaleDown)
920+
}
921+
922+
// If the number of CPUs requested to be retained is not a subset
923+
// of reusableCPUs, then we fail early
924+
if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
925+
if (mustKeepCPUsForScaleDown.Intersection(reusableCPUsForResize.Clone())).IsEmpty() {
926+
return cpuset.New(), fmt.Errorf("requested CPUs to be retained %s are not a subset of reusable CPUs %s", mustKeepCPUsForScaleDown.String(), reusableCPUsForResize.String())
927+
}
875928
}
876929

877930
// Otherwise build an accumulator to start allocating CPUs from.
878-
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
931+
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy, reusableCPUsForResize, mustKeepCPUsForScaleDown)
879932
if acc.isSatisfied() {
880933
return acc.result, nil
881934
}
882935
if acc.isFailed() {
883936
return cpuset.New(), fmt.Errorf("not enough cpus available to satisfy request: requested=%d, available=%d", numCPUs, availableCPUs.Size())
884937
}
885-
886938
// Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'.
887939
numas := acc.sortAvailableNUMANodes()
888940

@@ -1054,7 +1106,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
10541106
// size 'cpuGroupSize' from 'bestCombo'.
10551107
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
10561108
for _, numa := range bestCombo {
1057-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false)
1109+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false, reusableCPUsForResize, mustKeepCPUsForScaleDown)
10581110
acc.take(cpus)
10591111
}
10601112

@@ -1069,7 +1121,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
10691121
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
10701122
continue
10711123
}
1072-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false)
1124+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false, reusableCPUsForResize, mustKeepCPUsForScaleDown)
10731125
acc.take(cpus)
10741126
remainder -= cpuGroupSize
10751127
}
@@ -1093,5 +1145,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
10931145

10941146
// If we never found a combination of NUMA nodes that we could properly
10951147
// distribute CPUs across, fall back to the packing algorithm.
1096-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
1148+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false, reusableCPUsForResize, mustKeepCPUsForScaleDown)
10971149
}

‎pkg/kubelet/cm/cpumanager/cpu_assignment_test.go

Copy file name to clipboardExpand all lines: pkg/kubelet/cm/cpumanager/cpu_assignment_test.go
+9-9Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestCPUAccumulatorFreeSockets(t *testing.T) {
114114

115115
for _, tc := range testCases {
116116
t.Run(tc.description, func(t *testing.T) {
117-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
117+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked, nil, nil)
118118
result := acc.freeSockets()
119119
sort.Ints(result)
120120
if !reflect.DeepEqual(result, tc.expect) {
@@ -214,7 +214,7 @@ func TestCPUAccumulatorFreeNUMANodes(t *testing.T) {
214214

215215
for _, tc := range testCases {
216216
t.Run(tc.description, func(t *testing.T) {
217-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
217+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked, nil, nil)
218218
result := acc.freeNUMANodes()
219219
if !reflect.DeepEqual(result, tc.expect) {
220220
t.Errorf("expected %v to equal %v", result, tc.expect)
@@ -263,7 +263,7 @@ func TestCPUAccumulatorFreeSocketsAndNUMANodes(t *testing.T) {
263263

264264
for _, tc := range testCases {
265265
t.Run(tc.description, func(t *testing.T) {
266-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
266+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked, nil, nil)
267267
resultNUMANodes := acc.freeNUMANodes()
268268
if !reflect.DeepEqual(resultNUMANodes, tc.expectNUMANodes) {
269269
t.Errorf("expected NUMA Nodes %v to equal %v", resultNUMANodes, tc.expectNUMANodes)
@@ -335,7 +335,7 @@ func TestCPUAccumulatorFreeCores(t *testing.T) {
335335

336336
for _, tc := range testCases {
337337
t.Run(tc.description, func(t *testing.T) {
338-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
338+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked, nil, nil)
339339
result := acc.freeCores()
340340
if !reflect.DeepEqual(result, tc.expect) {
341341
t.Errorf("expected %v to equal %v", result, tc.expect)
@@ -391,7 +391,7 @@ func TestCPUAccumulatorFreeCPUs(t *testing.T) {
391391

392392
for _, tc := range testCases {
393393
t.Run(tc.description, func(t *testing.T) {
394-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked)
394+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0, CPUSortingStrategyPacked, nil, nil)
395395
result := acc.freeCPUs()
396396
if !reflect.DeepEqual(result, tc.expect) {
397397
t.Errorf("expected %v to equal %v", result, tc.expect)
@@ -477,7 +477,7 @@ func TestCPUAccumulatorTake(t *testing.T) {
477477

478478
for _, tc := range testCases {
479479
t.Run(tc.description, func(t *testing.T) {
480-
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs, CPUSortingStrategyPacked)
480+
acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs, CPUSortingStrategyPacked, nil, nil)
481481
totalTaken := 0
482482
for _, cpus := range tc.takeCPUs {
483483
acc.take(cpus)
@@ -750,7 +750,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
750750
strategy = CPUSortingStrategySpread
751751
}
752752

753-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
753+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption, nil, nil)
754754
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
755755
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
756756
}
@@ -851,7 +851,7 @@ func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
851851
if tc.opts.DistributeCPUsAcrossCores {
852852
strategy = CPUSortingStrategySpread
853853
}
854-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
854+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption, nil, nil)
855855
if tc.expErr != "" && err.Error() != tc.expErr {
856856
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
857857
}
@@ -1053,7 +1053,7 @@ func TestTakeByTopologyNUMADistributed(t *testing.T) {
10531053

10541054
for _, tc := range testCases {
10551055
t.Run(tc.description, func(t *testing.T) {
1056-
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize, CPUSortingStrategyPacked)
1056+
result, err := takeByTopologyNUMADistributed(tc.topo, tc.availableCPUs, tc.numCPUs, tc.cpuGroupSize, CPUSortingStrategyPacked, nil, nil)
10571057
if err != nil {
10581058
if tc.expErr == "" {
10591059
t.Errorf("unexpected error [%v]", err)

‎pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Copy file name to clipboardExpand all lines: pkg/kubelet/cm/cpumanager/cpu_manager_test.go
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod {
167167
}
168168

169169
pod.UID = types.UID(podUID)
170+
pod.Name = podUID
170171
pod.Spec.Containers[0].Name = containerName
171172

172173
return pod

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.