Skip to content

Commit

Permalink
fix: Bulk Action bug (#9255)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkim-det authored May 2, 2024
1 parent a8d05fa commit f6e42cd
Show file tree
Hide file tree
Showing 32 changed files with 2,248 additions and 1,530 deletions.
2 changes: 1 addition & 1 deletion e2e_tests/tests/cluster/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_job_queue_adjust_weight() -> None:
# Avoid leaking experiments even if this test fails.
# Leaking experiments can block the cluster and other tests from running other tasks
# while the experiments finish.
exp.kill_experiments(sess, exp_ids)
exp.kill_experiments(sess, exp_ids, -1)


def get_raw_data(sess: api.Session) -> Tuple[List[Dict[str, str]], List[str]]:
Expand Down
4 changes: 2 additions & 2 deletions e2e_tests/tests/cluster/test_log_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_log_policy_exclude_node_k8s(should_match: bool) -> None:
trial_logs = "\n".join(exp.trial_logs(sess, experiment_trials[0].trial.id))
assert "therefore will not schedule on" in trial_logs

exp.kill_experiments(sess, [exp_id])
exp.kill_experiments(sess, [exp_id], -1)
else:
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.ERROR)

Expand Down Expand Up @@ -145,7 +145,7 @@ def test_log_policy_exclude_node_single_agent(should_match: bool) -> None:
else:
if should_match:
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.QUEUED)
exp.kill_experiments(sess, [exp_id])
exp.kill_experiments(sess, [exp_id], -1)
else:
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.ERROR)

Expand Down
4 changes: 2 additions & 2 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def _test_master_restart_stopping(managed_cluster_restarts: abstract_cluster.Clu
sess, exp_id, bindings.experimentv1State.STOPPING_CANCELED, max_wait_secs=30
)
finally:
exp.kill_experiments(sess, [exp_id])
exp.kill_experiments(sess, [exp_id], -1)
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.CANCELED)

# All slots are empty, we don't leave a hanging container.
Expand Down Expand Up @@ -283,7 +283,7 @@ def test_master_restart_stopping_ignore_preemption_still_gets_killed(
trial_id = exp.experiment_first_trial(sess, exp_id)
exp.assert_patterns_in_trial_logs(sess, trial_id, ["137"])
finally:
exp.kill_experiments(sess, [exp_id])
exp.kill_experiments(sess, [exp_id], -1)
exp.wait_for_experiment_state(sess, exp_id, bindings.experimentv1State.CANCELED)


Expand Down
53 changes: 34 additions & 19 deletions e2e_tests/tests/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ def run_autotuning_experiment(


def archive_experiments(
sess: api.Session, experiment_ids: List[int], name: Optional[str] = None
sess: api.Session, experiment_ids: List[int], project_id: int, name: Optional[str] = None
) -> None:
body = bindings.v1ArchiveExperimentsRequest(experimentIds=experiment_ids)
body = bindings.v1ArchiveExperimentsRequest(projectId=project_id, experimentIds=experiment_ids)
if name is not None:
filters = bindings.v1BulkExperimentFilters(name=name)
body = bindings.v1ArchiveExperimentsRequest(experimentIds=[], filters=filters)
bindings.post_ArchiveExperiments(sess, body=body)
body = bindings.v1ArchiveExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids, filters=filters
)
bindings.post_ArchiveExperiments(sess, projectId=project_id, body=body)


def pause_experiment(sess: api.Session, experiment_id: int) -> None:
Expand All @@ -133,13 +135,16 @@ def pause_experiment(sess: api.Session, experiment_id: int) -> None:
def pause_experiments(
sess: api.Session,
experiment_ids: List[int],
project_id: int,
name: Optional[str] = None,
) -> None:
body = bindings.v1PauseExperimentsRequest(experimentIds=experiment_ids)
body = bindings.v1PauseExperimentsRequest(projectId=project_id, experimentIds=experiment_ids)
if name is not None:
filters = bindings.v1BulkExperimentFilters(name=name)
body = bindings.v1PauseExperimentsRequest(experimentIds=[], filters=filters)
bindings.post_PauseExperiments(sess, body=body)
body = bindings.v1PauseExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids, filters=filters
)
bindings.post_PauseExperiments(sess, projectId=project_id, body=body)


