Skip to content

Commit

Permalink
test: add e2e_tests for multirm k8s [RM-11] (#8926)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasBlaskey authored Mar 11, 2024
1 parent 18154f6 commit 1a35e5d
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 13 deletions.
52 changes: 52 additions & 0 deletions .circleci/devcluster/multi-k8s.devcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
stages:
- db:
name: db

- master:
pre:
- sh: make -C tools prep-root
# Slice our kubeconfig up into two different kube configs. Determined should use whatever context is set.
- sh: cp ~/.kube/config /tmp/defaultrm-kubeconf && kubectl config use-context defaultrm --kubeconfig=/tmp/defaultrm-kubeconf
- sh: cp ~/.kube/config /tmp/additionalrm-kubeconf && kubectl config use-context additionalrm --kubeconfig=/tmp/additionalrm-kubeconf
config_file:
db:
host: localhost
port: 5432
password: postgres
user: postgres
name: determined
checkpoint_storage:
type: shared_fs
host_path: /tmp/determined-cp
cache:
cache_dir: /tmp/determined-cache
log:
level: debug
enable_cors: true
root: tools/build

resource_manager:
type: kubernetes
name: default
namespace: default
max_slots_per_pod: 1
slot_type: "cpu"
slot_resource_requests:
cpu: 1
kubeconfig_path: /tmp/defaultrm-kubeconf
determined_master_ip: $DOCKER_LOCALHOST
determined_master_port: 8080
additional_resource_managers:
- resource_manager:
type: kubernetes
name: additionalrm
namespace: default
max_slots_per_pod: 1
slot_type: "cpu"
slot_resource_requests:
cpu: 1
kubeconfig_path: /tmp/additionalrm-kubeconf
determined_master_ip: $DOCKER_LOCALHOST
determined_master_port: 8080
resource_pools:
- pool_name: additional_pool
29 changes: 29 additions & 0 deletions .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,9 @@ jobs:
wait-for-master:
type: boolean
default: true
run-minikubes:
type: boolean
default: false
machine:
image: <<pipeline.parameters.machine-image>>
resource_class: xlarge
Expand All @@ -2109,6 +2112,21 @@ jobs:
extra-requirements-file: "e2e_tests/tests/requirements.txt"
executor: <<pipeline.parameters.machine-image>>

- when:
condition: <<parameters.run-minikubes>>
steps:
- kubernetes/install-kubectl
- run:
name: Install minikube
command: |
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && sudo install minikube-linux-amd64 /usr/local/bin/minikube
- run:
name: Start defaultrm minikube
command: minikube start --profile defaultrm
- run:
name: Start additionalrm minikube
command: minikube start --profile additionalrm

- install-devcluster
- unless:
condition: <<parameters.managed-devcluster>>
Expand Down Expand Up @@ -2882,6 +2900,17 @@ workflows:
# so `compare_stats` cannot get the full logs from `det master logs`.
extra-pytest-flags: "--no-compare-stats"

- test-e2e:
name: test-e2e-multi-k8s
requires:
- build-go
parallelism: 1
tf2: true
mark: e2e_multi_k8s
target-stage: master
devcluster-config: multi-k8s.devcluster.yaml
run-minikubes: true

- test-e2e:
name: test-e2e-port-registry
requires:
Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ markers =
e2e_cpu_cross_version: end to end CPU tests for testing basic cluster functionality with differing master/agent versions
e2e_cpu_agent_connection_loss: end to end CPU tests for testing agent functionality on connection loss
e2e_gpu: end to end GPU tests
e2e_multi_k8s: end to end for multirm k8s
e2e_k8s: end to end tests specific to k8s (only used in test-e2e-gke-single-cpu currently)
e2e_pbs: end to end pbs integration tests
e2e_slurm: end to end slurm integration tests
Expand Down
75 changes: 75 additions & 0 deletions e2e_tests/tests/cluster/test_multi_k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Optional

import pytest

from determined.common import api
from determined.common.api import bindings
from tests import api_utils
from tests import config as conf
from tests import detproc
from tests import experiment as exp
from tests.cluster import utils

MAX_WAIT_TIME = 500 # Really long since minikube will need to pull images.


@pytest.mark.e2e_multi_k8s
@pytest.mark.parametrize(
"resource_pool, expected_node", [(None, "defaultrm"), ("additional_pool", "additionalrm")]
)
def test_run_experiment_multi_k8s(resource_pool: Optional[str], expected_node: str) -> None:
args = ["--config", "entrypoint=echo RunningOnNode=$DET_AGENT_ID"]
if resource_pool:
args += ["--config", f"resources.resource_pool={resource_pool}"]

sess = api_utils.user_session()
exp_id = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single-one-short-step.yaml"),
conf.fixtures_path("no_op"),
args,
)
exp.wait_for_experiment_state(
sess,
exp_id,
bindings.experimentv1State.COMPLETED,
max_wait_secs=MAX_WAIT_TIME,
)
exp.assert_patterns_in_trial_logs(
sess, exp.experiment_first_trial(sess, exp_id), [f"RunningOnNode={expected_node}"]
)


