Skip to content

Commit

Permalink
fix: [RM-6] remove global max-slots-per-pod default when multiple RMs… (
Browse files Browse the repository at this point in the history
#8938)

* fix: [RM-6] remove global max-slots-per-pod default when multiple RMs defined

* undo changed used for debugging
  • Loading branch information
kkunapuli authored Mar 4, 2024
1 parent 0d61d15 commit c404c8e
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 23 deletions.
68 changes: 45 additions & 23 deletions master/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,48 @@ func (c Config) Printable() ([]byte, error) {
return optJSON, nil
}

func k8sValidateMaxSlots(r *ResourceManagerWithPoolsConfig,
taskContainerDefaults model.TaskContainerDefaultsConfig, totalRMs int,
) (model.TaskContainerDefaultsConfig, error) {
if taskContainerDefaults.Kubernetes == nil {
taskContainerDefaults.Kubernetes = &model.KubernetesTaskContainerDefaults{}
}

rmMaxSlots := r.ResourceManager.KubernetesRM.MaxSlotsPerPod
taskMaxSlots := taskContainerDefaults.Kubernetes.MaxSlotsPerPod

// if exactly one resource manager, allow global task default to be used
if totalRMs == 1 {
if (rmMaxSlots != nil) == (taskMaxSlots != nil) {
return taskContainerDefaults, fmt.Errorf("must provide exactly one of " +
"resource_manager.max_slots_per_pod and " +
"task_container_defaults.kubernetes.max_slots_per_pod")
}

if rmMaxSlots != nil {
taskContainerDefaults.Kubernetes.MaxSlotsPerPod = rmMaxSlots
}
if taskMaxSlots != nil {
r.ResourceManager.KubernetesRM.MaxSlotsPerPod = taskMaxSlots
}
} else {
// otherwise, must use max slots defined in resource manager config
if rmMaxSlots == nil {
return taskContainerDefaults, fmt.Errorf("must provide resource_manager.max_slots_per_pod")
}
if taskMaxSlots != nil {
log.Warn("ignoring task_container_defaults.kubernetes.max_slots_per_pod - " +
"must provide resource_manager.max_slots_per_pod " +
"if multiple resource managers are defined")
}
}

if maxSlotsPerPod := *r.ResourceManager.KubernetesRM.MaxSlotsPerPod; maxSlotsPerPod < 0 {
return taskContainerDefaults, fmt.Errorf("max_slots_per_pod must be >= 0 got %d", maxSlotsPerPod)
}
return taskContainerDefaults, nil
}

// Resolve resolves the values in the configuration.
func (c *Config) Resolve() error {
if c.Port == 0 {
Expand Down Expand Up @@ -267,31 +309,11 @@ func (c *Config) Resolve() error {
r.ResourceManager.AgentRM.Scheduler = DefaultSchedulerConfig()
}

// TODO(RM-6) change behavior on max slots per pod.
if r.ResourceManager.KubernetesRM != nil {
if c.TaskContainerDefaults.Kubernetes == nil {
c.TaskContainerDefaults.Kubernetes = &model.KubernetesTaskContainerDefaults{}
c.TaskContainerDefaults, err = k8sValidateMaxSlots(r, c.TaskContainerDefaults, len(c.ResourceManagers()))
if err != nil {
return err
}

rmMaxSlots := r.ResourceManager.KubernetesRM.MaxSlotsPerPod
taskMaxSlots := c.TaskContainerDefaults.Kubernetes.MaxSlotsPerPod
if (rmMaxSlots != nil) == (taskMaxSlots != nil) {
return fmt.Errorf("must provide exactly one of " +
"resource_manager.max_slots_per_pod and " +
"task_container_defaults.kubernetes.max_slots_per_pod")
}

if rmMaxSlots != nil {
c.TaskContainerDefaults.Kubernetes.MaxSlotsPerPod = rmMaxSlots
}
if taskMaxSlots != nil {
r.ResourceManager.KubernetesRM.MaxSlotsPerPod = taskMaxSlots
}
if maxSlotsPerPod := *r.ResourceManager.KubernetesRM.MaxSlotsPerPod; maxSlotsPerPod < 0 {
return fmt.Errorf("max_slots_per_pod must be >= 0 got %d", maxSlotsPerPod)
}

break // TODO(RM-6) remove this break.
}
}

Expand Down
109 changes: 109 additions & 0 deletions master/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,115 @@ task_container_defaults:
}
}

func TestMaxSlotsPerPodConfigMultiRM(t *testing.T) {
t.Run("valid RMs", func(t *testing.T) {
config := `
db:
user: config_file_user
password: password
host: hostname
port: "3000"
resource_manager:
type: kubernetes
max_slots_per_pod: 5
additional_resource_managers:
- resource_manager:
name: test
type: kubernetes
max_slots_per_pod: 65
`
var unmarshaled Config
expected := [2]int{5, 65}
err := yaml.Unmarshal([]byte(config), &unmarshaled, yaml.DisallowUnknownFields)
require.NoError(t, err)
require.NoError(t, unmarshaled.Resolve())
for i, r := range unmarshaled.ResourceConfig.ResourceManagers() {
require.Equal(t, expected[i], *r.ResourceManager.KubernetesRM.MaxSlotsPerPod)
}
})

t.Run("negative max for RM", func(t *testing.T) {
config := `
db:
user: config_file_user
password: password
host: hostname
port: "3000"
resource_manager:
type: kubernetes
max_slots_per_pod: 5
additional_resource_managers:
- resource_manager:
name: test
type: kubernetes
max_slots_per_pod: -65
`
var unmarshaled Config
err := yaml.Unmarshal([]byte(config), &unmarshaled, yaml.DisallowUnknownFields)
require.NoError(t, err)
require.Error(t, unmarshaled.Resolve(), ">= 0")
})

t.Run("max not defined for RM", func(t *testing.T) {
config := `
db:
user: config_file_user
password: password
host: hostname
port: "3000"
resource_manager:
type: kubernetes
max_slots_per_pod: 5
additional_resource_managers:
- resource_manager:
name: test
type: kubernetes
`
var unmarshaled Config
err := yaml.Unmarshal([]byte(config), &unmarshaled, yaml.DisallowUnknownFields)
require.NoError(t, err)
require.Error(t, unmarshaled.Resolve(), "must provide resource_manager.max_slots_per_pod")
})

t.Run("RM and global task max slots defined", func(t *testing.T) {
config := `
db:
user: config_file_user
password: password
host: hostname
port: "3000"
resource_manager:
type: kubernetes
max_slots_per_pod: 5
additional_resource_managers:
- resource_manager:
name: test
type: kubernetes
max_slots_per_pod: 65
task_container_defaults:
kubernetes:
max_slots_per_pod: 0
`
var unmarshaled Config
expected := [2]int{5, 65}
err := yaml.Unmarshal([]byte(config), &unmarshaled, yaml.DisallowUnknownFields)
require.NoError(t, err)
require.NoError(t, unmarshaled.Resolve())
for i, r := range unmarshaled.ResourceConfig.ResourceManagers() {
require.Equal(t, expected[i], *r.ResourceManager.KubernetesRM.MaxSlotsPerPod)
}
})
}

//nolint:gosec // These are not potential hardcoded credentials.
func TestPrintableConfig(t *testing.T) {
s3Key := "my_access_key_secret"
Expand Down

0 comments on commit c404c8e

Please sign in to comment.