def activate_experiment(sess: api.Session, experiment_id: int) -> None:
Expand All @@ -148,14 +153,18 @@ def activate_experiment(sess: api.Session, experiment_id: int) -> None:


def activate_experiments(
sess: api.Session, experiment_ids: List[int], name: Optional[str] = None
sess: api.Session, experiment_ids: List[int], project_id: int, name: Optional[str] = None
) -> None:
if name is None:
body = bindings.v1ActivateExperimentsRequest(experimentIds=experiment_ids)
body = bindings.v1ActivateExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids
)
else:
filters = bindings.v1BulkExperimentFilters(name=name)
body = bindings.v1ActivateExperimentsRequest(experimentIds=[], filters=filters)
bindings.post_ActivateExperiments(sess, body=body)
body = bindings.v1ActivateExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids, filters=filters
)
bindings.post_ActivateExperiments(sess, projectId=project_id, body=body)


def cancel_experiment(sess: api.Session, experiment_id: int) -> None:
Expand All @@ -169,25 +178,31 @@ def kill_experiment(sess: api.Session, experiment_id: int) -> None:


def cancel_experiments(
sess: api.Session, experiment_ids: List[int], name: Optional[str] = None
sess: api.Session, experiment_ids: List[int], project_id: int, name: Optional[str] = None
) -> None:
if name is None:
body = bindings.v1CancelExperimentsRequest(experimentIds=experiment_ids)
body = bindings.v1CancelExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids
)
else:
filters = bindings.v1BulkExperimentFilters(name=name)
body = bindings.v1CancelExperimentsRequest(experimentIds=[], filters=filters)
bindings.post_CancelExperiments(sess, body=body)
body = bindings.v1CancelExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids, filters=filters
)
bindings.post_CancelExperiments(sess, projectId=project_id, body=body)


def kill_experiments(
sess: api.Session, experiment_ids: List[int], name: Optional[str] = None
sess: api.Session, experiment_ids: List[int], project_id: int, name: Optional[str] = None
) -> None:
if name is None:
body = bindings.v1KillExperimentsRequest(experimentIds=experiment_ids)
body = bindings.v1KillExperimentsRequest(projectId=project_id, experimentIds=experiment_ids)
else:
filters = bindings.v1BulkExperimentFilters(name=name)
body = bindings.v1KillExperimentsRequest(experimentIds=[], filters=filters)
bindings.post_KillExperiments(sess, body=body)
body = bindings.v1KillExperimentsRequest(
projectId=project_id, experimentIds=experiment_ids, filters=filters
)
bindings.post_KillExperiments(sess, projectId=project_id, body=body)


def kill_trial(sess: api.Session, trial_id: int) -> None:
Expand Down
91 changes: 34 additions & 57 deletions e2e_tests/tests/experiment/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from typing import List
from typing import Dict, List

import pytest

Expand All @@ -21,65 +21,42 @@ def test_archived_proj_exp_list() -> None:

projects = []
experiments = []
experimentMap: Dict[int, List[int]] = {}
for wrkspc in workspaces:
body1 = bindings.v1PostProjectRequest(
name=f"p_{uuid.uuid4().hex[:8]}", workspaceId=wrkspc.id
)
pid1 = bindings.post_PostProject(
admin,
body=body1,
workspaceId=wrkspc.id,
).project.id

body2 = bindings.v1PostProjectRequest(
name=f"p_{uuid.uuid4().hex[:8]}", workspaceId=wrkspc.id
)
pid2 = bindings.post_PostProject(
admin,
body=body2,
workspaceId=wrkspc.id,
).project.id

projects.append(pid1)
projects.append(pid2)

