Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke host-select via bash login shell #2555

Merged
merged 5 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metomi/rose/app_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ def _run_poll(
# Return any remaining test-failing files.
return poll_test, poll_any_files, poll_all_files

def _poll_file(self, file_, poll_file_test):
def _poll_file(self, file_: str, poll_file_test: str) -> bool:
"""Poll for existence of a file."""
is_done = False
if poll_file_test:
test = poll_file_test.replace(
"{}", self.popen.list_to_shell_str([file_])
r'{}', self.popen.list_to_shell_str([file_])
)
is_done = (
self.popen.run(
Expand Down
4 changes: 2 additions & 2 deletions metomi/rose/etc/rose-meta/rose-app-conf/rose-meta.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ description=Specify prerequisites to poll for before running the actual applicat
= with delays between them.

[poll=all-files]
help=A space delimited list of file paths.
help=A space delimited list of file paths. Accepts globs.
=
= This test passes only if all file paths in the list exist.
=
Expand All @@ -67,7 +67,7 @@ help=A space delimited list of file paths.
= for all file paths.

[poll=any-files]
help=A space delimited list of file paths.
help=A space delimited list of file paths. Accepts globs.
=
= This test passes if any file path in the list exists.
=
Expand Down
67 changes: 45 additions & 22 deletions metomi/rose/host_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
gethostname,
)
import sys
import textwrap
from time import sleep, time
import traceback
from typing import List, Optional

from metomi.rose.opt_parse import RoseOptionParser
from metomi.rose.popen import RosePopener
from metomi.rose.reporter import Event, Reporter
from metomi.rose.resource import ResourceLocator
from typing import Optional


class NoHostError(Exception):
Expand All @@ -63,17 +64,22 @@ class HostSelectCommandFailedEvent(Event):

KIND = Event.KIND_ERR

def __init__(self, return_code: int, host: str):
self.return_code = return_code
def __init__(
self, host: str, return_code: int, stderr: Optional[str] = None
):
self.host = host
self.return_code = return_code
self.stderr = stderr
Event.__init__(self)

def __str__(self):
def __str__(self) -> str:
if self.return_code == 255:
msg = 'ssh failed'
else:
msg = f'failed {self.return_code}'
return f'{self.host}: ({msg})'
msg = f"failed ({self.return_code})"
if self.stderr is not None:
msg += f"\n{textwrap.indent(self.stderr, ' ')}"
return f"{self.host}: {msg}"


class HostThresholdNotMetEvent(Event):
Expand Down Expand Up @@ -276,6 +282,18 @@ def expand(self, names=None, rank_method=None, thresholds=None):

return host_names, rank_method, thresholds

@staticmethod
def _bash_login_cmd(cmd: List[str]) -> List[str]:
"""Return the given command as a bash login shell command.

This allows users to set env vars.

Example:
>>> HostSelector._bash_login_cmd(["echo", "-n", "Multiple words"])
['bash', '-l', '-c', "echo -n 'Multiple words'"]
"""
return ['bash', '-l', '-c', RosePopener.shlex_join(cmd)]

def select(
self,
names=None,
Expand Down Expand Up @@ -379,7 +397,7 @@ def select(
elif proc.wait():
self.handle_event(
HostSelectCommandFailedEvent(
proc.returncode, host_name
host_name, proc.returncode
)
)
else:
Expand All @@ -391,24 +409,25 @@ def select(
host_proc_dict = {}
for host_name in sorted(host_names):
# build host-select-client command
command = []
if not self.is_local_host(host_name):
command_args = []
command_args.append(host_name)
command = self.popen.get_cmd("ssh", *command_args)
command: List[str] = []

# pass through CYLC_VERSION to support use of cylc wrapperf script
# pass through CYLC_VERSION to support use of cylc wrapper script
try:
import cylc.flow
except ModuleNotFoundError:
pass
else:
command.extend([
'env',
f'CYLC_VERSION={cylc.flow.__version__}'
f'CYLC_VERSION={cylc.flow.__version__}',
])
cylc_env_name = os.getenv('CYLC_ENV_NAME')
if cylc_env_name:
command.append(f'CYLC_ENV_NAME={cylc_env_name}')

command.extend(["rose", "host-select-client"])
command.extend(
self._bash_login_cmd(['rose', 'host-select-client'])
)

# build list of metrics to obtain for each host
metrics = rank_conf.get_command()
Expand All @@ -420,6 +439,11 @@ def select(
# convert metrics list to JSON stdin
stdin = '\n***start**\n' + json.dumps(metrics) + '\n**end**\n'

if not self.is_local_host(host_name):
command = [
*self.popen.get_cmd('ssh', host_name),
RosePopener.shlex_join(command)
]
# fire off host-select-client processes
proc = self.popen.run_bg(
*command, stdin=stdin, preexec_fn=os.setpgrp
Expand All @@ -434,19 +458,18 @@ def select(
while host_proc_dict:
sleep(self.SSH_CMD_POLL_DELAY)
for host_name, (proc, metrics) in list(host_proc_dict.items()):
if proc.poll() is None:
score = None
elif proc.wait():
stdout, stderr = proc.communicate()
if proc.poll() is None: # still running
continue
stdout, stderr = proc.communicate()
if proc.returncode:
self.handle_event(
HostSelectCommandFailedEvent(
proc.returncode, host_name
host_name, proc.returncode, stderr
)
)
host_proc_dict.pop(host_name)
else:
out = proc.communicate()[0]
out = _deserialise(metrics, json.loads(out.strip()))
out = _deserialise(metrics, json.loads(stdout.strip()))

host_proc_dict.pop(host_name)
for threshold_conf in threshold_confs:
Expand Down
59 changes: 48 additions & 11 deletions metomi/rose/popen.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# -----------------------------------------------------------------------------
"""Wraps Python's subprocess.Popen."""

import asyncio
import os
import re
import select
import shlex
from subprocess import PIPE, Popen
import sys

import asyncio
from typing import Iterable, List

from metomi.rose.reporter import Event
from metomi.rose.resource import ResourceLocator
Expand Down Expand Up @@ -67,7 +67,7 @@ def __str__(self):
if isinstance(command, str):
ret = command
else:
ret = RosePopener.list_to_shell_str(self.command)
ret = RosePopener.shlex_join(self.command)

try:
# real file or real stream
Expand Down Expand Up @@ -128,11 +128,47 @@ class RosePopener:
}
ENVS_OF_CMDS = {"editor": ["VISUAL", "EDITOR"]}

@classmethod
def list_to_shell_str(cls, args):
if not args:
return ""
return " ".join([re.sub(r"([\"'\s])", r"\\\1", arg) for arg in args])
@staticmethod
def shlex_join(args: Iterable[str]) -> str:
"""Convert a list of strings into a shell command, safely quoting
when the strings contain whitespace and special chars.

Basically a back-port of shlex.join(), needed for py 3.7.

Examples:
>>> RosePopener.shlex_join([])
''
>>> RosePopener.shlex_join(["echo", "-n", "Multiple words"])
"echo -n 'Multiple words'"
>>> RosePopener.shlex_join(["ls", "my_dir;foiled_injection"])
"ls 'my_dir;foiled_injection'"
>>> RosePopener.shlex_join(["what", "about", "globs*"])
"what about 'globs*'"
"""
return " ".join(shlex.quote(arg) for arg in args)

@staticmethod
def list_to_shell_str(args: Iterable[str]) -> str:
"""Convert a list of strings into a shell command, escaping whitespace
and quotes, but otherwise allowing special chars so user defined
commands can run without sanitisation.

Examples:
>>> RosePopener.list_to_shell_str([])
''
>>> _ = RosePopener.list_to_shell_str(["it's"])
>>> print(_)
it\\'s
>>> _
"it\\\\'s"
>>> RosePopener.list_to_shell_str(["echo", "-n", "Multiple words"])
'echo -n Multiple\\\\ words'
>>> RosePopener.list_to_shell_str(["ls", "my_dir;allow_this"])
'ls my_dir;allow_this'
>>> RosePopener.list_to_shell_str(["what", "about", "globs*"])
'what about globs*'
"""
return " ".join(re.sub(r"([\"'\s])", r"\\\1", arg) for arg in args)

def __init__(self, event_handler=None):
self.event_handler = event_handler
Expand All @@ -143,7 +179,7 @@ def handle_event(self, *args, **kwargs):
if callable(self.event_handler):
return self.event_handler(*args, **kwargs)

def get_cmd(self, key, *args):
def get_cmd(self, key: str, *args: str) -> List[str]:
"""Return default options and arguments of a known command as a list.

If a setting [external] <key> is defined in the site/user
Expand All @@ -167,8 +203,9 @@ def get_cmd(self, key, *args):
self.cmds[key] = shlex.split(node.value)
if key not in self.cmds:
for name in self.ENVS_OF_CMDS.get(key, []):
if os.getenv(name): # not None, not null str
self.cmds[key] = shlex.split(os.getenv(name))
env_var = os.getenv(name)
if env_var: # not None, not null str
self.cmds[key] = shlex.split(env_var)
break
if key not in self.cmds:
self.cmds[key] = self.CMDS[key]
Expand Down
4 changes: 2 additions & 2 deletions metomi/rose/run_source_vc.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ def write_source_vc_info(run_source_dir, output=None, popen=None):
os.chdir(run_source_dir)
try:
for args in args_list:
cmd = [vcs] + args
cmd = [vcs, *args]
ret_code, out, _ = popen.run(*cmd, env=environ)
if out:
write_safely(("#" * 80 + "\n"), handle)
write_safely(
("# %s\n" % popen.list_to_shell_str(cmd)), handle
("# %s\n" % popen.shlex_join(cmd)), handle
)
write_safely(("#" * 80 + "\n"), handle)
write_safely(out, handle)
Expand Down
4 changes: 2 additions & 2 deletions metomi/rose/suite_engine_procs/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Logic specific to the Cylc suite engine."""
"""Logic specific to the Cylc workflow engine."""

from glob import glob
import os
Expand All @@ -39,7 +39,7 @@

class CylcProcessor(SuiteEngineProcessor):

"""Logic specific to the cylc suite engine."""
"""Logic specific to the Cylc workflow engine."""

REC_CYCLE_TIME = re.compile(
r"\A[\+\-]?\d+(?:W\d+)?(?:T\d+(?:Z|[+-]\d+)?)?\Z"
Expand Down
6 changes: 5 additions & 1 deletion metomi/rose/tests/suite_engine_procs/test_cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
"""Tests for functions in the cylc suite engine proc.
"""

import cylc.rose.platform_utils
import pytest
from pytest import param

try:
import cylc.rose.platform_utils
except ImportError:
pytestmark = pytest.mark.skip(reason="cylc-rose not found")

from metomi.rose.suite_engine_procs.cylc import CylcProcessor


Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ tests =
pytest
types-aiofiles
types-setuptools
# Note: some tests also depend on cylc-rose which has to be the
# development version installed manually (because the latest production
# version is pinned to the previous rose release)
all =
%(docs)s
%(graph)s
Expand Down
Loading