Skip to content

Commit

Permalink
fix: job state shows as scheduled when resources are allocated (#9466)
Browse files Browse the repository at this point in the history
  • Loading branch information
kkunapuli authored Jun 4, 2024
1 parent 8d64508 commit 934aeb6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 10 deletions.
6 changes: 6 additions & 0 deletions docs/release-notes/job-state.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
:orphan:

**Bug Fixes**

- Kubernetes: Fix an issue where where jobs would remain in "QUEUED" state until all pods were
running. Jobs will now correctly show as "SCHEDULED" once all pods have been assigned to nodes.
16 changes: 12 additions & 4 deletions master/internal/rm/kubernetesrm/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ func (j *job) preparePodUpdateMessage(msgText string) string {
return msgText
}

// PodScheduled checks pod conditions to determine if a pod has been scheduled onto a node.
func podScheduled(pod k8sV1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == k8sV1.PodScheduled {
return condition.Status == k8sV1.ConditionTrue
}
}
return false
}

func (j *job) getPodState(pod k8sV1.Pod) (cproto.State, error) {
switch pod.Status.Phase {
case k8sV1.PodPending:
Expand All @@ -599,10 +609,8 @@ func (j *job) getPodState(pod k8sV1.Pod) (cproto.State, error) {
return cproto.Terminated, nil
}

for _, condition := range pod.Status.Conditions {
if condition.Type == k8sV1.PodScheduled && condition.Status == k8sV1.ConditionTrue {
return cproto.Starting, nil
}
if podScheduled(pod) {
return cproto.Starting, nil
}
return cproto.Assigned, nil

Expand Down
9 changes: 5 additions & 4 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func (j *jobsService) podStatusCallback(obj any) {
return
}

j.updatePodSchedulingState(jobName, pod)
j.updatePodSchedulingState(jobName, *pod)
if j.jobSchedulingStateCallback != nil {
go j.jobSchedulingStateCallback(jobSchedulingStateChanged{
AllocationID: jobHandler.req.AllocationID,
Expand Down Expand Up @@ -970,15 +970,16 @@ func (j *jobsService) jobSchedulingState(jobName string) sproto.SchedulingState
return sproto.SchedulingStateScheduled
}

// updatePodSchedulingState stores the scheduling state of a pod based on its state (in particular the phase).
func (j *jobsService) updatePodSchedulingState(jobName string, pod *k8sV1.Pod) {
// updatePodSchedulingState stores the scheduling state of a pod based on its state.
func (j *jobsService) updatePodSchedulingState(jobName string, pod k8sV1.Pod) {
states, ok := j.jobNameToPodNameToSchedulingState[jobName]
if !ok {
states = make(map[string]sproto.SchedulingState)
}

// The field pod.Spec.NodeName is a request to be scheduled onto a node but it is not guaranteed.
states[pod.Name] = sproto.SchedulingStateQueued
if pod.Status.Phase == "Running" {
if podScheduled(pod) {
states[pod.Name] = sproto.SchedulingStateScheduled
}
j.jobNameToPodNameToSchedulingState[jobName] = states
Expand Down
63 changes: 61 additions & 2 deletions master/internal/rm/kubernetesrm/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/determined-ai/determined/master/internal/mocks"
"github.com/determined-ai/determined/master/internal/sproto"
)

func TestGetNonDetPods(t *testing.T) {
Expand Down Expand Up @@ -63,18 +64,76 @@ func TestGetNonDetPods(t *testing.T) {
ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[1])}, nil)

p := jobsService{
js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": ns1,
"ns2": ns2,
},
}

actualPods, err := p.getNonDetPods()
actualPods, err := js.getNonDetPods()
require.NoError(t, err)
require.ElementsMatch(t, expectedPods, actualPods)
}

func TestJobScheduledStatus(t *testing.T) {
// Pod has been created, but has zero PodConditions yet.
pendingPod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "test-pod",
},
Status: k8sV1.PodStatus{
Conditions: make([]k8sV1.PodCondition, 0),
},
}
js := jobsService{
jobNameToPodNameToSchedulingState: make(map[string]map[string]sproto.SchedulingState),
}
jobName := "test-job"
js.updatePodSchedulingState(jobName, pendingPod)
actualState := js.jobSchedulingState(jobName)
expectedState := sproto.SchedulingStateQueued
require.Equal(t, expectedState, actualState)

// Pod has been created, but the PodScheduled PodCondition is false.
notScheduledPod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "test-pod",
},
Status: k8sV1.PodStatus{
Conditions: []k8sV1.PodCondition{
{
Type: k8sV1.PodScheduled,
Status: k8sV1.ConditionFalse,
},
},
},
}
js.updatePodSchedulingState(jobName, notScheduledPod)
actualState = js.jobSchedulingState(jobName)
expectedState = sproto.SchedulingStateQueued
require.Equal(t, expectedState, actualState)

// Pod has been created, and the PodScheduled PodCondition is true.
scheduledPod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "test-pod",
},
Status: k8sV1.PodStatus{
Conditions: []k8sV1.PodCondition{
{
Type: k8sV1.PodScheduled,
Status: k8sV1.ConditionTrue,
},
},
},
}
js.updatePodSchedulingState(jobName, scheduledPod)
actualState = js.jobSchedulingState(jobName)
expectedState = sproto.SchedulingStateScheduled
require.Equal(t, expectedState, actualState)
}

func TestTaintTolerated(t *testing.T) {
cases := []struct {
expected bool
Expand Down

0 comments on commit 934aeb6

Please sign in to comment.