Skip to content

Commit

Permalink
refactor: use orm in queries
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulFarault committed Jun 30, 2023
1 parent 5ddc290 commit 8b6345a
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 113 deletions.
63 changes: 16 additions & 47 deletions tdp/cli/commands/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
from enum import Enum

import click
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from tabulate import tabulate

from tdp.cli.queries import get_deployment
from tdp.cli.queries import get_deployment, get_deployments, get_operation_log
from tdp.cli.session import get_session_class
from tdp.cli.utils import database_dsn
from tdp.core.models import DeploymentLog, OperationLog, ServiceComponentLog
Expand All @@ -36,52 +34,23 @@
help="At which offset should the database query should start",
)
@database_dsn
def browse(deployment_id, operation, limit, offset, database_dsn):
def browse(
deployment_id: int, operation: str, limit: int, offset: int, database_dsn: str
):
session_class = get_session_class(database_dsn)
try:
if not deployment_id:
print_formatted_deployments(get_deployments(session_class, limit, offset))
else:
if not operation:
print_formatted_deployment(get_deployment(session_class, deployment_id))
else:
print_formatted_operation_log(
get_operation_log(session_class, deployment_id, operation)
)
except Exception as e:
raise click.ClickException(str(e)) from e


def get_deployments(session_class, limit, offset):
query = (
select(DeploymentLog)
.options(joinedload(DeploymentLog.service_components))
.order_by(DeploymentLog.id)
.limit(limit)
.offset(offset)
)
with session_class() as session:
return session.execute(query).unique().scalars().fetchall()


def get_operation_log(session_class, deployment_id, operation):
query = (
select(OperationLog)
.options(
joinedload(OperationLog.deployment),
joinedload("deployment.service_components"),
)
.where(OperationLog.deployment_id == deployment_id)
.where(OperationLog.operation == operation)
.order_by(OperationLog.start_time)
)
with session_class() as session:
operation_log = session.execute(query).unique().scalar_one_or_none()
if operation_log is None:
raise ValueError(
f"Operation {operation} does exist in deployment {deployment_id}"
)
return operation_log
try:
if not deployment_id:
print_formatted_deployments(get_deployments(session, limit, offset))
else:
if not operation:
print_formatted_deployment(get_deployment(session, deployment_id))
else:
print_formatted_operation_log(
get_operation_log(session, deployment_id, operation)
)
except Exception as e:
raise click.ClickException(str(e)) from e


def print_formatted_deployments(deployments):
Expand Down
2 changes: 1 addition & 1 deletion tdp/cli/commands/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import click

