Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental quorum queues #19

Merged
merged 20 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f4e650c
fix(docker): Use proper org-prefix for metricq-python
phijor Nov 17, 2020
f711828
build(setuptools): Move to setup.cfg
phijor Dec 23, 2020
3144bd6
chore(manager): Remove unused imports
phijor Jan 18, 2021
7aea346
feat: Declare queues using QueueManager
phijor Oct 22, 2020
4b25952
feat(queue_manager): Include queue type in queue name
phijor Jan 25, 2021
1e9a4ee
feat(queue_manager): Use a temporary channel for each operation
phijor Jan 21, 2021
aed4e3b
refactor: Move legacy DB-subscribe-on-register behaviour into its own…
phijor Jan 21, 2021
a9aa1c8
chore(queue_manager): Use consistent method names
phijor Jan 21, 2021
725e00f
docs(config_parser): Explain how to use the configuration parser
phijor Jan 21, 2021
1395e0a
feat(tools): Add script to pre-declare database queues
phijor Jan 25, 2021
a671310
fix(docs): Correct some typos
phijor Feb 9, 2021
ce8c037
refactor(config_parser): Optional parameter to queue_name validates
phijor Feb 9, 2021
4cc965e
refactor(manager): Explain parsing of DB bindings
phijor Feb 9, 2021
491a510
test(config_parser): QueueType.to_string() is exchaustive
phijor Feb 9, 2021
b2b2d35
test(config_parser): Assert arguments() is iterable for every QueueType
phijor Feb 9, 2021
bdf039f
fix(queue_manager): Make sure queues/exchanges don't use closed channel
phijor Feb 9, 2021
47ee530
refactor(queue_manager): Deduplicate DB queue binding code
phijor Feb 9, 2021
9feaeb4
build(deps): Require a recent version of metricq (1.4)
phijor Feb 9, 2021
f797b5f
test(queue_manager): Add basic transformer test
phijor Feb 16, 2021
e2e21d7
build(version): Bump version 0.1 -> 0.2.0
phijor Feb 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM metricq-python:latest AS builder
FROM metricq/metricq-python:latest AS builder
LABEL maintainer="mario.bielert@tu-dresden.de"

USER root
Expand All @@ -11,7 +11,7 @@ WORKDIR /home/metricq/manager
RUN . /home/metricq/venv/bin/activate && pip install .
RUN wget -O wait-for-it.sh https://github.com/vishnubob/wait-for-it/raw/master/wait-for-it.sh && chmod +x wait-for-it.sh

FROM metricq-python:latest
FROM metricq/metricq-python:latest

USER metricq
COPY --from=builder /home/metricq/venv /home/metricq/venv
Expand Down Expand Up @@ -40,4 +40,12 @@ ENV data_url=$data_url

VOLUME ["/home/metricq/manager/config"]

