From 74d1650a6ea15d7bedf4fe7213bc5299d4818eca Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 4 May 2022 19:37:40 +0530 Subject: [PATCH] teuthology/queue: Single command for queue operations Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk. Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 35 --------- scripts/dispatcher.py | 9 +-- scripts/kill.py | 2 +- scripts/paddles_queue.py | 45 ------------ scripts/queue.py | 17 +++-- scripts/schedule.py | 2 +- scripts/worker.py | 6 +- teuthology/dispatcher/__init__.py | 31 +++----- teuthology/dispatcher/supervisor.py | 7 +- teuthology/orchestra/run.py | 1 + teuthology/queue/__init__.py | 106 ---------------------------- teuthology/queue/beanstalk.py | 16 ++--- teuthology/queue/paddles.py | 49 ++++++------- teuthology/queue/util.py | 101 ++++++++++++++++++++++++++ teuthology/report.py | 28 ++++---- teuthology/schedule.py | 6 +- teuthology/test/test_dispatcher.py | 73 ------------------- 17 files changed, 182 insertions(+), 352 deletions(-) delete mode 100644 scripts/beanstalk_queue.py delete mode 100644 scripts/paddles_queue.py create mode 100644 teuthology/queue/util.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py deleted file mode 100644 index a8a0661ecf..0000000000 --- a/scripts/beanstalk_queue.py +++ /dev/null @@ -1,35 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.beanstalk - -doc = """ -usage: teuthology-beanstalk-queue -h - teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE - teuthology-beanstalk-queue [-r] -m MACHINE_TYPE - teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN - teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. -Arguments: - -m, --machine_type MACHINE_TYPE [default: multi] - Which machine type queue to work on. -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 - will unpause. If -m is passed, pause that queue, - otherwise pause all queues. -""" - - -def main(): - - args = docopt.docopt(doc) - print(args) - teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 5e64b382d8..ac74b29fa2 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,9 +1,9 @@ """ usage: teuthology-dispatcher --help teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND + teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] [--queue-backend BACKEND] --log-dir LOG_DIR --tube TUBE -Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk +Start a dispatcher for the specified tube. Grab jobs from a paddles/beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -17,12 +17,13 @@ -v, --verbose be more verbose -l, --log-dir LOG_DIR path in which to store logs -a DIR, --archive-dir DIR path to archive results in - --machine-type MACHINE_TYPE the machine type for the job + -t, --tube TUBE which queue to read jobs from --supervisor run dispatcher in job supervisor mode --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file --exit-on-empty-queue if the queue is empty, exit - --queue-backend BACKEND choose between paddles and beanstalk + --queue-backend BACKEND which backend will be used for the queue + [default: beanstalk] """ import docopt diff --git a/scripts/kill.py b/scripts/kill.py index e2a1a4ef09..a93bcd8629 100644 --- a/scripts/kill.py +++ b/scripts/kill.py @@ -12,7 +12,7 @@ teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN Kill running teuthology jobs: -1. Removes any queued jobs from the paddles queue +1. Removes any queued jobs from the queue 2. Kills any running jobs 3. Nukes any machines involved diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py deleted file mode 100644 index 8487fd938e..0000000000 --- a/scripts/paddles_queue.py +++ /dev/null @@ -1,45 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.paddles_queue -doc = """ -usage: teuthology-paddles-queue -h - teuthology-paddles-queue -s -m MACHINE_TYPE - teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -u -m MACHINE_TYPE -U USER - -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. - -Arguments: - -m, --machine_type MACHINE_TYPE - Which machine type queue to work on. - -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -t, --time SECONDS Pause queues for a number of seconds. - If -m is passed, pause that queue, - otherwise pause all queues. - -p, --pause Pause queue - -u, --unpause Unpause queue - -P, --priority PRIORITY - Change priority of queued jobs - -U, --user USER User who owns the jobs - -R, --run-name RUN_NAME - Used to change priority of all jobs in the run. -""" - - -def main(): - args = docopt.docopt(doc) - teuthology.paddles_queue.main(args) diff --git a/scripts/queue.py b/scripts/queue.py index 2c466a7be9..0c972634ed 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,15 +1,16 @@ import docopt -import teuthology.config import teuthology.queue.beanstalk import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h - teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -29,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.queue.main(args) + if config.backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/scripts/schedule.py b/scripts/schedule.py index ee443125ee..59a2cee298 100644 --- a/scripts/schedule.py +++ b/scripts/schedule.py @@ -20,7 +20,7 @@ Queue backend name, use prefix '@' to append job config to the given file path as yaml. - [default: paddles] + [default: beanstalk] -n , --name Name of suite run the job is part of -d , --description Job description -o , --owner Job owner diff --git a/scripts/worker.py b/scripts/worker.py index 8d3228d8d0..a3e12c20d7 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -9,7 +9,7 @@ def main(): def parse_args(): parser = argparse.ArgumentParser(description=""" -Grab jobs from a paddles queue and run the teuthology tests they +Grab jobs from a beanstalk queue and run the teuthology tests they describe. One job is run at a time. """) parser.add_argument( @@ -29,8 +29,8 @@ def parse_args(): required=True, ) parser.add_argument( - '-m', '--machine-type', - help='which machine type the jobs will run on', + '-t', '--tube', + help='which beanstalk tube to read jobs from', required=True, ) diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index b06ef60618..f4c8b9b07f 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -70,23 +70,24 @@ def main(args): return supervisor.main(args) verbose = args["--verbose"] - machine_type = args["--machine-type"] + tube = args["--tube"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] exit_on_empty_queue = args["--exit-on-empty-queue"] backend = args['--queue-backend'] + if backend is None: + backend = 'beanstalk' + if archive_dir is None: archive_dir = teuth_config.archive_base - if machine_type is None and teuth_config.machine_type is None: - return # setup logging for disoatcher in {log_dir} loglevel = logging.INFO if verbose: loglevel = logging.DEBUG log.setLevel(loglevel) - log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}") + log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}") setup_log_file(log_file_path) install_except_hook() @@ -94,7 +95,7 @@ def main(args): if backend == 'beanstalk': connection = beanstalk.connect() - beanstalk.watch_tube(connection, machine_type) + beanstalk.watch_tube(connection, tube) result_proc = None @@ -129,8 +130,11 @@ def main(args): log.info('Reserved job %s', job_id) log.info('Config is: %s', job.body) else: - job = report.get_queued_job(machine_type) + job = report.get_queued_job(tube) if job is None: + if exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break continue job = clean_config(job) report.try_push_job_info(job, dict(status='running')) @@ -220,18 +224,3 @@ def create_job_archive(job_name, job_archive_path, archive_dir): if not os.path.exists(run_archive): safepath.makedirs('/', run_archive) safepath.makedirs('/', job_archive_path) - - -def pause_queue(machine_type, paused, paused_by, pause_duration=None): - if paused: - report.pause_queue(machine_type, paused, paused_by, pause_duration) - ''' - If there is a pause duration specified - un-pause the queue after the time elapses - ''' - if pause_duration is not None: - sleep(int(pause_duration)) - paused = False - report.pause_queue(machine_type, paused, paused_by) - elif not paused: - report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index edb273a199..19883267dc 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -70,9 +70,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): if teuth_config.results_server: try: report.try_delete_jobs(job_config['name'], job_config['job_id']) - except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) + except Exception: + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -130,7 +129,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if 'description' in job_config: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index 6235b0d36e..f31dfd0d7f 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -182,6 +182,7 @@ def _raise_for_status(self): command=self.command, exitstatus=self.returncode, node=self.hostname, label=self.label ) + def _get_exitstatus(self): """ :returns: the remote command's exit status (return code). Note that diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py index 2a0b6ff363..e69de29bb2 100644 --- a/teuthology/queue/__init__.py +++ b/teuthology/queue/__init__.py @@ -1,106 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - if config.backend == 'paddles': - paddles.main(args) - else: - beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py index 90b1cbd6d3..c668e4f6bc 100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -1,12 +1,10 @@ import beanstalkc import yaml import logging -import pprint -import sys -from collections import OrderedDict from teuthology.config import config -from teuthology import report +from teuthology.queue import util + log = logging.getLogger(__name__) @@ -47,7 +45,7 @@ def callback(jobs_dict) # Try to figure out a sane timeout based on how many jobs are in the queue timeout = job_count / 2000.0 * 60 for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = connection.reserve(timeout=timeout) if job is None or job.body is None: continue @@ -57,7 +55,7 @@ def callback(jobs_dict) if pattern is not None and pattern not in job_name: continue processor.add_job(job_id, job_config, job) - end_progress() + util.end_progress() processor.complete() @@ -105,13 +103,13 @@ def main(args): pause_tube(connection, machine_type, pause_duration) elif delete: walk_jobs(connection, machine_type, - JobDeleter(delete)) + util.JobDeleter(delete)) elif runs: walk_jobs(connection, machine_type, - RunPrinter()) + util.RunPrinter()) else: walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + util.JobPrinter(show_desc=show_desc, full=full)) except KeyboardInterrupt: log.info("Interrupted.") finally: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py index f2ea8b84c8..489d638e2f 100644 --- a/teuthology/queue/paddles.py +++ b/teuthology/queue/paddles.py @@ -1,11 +1,7 @@ import logging -import pprint -import sys -from collections import OrderedDict from teuthology import report -from teuthology.dispatcher import pause_queue - +from teuthology.queue import util log = logging.getLogger(__name__) @@ -14,19 +10,17 @@ def stats_queue(machine_type): stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) else: log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) -def update_priority(machine_type, priority, user, run_name=None): +def update_priority(machine_type, priority, run_name=None): if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) + jobs = report.get_jobs_by_run(machine_type, run_name) for job in jobs: job['priority'] = priority report.try_push_job_info(job) @@ -34,55 +28,54 @@ def update_priority(machine_type, priority, user, run_name=None): def walk_jobs(machine_type, processor, user): log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] + job_count = report.get_queue_stats(machine_type)['queued_jobs'] jobs = report.get_user_jobs_queue(machine_type, user) if job_count == 0: - log.info('No jobs in queue') + log.info('No jobs in Paddles queue') return for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = jobs[i-1] if job is None: continue job_id = job['job_id'] processor.add_job(job_id, job) - end_progress() + util.end_progress() processor.complete() def main(args): machine_type = args['--machine_type'] - #user = args['--user'] - #run_name = args['--run_name'] - #priority = args['--priority'] + user = args['--user'] + run_name = args['--run-name'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] pause_duration = args['--pause'] - #unpause = args['--unpause'] - #pause_duration = args['--time'] + priority = args['--priority'] try: if status: stats_queue(machine_type) if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - #else: - #pause_queue(machine_type, pause, user) + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) elif priority: update_priority(machine_type, priority, run_name) elif delete: walk_jobs(machine_type, - JobDeleter(delete), user) + util.JobDeleter(delete), user) elif runs: walk_jobs(machine_type, - RunPrinter(), user) + util.RunPrinter(), user) else: walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), + util.JobPrinter(show_desc=show_desc, full=full), user) except KeyboardInterrupt: log.info("Interrupted.") diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..2a7642e726 --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,101 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report + +log = logging.getLogger(__name__) + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index 6796415014..6e2a1993a3 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -282,7 +282,6 @@ def write_new_job(self, run_name, job_info): sleep=1, increment=inc, action=f'write job for {run_name}') as proceed: while proceed(): response = self.session.post(run_uri, data=job_json, headers=headers) - if response.status_code == 200: resp_json = response.json() job_id = resp_json['job_id'] @@ -387,7 +386,7 @@ def last_run(self): def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, queue=queue) inc = random.uniform(0, 1) with safe_while( @@ -523,7 +522,6 @@ def create_queue(self, queue): sleep=1, increment=inc, action=f'creating queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: self.log.info("Successfully created queue {queue}".format( queue=queue, @@ -546,13 +544,14 @@ def create_queue(self, queue): response.raise_for_status() - def update_queue(self, queue, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) + if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} @@ -595,9 +594,6 @@ def queue_stats(self, queue): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {queue}".format( - queue=queue, - )) return response.json() else: msg = response.text @@ -669,12 +665,18 @@ def get_user_jobs_queue(queue, user, run_name=None): return return reporter.queued_jobs(queue, user, run_name) +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) + -def pause_queue(queue, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(queue, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused_by, pause_duration) def is_queue_paused(queue): @@ -711,7 +713,7 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): reporter.report_job(run_name, job_id, job_info) -def get_queued_job(machine_type): +def get_queued_job(queue): """ Retrieve a job that is queued depending on priority @@ -720,10 +722,6 @@ def get_queued_job(machine_type): reporter = ResultsReporter() if not reporter.base_uri: return - if ',' in machine_type: - queue = 'multi' - else: - queue = machine_type if is_queue_paused(queue) == True: log.info("Teuthology queue %s is currently paused", queue) diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 81dd4d548f..fea64e9b4d 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,7 +1,7 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -115,9 +115,9 @@ def beanstalk_schedule_job(job_config, backend, num=1): """ num = int(num) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) - queue = report.create_machine_type_queue(job_config['machine_type']) + queue = report.create_machine_type_queue(tube) job_config['queue'] = queue while num > 0: job_id = report.try_create_job(job_config, dict(status='queued')) diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 9a6d0ff564..ce4e55d851 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -1,8 +1,6 @@ -from teuthology import dispatcher from unittest.mock import patch, Mock from teuthology import report -import unittest.mock as mock import unittest @@ -35,74 +33,3 @@ def test_mock_get_queue_job(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), job_config) - - - @patch("teuthology.worker.fetch_teuthology") - @patch("teuthology.dispatcher.fetch_qa_suite") - @patch("teuthology.worker.fetch_qa_suite") - @patch("teuthology.dispatcher.report.get_queued_job") - @patch("teuthology.dispatcher.report.try_push_job_info") - @patch("teuthology.dispatcher.setup_log_file") - @patch("os.path.isdir") - @patch("os.getpid") - @patch("teuthology.dispatcher.teuth_config") - @patch("subprocess.Popen") - @patch("os.path.join") - @patch("teuthology.dispatcher.create_job_archive") - @patch("yaml.safe_dump") - def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, - m_worker_fetch_qa_suite, m_get_queued_job, - m_try_push_job_info, - m_setup_log, - m_isdir, m_getpid, - m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump): - - args = { - '--owner': 'the_owner', - '--archive-dir': '/archive/dir', - '--log-dir': '/worker/log', - '--name': 'the_name', - '--description': 'the_description', - '--machine-type': 'test_queue', - '--supervisor': False, - '--verbose': False, - '--queue-backend': 'paddles' - } - - m = mock.MagicMock() - job_id = {'job_id': '1'} - m.__getitem__.side_effect = job_id.__getitem__ - m.__iter__.side_effect = job_id.__iter__ - job = { - 'job_id': '1', - 'description': 'DESC', - 'email': 'EMAIL', - 'first_in_suite': False, - 'last_in_suite': True, - 'machine_type': 'test_queue', - 'name': 'NAME', - 'owner': 'OWNER', - 'priority': 99, - 'results_timeout': '6', - 'verbose': False, - 'stop_worker': True, - 'archive_path': '/archive/dir/NAME/1' - } - - m_fetch_teuthology.return_value = '/teuth/path' - m_fetch_qa_suite.return_value = '/suite/path' - m_isdir.return_value = True - mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') - mock_get = mock_get_patcher.start() - mock_get.return_value = job - - mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job') - mock_prep_job = mock_prep_job_patcher.start() - mock_prep_job.return_value = (job, '/teuth/bin/path') - m_yaml_dump.return_value = '' - - m_try_push_job_info.called_once_with(job, dict(status='running')) - dispatcher.main(args) - mock_get_patcher.stop() - -