from tdp.cli.commands.plan.utils import get_planned_deployment_log
from tdp.cli.queries import get_planned_deployment_log
from tdp.cli.session import get_session_class
from tdp.cli.utils import (
check_services_cleanliness,
Expand Down
2 changes: 1 addition & 1 deletion tdp/cli/commands/plan/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tdp.core.deployment import DeploymentPlan
from tdp.core.models import FilterTypeEnum

from .utils import get_planned_deployment_log
from tdp.cli.queries import get_planned_deployment_log


def validate_filtertype(ctx, param, value):
Expand Down
8 changes: 4 additions & 4 deletions tdp/cli/commands/plan/reconfigure.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from tdp.core.variables import ClusterVariables

from .utils import get_planned_deployment_log
from tdp.cli.queries import get_planned_deployment_log


@click.command(short_help="Restart required TDP services.")
Expand All @@ -40,9 +40,9 @@ def reconfigure(

session_class = get_session_class(database_dsn)
with session_class() as session:
latest_success_service_component_version = session.execute(
get_latest_success_service_component_version_query()
).all()
latest_success_service_component_version = (
get_latest_success_service_component_version_query(session)
)
service_component_deployed_version = map(
lambda result: result[1:], latest_success_service_component_version
)
Expand Down
6 changes: 3 additions & 3 deletions tdp/cli/commands/plan/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tdp.core.dag import Dag
from tdp.core.deployment import DeploymentPlan

from .utils import get_planned_deployment_log
from tdp.cli.queries import get_planned_deployment_log


@click.command(short_help="Resume a failed deployment.")
Expand All @@ -31,10 +31,10 @@ def resume(
session_class = get_session_class(database_dsn)
with session_class() as session:
if id is None:
deployment_log_to_resume = get_last_deployment(session_class)
deployment_log_to_resume = get_last_deployment(session)
click.echo(f"Creating a deployment plan to resume latest deployment.")
else:
deployment_log_to_resume = get_deployment(session_class, id)
deployment_log_to_resume = get_deployment(session, id)
click.echo(f"Creating a deployment plan to resume deployment #{id}.")
try:
deployment_log = DeploymentPlan.from_failed_deployment(
Expand Down
2 changes: 1 addition & 1 deletion tdp/cli/commands/plan/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tdp.core.dag import Dag
from tdp.core.deployment import DeploymentPlan

from .utils import get_planned_deployment_log
from tdp.cli.queries import get_planned_deployment_log


@click.command(short_help="Run single TDP operation.")
Expand Down
10 changes: 0 additions & 10 deletions tdp/cli/commands/plan/utils.py

This file was deleted.

6 changes: 3 additions & 3 deletions tdp/cli/commands/service_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def service_versions(database_dsn):
session_class = get_session_class(database_dsn)

with session_class() as session:
latest_success_service_version = session.execute(
get_latest_success_service_component_version_query()
).all()
latest_success_service_version = (
get_latest_success_service_component_version_query(session)
)

click.echo(
"Service versions:\n"
Expand Down
176 changes: 133 additions & 43 deletions tdp/cli/queries.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,67 @@
# Copyright 2022 TOSIT.IO
# SPDX-License-Identifier: Apache-2.0

from sqlalchemy import and_, desc, func, or_, select, tuple_
from typing import List

from sqlalchemy import and_, desc, func, or_, tuple_, select
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session
from sqlalchemy.orm.exc import NoResultFound

from tdp.core.models import DeploymentLog, ServiceComponentLog, OperationLog


def get_latest_success_service_component_version_query(session: Session):
"""Get the latest success service component version.
from tdp.core.models import DeploymentLog, ServiceComponentLog
Args:
session: The database session.
Returns:
Components with the latest success version.
"""
max_deployment_id_label = f"max_{ServiceComponentLog.deployment_id.name}"

def get_latest_success_service_component_version_query():
max_depid_label = f"max_{ServiceComponentLog.deployment_id.name}"
# Latest success deployment for each service/component pair
latest_deployed_service_component = (
session.query(
func.max(ServiceComponentLog.deployment_id).label(max_deployment_id_label),
ServiceComponentLog.service,
ServiceComponentLog.component,
)
.group_by(ServiceComponentLog.service, ServiceComponentLog.component)
.subquery()
)

latest_success_for_each_service = select(
func.max(ServiceComponentLog.deployment_id).label(max_depid_label),
ServiceComponentLog.service,
ServiceComponentLog.component,
).group_by(ServiceComponentLog.service, ServiceComponentLog.component)
# Request with or_ because querying with a tuple of 3 attributes using in_ operator
# does not work when the value can be null (because NULL in_ NULL is translated to `NULL = NULL` which returns NULL)
return (
select(
session.query(
ServiceComponentLog.deployment_id,
ServiceComponentLog.service,
ServiceComponentLog.component,
func.substr(ServiceComponentLog.version, 1, 7),
)
.filter(
or_(
# Components with the latest success deployment
# Filter deployment_id, service and component from the subquery
tuple_(
ServiceComponentLog.deployment_id,
ServiceComponentLog.service,
ServiceComponentLog.component,
).in_(latest_success_for_each_service),
).in_(select(latest_deployed_service_component)),
# Services with the latest success depoyment (no component)
# Filter deployment_id and service from the subquery AND component is null
and_(
tuple_(
ServiceComponentLog.deployment_id,
ServiceComponentLog.service,
).in_(
# Component column is removed from the subquery
select(
latest_success_for_each_service.c[0],
latest_success_for_each_service.c[1],
latest_deployed_service_component.c[
max_deployment_id_label
],
latest_deployed_service_component.c.service,
)
),
ServiceComponentLog.component.is_(None),
Expand All @@ -50,40 +73,107 @@ def get_latest_success_service_component_version_query():
ServiceComponentLog.service,
ServiceComponentLog.component,
)
.all()
)


def get_deployment(session_class, deployment_id):
query = (
select(DeploymentLog)
.options(
joinedload(DeploymentLog.service_components),
joinedload(DeploymentLog.operations),
)
.where(DeploymentLog.id == deployment_id)
.order_by(DeploymentLog.id)
def get_deployments(session: Session, limit: int, offset: int) -> List[DeploymentLog]:
"""Get deployments.
Args:
session: The database session.
limit: The maximum number of deployments to return.
offset: The offset at which to start the query.
Returns:
The deployments.
"""
return (
session.query(DeploymentLog)
.options(joinedload(DeploymentLog.service_components))
.order_by(DeploymentLog.id.desc())
.limit(limit)
.offset(offset)
.all()
)

with session_class() as session:
deployment_log = session.execute(query).unique().scalar_one_or_none()
if deployment_log is None:
raise Exception(f"Deployment id {deployment_id} does not exist")
return deployment_log

def get_deployment(session: Session, deployment_id: int) -> DeploymentLog:
"""Get a deployment by id.
Args:
session: The database session.
deployment_id: The deployment id.
Returns:
The deployment.
Raises:
NoResultFound: If the deployment does not exist."""
try:
return session.query(DeploymentLog).filter_by(id=deployment_id).one()
except NoResultFound as e:
raise Exception(f"Deployment id {deployment_id} does not exist.") from e

def get_last_deployment(session_class):
query = (
select(DeploymentLog)
.options(
joinedload(DeploymentLog.service_components),
joinedload(DeploymentLog.operations),

def get_last_deployment(session: Session) -> DeploymentLog:
"""Get the last deployment.
Args:
session: The database session.
Returns:
The last deployment.
Raises:
NoResultFound: If there is no deployment.
"""
try:
return (
session.query(DeploymentLog)
.order_by(DeploymentLog.id.desc())
.limit(1)
.one()
)
.order_by(DeploymentLog.id.desc())
.limit(1)
)
except NoResultFound as e:
raise Exception(f"No deployments.") from e


def get_planned_deployment_log(session: Session) -> DeploymentLog:
"""Get the planned deployment.
with session_class() as session:
deployment_log = session.execute(query).unique().scalar_one_or_none()
if deployment_log is None:
raise Exception(f"No deployments")
return deployment_log
Args:
session: The database session.
Returns:
The planned deployment or None if there is no planned deployment.
"""
return session.query(DeploymentLog).filter_by(state="PLANNED").one_or_none()


def get_operation_log(
session: Session, deployment_id: int, operation_name: str
) -> OperationLog:
"""Get an operation log.
Args:
session: The database session.
deployment_id: The deployment id.
operation_name: The operation name.
Returns:
The operation log.
Raises:
NoResultFound: If the operation does not exist.
"""
try:
return (
session.query(OperationLog)
.filter_by(deployment_id=deployment_id, operation=operation_name)
.one()
)
except NoResultFound as e:
raise Exception(
f"Operation {operation_name} does not exist in deployment {deployment_id}."
) from e

0 comments on commit 8b6345a

Please sign in to comment.