diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 0d042950..51d078f7 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -448,10 +448,18 @@ def _create_cos_tokens(self): log.info("Prepare cos tokens") if rel := self.model.get_relation("cos-tokens"): - self.distributor.allocate_tokens(relation=rel, token_strategy=TokenStrategy.COS) + self.distributor.allocate_tokens( + relation=rel, + token_strategy=TokenStrategy.COS, + token_type=ClusterTokenType.CONTROL_PLANE, + ) if rel := self.model.get_relation("cos-worker-tokens"): - self.distributor.allocate_tokens(relation=rel, token_strategy=TokenStrategy.COS) + self.distributor.allocate_tokens( + relation=rel, + token_strategy=TokenStrategy.COS, + token_type=ClusterTokenType.WORKER, + ) @on_error( WaitingStatus("Waiting to enable features"), diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index d8ba3ae7..538bfd57 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -6,7 +6,7 @@ import contextlib import logging from enum import Enum, auto -from typing import Optional +from typing import Dict, Optional import charms.contextual_status as status import ops @@ -16,6 +16,7 @@ K8sdAPIManager, K8sdConnectionError, ) +from pydantic import SecretStr log = logging.getLogger(__name__) @@ -48,6 +49,129 @@ class ClusterTokenType(Enum): NONE = "" +class TokenManager: + """Protocol for managing tokens. + + Attributes: + api_manager (K8sdAPIManager): An K8sdAPIManager object for interacting with k8sd API. + allocator_needs_tokens: Whether or not the allocator needs tokens. + strategy: The strategy for token creation. + revoke_on_join: Whether or not to revoke a token once it's joined. + """ + + allocator_needs_tokens: bool + strategy: TokenStrategy + revoke_on_join: bool + + def __init__(self, api_manager: K8sdAPIManager): + """Initialize a TokenManager instance. + + Args: + api_manager (K8sdAPIManager): An K8sdAPIManager object for interacting with k8sd API. + """ + self.api_manager = api_manager + + def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: + """Create a token. + + Args: + name (str): The name of the node. + token_type (ClusterTokenType): The type of cluster token. + + Returns: + SecretStr: The created token. + """ + return SecretStr("") + + def revoke(self, name: str, ignore_errors: bool): + """Remove a token. + + Args: + name (str): The name of the node. + ignore_errors (bool): Whether or not errors can be ignored + """ + ... + + +class ClusterTokenManager(TokenManager): + """Class for managing cluster tokens. + + Attributes: + allocator_needs_tokens: The allocating node does not need a cluster token to join + strategy: The cluster strategy for token creation. + revoke_on_join: Revoke a token once it's joined. + """ + + allocator_needs_tokens: bool = False + strategy: TokenStrategy = TokenStrategy.CLUSTER + revoke_on_join = True + + def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: + """Create a cluster token. + + Args: + name (str): The name of the node. + token_type (ClusterTokenType): The type of cluster token. + + Returns: + SecretStr: The created cluster token. + """ + worker = token_type == ClusterTokenType.WORKER + return self.api_manager.create_join_token(name, worker=worker) + + def revoke(self, name: str, ignore_errors: bool): + """Remove a cluster token. + + Args: + name (str): The name of the node. + ignore_errors (bool): Whether or not errors can be ignored + + Raises: + K8sdConnectionError: reraises cluster token revoke failures + """ + try: + self.api_manager.remove_node(name) + except (K8sdConnectionError, InvalidResponseError) as e: + if ignore_errors or e.code == ErrorCodes.StatusNodeUnavailable: + # Let's just ignore some of these expected errors: + # "Remote end closed connection without response" + # "Failed to check if node is control-plane" + # Removing a node that doesn't exist + log.warning("Remove_Node %s: but with an expected error: %s", name, e) + else: + raise + + +class CosTokenManager(TokenManager): + """Class for managing COS tokens. + + Attributes: + allocator_needs_tokens: The allocating node needs a cos-token to join + strategy: The cos strategy for token creation. + revoke_on_join: Don't revoke a token once it's joined. + """ + + allocator_needs_tokens: bool = True + strategy: TokenStrategy = TokenStrategy.COS + revoke_on_join = False + + def create(self, name: str, _) -> SecretStr: + """Create a COS token. + + Args: + name (str): The name of the node. + + Returns: + SecretStr: The created COS token. + """ + return self.api_manager.request_auth_token( + username=f"system:cos:{name}", groups=["system:cos"] + ) + + def revoke(self, _: str, __): + """Remove a COS token intentionally left unimplemented.""" + + class TokenCollector: """Helper class for collecting tokens for units in a relation.""" @@ -146,71 +270,10 @@ def __init__(self, charm: ops.CharmBase, node_name: str, api_manager: K8sdAPIMan """ self.charm = charm self.node_name = node_name - self.api_manager = api_manager - self.token_creation_strategies = { - TokenStrategy.CLUSTER: self._create_cluster_token, - TokenStrategy.COS: self._create_cos_token, + self.token_strategies: Dict[TokenStrategy, TokenManager] = { + TokenStrategy.CLUSTER: ClusterTokenManager(api_manager), + TokenStrategy.COS: CosTokenManager(api_manager), } - self.token_revoking_strategies = { - TokenStrategy.CLUSTER: self._revoke_cluster_token, - TokenStrategy.COS: self._revoke_cos_token, - } - - def _create_cluster_token(self, name: str, token_type: ClusterTokenType): - """Create a cluster token. - - Args: - name (str): The name of the node. - token_type (ClusterTokenType): The type of cluster token. - - Returns: - str: The created cluster token. - """ - worker = token_type == ClusterTokenType.WORKER - return self.api_manager.create_join_token(name, worker=worker) - - def _create_cos_token(self, name: str, _): - """Create a COS token. - - Args: - name (str): The name of the node. - - Returns: - str: The created COS token. - """ - return self.api_manager.request_auth_token( - username=f"system:cos:{name}", groups=["system:cos"] - ) - - def _revoke_cluster_token(self, name: str, ignore_errors: bool): - """Remove a cluster token. - - Args: - name (str): The name of the node. - ignore_errors (bool): Whether or not errors can be ignored - - Raises: - K8sdConnectionError: reraises cluster token revoke failures - """ - try: - self.api_manager.remove_node(name) - except (K8sdConnectionError, InvalidResponseError) as e: - if ignore_errors or e.code == ErrorCodes.StatusNodeUnavailable: - # Let's just ignore some of these expected errors: - # "Remote end closed connection without response" - # "Failed to check if node is control-plane" - # Removing a node that doesn't exist - log.warning("Remove_Node %s: but with an expected error: %s", name, e) - else: - raise - - def _revoke_cos_token(self, name: str, _): - """Remove a COS token. - - Args: - name (str): The name of the node. - """ - # TODO: implement removing cos token def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[str]: """Lookup juju secret offered to a unit on this relation. @@ -282,11 +345,7 @@ def allocate_tokens( token_strategy (TokenStrategy): The strategy of token creation. token_type (ClusterTokenType): The type of cluster token. Defaults to ClusterTokenType.NONE. - - Raises: - ValueError: If an invalid token_strategy is provided. """ - revoke_on_join = token_strategy == TokenStrategy.CLUSTER units = relation.units if self.charm.app == relation.app: # include self in peer relations @@ -294,24 +353,31 @@ def allocate_tokens( assert relation.app, f"Remote application doesn't exist on {relation.name}" # nosec # Select the appropriate token creation strategy - token_strat = self.token_creation_strategies.get(token_strategy) - if not token_strat: - raise ValueError(f"Invalid token_strategy: {token_strategy}") - - log.info("Allocating %s tokens", token_type.value) - status.add(ops.MaintenanceStatus(f"Allocating {token_type.value} tokens")) + tokenizer = self.token_strategies.get(token_strategy) + assert tokenizer, f"Invalid token_strategy: {token_strategy}" # nosec + + log.info("Allocating %s %s tokens", token_type.name.title(), token_strategy.name.title()) + status.add( + ops.MaintenanceStatus( + f"Allocating {token_type.name.title()} {token_strategy.name.title()} tokens" + ) + ) local_cluster = self.charm.get_cluster_name() relation.data[self.charm.unit]["node-name"] = self.node_name - relation.data[self.charm.unit]["joined"] = local_cluster relation.data[self.charm.unit]["cluster-name"] = local_cluster + if not tokenizer.allocator_needs_tokens: + # the allocator doesn't need a token to join, mark as already joined + relation.data[self.charm.unit]["joined"] = local_cluster + for unit in units: secret_id = SECRET_ID.format(unit.name) remote_cluster = relation.data[unit].get("joined") node = relation.data[unit].get("node-name") if not node: log.info( - "Wait for node-name of %s unit=%s:%s", - token_type.value, + "Wait for %s token allocation of %s with unit=%s:%s", + token_strategy.name.title(), + token_type.name.title(), relation.name, unit.name, ) @@ -319,8 +385,9 @@ def allocate_tokens( if remote_cluster and remote_cluster != local_cluster: # ignore this unit, it's not in our cluster log.info( - "Ignoring token allocation of %s with unit=%s:%s (%s)", - token_type.value, + "Ignoring %s token allocation of %s with unit=%s:%s (%s)", + token_strategy.name.title(), + token_type.name.title(), relation.name, unit.name, node, @@ -332,30 +399,38 @@ def allocate_tokens( # if the unit leaves. Let's create a cache in # our app's session of this data. log.info( - "Completed token allocation of %s with unit=%s:%s (%s)", - token_type.value, + "Completed %s token allocation of %s with unit=%s:%s (%s)", + token_strategy.name.title(), + token_type.name.title(), relation.name, unit.name, node, ) self.update_node(relation, unit, f"joined-{node}") - if revoke_on_join: + if tokenizer.revoke_on_join: self._revoke_juju_secret(relation, unit) continue # unit reports its joined already if relation.data[self.charm.unit].get(secret_id): # unit already assigned a token log.info( - "Waiting for token to be recovered %s unit=%s:%s (%s)", - token_type.value, + "Waiting for %s token to be recovered %s unit=%s:%s (%s)", + token_strategy.name.title(), + token_type.name.title(), relation.name, unit.name, node, ) continue - log.info("Creating token for %s unit=%s node=%s", token_type.value, unit.name, node) - token = token_strat(node, token_type) + log.info( + "Creating %s token for %s unit=%s node=%s", + token_strategy.name.title(), + token_type.name.title(), + unit.name, + node, + ) + token = tokenizer.create(node, token_type) content = {"token": token.get_secret_value()} secret = relation.app.add_secret(content) secret.grant(relation, unit=unit) @@ -377,9 +452,6 @@ def revoke_tokens( token_type (ClusterTokenType, optional): The type of cluster token. Defaults to ClusterTokenType.NONE. to_remove (ops.Unit, optional): unit to ensure its token is revoked - - Raises: - ValueError: If an invalid token_strategy is provided. """ # any unit currently in the relation all_units = relation.units @@ -399,26 +471,31 @@ def revoke_tokens( return log.info( - "Token report for %s \n\tjoined=%s\n\tremoving=%s\n\tremaining=%s", + "%s Token report for %s \n\tjoined=%s\n\tremoving=%s\n\tremaining=%s", + token_strategy.name.title(), relation.name, ",".join(sorted(u.name for u in joined)), ",".join(sorted(u.name for u in remove)), ",".join(sorted(u.name for u in remaining)), ) - token_strat = self.token_revoking_strategies.get(token_strategy) - if not token_strat: - raise ValueError(f"Invalid token_strategy: {token_strategy}") + tokenizer = self.token_strategies.get(token_strategy) + assert tokenizer, f"Invalid token_strategy: {token_strategy}" # nosec - status.add(ops.MaintenanceStatus(f"Revoking {token_type.value} tokens")) + status.add( + ops.MaintenanceStatus( + f"Revoking {token_type.name.title()} {token_strategy.name.title()} tokens" + ) + ) local_cluster = self.charm.get_cluster_name() for unit in remove: if node_state := app_databag.get(unit): state, node = node_state.split("-", 1) remote_cluster = (data := relation.data.get(unit)) and data.get("joined") log.info( - "Revoking token for %s unit=%s:%s %s node=%s", - token_type.value, + "Revoking %s, token for %s unit=%s:%s %s node=%s", + token_strategy.name.title(), + token_type.name.title(), relation.name, unit.name, state, @@ -427,6 +504,6 @@ def revoke_tokens( ignore_errors = self.node_name == node # removing myself ignore_errors |= state == "pending" # on pending tokens ignore_errors |= local_cluster != remote_cluster # if cluster doesn't match - token_strat(node, ignore_errors) + tokenizer.revoke(node, ignore_errors) self.drop_node(relation, unit) self._revoke_juju_secret(relation, unit) diff --git a/tests/integration/prometheus.py b/tests/integration/prometheus.py index dc90784e..3d34134f 100644 --- a/tests/integration/prometheus.py +++ b/tests/integration/prometheus.py @@ -7,7 +7,7 @@ import logging import urllib.parse import urllib.request -from typing import Optional +from typing import List, Optional log = logging.getLogger(__name__) @@ -68,11 +68,14 @@ async def health(self) -> str: return data - async def check_metrics(self, query: str): + async def get_metrics(self, query: str) -> List: """Query Prometheus for metrics. Args: query (str): The Prometheus query to execute. + + Returns: + List: A list of results from the query. """ api_path = "api/v1/query" uri = f"{self.base_uri}/{api_path}" @@ -85,4 +88,4 @@ async def check_metrics(self, query: str): assert response.code == 200, f"Failed to query '{query}': {data}" result = json.loads(data) assert result.get("status") == "success", f"Query failed: {result}" - assert result.get("data", {}).get("result"), "Data not yet available" + return result.get("data", {}).get("result", []) diff --git a/tests/integration/test_k8s.py b/tests/integration/test_k8s.py index b2ae11ec..8d2cf8ea 100644 --- a/tests/integration/test_k8s.py +++ b/tests/integration/test_k8s.py @@ -172,5 +172,6 @@ async def test_prometheus(traefik_url: str, cos_model: model.Model): 'up{job="kube-proxy"} > 0', 'up{job="kube-state-metrics"} > 0', ] - for query in queries: - await prometheus.check_metrics(query) + results = asyncio.gather(*[prometheus.get_metrics(query) for query in queries]) + failed = [query for query, result in zip(queries, results) if not result] + assert not failed, f"Failed queries: {failed}"