@@ -294,14 +294,51 @@ type cpuAccumulator struct {
294
294
availableCPUSorter availableCPUSorter
295
295
}
296
296
297
- func newCPUAccumulator (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy ) * cpuAccumulator {
297
+ func newCPUAccumulator (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , reusableCPUsForResize * cpuset. CPUSet , mustKeepCPUsForScaleDown * cpuset. CPUSet ) * cpuAccumulator {
298
298
acc := & cpuAccumulator {
299
299
topo : topo ,
300
300
details : topo .CPUDetails .KeepOnly (availableCPUs ),
301
301
numCPUsNeeded : numCPUs ,
302
302
result : cpuset .New (),
303
303
}
304
304
305
+ if reusableCPUsForResize != nil {
306
+ if ! reusableCPUsForResize .IsEmpty () {
307
+ // Increase of CPU resources ( scale up )
308
+ // Take existing from allocated
309
+ // CPUs
310
+ if numCPUs > reusableCPUsForResize .Size () {
311
+ // scale up ...
312
+ acc .take (reusableCPUsForResize .Clone ())
313
+ }
314
+
315
+ // Decrease of CPU resources ( scale down )
316
+ // Take delta from allocated CPUs, if mustKeepCPUsForScaleDown
317
+ // is not nil, use explicetely those. If it is nil
318
+ // take delta starting from lowest CoreId of CPUs ( TODO esotsal, perhaps not needed).
319
+ if numCPUs < reusableCPUsForResize .Size () {
320
+ if mustKeepCPUsForScaleDown != nil {
321
+ // If explicetely CPUs to keep
322
+ // during scale down is given ( this requires
323
+ // addition in container[].resources ... which
324
+ // could be possible to patch ? Esotsal Note This means
325
+ // modifying API code
326
+ if ! (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
327
+ acc .take (mustKeepCPUsForScaleDown .Clone ())
328
+ } else {
329
+ return acc
330
+ }
331
+ }
332
+ }
333
+
334
+ if numCPUs == reusableCPUsForResize .Size () {
335
+ // nothing to do return as is
336
+ acc .take (reusableCPUsForResize .Clone ())
337
+ return acc
338
+ }
339
+ }
340
+ }
341
+
305
342
if topo .NumSockets >= topo .NumNUMANodes {
306
343
acc .numaOrSocketsFirst = & numaFirst {acc }
307
344
} else {
@@ -750,15 +787,23 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
750
787
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
751
788
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
752
789
// the least amount of free CPUs to the one with the highest amount of free CPUs.
753
- func takeByTopologyNUMAPacked (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , preferAlignByUncoreCache bool ) (cpuset.CPUSet , error ) {
754
- acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy )
790
+ func takeByTopologyNUMAPacked (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuSortingStrategy CPUSortingStrategy , preferAlignByUncoreCache bool , reusableCPUsForResize * cpuset.CPUSet , mustKeepCPUsForScaleDown * cpuset.CPUSet ) (cpuset.CPUSet , error ) {
791
+
792
+ // If the number of CPUs requested to be retained is not a subset
793
+ // of reusableCPUs, then we fail early
794
+ if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
795
+ if (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
796
+ return cpuset .New (), fmt .Errorf ("requested CPUs to be retained %s are not a subset of reusable CPUs %s" , mustKeepCPUsForScaleDown .String (), reusableCPUsForResize .String ())
797
+ }
798
+ }
799
+
800
+ acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy , reusableCPUsForResize , mustKeepCPUsForScaleDown )
755
801
if acc .isSatisfied () {
756
802
return acc .result , nil
757
803
}
758
804
if acc .isFailed () {
759
805
return cpuset .New (), fmt .Errorf ("not enough cpus available to satisfy request: requested=%d, available=%d" , numCPUs , availableCPUs .Size ())
760
806
}
761
-
762
807
// Algorithm: topology-aware best-fit
763
808
// 1. Acquire whole NUMA nodes and sockets, if available and the container
764
809
// requires at least a NUMA node or socket's-worth of CPUs. If NUMA
@@ -867,25 +912,32 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
867
912
// of size 'cpuGroupSize' according to the algorithm described above. This is
868
913
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
869
914
// a single core are allocated together.
870
- func takeByTopologyNUMADistributed (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuGroupSize int , cpuSortingStrategy CPUSortingStrategy ) (cpuset.CPUSet , error ) {
915
+ func takeByTopologyNUMADistributed (topo * topology.CPUTopology , availableCPUs cpuset.CPUSet , numCPUs int , cpuGroupSize int , cpuSortingStrategy CPUSortingStrategy , reusableCPUsForResize * cpuset. CPUSet , mustKeepCPUsForScaleDown * cpuset. CPUSet ) (cpuset.CPUSet , error ) {
871
916
// If the number of CPUs requested cannot be handed out in chunks of
872
917
// 'cpuGroupSize', then we just call out the packing algorithm since we
873
918
// can't distribute CPUs in this chunk size.
874
919
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
875
920
// Support for PreferAlignByUncoreCache to be done at beta release.
876
921
if (numCPUs % cpuGroupSize ) != 0 {
877
- return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false )
922
+ return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
923
+ }
924
+
925
+ // If the number of CPUs requested to be retained is not a subset
926
+ // of reusableCPUs, then we fail early
927
+ if reusableCPUsForResize != nil && mustKeepCPUsForScaleDown != nil {
928
+ if (mustKeepCPUsForScaleDown .Intersection (reusableCPUsForResize .Clone ())).IsEmpty () {
929
+ return cpuset .New (), fmt .Errorf ("requested CPUs to be retained %s are not a subset of reusable CPUs %s" , mustKeepCPUsForScaleDown .String (), reusableCPUsForResize .String ())
930
+ }
878
931
}
879
932
880
933
// Otherwise build an accumulator to start allocating CPUs from.
881
- acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy )
934
+ acc := newCPUAccumulator (topo , availableCPUs , numCPUs , cpuSortingStrategy , reusableCPUsForResize , mustKeepCPUsForScaleDown )
882
935
if acc .isSatisfied () {
883
936
return acc .result , nil
884
937
}
885
938
if acc .isFailed () {
886
939
return cpuset .New (), fmt .Errorf ("not enough cpus available to satisfy request: requested=%d, available=%d" , numCPUs , availableCPUs .Size ())
887
940
}
888
-
889
941
// Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'.
890
942
numas := acc .sortAvailableNUMANodes ()
891
943
@@ -1057,7 +1109,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1057
1109
// size 'cpuGroupSize' from 'bestCombo'.
1058
1110
distribution := (numCPUs / len (bestCombo ) / cpuGroupSize ) * cpuGroupSize
1059
1111
for _ , numa := range bestCombo {
1060
- cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), distribution , cpuSortingStrategy , false )
1112
+ cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), distribution , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1061
1113
acc .take (cpus )
1062
1114
}
1063
1115
@@ -1072,7 +1124,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1072
1124
if acc .details .CPUsInNUMANodes (numa ).Size () < cpuGroupSize {
1073
1125
continue
1074
1126
}
1075
- cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), cpuGroupSize , cpuSortingStrategy , false )
1127
+ cpus , _ := takeByTopologyNUMAPacked (acc .topo , acc .details .CPUsInNUMANodes (numa ), cpuGroupSize , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1076
1128
acc .take (cpus )
1077
1129
remainder -= cpuGroupSize
1078
1130
}
@@ -1096,5 +1148,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
1096
1148
1097
1149
// If we never found a combination of NUMA nodes that we could properly
1098
1150
// distribute CPUs across, fall back to the packing algorithm.
1099
- return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false )
1151
+ return takeByTopologyNUMAPacked (topo , availableCPUs , numCPUs , cpuSortingStrategy , false , reusableCPUsForResize , mustKeepCPUsForScaleDown )
1100
1152
}
0 commit comments