Skip to content

Commit

Permalink
[scheduler] Partial flavors assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed May 15, 2023
1 parent 8f7e247 commit 970867e
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 4 deletions.
33 changes: 29 additions & 4 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,39 @@ type FlavorAssignment struct {
// be assigned immediately. Each assigned flavor is accompanied with a
// FlavorAssignmentMode.
func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue) Assignment {
fullAssignment := assignFlavors(log, wl.TotalRequests, wl.Obj.Spec.PodSets, resourceFlavors, cq)
if fullAssignment.RepresentativeMode() != Fit {
reducer := NewPodSetReducer(wl.Obj.Spec.PodSets)
currentResources := make([]workload.PodSetResources, len(wl.TotalRequests))
for i := range wl.TotalRequests {
currentResources[i] = *wl.TotalRequests[i].DeepCopy()
}

for nextCounts := reducer.GetNext(); nextCounts != nil; nextCounts = reducer.GetNext() {
// create a lower count resource request
for i := range nextCounts {
currentResources[i].ScaleTo(nextCounts[i])
}

partialAssigment := assignFlavors(log, currentResources, wl.Obj.Spec.PodSets, resourceFlavors, cq)
if partialAssigment.RepresentativeMode() == Fit {
return partialAssigment
}

}
}
return fullAssignment
}

