Skip to content

Commit

Permalink
chore: revert multirm refactors (#8962)
Browse files Browse the repository at this point in the history
* Revert "chore: add multirm module to ResourceManager (#8857)"

This reverts commit c3012ff.

Revert "chore: refactor ResourceManager interface for multirm (#8847)"

This reverts commit 6857ecf.

Revert "chore: refactor proto, schema, and jobservice for multiRM (#8875)"

This reverts commit 7c6bec9.
  • Loading branch information
carolinaecalderon committed Mar 7, 2024
1 parent 4309f7f commit bf21896
Show file tree
Hide file tree
Showing 81 changed files with 1,202 additions and 3,089 deletions.
128 changes: 3 additions & 125 deletions harness/determined/common/api/bindings.py

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions master/internal/api_agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func (a *apiServer) GetAgents(
ctx context.Context, req *apiv1.GetAgentsRequest,
) (*apiv1.GetAgentsResponse, error) {
resp, err := a.m.rm.GetAgents()
resp, err := a.m.rm.GetAgents(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func (a *apiServer) GetAgent(
return nil, err
}

resp, err := a.m.rm.GetAgent(req.ResourceManager, req)
resp, err := a.m.rm.GetAgent(req)
if err != nil {
return nil, err
}
Expand All @@ -78,7 +78,7 @@ func (a *apiServer) GetSlots(
return nil, err
}

resp, err := a.m.rm.GetSlots(req.ResourceManager, req)
resp, err := a.m.rm.GetSlots(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (a *apiServer) GetSlot(
return nil, err
}

resp, err := a.m.rm.GetSlot(req.ResourceManager, req)
resp, err := a.m.rm.GetSlot(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (a *apiServer) EnableAgent(
if err := a.canUpdateAgents(ctx); err != nil {
return nil, err
}
return a.m.rm.EnableAgent(req.ResourceManager, req)
return a.m.rm.EnableAgent(req)
}

func (a *apiServer) DisableAgent(
Expand All @@ -153,7 +153,7 @@ func (a *apiServer) DisableAgent(
if err := a.canUpdateAgents(ctx); err != nil {
return nil, err
}
return a.m.rm.DisableAgent(req.ResourceManager, req)
return a.m.rm.DisableAgent(req)
}

func (a *apiServer) EnableSlot(
Expand All @@ -163,7 +163,7 @@ func (a *apiServer) EnableSlot(
return nil, err
}

resp, err = a.m.rm.EnableSlot(req.ResourceManager, req)
resp, err = a.m.rm.EnableSlot(req)
switch {
case errors.Is(err, rmerrors.ErrNotSupported):
return resp, status.Error(codes.Unimplemented, err.Error())
Expand All @@ -181,7 +181,7 @@ func (a *apiServer) DisableSlot(
return nil, err
}

resp, err = a.m.rm.DisableSlot(req.ResourceManager, req)
resp, err = a.m.rm.DisableSlot(req)
switch {
case errors.Is(err, rmerrors.ErrNotSupported):
return resp, status.Error(codes.Unimplemented, err.Error())
Expand Down
5 changes: 2 additions & 3 deletions master/internal/api_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (a *apiServer) getCommandLaunchParams(ctx context.Context, req *protoComman
resources.Slots = 0
}

managerName, poolName, launchWarnings, err := a.m.ResolveResources(
resources.ResourceManager,
poolName, launchWarnings, err := a.m.ResolveResources(
resources.ResourcePool,
resources.Slots,
int(cmdSpec.Metadata.WorkspaceID),
Expand All @@ -108,7 +107,7 @@ func (a *apiServer) getCommandLaunchParams(ctx context.Context, req *protoComman
}

// Get the base TaskSpec.
taskSpec, err := a.m.fillTaskSpec(managerName, poolName, agentUserGroup, userModel)
taskSpec, err := a.m.fillTaskSpec(poolName, agentUserGroup, userModel)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions master/internal/api_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (a *apiServer) GetExperiment(
}

jobID := model.JobID(exp.JobId)
jobSummary, err := jobservice.DefaultService.GetJobSummary(jobID, exp.ResourceManager, exp.ResourcePool)
jobSummary, err := jobservice.DefaultService.GetJobSummary(jobID, exp.ResourcePool)
if err != nil {
// An error here either is real or just that the experiment was not yet terminal in the DB
// when we first queried it but was by the time it got around to handling out ask. We can't
Expand Down Expand Up @@ -499,8 +499,6 @@ func (a *apiServer) deleteExperiments(exps []*model.Experiment, userModel *model
}

// delete jobs per experiment
// TODO (multirm for dispatcherrm): since we're not passing in an RM name here
// you'll have to check ALL RMs and all RPs for the job.
resp, err := a.m.rm.DeleteJob(sproto.DeleteJob{
JobID: exp.JobID,
})
Expand Down
2 changes: 0 additions & 2 deletions master/internal/api_experiment_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1930,8 +1930,6 @@ func TestDeleteExperimentsFiltered(t *testing.T) {
errC <- errors.New("something real bad")
return sproto.DeleteJobResponse{Err: errC}
}, nil)
mockRM.On("ResolveResourcePool", mock.Anything, mock.Anything).Return(
mock.Anything, mock.Anything, nil)

api, curUser, ctx := setupAPITest(t, nil, &mockRM)

Expand Down
13 changes: 4 additions & 9 deletions master/internal/api_generic_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,15 @@ func (a *apiServer) getGenericTaskLaunchParameters(
return nil, nil, nil, fmt.Errorf("resource slots must be >= 0")
}
isSingleNode := resources.IsSingleNode != nil && *resources.IsSingleNode
managerName, poolName, launchWarnings, err := a.m.ResolveResources(
resources.ResourceManager,
resources.ResourcePool,
poolName, launchWarnings, err := a.m.ResolveResources(resources.ResourcePool,
resources.Slots,
int(proj.WorkspaceId),
isSingleNode)
if err != nil {
return nil, nil, nil, err
}
// Get the base TaskSpec.
taskSpec, err := a.m.fillTaskSpec(managerName, poolName, agentUserGroup, userModel)
taskSpec, err := a.m.fillTaskSpec(poolName, agentUserGroup, userModel)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -128,7 +126,6 @@ func (a *apiServer) getGenericTaskLaunchParameters(
// Copy discovered (default) resource pool name and slot count.

fillTaskConfig(resources.Slots, taskSpec, &taskConfig.Environment)
taskConfig.Resources.RawResourceManager = &resources.ResourceManager
taskConfig.Resources.RawResourcePool = &poolName
taskConfig.Resources.RawSlots = &resources.Slots

Expand Down Expand Up @@ -359,9 +356,8 @@ func (a *apiServer) CreateGenericTask(
IsUserVisible: true,
Name: fmt.Sprintf("Generic Task %s", taskID),

SlotsNeeded: *genericTaskSpec.GenericTaskConfig.Resources.Slots(),
ResourceManager: genericTaskSpec.GenericTaskConfig.Resources.ResourceManager(),
ResourcePool: genericTaskSpec.GenericTaskConfig.Resources.ResourcePool(),
SlotsNeeded: *genericTaskSpec.GenericTaskConfig.Resources.Slots(),
ResourcePool: genericTaskSpec.GenericTaskConfig.Resources.ResourcePool(),
FittingRequirements: sproto.FittingRequirements{
SingleAgent: isSingleNode,
},
Expand Down Expand Up @@ -664,7 +660,6 @@ func (a *apiServer) UnpauseGenericTask(
IsUserVisible: true,
Name: fmt.Sprintf("Generic Task %s", resumingTask.TaskID),
SlotsNeeded: *genericTaskSpec.GenericTaskConfig.Resources.Slots(),
ResourceManager: genericTaskSpec.GenericTaskConfig.Resources.ResourceManager(),
ResourcePool: genericTaskSpec.GenericTaskConfig.Resources.ResourcePool(),
FittingRequirements: sproto.FittingRequirements{
SingleAgent: isSingleNode,
Expand Down
4 changes: 1 addition & 3 deletions master/internal/api_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func (a *apiServer) GetJobs(
ctx context.Context, req *apiv1.GetJobsRequest,
) (resp *apiv1.GetJobsResponse, err error) {
jobs, err := jobservice.DefaultService.GetJobs(
req.ResourceManager,
req.ResourcePool,
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
Expand Down Expand Up @@ -50,7 +49,6 @@ func (a *apiServer) GetJobsV2(
ctx context.Context, req *apiv1.GetJobsV2Request,
) (resp *apiv1.GetJobsV2Response, err error) {
jobs, err := jobservice.DefaultService.GetJobs(
req.ResourceManager,
req.ResourcePool,
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
Expand Down Expand Up @@ -99,7 +97,7 @@ func (a *apiServer) GetJobsV2(
func (a *apiServer) GetJobQueueStats(
_ context.Context, req *apiv1.GetJobQueueStatsRequest,
) (*apiv1.GetJobQueueStatsResponse, error) {
resp, err := a.m.rm.GetJobQueueStatsRequest(req.ResourceManager, req)
resp, err := a.m.rm.GetJobQueueStatsRequest(req)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions master/internal/api_resourcepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/internal/grpcutil"
"github.com/determined-ai/determined/master/internal/rm"
"github.com/determined-ai/determined/master/internal/sproto"
workspaceauth "github.com/determined-ai/determined/master/internal/workspace"
"github.com/determined-ai/determined/master/pkg/set"
"github.com/determined-ai/determined/proto/pkg/apiv1"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (a *apiServer) GetResourcePools(
if err != nil {
return nil, err
}
resp, err := a.m.rm.GetResourcePools()
resp, err := a.m.rm.GetResourcePools(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,7 +90,7 @@ func (a *apiServer) GetResourcePools(
func (a *apiServer) BindRPToWorkspace(
ctx context.Context, req *apiv1.BindRPToWorkspaceRequest,
) (*apiv1.BindRPToWorkspaceResponse, error) {
err := a.checkIfPoolIsDefault(req.ResourceManagerName, req.ResourcePoolName)
err := a.checkIfPoolIsDefault(req.ResourcePoolName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,7 +121,7 @@ func (a *apiServer) BindRPToWorkspace(
func (a *apiServer) OverwriteRPWorkspaceBindings(
ctx context.Context, req *apiv1.OverwriteRPWorkspaceBindingsRequest,
) (*apiv1.OverwriteRPWorkspaceBindingsResponse, error) {
err := a.checkIfPoolIsDefault(req.ResourceManagerName, req.ResourcePoolName)
err := a.checkIfPoolIsDefault(req.ResourcePoolName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,13 +212,16 @@ func (a *apiServer) ListWorkspacesBoundToRP(
}, nil
}

func (a *apiServer) checkIfPoolIsDefault(managerName string, poolName string) error {
defaultComputePool, err := a.m.rm.GetDefaultComputeResourcePool(managerName)
func (a *apiServer) checkIfPoolIsDefault(poolName string) error {
defaultComputePool, err := a.m.rm.GetDefaultComputeResourcePool(
sproto.GetDefaultComputeResourcePoolRequest{})
if err != nil {
return err
}

defaultAuxPool, err := a.m.rm.GetDefaultAuxResourcePool(managerName)
defaultAuxPool, err := a.m.rm.GetDefaultAuxResourcePool(
sproto.GetDefaultAuxResourcePoolRequest{},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -250,7 +254,7 @@ func (a *apiServer) canUserModifyWorkspaces(ctx context.Context, ids []int32) er
}

func (a *apiServer) resourcePoolsAsConfigs() ([]config.ResourcePoolConfig, error) {
resp, err := a.m.rm.GetResourcePools()
resp, err := a.m.rm.GetResourcePools(&apiv1.GetResourcePoolsRequest{})
if err != nil {
return nil, err
}
Expand Down
52 changes: 26 additions & 26 deletions master/internal/api_resourcepool_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func TestPostBindingFails(t *testing.T) {

// TODO (eliu): add some tests for workspaceIDs
// test resource pools on workspaces that do not exist
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Once()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Once()
_, err := api.BindRPToWorkspace(ctx, &apiv1.BindRPToWorkspaceRequest{
ResourcePoolName: testPoolName,
Expand All @@ -69,11 +69,11 @@ func TestPostBindingFails(t *testing.T) {
require.ErrorContains(t, err, "the following workspaces do not exist: [nonexistent_workspace]")

// test resource pool doesn't exist
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Once()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Twice()
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{},
}, nil).Once()
Expand All @@ -85,7 +85,7 @@ func TestPostBindingFails(t *testing.T) {
require.ErrorContains(t, err, "pool with name testRP doesn't exist")

// test resource pool is a default resource pool
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{PoolName: testPoolName}, nil).Twice()

_, err = api.BindRPToWorkspace(ctx, &apiv1.BindRPToWorkspaceRequest{
Expand All @@ -95,7 +95,7 @@ func TestPostBindingFails(t *testing.T) {

require.ErrorContains(t, err, "default resource pool testRP cannot be bound to any workspace")

mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{PoolName: testPoolName}, nil).Once()

_, err = api.BindRPToWorkspace(ctx, &apiv1.BindRPToWorkspaceRequest{
Expand All @@ -106,11 +106,11 @@ func TestPostBindingFails(t *testing.T) {
require.ErrorContains(t, err, "default resource pool testRP cannot be bound to any workspace")

// test no resource pool specified
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{PoolName: testPoolName}, nil).Once()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{PoolName: testPoolName}, nil).Once()
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Once()
Expand All @@ -134,11 +134,11 @@ func TestPostBindingSucceeds(t *testing.T) {
_ = setupWorkspaces(ctx, t, api)

// bind first resource pool
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Twice()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Twice()
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Twice()
Expand Down Expand Up @@ -170,11 +170,11 @@ func TestListWorkspacesBoundToRPFails(t *testing.T) {
_ = setupWorkspaces(ctx, t, api)

// bind first workspace
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Once()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Once()
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Times(3)
Expand Down Expand Up @@ -207,11 +207,11 @@ func TestListWorkspacesBoundToRPSucceeds(t *testing.T) {
workspaceIDs := setupWorkspaces(ctx, t, api)

// test bind resource pool to workspace
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Once()
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Once()
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Twice()
Expand All @@ -231,7 +231,7 @@ func TestListWorkspacesBoundToRPSucceeds(t *testing.T) {
require.Equal(t, workspaceIDs[0], resp.WorkspaceIds[0])

// test listing on resource pool that has no bindings
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{
{Name: testPoolName}, {Name: testPool2Name},
Expand All @@ -256,11 +256,11 @@ func TestPatchBindingsSucceeds(t *testing.T) {
workspaceIDs := setupWorkspaces(ctx, t, api)

// setup first binding
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Times(4)
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Times(4)
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Times(7)
Expand Down Expand Up @@ -328,11 +328,11 @@ func TestDeleteBindingsSucceeds(t *testing.T) {

// TODO: fix all comments
// setup first binding
mockRM.On("GetDefaultComputeResourcePool", mock.Anything).
mockRM.On("GetDefaultComputeResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultComputeResourcePoolResponse{}, nil).Times(1)
mockRM.On("GetDefaultAuxResourcePool", mock.Anything).
mockRM.On("GetDefaultAuxResourcePool", mock.Anything, mock.Anything).
Return(sproto.GetDefaultAuxResourcePoolResponse{}, nil).Times(1)
mockRM.On("GetResourcePools").
mockRM.On("GetResourcePools", mock.Anything, mock.Anything).
Return(&apiv1.GetResourcePoolsResponse{
ResourcePools: []*resourcepoolv1.ResourcePool{{Name: testPoolName}},
}, nil).Times(3)
Expand Down
Loading

0 comments on commit bf21896

Please sign in to comment.