@@ -291,14 +291,51 @@ type cpuAccumulator struct {
291
291
availableCPUSorter availableCPUSorter
292
292
}
293
293
294
- func newCPUAccumulator (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy ) * cpuAccumulator {
294
+ func newCPUAccumulator (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , reusableCPUsForResize * cpuset. CPUSet , mustKeepCPUsForScaleDown * cpuset. CPUSet ) * cpuAccumulator {
295
295
acc := & cpuAccumulator {
296
296
topo : topo ,
297
297
details : topo .CPUDetails .KeepOnly (availableCPUs ),
298
298
numCPUsNeeded : numCPUs ,
299
299
result : cpuset .New (),
300
300
}
301
301
302
+ if reusableCPUsForResize != nil {
303
+ if ! reusableCPUsForResize .IsEmpty () {
304
+ // Increase of CPU resources ( scale up )
305
+ // Take existing from allocated
306
+ // CPUs
307
+ if numCPUs > reusableCPUsForResize .Size () {
308
+ // scale up ...
309
+ acc .take (reusableCPUsForResize .Clone ())
310
+ }
311
+
312
+ // Decrease of CPU resources ( scale down )
313
+ // Take delta from allocated CPUs, if mustKeepCPUsForScaleDown
314
+ // is not nil, use explicetely those. If it is nil
315
+ // take delta starting from lowest CoreId of CPUs ( TODO esotsal, perhaps not needed).
316
+ if numCPUs < reusableCPUsForResize .Size () {
317
+ if mustKeepCPUsForScaleDown != nil {
318
+ // If explicetely CPUs to keep
319
+ // during scale down is given ( this requires
320
+ // addition in container[].resources ... which
321
+ // could be possible to patch ? Esotsal Note This means
322
+ // modifying API code
323
+ if ! (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
324
+ acc .take (mustKeepCPUsForScaleDown .Clone ())
325
+ } else {
326
+ return acc
327
+ }
328
+ }
329
+ }
330
+
331
+ if numCPUs == reusableCPUsForResize .Size () {
332
+ // nothing to do return as is
333
+ acc .take (reusableCPUsForResize .Clone ())
334
+ return acc
335
+ }
336
+ }
337
+ }
338
+
302
339
if topo .NumSockets >= topo .NumNUMANodes {
303
340
acc .numaOrSocketsFirst = & numaFirst {acc }
304
341
} else {
@@ -747,15 +784,23 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
747
784
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
748
785
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
749
786
// the least amount of free CPUs to the one with the highest amount of free CPUs.
750
- func takeByTopologyNUMAPacked (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , preferAlignByUncoreCache bool ) (cpuset.CPUSet , error ) {
751
- acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy )
787
+ func takeByTopologyNUMAPacked (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , preferAlignByUncoreCache bool , reusableCPUsForResize * cpuset.CPUSet , mustKeepCPUsForScaleDown * cpuset.CPUSet ) (cpuset.CPUSet , error ) {
788
+
789
+ // If the number of CPUs requested to be retained is not a subset
790
+ // of reusableCPUs, then we fail early
791
+ if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
792
+ if (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
793
+ return cpuset .New (), fmt .Errorf ("requested CPUs to be retained %s are not a subset of reusable CPUs %s" , mustKeepCPUsForScaleDown .String (), reusableCPUsForResize .String ())
794
+ }
795
+ }
796
+
797
+ acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy , reusableCPUsForResize , mustKeepCPUsForScaleDown )
752
798
if acc .isSatisfied () {
753
799
return acc .result , nil
754
800
}
755
801
if acc .isFailed () {
756
802
return cpuset .New (), fmt .Errorf ("not enough cpus available to satisfy request: requested=%d, available=%d" , numCPUs , availableCPUs .Size ())
757
803
}
758
-
759
804
// Algorithm: topology-aware best-fit
760
805
// 1. Acquire whole NUMA nodes and sockets, if available and the container
761
806
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
@@ -864,25 +909,32 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
864
909
// of size 'cpuGroupSize' according to the algorithm described above. This is
865
910
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
866
911
// a single core are allocated together.
867
- func takeByTopologyNUMADistributed (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuGroupSize int , cpuSortingStrategy CPUSortingStrategy ) (cpuset.CPUSet , error ) {
912
+ func takeByTopologyNUMADistributed (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuGroupSize int , cpuSortingStrategy CPUSortingStrategy , reusableCPUsForResize * cpuset. CPUSet , mustKeepCPUsForScaleDown * cpuset. CPUSet ) (cpuset.CPUSet , error ) {
868
913
// If the number of CPUs requested cannot be handed out in chunks of
869
914
// 'cpuGroupSize', then we just call out the packing algorithm since we
870
915
// can't distribute CPUs in this chunk size.
871
916
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
872
917
// Support for PreferAlignByUncoreCache to be done at beta release.
873
918
if (numCPUs % cpuGroupSize ) != 0 {
874
- return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false )
919
+ return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
920
+ }
921
+
922
+ // If the number of CPUs requested to be retained is not a subset
923
+ // of reusableCPUs, then we fail early
924
+ if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
925
+ if (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
926
+ return cpuset .New (), fmt .Errorf ("requested CPUs to be retained %s are not a subset of reusable CPUs %s" , mustKeepCPUsForScaleDown .String (), reusableCPUsForResize .String ())
927
+ }
875
928
}
876
929
877
930
// Otherwise build an accumulator to start allocating CPUs from.
878
- acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy )
931
+ acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy , reusableCPUsForResize , mustKeepCPUsForScaleDown )
879
932
if acc .isSatisfied () {
880
933
return acc .result , nil
881
934
}
882
935
if acc .isFailed () {
883
936
return cpuset .New (), fmt .Errorf ("not enough cpus available to satisfy request: requested=%d, available=%d" , numCPUs , availableCPUs .Size ())
884
937
}
885
-
886
938
// Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'.
887
939
numas := acc .sortAvailableNUMANodes ()
888
940
@@ -1054,7 +1106,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1054
1106
// size 'cpuGroupSize' from 'bestCombo'.
1055
1107
distribution := (numCPUs / len (bestCombo ) / cpuGroupSize ) * cpuGroupSize
1056
1108
for _ , numa := range bestCombo {
1057
- cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), distribution , cpuSortingStrategy , false )
1109
+ cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), distribution , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1058
1110
acc .take (cpus )
1059
1111
}
1060
1112
@@ -1069,7 +1121,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1069
1121
if acc .details .CPUsInNUMANodes (numa ).Size () < cpuGroupSize {
1070
1122
continue
1071
1123
}
1072
- cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), cpuGroupSize , cpuSortingStrategy , false )
1124
+ cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), cpuGroupSize , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1073
1125
acc .take (cpus )
1074
1126
remainder -= cpuGroupSize
1075
1127
}
@@ -1093,5 +1145,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1093
1145
1094
1146
// If we never found a combination of NUMA nodes that we could properly
1095
1147
// distribute CPUs across, fall back to the packing algorithm.
1096
- return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false )
1148
+ return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1097
1149
}
0 commit comments