func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets []kueue.PodSet, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue) Assignment {
assignment := Assignment{
TotalBorrow: make(cache.FlavorResourceQuantities),
PodSets: make([]PodSetAssignment, 0, len(wl.TotalRequests)),
PodSets: make([]PodSetAssignment, 0, len(requests)),
usage: make(cache.FlavorResourceQuantities),
}
for i, podSet := range wl.TotalRequests {
for i, podSet := range requests {
if _, found := cq.RGByResource[corev1.ResourcePods]; found {
podSet.Requests[corev1.ResourcePods] = int64(wl.Obj.Spec.PodSets[i].Count)
podSet.Requests[corev1.ResourcePods] = int64(podSet.Count)
}

psAssignment := PodSetAssignment{
Expand All @@ -258,7 +283,7 @@ func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[kueue
}
break
}
flavors, status := assignment.findFlavorForResourceGroup(log, rg, podSet.Requests, resourceFlavors, cq, &wl.Obj.Spec.PodSets[i].Template.Spec)
flavors, status := assignment.findFlavorForResourceGroup(log, rg, podSet.Requests, resourceFlavors, cq, &podSets[i].Template.Spec)
if status.IsError() || len(flavors) == 0 {
psAssignment.Flavors = nil
psAssignment.Status = status
Expand Down
97 changes: 97 additions & 0 deletions pkg/scheduler/flavorassigner/flavorassigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,103 @@ func TestAssignFlavors(t *testing.T) {
}},
},
},
"single flavor, partial fit": {
wlPods: []kueue.PodSet{
*utiltesting.MakePodSet("main", 5).
SetMinimumCount(1).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1Mi").
Obj(),
},
clusterQueue: cache.ClusterQueue{
ResourceGroups: []cache.ResourceGroup{{
CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourceMemory),
Flavors: []cache.FlavorQuotas{{
Name: "default",
Resources: map[corev1.ResourceName]*cache.ResourceQuota{
corev1.ResourceCPU: {Nominal: 1000},
corev1.ResourceMemory: {Nominal: 2 * utiltesting.Mi},
},
}},
}},
},
wantRepMode: Fit,
wantAssignment: Assignment{
PodSets: []PodSetAssignment{{
Name: "main",
Flavors: ResourceAssignment{
corev1.ResourceCPU: {Name: "default", Mode: Fit},
corev1.ResourceMemory: {Name: "default", Mode: Fit},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1Mi"),
},
Count: 1,
}},
},
},
"partial fit multiple podSets": {
wlPods: []kueue.PodSet{
*utiltesting.MakePodSet("main", 3).
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakePodSet("can-louse-2", 4).
SetMinimumCount(2).
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakePodSet("can-louse-4-louse-3", 5).
SetMinimumCount(1).
Request(corev1.ResourceCPU, "1").
Obj(),
},
clusterQueue: cache.ClusterQueue{
ResourceGroups: []cache.ResourceGroup{{
CoveredResources: sets.New(corev1.ResourceCPU),
Flavors: []cache.FlavorQuotas{{
Name: "default",
Resources: map[corev1.ResourceName]*cache.ResourceQuota{
corev1.ResourceCPU: {Nominal: 7000},
},
}},
}},
},
wantRepMode: Fit,
wantAssignment: Assignment{
PodSets: []PodSetAssignment{
{
Name: "main",
Flavors: ResourceAssignment{
corev1.ResourceCPU: {Name: "default", Mode: Fit},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3000m"),
},
Count: 3,
},
{
Name: "can-louse-2",
Flavors: ResourceAssignment{
corev1.ResourceCPU: {Name: "default", Mode: Fit},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
},
Count: 2,
},
{
Name: "can-louse-4-louse-3",
Flavors: ResourceAssignment{
corev1.ResourceCPU: {Name: "default", Mode: Fit},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
},
Count: 2,
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
77 changes: 77 additions & 0 deletions pkg/scheduler/flavorassigner/podSetReducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flavorassigner

import (
"k8s.io/utils/pointer"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

// PodSetReducer helper structure used to gradually walk down
// from PodSets[*].Count to *PodSets[*].MinimumCount.
type PodSetReducer struct {
podSets []kueue.PodSet
currentCounts []int32
currentPos int
avaiableDecremets int32
}

func NewPodSetReducer(podSets []kueue.PodSet) *PodSetReducer {
ret := &PodSetReducer{
podSets: podSets,
currentCounts: make([]int32, len(podSets)),
}
ret.Reset()
return ret
}

// Reset brings the PodSetReducer back to its initial state.
func (psr *PodSetReducer) Reset() {
psr.currentCounts = make([]int32, len(psr.podSets))
psr.currentPos = 0
psr.avaiableDecremets = 0

for i := range psr.podSets {
ps := &psr.podSets[i]
psr.currentCounts[i] = ps.Count
psr.avaiableDecremets += ps.Count - pointer.Int32Deref(ps.MinimumCount, ps.Count)
}
}

// GetNext returns the next slice of counts, of nil if the count cannot decrease anymore
func (psr *PodSetReducer) GetNext() []int32 {
if psr.avaiableDecremets == 0 {
return nil
}

for {
if psr.currentCounts[psr.currentPos] > pointer.Int32Deref(psr.podSets[psr.currentPos].MinimumCount, psr.podSets[psr.currentPos].Count) {
psr.currentCounts[psr.currentPos]--
psr.avaiableDecremets--
psr.currentPos = (psr.currentPos + 1) % len(psr.podSets)
return psr.cloneCurrentCounts()
}
psr.currentPos = (psr.currentPos + 1) % len(psr.podSets)
}
}

func (psr *PodSetReducer) cloneCurrentCounts() []int32 {
r := make([]int32, len(psr.currentCounts))
copy(r, psr.currentCounts)
return r
}
63 changes: 63 additions & 0 deletions pkg/scheduler/flavorassigner/podSetReducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package flavorassigner

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

func TestPodSetReducer(t *testing.T) {
cases := map[string]struct {
podSets []kueue.PodSet
wantSeries [][]int32
}{
"empty": {
podSets: []kueue.PodSet{},
wantSeries: [][]int32{},
},
"partial not available": {
podSets: []kueue.PodSet{
*utiltesting.MakePodSet("ps1", 1).Obj(),
*utiltesting.MakePodSet("ps2", 2).SetMinimumCount(2).Obj(),
},
wantSeries: [][]int32{},
},
"partial available": {
podSets: []kueue.PodSet{
*utiltesting.MakePodSet("ps1", 5).SetMinimumCount(3).Obj(),
*utiltesting.MakePodSet("ps2", 5).SetMinimumCount(4).Obj(),
*utiltesting.MakePodSet("ps3", 5).SetMinimumCount(1).Obj(),
*utiltesting.MakePodSet("ps4", 5).SetMinimumCount(2).Obj(),
},
wantSeries: [][]int32{
{4, 5, 5, 5},
{4, 4, 5, 5},
{4, 4, 4, 5},
{4, 4, 4, 4},
{3, 4, 4, 4},
{3, 4, 3, 4},
{3, 4, 3, 3},
{3, 4, 2, 3},
{3, 4, 2, 2},
{3, 4, 1, 2},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
got := [][]int32{}
red := NewPodSetReducer(tc.podSets)
for current := red.GetNext(); current != nil; current = red.GetNext() {
got = append(got, current)
}

if diff := cmp.Diff(tc.wantSeries, got, cmpopts.IgnoreUnexported(Assignment{}, FlavorAssignment{})); diff != "" {
t.Errorf("Unexpected series (-want,+got):\n%s", diff)
}
})
}

}
5 changes: 5 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ func (p *PodSetWrapper) Request(r corev1.ResourceName, q string) *PodSetWrapper
return p
}

func (p *PodSetWrapper) SetMinimumCount(mc int32) *PodSetWrapper {
p.MinimumCount = &mc
return p
}

func (p *PodSetWrapper) Toleration(t corev1.Toleration) *PodSetWrapper {
p.Template.Spec.Tolerations = append(p.Template.Spec.Tolerations, t)
return p
Expand Down
24 changes: 24 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ type PodSetResources struct {
Flavors map[corev1.ResourceName]kueue.ResourceFlavorReference
}

func (psr *PodSetResources) DeepCopy() *PodSetResources {
ret := &PodSetResources{
Name: psr.Name,
Requests: make(Requests, len(psr.Requests)),
Count: psr.Count,
Flavors: make(map[corev1.ResourceName]kueue.ResourceFlavorReference, len(psr.Flavors)),
}
for k, v := range psr.Requests {
ret.Requests[k] = v
}

for k, v := range psr.Flavors {
ret.Flavors[k] = v
}
return ret
}

func (psr *PodSetResources) ScaleTo(newCount int32) *PodSetResources {
psr.Requests.scaleDown(int64(psr.Count))
psr.Requests.scaleUp(int64(newCount))
psr.Count = newCount
return psr
}

func NewInfo(w *kueue.Workload) *Info {
info := &Info{
Obj: w,
Expand Down

0 comments on commit 970867e

Please sign in to comment.