diff --git a/scheduler/pkg/scheduler/filters/serverreplicas.go b/scheduler/pkg/scheduler/filters/serverreplicas.go new file mode 100644 index 0000000000..7c932dabcd --- /dev/null +++ b/scheduler/pkg/scheduler/filters/serverreplicas.go @@ -0,0 +1,21 @@ +package filters + +import ( + "fmt" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" +) + +type ServerReplicaFilter struct{} + +func (r ServerReplicaFilter) Name() string { + return "ServerReplicaFilter" +} + +func (r ServerReplicaFilter) Filter(model *store.ModelVersion, server *store.ServerSnapshot) bool { + return len(server.Replicas) > 0 +} + +func (r ServerReplicaFilter) Description(model *store.ModelVersion, server *store.ServerSnapshot) string { + return fmt.Sprintf("%d server replicas (waiting for server replicas to connect)", len(server.Replicas)) +} diff --git a/scheduler/pkg/scheduler/filters/serverreplicas_test.go b/scheduler/pkg/scheduler/filters/serverreplicas_test.go new file mode 100644 index 0000000000..d538ded922 --- /dev/null +++ b/scheduler/pkg/scheduler/filters/serverreplicas_test.go @@ -0,0 +1,61 @@ +package filters + +import ( + "testing" + + . "github.com/onsi/gomega" + + pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" +) + +func TestServerReplicasFilter(t *testing.T) { + g := NewGomegaWithT(t) + + type test struct { + name string + model *store.ModelVersion + server *store.ServerSnapshot + expected bool + } + serverName := "server1" + model := store.NewModelVersion( + &pb.Model{ModelSpec: &pb.ModelSpec{}, DeploymentSpec: &pb.DeploymentSpec{Replicas: 1}}, + 1, + serverName, + map[int]store.ReplicaStatus{3: {State: store.Loading}}, + false, + store.ModelProgressing) + tests := []test{ + { + name: "No Replicas", + model: model, + server: &store.ServerSnapshot{Name: serverName, + Shared: true, + ExpectedReplicas: 0, + }, + expected: false, + }, + { + name: "Replicas", + model: model, + server: &store.ServerSnapshot{Name: serverName, + Shared: true, + ExpectedReplicas: 0, + Replicas: map[int]*store.ServerReplica{ + 0: &store.ServerReplica{}, + }, + }, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + filter := ServerReplicaFilter{} + ok := filter.Filter(test.model, test.server) + g.Expect(ok).To(Equal(test.expected)) + }) + } +} diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 416260f80f..431749b205 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -46,7 +46,7 @@ type SchedulerConfig struct { func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig { return SchedulerConfig{ - serverFilters: []filters.ServerFilter{filters.SharingServerFilter{}, filters.DeletedServerFilter{}}, + serverFilters: []filters.ServerFilter{filters.ServerReplicaFilter{}, filters.SharingServerFilter{}, filters.DeletedServerFilter{}}, replicaFilters: []filters.ReplicaFilter{filters.RequirementsReplicaFilter{}, filters.AvailableMemoryReplicaFilter{}, filters.ExplainerFilter{}, filters.ReplicaDrainingFilter{}}, serverSorts: []sorters.ServerSorter{}, replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}},