Skip to content

Commit

Permalink
chore: refactor proto, schema, and jobservice for multiRM (#8875)
Browse files Browse the repository at this point in the history
  • Loading branch information
carolinaecalderon authored Feb 27, 2024
1 parent ca96da1 commit 7c6bec9
Show file tree
Hide file tree
Showing 38 changed files with 1,319 additions and 655 deletions.
128 changes: 125 additions & 3 deletions harness/determined/common/api/bindings.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion master/internal/api_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (a *apiServer) GetExperiment(
}

jobID := model.JobID(exp.JobId)
jobSummary, err := jobservice.DefaultService.GetJobSummary(jobID, exp.ResourcePool)
jobSummary, err := jobservice.DefaultService.GetJobSummary(jobID, exp.ResourceManager, exp.ResourcePool)
if err != nil {
// An error here either is real or just that the experiment was not yet terminal in the DB
// when we first queried it but was by the time it got around to handling out ask. We can't
Expand Down Expand Up @@ -499,6 +499,8 @@ func (a *apiServer) deleteExperiments(exps []*model.Experiment, userModel *model
}

// delete jobs per experiment
// TODO (multirm for dispatcherrm): since we're not passing in an RM name here
// you'll have to check ALL RMs and all RPs for the job.
resp, err := a.m.rm.DeleteJob(sproto.DeleteJob{
JobID: exp.JobID,
})
Expand Down
2 changes: 2 additions & 0 deletions master/internal/api_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (a *apiServer) GetJobs(
ctx context.Context, req *apiv1.GetJobsRequest,
) (resp *apiv1.GetJobsResponse, err error) {
jobs, err := jobservice.DefaultService.GetJobs(
req.ResourceManager,
req.ResourcePool,
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
Expand Down Expand Up @@ -49,6 +50,7 @@ func (a *apiServer) GetJobsV2(
ctx context.Context, req *apiv1.GetJobsV2Request,
) (resp *apiv1.GetJobsV2Response, err error) {
jobs, err := jobservice.DefaultService.GetJobs(
req.ResourceManager,
req.ResourcePool,
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
Expand Down
2 changes: 1 addition & 1 deletion master/internal/api_user_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func MockRM() *mocks.ResourceManager {
nil,
)
mockRM.On("SetGroupMaxSlots", mock.Anything).Return()
mockRM.On("SetGroupWeight", mock.Anything).Return(nil)
mockRM.On("SetGroupWeight", mock.Anything, mock.Anything).Return(nil)
mockRM.On("Allocate", mock.Anything).Return(func(msg sproto.AllocateRequest) *sproto.ResourcesSubscription {
return rmevents.Subscribe(msg.AllocationID)
}, nil)
Expand Down
6 changes: 3 additions & 3 deletions master/internal/checkpoint_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func TestRunCheckpointGCTask(t *testing.T) {
rm: func() *mocks.ResourceManager {
var rm mocks.ResourceManager

rm.On("ResolveResourcePool", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
rm.On("ResolveResourcePool", mock.Anything, mock.Anything, mock.Anything).
Return("default", nil)

rm.On("TaskContainerDefaults", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
rm.On("TaskContainerDefaults", mock.Anything, mock.Anything).
Return(model.TaskContainerDefaultsConfig{}, nil)

return &rm
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestRunCheckpointGCTask(t *testing.T) {
rm: func() *mocks.ResourceManager {
var rm mocks.ResourceManager

rm.On("ResolveResourcePool", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
rm.On("ResolveResourcePool", mock.Anything, mock.Anything, mock.Anything).
Return("", errors.New("rm is down or something"))

return &rm
Expand Down
11 changes: 6 additions & 5 deletions master/internal/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,12 @@ func (c *Command) garbageCollect() {

func (c *Command) setNTSCPriority(priority int, forward bool) error {
if forward {
switch err := c.rm.SetGroupPriority(sproto.SetGroupPriority{
Priority: priority,
ResourcePool: c.Config.Resources.ResourcePool,
JobID: c.jobID,
}).(type) {
switch err := c.rm.SetGroupPriority(c.Config.Resources.ResourceManager,
sproto.SetGroupPriority{
Priority: priority,
ResourcePool: c.Config.Resources.ResourcePool,
JobID: c.jobID,
}).(type) {
case nil:
case rmerrors.UnsupportedError:
c.syslog.WithError(err).Debug("ignoring unsupported call to set group priority")
Expand Down
17 changes: 9 additions & 8 deletions master/internal/command/command_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ func (c *Command) SetWeight(weight float64) error {
c.mu.Lock()
defer c.mu.Unlock()

switch err := c.rm.SetGroupWeight(sproto.SetGroupWeight{
Weight: weight,
ResourcePool: c.Config.Resources.ResourcePool,
JobID: c.jobID,
}).(type) {
switch err := c.rm.SetGroupWeight(c.Config.Resources.ResourceManager,
sproto.SetGroupWeight{
Weight: weight,
ResourcePool: c.Config.Resources.ResourcePool,
JobID: c.jobID,
}).(type) {
case nil:
case rmerrors.UnsupportedError:
c.syslog.WithError(err).Debug("ignoring unsupported call to set group weight")
Expand All @@ -76,11 +77,11 @@ func (c *Command) SetWeight(weight float64) error {
}

// SetResourcePool is not implemented for commands.
func (c *Command) SetResourcePool(resourcePool string) error {
func (c *Command) SetResourcePool(resourceManager, resourcePool string) error {
return fmt.Errorf("setting resource pool for job type %s is not supported", c.jobType)
}

// ResourcePool gets the command's resource pool.
func (c *Command) ResourcePool() string {
return c.Config.Resources.ResourcePool
func (c *Command) ResourcePool() (string, string) {
return c.Config.Resources.ResourceManager, c.Config.Resources.ResourcePool
}
35 changes: 19 additions & 16 deletions master/internal/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ func (e *internalExperiment) start() error {
}

if j.QPos.GreaterThan(decimal.Zero) {
e.rm.RecoverJobPosition(sproto.RecoverJobPosition{
JobID: e.JobID,
JobPosition: j.QPos,
ResourcePool: e.activeConfig.Resources().ResourcePool(),
})
e.rm.RecoverJobPosition(e.activeConfig.Resources().ResourceManager(),
sproto.RecoverJobPosition{
JobID: e.JobID,
JobPosition: j.QPos,
ResourcePool: e.activeConfig.Resources().ResourcePool(),
})
}

e.restoreTrials()
Expand Down Expand Up @@ -1046,11 +1047,12 @@ func (e *internalExperiment) setPriority(priority *int, forward bool) (err error
}

if forward {
switch err := e.rm.SetGroupPriority(sproto.SetGroupPriority{
Priority: *priority,
ResourcePool: e.activeConfig.Resources().ResourcePool(),
JobID: e.JobID,
}).(type) {
switch err := e.rm.SetGroupPriority(resources.ResourceManager(),
sproto.SetGroupPriority{
Priority: *priority,
ResourcePool: resources.ResourcePool(),
JobID: e.JobID,
}).(type) {
case nil:
case rmerrors.UnsupportedError:
e.syslog.WithError(err).Debug("ignoring unsupported call to set group priority")
Expand All @@ -1073,11 +1075,12 @@ func (e *internalExperiment) setWeight(weight float64) error {
return fmt.Errorf("setting experiment %d weight: %w", e.ID, err)
}

switch err := e.rm.SetGroupWeight(sproto.SetGroupWeight{
Weight: weight,
ResourcePool: e.activeConfig.Resources().ResourcePool(),
JobID: e.JobID,
}).(type) {
switch err := e.rm.SetGroupWeight(resources.ResourceManager(),
sproto.SetGroupWeight{
Weight: weight,
ResourcePool: resources.ResourcePool(),
JobID: e.JobID,
}).(type) {
case nil:
case rmerrors.UnsupportedError:
e.syslog.WithError(err).Debug("ignoring unsupported call to set group weight")
Expand All @@ -1089,7 +1092,7 @@ func (e *internalExperiment) setWeight(weight float64) error {
return nil
}

func (e *internalExperiment) setRP(resourcePool string) error {
func (e *internalExperiment) setRP(resourceManager string, resourcePool string) error {
resources := e.activeConfig.Resources()
oldRP := resources.ResourcePool()
workspaceModel, err := workspace.WorkspaceByProjectID(context.TODO(), e.ProjectID)
Expand Down
8 changes: 4 additions & 4 deletions master/internal/experiment_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ func (e *internalExperiment) SetWeight(weight float64) error {
}

// SetResourcePool sets the experiment's resource pool.
func (e *internalExperiment) SetResourcePool(resourcePool string) error {
func (e *internalExperiment) SetResourcePool(resourceManager, resourcePool string) error {
e.mu.Lock()
defer e.mu.Unlock()

return e.setRP(resourcePool)
return e.setRP(resourceManager, resourcePool)
}

// ResourcePool gets the experiment's resource pool.
func (e *internalExperiment) ResourcePool() string {
func (e *internalExperiment) ResourcePool() (string, string) {
e.mu.Lock()
defer e.mu.Lock()

return e.activeConfig.Resources().ResourcePool()
return e.activeConfig.Resources().ResourceManager(), e.activeConfig.Resources().ResourcePool()
}
36 changes: 19 additions & 17 deletions master/internal/job/jobservice/jobservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type Job interface {
ToV1Job() (*jobv1.Job, error)
SetJobPriority(priority int) error
SetWeight(weight float64) error
SetResourcePool(resourcePool string) error
ResourcePool() string
SetResourcePool(rm string, rp string) error
ResourcePool() (rm, rp string)
}

// Service manages the job service.
Expand Down Expand Up @@ -90,16 +90,17 @@ func (s *Service) jobQRefs(jobQ map[model.JobID]*sproto.RMJobInfo) (map[model.Jo
return jobRefs, nil
}

// GetJobs returns a list of jobs for a resource pool.
// GetJobs returns a list of jobs for a resource pool under a specific resource manager.
func (s *Service) GetJobs(
resourceManager string,
resourcePool string,
desc bool,
states []jobv1.State,
) ([]*jobv1.Job, error) {
s.mu.Lock()
defer s.mu.Unlock()

jobQ, err := s.rm.GetJobQ(sproto.GetJobQ{ResourcePool: resourcePool})
jobQ, err := s.rm.GetJobQ(resourceManager, resourcePool)
if err != nil {
s.syslog.WithError(err).Error("getting job queue info from RM")
return nil, err
Expand All @@ -113,9 +114,7 @@ func (s *Service) GetJobs(
// If the GetExternalJobs call is supported, RM returns a list of external jobs or
// an error if there is any problem. Otherwise, RM returns rmerrors.ErrNotSupported
// error. In this case, continue without the External jobs.
externalJobs, err := s.rm.GetExternalJobs(sproto.GetExternalJobs{
ResourcePool: resourcePool,
})
externalJobs, err := s.rm.GetExternalJobs(resourceManager, resourcePool)
if err != nil {
// If the error is not 'ErrNotSupported' error, propagate the error upwards.
if err != rmerrors.ErrNotSupported {
Expand Down Expand Up @@ -161,12 +160,13 @@ func (s *Service) GetJobs(
return jobsInRM, nil
}

// GetJobSummary returns a summary of the job given an id and resource pool.
func (s *Service) GetJobSummary(id model.JobID, resourcePool string) (*jobv1.JobSummary, error) {
// GetJobSummary returns a summary of the job given an id and resource pool/resource manager.
func (s *Service) GetJobSummary(id model.JobID, resourceManager string, resourcePool string,
) (*jobv1.JobSummary, error) {
s.mu.Lock()
defer s.mu.Unlock()

jobQ, err := s.rm.GetJobQ(sproto.GetJobQ{ResourcePool: resourcePool})
jobQ, err := s.rm.GetJobQ(resourceManager, resourcePool)
if err != nil {
s.syslog.WithError(err).Error("getting job queue info from RM")
return nil, err
Expand Down Expand Up @@ -202,24 +202,26 @@ func (s *Service) applyUpdate(update *jobv1.QueueControl) error {
s.syslog.WithError(err).Info("setting command job weight")
return err
}
case *jobv1.QueueControl_ResourcePool:
if action.ResourcePool == "" {
case *jobv1.QueueControl_Resources:
if action.Resources.ResourcePool == "" {
s.syslog.Error("resource pool must be set")
}
return j.SetResourcePool(action.ResourcePool)
return j.SetResourcePool(action.Resources.ResourceManager, action.Resources.ResourcePool)
case *jobv1.QueueControl_AheadOf:
return s.rm.MoveJob(sproto.MoveJob{
rmName, rpName := j.ResourcePool()
return s.rm.MoveJob(rmName, sproto.MoveJob{
ID: jobID,
Anchor: model.JobID(action.AheadOf),
Ahead: true,
ResourcePool: j.ResourcePool(),
ResourcePool: rpName,
})
case *jobv1.QueueControl_BehindOf:
return s.rm.MoveJob(sproto.MoveJob{
rmName, rpName := j.ResourcePool()
return s.rm.MoveJob(rmName, sproto.MoveJob{
ID: jobID,
Anchor: model.JobID(action.BehindOf),
Ahead: false,
ResourcePool: j.ResourcePool(),
ResourcePool: rpName,
})
default:
return fmt.Errorf("unexpected action: %v", action)
Expand Down
24 changes: 12 additions & 12 deletions master/internal/rm/agentrm/agent_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,21 @@ func (a *ResourceManager) GetDefaultComputeResourcePool(
}

// GetExternalJobs implements rm.ResourceManager.
func (*ResourceManager) GetExternalJobs(sproto.GetExternalJobs) ([]*jobv1.Job, error) {
func (*ResourceManager) GetExternalJobs(_, _ string) ([]*jobv1.Job, error) {
return nil, rmerrors.ErrNotSupported
}

// GetJobQ implements rm.ResourceManager.
func (a *ResourceManager) GetJobQ(msg sproto.GetJobQ) (map[model.JobID]*sproto.RMJobInfo, error) {
if msg.ResourcePool == "" {
msg.ResourcePool = a.config.DefaultComputeResourcePool
func (a *ResourceManager) GetJobQ(_, resourcePool string) (map[model.JobID]*sproto.RMJobInfo, error) {
if resourcePool == "" {
resourcePool = a.config.DefaultComputeResourcePool
}

pool, err := a.poolByName(msg.ResourcePool)
pool, err := a.poolByName(resourcePool)
if err != nil {
return nil, err
}
return pool.GetJobQ(sproto.GetJobQ{ResourcePool: msg.ResourcePool}), nil
return pool.GetJobQ(), nil
}

// GetJobQueueStatsRequest implements rm.ResourceManager.
Expand All @@ -302,7 +302,7 @@ func (a *ResourceManager) GetJobQueueStatsRequest(
continue
}

stats := pool.GetJobQStats(sproto.GetJobQStats{})
stats := pool.GetJobQStats()

aggregates, err := a.fetchAvgQueuedTime(name)
if err != nil {
Expand Down Expand Up @@ -375,7 +375,7 @@ func (*ResourceManager) IsReattachableOnlyAfterStarted() bool {
}

// MoveJob implements rm.ResourceManager.
func (a *ResourceManager) MoveJob(msg sproto.MoveJob) error {
func (a *ResourceManager) MoveJob(_ string, msg sproto.MoveJob) error {
pool, err := a.poolByName(msg.ResourcePool)
if err != nil {
return fmt.Errorf("move job found no resource pool with name %s: %w", msg.ResourcePool, err)
Expand All @@ -392,7 +392,7 @@ func (*ResourceManager) NotifyContainerRunning(sproto.NotifyContainerRunning) er
}

// RecoverJobPosition implements rm.ResourceManager.
func (a *ResourceManager) RecoverJobPosition(msg sproto.RecoverJobPosition) {
func (a *ResourceManager) RecoverJobPosition(_ string, msg sproto.RecoverJobPosition) {
pool, err := a.poolByName(msg.ResourcePool)
if err != nil {
a.syslog.WithError(err).Error("recovering job position")
Expand Down Expand Up @@ -487,7 +487,7 @@ func (a *ResourceManager) SetGroupMaxSlots(msg sproto.SetGroupMaxSlots) {
}

// SetGroupPriority implements rm.ResourceManager.
func (a *ResourceManager) SetGroupPriority(msg sproto.SetGroupPriority) error {
func (a *ResourceManager) SetGroupPriority(_ string, msg sproto.SetGroupPriority) error {
pool, err := a.poolByName(msg.ResourcePool)
if err != nil {
return fmt.Errorf("set group priority found no resource pool with name %s: %w",
Expand All @@ -497,7 +497,7 @@ func (a *ResourceManager) SetGroupPriority(msg sproto.SetGroupPriority) error {
}

// SetGroupWeight implements rm.ResourceManager.
func (a *ResourceManager) SetGroupWeight(msg sproto.SetGroupWeight) error {
func (a *ResourceManager) SetGroupWeight(_ string, msg sproto.SetGroupWeight) error {
pool, err := a.poolByName(msg.ResourcePool)
if err != nil {
return fmt.Errorf("set group weight found no resource pool with name %s: %w",
Expand Down Expand Up @@ -607,7 +607,7 @@ func (a *ResourceManager) getPoolJobStats(poolConfig config.ResourcePoolConfig)
if err != nil {
return nil, err
}
return pool.GetJobQStats(sproto.GetJobQStats{}), nil
return pool.GetJobQStats(), nil
}

func (a *ResourceManager) getResourcePoolConfig(poolName string) (
Expand Down
4 changes: 2 additions & 2 deletions master/internal/rm/agentrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,13 +774,13 @@ func (rp *resourcePool) RecoverJobPosition(msg sproto.RecoverJobPosition) {
rp.queuePositions.RecoverJobPosition(msg.JobID, msg.JobPosition)
}

func (rp *resourcePool) GetJobQStats(msg sproto.GetJobQStats) *jobv1.QueueStats {
func (rp *resourcePool) GetJobQStats() *jobv1.QueueStats {
rp.mu.Lock()
defer rp.mu.Unlock()
return tasklist.JobStats(rp.taskList)
}

func (rp *resourcePool) GetJobQ(msg sproto.GetJobQ) map[model.JobID]*sproto.RMJobInfo {
func (rp *resourcePool) GetJobQ() map[model.JobID]*sproto.RMJobInfo {
rp.mu.Lock()
defer rp.mu.Unlock()
return rp.scheduler.JobQInfo(rp)
Expand Down
Loading

0 comments on commit 7c6bec9

Please sign in to comment.