CMD /home/metricq/wait-for-it.sh $wait_for_couchdb_url -- /home/metricq/wait-for-it.sh $wait_for_rabbitmq_url -- /home/metricq/venv/bin/metricq-manager --config-path /home/metricq/manager/config --couchdb-url $couchdb_url --couchdb-user $couchdb_user --couchdb-password $couchdb_pw $rpc_url $data_url
CMD /home/metricq/wait-for-it.sh $wait_for_couchdb_url -- \
/home/metricq/wait-for-it.sh $wait_for_rabbitmq_url -- \
/home/metricq/venv/bin/metricq-manager \
--config-path /home/metricq/manager/config \
--couchdb-url $couchdb_url \
--couchdb-user $couchdb_user \
--couchdb-password $couchdb_pw \
$rpc_url \
$data_url
5 changes: 5 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ cd configurations
../manager.py
```

## Database queue maintainance and migration

`./tools/db-predeclare.py` contains a script to declare queues of a database
after their configuration changed without needing to restart the database.

## Deployment with Docker

### Prerequisites
Expand Down
354 changes: 354 additions & 0 deletions metricq_manager/config_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
# Copyright (C) 2020, ZIH, Technische Universitaet Dresden, Federal Republic of Germany
#
# All rights reserved.
#
# This file is part of metricq.
#
# SPDX-License-Identifier: GPL-3.0-or-later

"""In order to correctly manage and allocate resources on the MetricQ network,
the manager sometimes needs per-client information. This "client configuration"
is stored in a CouchDB backend, in the database :literal:`"config"`.

For each client, the configuration sits in the same document as the usual
client configuration (as returned by the `config` RPC), but under the special key
`x-metricq`:

.. code-block:: json

{
"foo": "bar",
"baz": 1,
"x-metricq": { "data-queue-type": "quorum" }
}

The :class:`ConfigParser` allows to parse such a client configuration and gives
structured access to the relevant queue-specific configuration keys.
In order to create a new :class:`ConfigParser`:
Since a :class:`ConfigParser` is to be used when declaring a specific queue for
client, a so-called "queue role" has to be supplied: it namespaces different
configuration keys based on the intended role of this queue.

Currently, queue roles with defined meaning are:

* :literal:`"data"`: data queues
* :literal:`"hreq"`: history request queues
* :literal:`"hrsp"`: history response queues

For example, the "queue type" (as given by the AMQP queue argument
:literal:`"x-queue-type"`) is retrieved from the key
:literal:`x-metricq.{role}-queue-type`.

This makes it possible to only set the data queue of a DB client as a "quorum"
queue, leaving the history request queue as a "classic" queue.
For this, the client configuration would look like the following:

.. code-block:: json

{
"x-metricq": { "data-queue-type": "quorum" }
}

Some configuration keys are interdependent.
Most notably, :literal:`x-metricq.{role}-message-ttl` is ignored if the queue
type :literal:`"quorum"` is requested.
See the document of :meth:`ConfigParser.arguments` below.

Currently, the following configuration keys can be accessed:

:literal:`x-metricq.{role}-queue-type`:
The intended AMQP queue type, see :meth:`ConfigParser.queue_type`.

:literal:`x-metricq.{role}-message-ttl` `(classic queues only)`:
The number of seconds after which unconsumed messages will be deleted
from this queue.

If you intended on adding more keys to be parsed, please consider the following:

* Add a method that parses value of this configuration key.
If the key is not set, either return a sensible default or :code:`None`
to mark its absence.
* Parse the value *completely*.
Either handle all edge-cases and raise :exc:`ValueError` and
:exc:`TypeError` as appropriate, or log and ignore invalid values and
treat the key as unset.

Values returned by your method should be usable as queue-argument
as-is, no futher preparation required.

A good example for this is :meth:`ConfigParser.message_ttl`.
It multiplies the value of :literal:`x-metricq.{role}-message-ttl` by
1000 before returning it, since in the client configuration it is
stored as a number of seconds, but the queue argument
:literal:`x-message-ttl` expects a number of milliseconds.
If the value is either *not a number* or it *is negative*,
it will be ignored and :code:`None` is returned.
* Make sure the queue argument is compatible with the intended queue type.
Some arguments cannot be set for all queue types.
Add it to the iterators returned by :meth:`ConfigParser.classic_arguments`
and :meth:`ConfigParser.quorum_arguments` depending on that.
"""

from contextlib import suppress
from enum import Enum, auto
from typing import Any, Dict, Iterator, List, Optional, Tuple, TypeVar
from uuid import uuid4

from metricq import get_logger

T = TypeVar("T")

logger = get_logger(__name__)

ClientConfig = Dict[str, Any]
"""A client configuration object as retrieved from the database.
"""


class QueueType(Enum):
"""The AMQP queue types known to the manager.

By default, queues are declared as "classic" queues.
If desired, "quorum" queues offer higher data safety.
"""

CLASSIC = auto()
QUORUM = auto()

@staticmethod
def default() -> "QueueType":
return QueueType.CLASSIC

@staticmethod
def from_str(queue_type: str) -> "QueueType":
if queue_type == "classic":
return QueueType.CLASSIC
elif queue_type == "quorum":
return QueueType.QUORUM
else:
raise ValueError(f"Invalid queue type {queue_type!r}")

def to_string(self) -> str:
if self is QueueType.CLASSIC:
return "classic"
elif self is QueueType.QUORUM:
return "quorum"
else:
assert False, f"Invalid QueueType {self!r}"
bmario marked this conversation as resolved.
Show resolved Hide resolved


class ConfigParser:
"""High-level structured access to per-client queue configuration.

Args:
config:
A client configuration object.
role:
The intended queue role.
client_token:
The client's token.
top_level_key:
The top-level key under which manager-specific configuration is found.
"""

def __init__(
self,
config: ClientConfig,
role: str,
client_token: str,
top_level_key: str = "x-metricq",
):
self.config = config
self.role = role
self.client_token = client_token
self.top_level_key = top_level_key

def replace(self, role: Optional[str] = None) -> "ConfigParser":
"""Return a copy of this ConfigParser, with the role replaced."""
role = self.role if role is None else role
return ConfigParser(
config=self.config,
role=role,
client_token=self.client_token,
top_level_key=self.top_level_key,
)

def get(
self,
key: str,
*,
deprecated: Optional[List[str]] = None,
default: Optional[T] = None,
) -> Optional[T]:
"""Retrieve a configuration value by key, optionally falling back to a default value.

Args:
key:
The configuration key whose value to retrieve.
This is namespaced under the top-level key, so
:code:`config.get("foo")` retrieves from :literal:`"{config.top_level_key}.foo"`.
deprecated:
A list of `legacy` keys to try `before` accessing the given key.

Note:
These keys are not namespaced under the top-level key.
:code:`config.get("foo", deprecated=["frob"])`
retrieves from :literal:`frob`, *not*
:literal:`{config.top_level_key}.frob`.
default:
A default value to be returned if the key is not set in the configuration.
"""
if deprecated:
for deprecated_key in deprecated:
with suppress(KeyError):
value = self.config[deprecated_key]
logger.warning(
'Client configuration for {!r} has legacy key {!r} set, use "{}.{}" instead!',
self.client_token,
deprecated_key,
self.top_level_key,
key,
)
return value

top_level: Optional[ClientConfig] = self.config.get(self.top_level_key)
if top_level is not None:
return top_level.get(key, default)
else:
return default

def classic_arguments(self) -> Iterator[Tuple[str, Any]]:
"""An iterator over `key-value` pairs of arguments for queues of type
:literal:`"classic"`, as parsed from the configuration object.
"""
if (message_ttl := self.message_ttl()) is not None:
yield ("x-message-ttl", message_ttl)

def message_ttl(self) -> Optional[int]:
"""Parse message `time-to-live <https://www.rabbitmq.com/ttl.html#queue-ttl>`_
argument for messages in a classic queue.

