Skip to content

Commit

Permalink
fix: partially scheduled k8s jobs should display as queued (#9468)
Browse files Browse the repository at this point in the history
  • Loading branch information
stoksc authored Jun 13, 2024
1 parent 32585ad commit 9adc092
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
5 changes: 3 additions & 2 deletions master/internal/rm/kubernetesrm/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,9 @@ func (j *job) podUpdatedCallback(updatedPod k8sV1.Pod) error {
}
}

allPodsFound := len(j.podStates) == j.numPods
allPodsAtLeastStarting := all(cproto.Starting.Before, maps.Values(j.podStates)...)
if allPodsAtLeastStarting && !j.sentStartingEvent {
if allPodsFound && allPodsAtLeastStarting && !j.sentStartingEvent {
// Kubernetes does not have an explicit state for pulling container images.
// We insert it here because our current implementation of the trial actor requires it.
j.syslog.WithField("pod-name", podName).Info("pod is pulling images and starting")
Expand Down Expand Up @@ -318,7 +319,7 @@ func (j *job) podUpdatedCallback(updatedPod k8sV1.Pod) error {
}

allPodsAtLeastRunning := all(cproto.Running.Before, maps.Values(j.podStates)...)
if allPodsAtLeastRunning && !j.sentRunningEvent {
if allPodsFound && allPodsAtLeastRunning && !j.sentRunningEvent {
j.syslog.WithField("pod-name", podName).Info("pod is running")
j.container.State = cproto.Running
j.informTaskResourcesStarted(sproto.ResourcesStarted{NativeResourcesID: j.jobName})
Expand Down
81 changes: 81 additions & 0 deletions master/internal/rm/kubernetesrm/resource_pool_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,87 @@ func TestJobQueueReattach(t *testing.T) {
require.Equal(t, 2, reattachInfo.RequestedSlots)
}

func TestPartialJobsShowQueuedStates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

j := newTestJobsService(t)
rp := newTestResourcePool(j)

user := db.RequireMockUser(t, db.SingleDB())
task := db.RequireMockTask(t, db.SingleDB(), &user.ID)
alloc := db.RequireMockAllocation(t, db.SingleDB(), task.TaskID)
allocationID, taskID, jobID := alloc.AllocationID, task.TaskID, *task.JobID
startTime := task.StartTime

err := tasklist.GroupPriorityChangeRegistry.Add(jobID, func(i int) error { return nil })
require.NoError(t, err)

var slots int
for _, n := range rp.jobsService.GetAgents().Agents {
slots += len(n.Slots)
}

sub := rmevents.Subscribe(allocationID)
allocateReq := sproto.AllocateRequest{
AllocationID: allocationID,
TaskID: taskID,
JobID: jobID,
RequestTime: startTime,
JobSubmissionTime: startTime,
IsUserVisible: true,
Name: "test job",
SlotsNeeded: 2 * slots,
ResourcePool: "default",
}
rp.AllocateRequest(allocateReq)
rp.Admit()

allocated := poll[*sproto.ResourcesAllocated](ctx, t, sub)
require.NotNil(t, allocated)
require.Len(t, allocated.Resources, 1)
for _, res := range allocated.Resources {
conf := expconf.ExperimentConfig{ //nolint:exhaustruct
RawEnvironment: &expconf.EnvironmentConfigV0{ //nolint:exhaustruct
RawImage: &expconf.EnvironmentImageMapV0{ //nolint:exhaustruct
RawCPU: ptrs.Ptr("ubuntu:latest"),
},
},
}
conf = schemas.WithDefaults(conf)

err := res.Start(nil, tasks.TaskSpec{
Description: fmt.Sprintf("test-job-%s", uuid.NewString()[:8]),
Entrypoint: []string{"sleep", "99999"},
AgentUserGroup: &model.AgentUserGroup{},
Environment: conf.Environment(),
ResourcesConfig: conf.Resources(),
DontShipLogs: true,
}, sproto.ResourcesRuntimeInfo{})
defer res.Kill(nil)
require.NoError(t, err)
}

shortCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
for {
ev, err := sub.GetWithContext(shortCtx)
if err != nil {
break
}

res, ok := ev.(*sproto.ResourcesStateChanged)
if !ok {
continue
}
if sproto.Pulling.BeforeOrEqual(res.ResourcesState) {
continue
}
t.Error("state went to PULLING or beyond when all pods could not have been scheduled")
t.FailNow()
}
}

func TestNodeWorkflows(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
Expand Down

0 comments on commit 9adc092

Please sign in to comment.