experiments.append(
exp.create_experiment(
admin,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
["--project_id", str(pid1), ("--paused")],
)
)
experiments.append(
exp.create_experiment(
admin,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
["--project_id", str(pid1), ("--paused")],
workspace_projects = []
for _ in range(count):
proj_body = bindings.v1PostProjectRequest(
name=f"p_{uuid.uuid4().hex[:8]}", workspaceId=wrkspc.id
)
)
experiments.append(
exp.create_experiment(
pid = bindings.post_PostProject(
admin,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
["--project_id", str(pid2), ("--paused")],
)
)
experiments.append(
exp.create_experiment(
admin,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
["--project_id", str(pid2), ("--paused")],
)
body=proj_body,
workspaceId=wrkspc.id,
).project.id
workspace_projects.append(pid)

for p in workspace_projects:
for _ in range(count):
expID = exp.create_experiment(
admin,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
["--project_id", str(p), "--paused"],
)
experimentMap[p] = experimentMap.get(p, []) + [expID]
experiments.append(expID)

projects.extend(workspace_projects)

for proj in experimentMap:
bindings.post_KillExperiments(
admin,
body=bindings.v1KillExperimentsRequest(
projectId=proj, experimentIds=experimentMap[proj]
),
projectId=proj,
)

bindings.post_KillExperiments(
admin, body=bindings.v1KillExperimentsRequest(experimentIds=experiments)
)

for x in experiments:
exp.wait_for_experiment_state(admin, x, bindings.experimentv1State.CANCELED)

Expand All @@ -99,7 +76,7 @@ def test_archived_proj_exp_list() -> None:
archived_exp.append(experiments[2])
archived_exp.append(experiments[4])

# test2: GetExperiments shouldn't return experiements from archived projects when
# test2: GetExperiments shouldn't return experiments from archived projects when
# archived flag is False
r2 = bindings.get_GetExperiments(admin, archived=False)
for e in r2.experiments:
Expand All @@ -109,7 +86,7 @@ def test_archived_proj_exp_list() -> None:

archived_exp.append(experiments[7])

# test3: GetExperiments shouldn't return experiements from archived workspaces when
# test3: GetExperiments shouldn't return experiments from archived workspaces when
# archived flag is False
r3 = bindings.get_GetExperiments(admin, archived=False)
for e in r3.experiments:
Expand Down
12 changes: 6 additions & 6 deletions e2e_tests/tests/experiment/test_noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ def test_noop_pause_with_multiexperiment() -> None:
with open(tf.name, "w") as f:
util.yaml_safe_dump(config_obj, f)
experiment_id = exp.create_experiment(sess, tf.name, conf.fixtures_path("no_op"), None)
exp.pause_experiments(sess, [experiment_id])
exp.pause_experiments(sess, [experiment_id], -1)
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.PAUSED)

exp.activate_experiments(sess, [experiment_id])
exp.activate_experiments(sess, [experiment_id], -1)
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.QUEUED)
exp.kill_experiments(sess, [experiment_id])
exp.kill_experiments(sess, [experiment_id], -1)


@pytest.mark.e2e_cpu
Expand All @@ -156,13 +156,13 @@ def test_noop_pause_with_multiexperiment_filter() -> None:
with open(tf.name, "w") as f:
util.yaml_safe_dump(config_obj, f)
experiment_id = exp.create_experiment(sess, tf.name, conf.fixtures_path("no_op"), None)
exp.pause_experiments(sess, [], name=tf.name)
exp.pause_experiments(sess, [], -1, name=tf.name)
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.PAUSED)
# test state=nonTerminalExperimentStates() filter in cancel/kill
exp.kill_experiments(sess, [], name=tf.name)
exp.kill_experiments(sess, [], -1, name=tf.name)
exp.wait_for_experiment_state(sess, experiment_id, bindings.experimentv1State.CANCELED)
# test state=terminalExperimentStates() filter in archive
exp.archive_experiments(sess, [], name=tf.name)
exp.archive_experiments(sess, [], -1, name=tf.name)


@pytest.mark.e2e_cpu
Expand Down
6 changes: 3 additions & 3 deletions e2e_tests/tests/experiment/test_pending_hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ def test_hpc_job_pending_reason() -> None:
# for the pending job to have a chance to refresh the state and write out the
# state reason in experiment logs
time.sleep(60)
exp.kill_experiments(sess, [running_exp_id])
exp.kill_experiments(sess, [running_exp_id], -1)

# Make sure the second experiment will start running after the first experinemt
# Make sure the second experiment will start running after the first experiment
# releases the CPUs
exp.wait_for_experiment_state(sess, pending_exp_id, bindings.experimentv1State.RUNNING)
print(f"Experiment {pending_exp_id} running")

# Now kill the second experiment to shorten the test run
exp.kill_experiments(sess, [pending_exp_id])
exp.kill_experiments(sess, [pending_exp_id], -1)

trials = exp.experiment_trials(sess, pending_exp_id)
print(f"Check logs for exp {pending_exp_id}")
Expand Down
Loading

0 comments on commit f6e42cd

Please sign in to comment.