Does not apply to quorum queues.
"""
message_ttl: Any = self.get(
f"{self.role}-message-ttl", deprecated=["message_ttl"]
)

if message_ttl is None:
return None
elif isinstance(message_ttl, (float, int)):
ttl = int(1000 * message_ttl)

if ttl >= 0:
return ttl
else:
logger.warning(
"Client {!r} has per-queue message TTL that is not a positive number of seconds (got {})",
self.client_token,
ttl,
)
return None
else:
logger.warning(
"Client {!r} has message TTL set which is not a number of seconds: got {} of type {!r}",
self.client_token,
message_ttl,
type(message_ttl),
)
return None

def quorum_arguments(self) -> Iterator[Tuple[str, Any]]:
"""An iterator over `key-value` pairs of arguments for queues of type
:literal:`"quorum"`, as parsed from the configuration object.
"""
yield ("x-queue-type", "quorum")

def arguments(self) -> Iterator[Tuple[str, Any]]:
"""An iterator over `key-value` pairs of arguments for queues, as
parsed from the configuration object.

Depending on the queue type set in :literal:`{top_level_key}.{role}-queue-type`,
this yields either "classic" queue arguments
(:meth:`classic_arguments`) or "quorum" queue arguments
(:meth:`quorum_arguments`).
"""
queue_type = self.queue_type()
if queue_type is QueueType.CLASSIC:
return self.classic_arguments()
elif queue_type is QueueType.QUORUM:
return self.quorum_arguments()
else:
assert False, f"Unhandled queue type: {queue_type!r}"
bmario marked this conversation as resolved.
Show resolved Hide resolved

def queue_type(self) -> QueueType:
"""Parse the requested queue type from a configuration."""
queue_type = self.get(f"{self.role}-queue-type")
if queue_type is None:
return QueueType.default()
elif isinstance(queue_type, str):
return QueueType.from_str(queue_type)
else:
raise ValueError(f"Queue type for {self.client_token!r} must be a string")

def queue_name(
self,
*,
unique: bool = True,
validate: Optional[str] = None,
) -> str:
"""Return a suitable name for the queue under construction.

All queue names returned from this method are in the form of
:literal:`{self.client_token}[-*]-{self.role}`.

Args:
unique:
If set, the queue name will contain a random string of letters,
unique for each call of this method.
validate:
If set, the given value will be checked against the format mentioned above.
If it matches, it is returned as-is, otherwise a warning is logged.

Note:
In the future, this might be changed to raise a :exc:`ValueError` instead!

Returns:
The formatted queue name.
"""
if validate:
if validate.startswith(self.client_token) and validate.endswith(self.role):
return validate
else:
logger.warning(
f"Invalid queue name for client {self.client_token!r}: "
f'{validate!r} does not match "{self.client_token}[-*]-{self.role}"'
)
return validate
else:
return "-".join(self._queue_name_parts(unique=unique))

def _queue_name_parts(self, unique: bool):
yield self.client_token
if unique:
yield uuid4().hex

# When changing the queue type of a client queue, its next declaration is
# going to fail with error code 406 (Precondition failed), since it does
# not match the arguments of the existing queue.
#
# We include the queue type in the queue name to sidestep the problem.
# Changing the queue type then results in a new queue (with a predictable
# name) being declared and no conflicts arise. A client re-registering
# itself with a changed queue type then gets assigned the newly declared
# queue. Of course, the old queue still exists and needs to be deleted
# manually.
#
# For backward compatibility, we only include the queue type if it is
# different from the default queue type. In the past, this was the only
# available queue type. This way, all clients that do not declare a
# special queue type keep the old queue names.
queue_type = self.queue_type()
if queue_type != QueueType.default():
yield self.queue_type().to_string()

yield self.role
Loading