Skip to content

Commit

Permalink
fix: match GetJobQueueStats behavior in k8s RM to agent RM [RM-136] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
carolinaecalderon authored Apr 5, 2024
1 parent 2ef5ab9 commit a0847b8
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
34 changes: 34 additions & 0 deletions master/internal/rm/agentrm/agent_resource_manager_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/internal/user"
"github.com/determined-ai/determined/master/pkg/syncx/queue"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/jobv1"
"github.com/determined-ai/determined/proto/pkg/resourcepoolv1"
)
Expand Down Expand Up @@ -245,3 +246,36 @@ func TestGetResourcePools(t *testing.T) {

require.Equal(t, string(expected), string(actual))
}

func TestGetJobQueueStatsRequest(t *testing.T) {
agentRM := &ResourceManager{
pools: map[string]*resourcePool{
"pool1": setupResourcePool(
t, nil, &config.ResourcePoolConfig{PoolName: "pool1"},
nil, nil, []*MockAgent{{ID: "agent1", Slots: 0}},
),
"pool2": setupResourcePool(
t, nil, &config.ResourcePoolConfig{PoolName: "pool2"},
nil, nil, []*MockAgent{{ID: "agent2", Slots: 0}},
),
},
}

cases := []struct {
name string
filteredRPs []string
expected int
}{
{"empty, return all", []string{}, 2},
{"filter 1 in", []string{"pool1"}, 1},
{"filter 2 in", []string{"pool1", "pool2"}, 2},
{"filter undefined in, return none", []string{"bogus"}, 0},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
res, err := agentRM.GetJobQueueStatsRequest(&apiv1.GetJobQueueStatsRequest{ResourcePools: tt.filteredRPs})
require.NoError(t, err)
require.Equal(t, tt.expected, len(res.Results))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"maps"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -239,13 +240,17 @@ func (k *ResourceManager) GetJobQ(rpName rm.ResourcePoolName) (map[model.JobID]*

// GetJobQueueStatsRequest implements rm.ResourceManager.
func (k *ResourceManager) GetJobQueueStatsRequest(
*apiv1.GetJobQueueStatsRequest,
msg *apiv1.GetJobQueueStatsRequest,
) (*apiv1.GetJobQueueStatsResponse, error) {
resp := &apiv1.GetJobQueueStatsResponse{
Results: make([]*apiv1.RPQueueStat, 0),
}

for poolName, rp := range k.pools {
if len(msg.ResourcePools) != 0 && !slices.Contains(msg.ResourcePools, poolName) {
continue
}

qStats := apiv1.RPQueueStat{
ResourcePool: poolName,
Stats: rp.GetJobQStats(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package kubernetesrm
import (
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"testing"
"time"
Expand All @@ -20,13 +22,16 @@ import (
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/internal/mocks"
"github.com/determined-ai/determined/master/internal/rm/tasklist"
"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/pkg/cproto"
"github.com/determined-ai/determined/master/pkg/device"
"github.com/determined-ai/determined/master/pkg/etc"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/jobv1"
"github.com/determined-ai/determined/proto/pkg/resourcepoolv1"
)
Expand All @@ -45,6 +50,26 @@ const (
nonDetNodeName = "NonDetermined"
)

func TestMain(m *testing.M) {
// Need to set up the DB for TestJobQueueStats
pgDB, err := db.ResolveTestPostgres()
if err != nil {
log.Panicln(err)
}

err = db.MigrateTestPostgres(pgDB, "file://../../../static/migrations", "up")
if err != nil {
log.Panicln(err)
}

err = etc.SetRootPath("../../../static/srv")
if err != nil {
log.Panicln(err)
}

os.Exit(m.Run())
}

func TestGetAgents(t *testing.T) {
type AgentsTestCase struct {
Name string
Expand Down Expand Up @@ -567,6 +592,44 @@ func TestGetResourcePools(t *testing.T) {
require.Equal(t, string(expected), string(actual))
}

func TestGetJobQueueStatsRequest(t *testing.T) {
mockPods := createMockPodsService(make(map[string]*k8sV1.Node), device.CUDA, true)
pool1 := &kubernetesResourcePool{
poolConfig: &config.ResourcePoolConfig{PoolName: "pool1"},
podsService: mockPods,
reqList: tasklist.New(),
}
pool2 := &kubernetesResourcePool{
poolConfig: &config.ResourcePoolConfig{PoolName: "pool2"},
podsService: mockPods,
reqList: tasklist.New(),
}
k8sRM := &ResourceManager{
pools: map[string]*kubernetesResourcePool{
"pool1": pool1,
"pool2": pool2,
},
}

cases := []struct {
name string
filteredRPs []string
expected int
}{
{"empty, return all", []string{}, 2},
{"filter 1 in", []string{"pool1"}, 1},
{"filter 2 in", []string{"pool1", "pool2"}, 2},
{"filter undefined in, return none", []string{"bogus"}, 0},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
res, err := k8sRM.GetJobQueueStatsRequest(&apiv1.GetJobQueueStatsRequest{ResourcePools: tt.filteredRPs})
require.NoError(t, err)
require.Equal(t, tt.expected, len(res.Results))
})
}
}

func TestHealthCheck(t *testing.T) {
mockPodInterface := &mocks.PodInterface{}
kubernetesRM := &ResourceManager{
Expand Down
4 changes: 2 additions & 2 deletions master/internal/rm/multirm/multirm_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ func TestGetJobQueueStatsRequest(t *testing.T) {
err error
expectedLen int
}{
// TODO RM-136: revist this.
// Per the mocks set-up, no matter the pools in the request, return the max # of responses.
// Per the mocks set-up, no matter the pools in the request, return the max # of responses because of
// the fan-out call to all RMs.
{"empty request", &apiv1.GetJobQueueStatsRequest{}, nil, 2},
{"empty RP name will default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{""}}, nil, 2},
{"defined RP in default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{defaultRMName}}, nil, 2},
Expand Down

0 comments on commit a0847b8

Please sign in to comment.