Skip to content

Commit

Permalink
fix: job queue's allocated slots should be correct after restarts (#9461
Browse files Browse the repository at this point in the history
)
  • Loading branch information
stoksc authored Jun 1, 2024
1 parent c49eeea commit 57bece4
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(
t := time.NewTicker(podSubmissionInterval)
defer t.Stop()
for range t.C {
rp.Schedule()
rp.Admit()
}
}()
k.pools[poolConfig.PoolName] = rp
Expand Down
51 changes: 29 additions & 22 deletions master/internal/rm/kubernetesrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type kubernetesResourcePool struct {

jobsService *jobsService

queuePositions tasklist.JobSortState
reschedule bool
queuePositions tasklist.JobSortState
tryAdmitPendingTasks bool

db *db.PgDB

Expand Down Expand Up @@ -77,38 +77,38 @@ func newResourcePool(
func (k *kubernetesResourcePool) SetGroupMaxSlots(msg sproto.SetGroupMaxSlots) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

k.getOrCreateGroup(msg.JobID).MaxSlots = msg.MaxSlots
}

func (k *kubernetesResourcePool) AllocateRequest(msg sproto.AllocateRequest) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

k.addTask(msg)
}

func (k *kubernetesResourcePool) ResourcesReleased(msg sproto.ResourcesReleased) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

k.resourcesReleased(msg)
}

func (k *kubernetesResourcePool) JobSchedulingStateChanged(msg jobSchedulingStateChanged) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

for it := k.reqList.Iterator(); it.Next(); {
req := it.Value()
if req.AllocationID == msg.AllocationID {
req.State = msg.State
if sproto.ScheduledStates[req.State] {
k.allocationIDToRunningPods[msg.AllocationID] += msg.NumPods
k.allocationIDToRunningPods[msg.AllocationID] = msg.NumPods
}
}
}
Expand All @@ -121,23 +121,23 @@ func (k *kubernetesResourcePool) PendingPreemption(msg sproto.PendingPreemption)
func (k *kubernetesResourcePool) GetJobQ() map[model.JobID]*sproto.RMJobInfo {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

return k.jobQInfo()
}

func (k *kubernetesResourcePool) GetJobQStats() *jobv1.QueueStats {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

return tasklist.JobStats(k.reqList)
}

func (k *kubernetesResourcePool) GetJobQStatsAPI(msg *apiv1.GetJobQueueStatsRequest) *apiv1.GetJobQueueStatsResponse {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

resp := &apiv1.GetJobQueueStatsResponse{
Results: make([]*apiv1.RPQueueStat, 0),
Expand All @@ -152,15 +152,15 @@ func (k *kubernetesResourcePool) GetJobQStatsAPI(msg *apiv1.GetJobQueueStatsRequ
func (k *kubernetesResourcePool) SetGroupWeight(msg sproto.SetGroupWeight) error {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

return rmerrors.UnsupportedError("set group weight is unsupported in k8s")
}

func (k *kubernetesResourcePool) SetGroupPriority(msg sproto.SetGroupPriority) error {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

group := k.getOrCreateGroup(msg.JobID)
// Check if there is already a submitted task in this group for which
Expand Down Expand Up @@ -191,15 +191,15 @@ func (k *kubernetesResourcePool) SetGroupPriority(msg sproto.SetGroupPriority) e
func (k *kubernetesResourcePool) MoveJob(msg sproto.MoveJob) error {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

return k.moveJob(msg.ID, msg.Anchor, msg.Ahead)
}

func (k *kubernetesResourcePool) DeleteJob(msg sproto.DeleteJob) sproto.DeleteJobResponse {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

// For now, there is nothing to cleanup in k8s.
return sproto.EmptyDeleteJobResponse()
Expand All @@ -208,7 +208,7 @@ func (k *kubernetesResourcePool) DeleteJob(msg sproto.DeleteJob) sproto.DeleteJo
func (k *kubernetesResourcePool) RecoverJobPosition(msg sproto.RecoverJobPosition) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

k.queuePositions.RecoverJobPosition(msg.JobID, msg.JobPosition)
}
Expand All @@ -220,10 +220,17 @@ func (k *kubernetesResourcePool) GetAllocationSummaries() map[model.AllocationID
return k.reqList.TaskSummaries(k.groups, kubernetesScheduler)
}

func (k *kubernetesResourcePool) GetAllocationSummary(id model.AllocationID) *sproto.AllocationSummary {
k.mu.Lock()
defer k.mu.Unlock()

return k.reqList.TaskSummary(id, k.groups, kubernetesScheduler)
}

func (k *kubernetesResourcePool) getResourceSummary() (*resourceSummary, error) {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

slotsUsed := 0
for _, slotsUsedByGroup := range k.slotsUsedPerGroup {
Expand All @@ -249,20 +256,20 @@ func (k *kubernetesResourcePool) ValidateResources(
) sproto.ValidateResourcesResponse {
k.mu.Lock()
defer k.mu.Unlock()
k.reschedule = true
k.tryAdmitPendingTasks = true

fulfillable := k.maxSlotsPerPod >= msg.Slots
return sproto.ValidateResourcesResponse{Fulfillable: fulfillable}
}

func (k *kubernetesResourcePool) Schedule() {
func (k *kubernetesResourcePool) Admit() {
k.mu.Lock()
defer k.mu.Unlock()

if k.reschedule {
k.schedulePendingTasks()
if k.tryAdmitPendingTasks {
k.admitPendingTasks()
}
k.reschedule = false
k.tryAdmitPendingTasks = false
}

func (k *kubernetesResourcePool) summarizePods() (*computeUsageSummary, error) {
Expand Down Expand Up @@ -593,7 +600,7 @@ func (k *kubernetesResourcePool) getOrCreateGroup(jobID model.JobID) *tasklist.G
return g
}

func (k *kubernetesResourcePool) schedulePendingTasks() {
func (k *kubernetesResourcePool) admitPendingTasks() {
for it := k.reqList.Iterator(); it.Next(); {
req := it.Value()
group := k.groups[req.JobID]
Expand Down
Loading

0 comments on commit 57bece4

Please sign in to comment.