@pytest.mark.e2e_multi_k8s
@pytest.mark.parametrize(
"resource_pool, expected_node", [(None, "defaultrm"), ("additional_pool", "additionalrm")]
)
def test_run_command_multi_k8s(resource_pool: Optional[str], expected_node: str) -> None:
sess = api_utils.user_session()
args = (
None if resource_pool is None else ["--config", f"resources.resource_pool={resource_pool}"]
)
command_id = utils.run_command_args(sess, "echo RunningOnNode=$DET_AGENT_ID", args)
utils.wait_for_command_state(sess, command_id, "TERMINATED", MAX_WAIT_TIME)
utils.assert_command_succeeded(sess, command_id)

logs = api.task_logs(sess, command_id)
str_logs = "".join(log.log for log in logs)
assert f"RunningOnNode={expected_node}" in str_logs, str_logs


@pytest.mark.e2e_multi_k8s
def test_not_found_pool_multi_k8s() -> None:
sess = api_utils.user_session()
detproc.check_error(
sess,
["det", "cmd", "run", "--config", "resources.resource_pool=missing", "echo"],
"could not find resource pool missing",
)


@pytest.mark.e2e_multi_k8s
def test_get_agents_multi_k8s() -> None:
sess = api_utils.user_session()
resp = bindings.get_GetAgents(sess)
assert {agent.id for agent in resp.agents} == {"defaultrm", "additionalrm"}
14 changes: 13 additions & 1 deletion e2e_tests/tests/cluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
import threading
import time
from typing import Any, Dict, Tuple, Type
from typing import Any, Dict, List, Optional, Tuple, Type

import pytest
import requests
Expand Down Expand Up @@ -130,6 +130,18 @@ def run_command(sess: api.Session, sleep: int = 30, slots: int = 1) -> str:
return detproc.check_output(sess, cmd).strip()


def run_command_args(sess: api.Session, entrypoint: str, args: Optional[List[str]]) -> str:
cmd = [
"det",
"command",
"run",
"-d",
]
if args:
cmd += args
return detproc.check_output(sess, cmd + [entrypoint]).strip()


def run_zero_slot_command(sess: api.Session, sleep: int = 30) -> str:
return run_command(sess, sleep=sleep, slots=0)

Expand Down
1 change: 1 addition & 0 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"e2e_cpu_elastic",
"e2e_cpu_rbac",
"e2e_gpu",
"e2e_multi_k8s",
"e2e_k8s",
"e2e_pbs",
"e2e_slurm",
Expand Down
3 changes: 0 additions & 3 deletions master/internal/config/resource_manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ func (a AgentResourceManagerConfig) Validate() []error {
type KubernetesResourceManagerConfig struct {
Namespace string `json:"namespace"`

// Deprecated: this can be per resource pool now on taskContainerDefaults.
// This will always be the same as global
// task_container_defaults.kubernetes.max_slots_per_pod so use that.
MaxSlotsPerPod *int `json:"max_slots_per_pod"`

MasterServiceName string `json:"master_service_name"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func New(

for _, poolConfig := range k.poolsConfig {
maxSlotsPerPod := 0
if k.taskContainerDefaults.Kubernetes.MaxSlotsPerPod != nil {
maxSlotsPerPod = *k.taskContainerDefaults.Kubernetes.MaxSlotsPerPod
if m := k.config.MaxSlotsPerPod; m != nil {
maxSlotsPerPod = *m
}
if poolConfig.TaskContainerDefaults != nil &&
poolConfig.TaskContainerDefaults.Kubernetes != nil &&
Expand Down Expand Up @@ -509,8 +509,8 @@ func (k *ResourceManager) createResourcePoolSummary(

// TODO actor refactor, this is just getting resourcePool[poolName].maxSlotsPerPod
slotsPerAgent := 0
if k.taskContainerDefaults.Kubernetes.MaxSlotsPerPod != nil {
slotsPerAgent = *k.taskContainerDefaults.Kubernetes.MaxSlotsPerPod
if m := k.config.MaxSlotsPerPod; m != nil {
slotsPerAgent = *m
}
if pool.TaskContainerDefaults != nil &&
pool.TaskContainerDefaults.Kubernetes != nil &&
Expand Down
12 changes: 7 additions & 5 deletions master/internal/rm/kubernetesrm/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,13 +923,15 @@ func (p *pods) podStatusCallback(event watch.Event) {
}
}

var clusterID string
var (
clusterID string
once sync.Once
)

func setClusterID(s string) {
if clusterID != "" {
panic(fmt.Sprintf("set cluster ID again new %s old %s", s, clusterID))
}
clusterID = s
once.Do(func() {
clusterID = s
})
}

func clusterIDNodeLabel() string {
Expand Down

0 comments on commit 1a35e5d

Please sign in to comment.