Skip to content

Commit

Permalink
Add beanstalk as a possible queue backend for Teuthology Jobs along w…
Browse files Browse the repository at this point in the history
…ith Paddles

With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue.
In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID.
This is the ID teuthology will treat as the Job ID throughout the run of the job.

To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
  • Loading branch information
amathuria committed Oct 6, 2021
1 parent 88af7b5 commit 61a399c
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 102 deletions.
35 changes: 35 additions & 0 deletions scripts/beanstalk_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import docopt

import teuthology.config
import teuthology.beanstalk

doc = """
usage: teuthology-beanstalk-queue -h
teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
queue.
Arguments:
-m, --machine_type MACHINE_TYPE [default: multi]
Which machine type queue to work on.
optional arguments:
-h, --help Show this help message and exit
-D, --delete PATTERN Delete Jobs with PATTERN in their name
-d, --description Show job descriptions
-r, --runs Only show run names
-f, --full Print the entire job config. Use with caution.
-s, --status Prints the status of the queue
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
"""


def main():

args = docopt.docopt(doc)
print(args)
teuthology.beanstalk.main(args)
5 changes: 3 additions & 2 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
Start a dispatcher for the specified machine type. Grab jobs from a paddles
Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
Expand All @@ -21,6 +21,7 @@
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--queue-backend BACKEND choose between paddles and beanstalk
"""

import docopt
Expand Down
18 changes: 9 additions & 9 deletions scripts/queue.py → scripts/paddles_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import teuthology.paddles_queue

doc = """
usage: teuthology-queue -h
teuthology-queue -s -m MACHINE_TYPE
teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
teuthology-queue [-r] -m MACHINE_TYPE -U USER
teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
teuthology-queue -u -m MACHINE_TYPE -U USER
usage: teuthology-paddles-queue -h
teuthology-paddles-queue -s -m MACHINE_TYPE
teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
Expand All @@ -36,7 +36,7 @@
-P, --priority PRIORITY
Change priority of queued jobs
-U, --user USER User who owns the jobs
-R, --run_name RUN_NAME
-R, --run-name RUN_NAME
Used to change priority of all jobs in the run.
"""

Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
'orchestra': [
# For apache-libcloud when using python < 2.7.9
'backports.ssl_match_hostname',
'beanstalkc3 >= 0.4.0',
'httplib2',
'ndg-httpsclient', # for requests, urllib3
'pyasn1', # for requests, urllib3
Expand Down Expand Up @@ -125,7 +126,8 @@
'teuthology-results = scripts.results:main',
'teuthology-report = scripts.report:main',
'teuthology-kill = scripts.kill:main',
'teuthology-queue = scripts.queue:main',
'teuthology-paddles-queue = scripts.paddles_queue:main',
'teuthology-beanstalk-queue = scripts.beanstalk_queue:main',
'teuthology-prune-logs = scripts.prune_logs:main',
'teuthology-describe = scripts.describe:main',
'teuthology-reimage = scripts.reimage:main',
Expand Down
214 changes: 214 additions & 0 deletions teuthology/beanstalk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import beanstalkc
import yaml
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology.config import config
from teuthology import report

log = logging.getLogger(__name__)


def connect():
host = config.queue_host
port = config.queue_port
if host is None or port is None:
raise RuntimeError(
'Beanstalk queue information not found in {conf_path}'.format(
conf_path=config.teuthology_yaml))
return beanstalkc.Connection(host=host, port=port)


def watch_tube(connection, tube_name):
"""
Watch a given tube, potentially correcting to 'multi' if necessary. Returns
the tube_name that was actually used.
"""
if ',' in tube_name:
log.debug("Correcting tube name to 'multi'")
tube_name = 'multi'
connection.watch(tube_name)
connection.ignore('default')
return tube_name


def walk_jobs(connection, tube_name, processor, pattern=None):
"""
def callback(jobs_dict)
"""
log.info("Checking Beanstalk Queue...")
job_count = connection.stats_tube(tube_name)['current-jobs-ready']
if job_count == 0:
log.info('No jobs in Beanstalk Queue')
return

# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
if job is None or job.body is None:
continue
job_config = yaml.safe_load(job.body)
job_name = job_config['name']
job_id = job.stats()['id']
if pattern is not None and pattern not in job_name:
continue
processor.add_job(job_id, job_config, job)
end_progress()
processor.complete()


def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()


def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()


class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
job_obj = self.jobs[job_id].get('job_obj')
if job_obj:
job_obj.delete()
report.try_delete_jobs(job_name, job_id)


def pause_tube(connection, tube, duration):
duration = int(duration)
if not tube:
tubes = sorted(connection.tubes())
else:
tubes = [tube]

prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
templ = prefix + ": {tubes}"
log.info(templ.format(dur=duration, tubes=tubes))
for tube in tubes:
connection.pause_tube(tube, duration)


def stats_tube(connection, tube):
stats = connection.stats_tube(tube)
result = dict(
name=tube,
count=stats['current-jobs-ready'],
paused=(stats['pause'] != 0),
)
return result


def main(args):
machine_type = args['--machine_type']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
pause_duration = args['--pause']
try:
connection = connect()
if machine_type and not pause_duration:
# watch_tube needs to be run before we inspect individual jobs;
# it is not needed for pausing tubes
watch_tube(connection, machine_type)
if status:
print(stats_tube(connection, machine_type))
elif pause_duration:
pause_tube(connection, machine_type, pause_duration)
elif delete:
walk_jobs(connection, machine_type,
JobDeleter(delete))
elif runs:
walk_jobs(connection, machine_type,
RunPrinter())
else:
walk_jobs(connection, machine_type,
JobPrinter(show_desc=show_desc, full=full))
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
connection.close()
1 change: 1 addition & 0 deletions teuthology/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class TeuthologyConfig(YamlConfig):
'archive_upload_key': None,
'archive_upload_url': None,
'automated_scheduling': False,
'backend': 'paddles',
'reserve_machines': 5,
'ceph_git_base_url': 'https://github.com/ceph/',
'ceph_git_url': None,
Expand Down
Loading

0 comments on commit 61a399c

Please sign in to comment.