Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partial catalogue update #101

Merged
merged 9 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import cads_common.logging
import sqlalchemy as sa

import alembic.context
Expand All @@ -33,7 +34,8 @@ def run_migrations_offline() -> None:
Calls to alembic.context.execute() here emit the given string to the
script output.
"""
cads_catalogue.utils.configure_log()
cads_common.logging.structlog_configure()
cads_common.logging.logging_configure()
url = config.get_main_option("sqlalchemy.url")
alembic.context.configure(
url=url,
Expand All @@ -51,7 +53,8 @@ def run_migrations_online() -> None:
In this scenario we need to create an Engine
and associate a connection with the alembic.context.
"""
cads_catalogue.utils.configure_log()
cads_common.logging.structlog_configure()
cads_common.logging.logging_configure()
engine = sa.engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
Expand Down
269 changes: 185 additions & 84 deletions cads_catalogue/entry_points.py

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions cads_catalogue/licence_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ def update_catalogue_licences(
-------
list: list of licence uids involved
"""
logger.info("running catalogue db update for licences")

involved_licence_uids = []
licences = load_licences_from_folder(licences_folder_path)
logger.info("loaded %s licences from %s" % (len(licences), licences_folder_path))
Expand All @@ -266,10 +264,10 @@ def update_catalogue_licences(
try:
with session.begin_nested():
licence_sync(session, licence_uid, licences, storage_settings)
logger.info("licence %s db sync successful" % licence_uid)
logger.info("licence '%s' db sync successful" % licence_uid)
except Exception: # noqa
logger.exception(
"db sync for licence %s failed, error follows" % licence_uid
"db sync for licence '%s' failed, error follows" % licence_uid
)
return involved_licence_uids

Expand Down Expand Up @@ -297,7 +295,7 @@ def remove_orphan_licences(
continue
licence_to_delete.resources = [] # type: ignore
session.delete(licence_to_delete)
logger.info("removed licence %s" % licence_to_delete.licence_uid)
logger.info("removed licence '%s'" % licence_to_delete.licence_uid)


def migrate_from_cds_licences(
Expand Down
170 changes: 81 additions & 89 deletions cads_catalogue/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""utility module to load and store data in the catalogue database."""
import datetime

# Copyright 2022, European Union.
#
Expand All @@ -13,14 +14,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import glob
import hashlib
import itertools
import json
import os
import pathlib
from typing import Any, List, Optional, Sequence, Tuple
from typing import Any, List, Sequence

import sqlalchemy as sa
import structlog
Expand Down Expand Up @@ -68,101 +68,57 @@ def compute_config_hash(resource: dict[str, Any]) -> str:
return ret_value.hexdigest() # type: ignore


def is_db_to_update(
session: sa.orm.session.Session,
resources_folder_path: str | pathlib.Path,
licences_folder_path: str | pathlib.Path,
messages_folder_path: str | pathlib.Path,
cim_folder_path: str | pathlib.Path,
) -> Tuple[
bool,
bool,
Optional[str],
Optional[str],
Optional[str],
Optional[str],
Optional[str],
]:
def get_current_git_hashes(*folders: str | pathlib.Path) -> List[str]:
"""
Compare current and last run's status of repo folders and return information if the database is to update.
Return the list of last commit hashes of input folders.

Parameters
----------
session: opened SQLAlchemy session
resources_folder_path: the folder path where to look for metadata files of all the resources
licences_folder_path: the folder path where to look for metadata files of all the licences
messages_folder_path: the folder path where to look for metadata files of all the messages
cim_folder_path: the folder path containing CIM generated Quality Assessment layouts
folders: list of folders

Returns
-------
(did_input_folders_change, did_catalogue_source_change, *last_commit_hashes)
List of last commit hashes
"""
current_hashes = [None, None, None, None, None]

# load effective commit hashes from the folders
catalogue_folder_path = os.path.dirname(os.path.abspath(__file__))
for i, folder_path in enumerate(
[
catalogue_folder_path,
resources_folder_path,
licences_folder_path,
messages_folder_path,
cim_folder_path,
]
):
current_hashes = []
for folder in folders:
try:
current_hashes[i] = utils.get_last_commit_hash(folder_path)
current_hashes.append(utils.get_last_commit_hash(folder))
except Exception: # noqa
logger.exception(
"no check on commit hash for folder %r, error follows" % folder_path
"no check on commit hash for folder %r, error follows" % folder
)
current_hashes.append(None)
return current_hashes


def get_last_git_hashes(
session: sa.orm.session.Session, *column_names: str | None
) -> List[str | None]:
"""
Return last stored git hashes of table catalogue_updates.

# get last stored commit hashes from the db
last_hashes = [None, None, None, None, None]
Parameters
----------
session: opened SQLAlchemy session
column_names: list of columns of table catalogue_updates to return the values

Returns
-------
The values of input column names for the table catalogue_updates
"""
last_hashes: List[str | None] = [None] * len(column_names)
last_update_record = session.scalars(
sa.select(database.CatalogueUpdate)
.order_by(database.CatalogueUpdate.update_time.desc())
.limit(1)
).first()
if not last_update_record:
logger.warning("table catalogue_updates is currently empty")
is_to_update = True
return is_to_update, True, *current_hashes # type: ignore
for i, last_hash_attr in enumerate(
[
"catalogue_repo_commit",
"metadata_repo_commit",
"licence_repo_commit",
"message_repo_commit",
"cim_repo_commit",
]
):
last_hashes[i] = getattr(last_update_record, last_hash_attr)

# logs what's happening
for i, repo_name in enumerate(
[
"catalogue manager", # cads-catalogue
"dataset metadata", # cads-forms-json
"licence", # cads-licences
"message", # cads-messages
"cim layouts", # cads-forms-cim-json
]
):
last_hash = last_hashes[i]
current_hash = current_hashes[i]
if not last_hash:
logger.warning("no information of last %s repository commit" % repo_name)
elif last_hash != current_hash:
logger.info("detected update of %s repository" % repo_name)

# set the bool value
is_to_update = (
last_hashes == [None, None, None, None, None] or last_hashes != current_hashes
)
did_catalogue_source_change = last_hashes[0] != current_hashes[0]
return is_to_update, did_catalogue_source_change, *current_hashes # type: ignore
return last_hashes
for i, last_hash_attr in enumerate(column_names):
last_hashes[i] = getattr(last_update_record, last_hash_attr) # type: ignore
return last_hashes


def is_resource_to_update(session, resource_folder_path):
Expand Down Expand Up @@ -616,6 +572,8 @@ def update_catalogue_resources(
cim_folder_path: str | pathlib.Path,
storage_settings: config.ObjectStorageSettings,
force: bool = False,
include: List[str] = [],
exclude: List[str] = [],
) -> List[str]:
"""
Load metadata of resources from files and sync each resource in the db.
Expand All @@ -627,32 +585,44 @@ def update_catalogue_resources(
storage_settings: object with settings to access the object storage
cim_folder_path: the folder path containing CIM generated Quality Assessment layouts
force: if True, no skipping of dataset update based on detected changes of sources is made
include: list of include patterns for the resource uids
exclude: list of exclude patterns for the resource uids

Returns
-------
list: list of resource uids involved
"""
input_resource_uids = []

logger.info("running catalogue db update for resources")
# load metadata of each resource from files and sync each resource in the db
for resource_folder_path in glob.glob(os.path.join(resources_folder_path, "*/")):
involved_resource_uids = []

# filtering resource uids
folders = set(glob.glob(os.path.join(resources_folder_path, "*/")))
if include:
folders = set()
for pattern in include:
matched = set(glob.glob(os.path.join(resources_folder_path, f"{pattern}/")))
folders |= matched
if exclude:
for pattern in exclude:
matched = set(glob.glob(os.path.join(resources_folder_path, f"{pattern}/")))
folders -= matched

for resource_folder_path in sorted(folders):
resource_uid = os.path.basename(resource_folder_path.rstrip(os.sep))
logger.debug("parsing folder %s" % resource_folder_path)
input_resource_uids.append(resource_uid)
involved_resource_uids.append(resource_uid)
try:
with session.begin_nested():
to_update, sources_hash = is_resource_to_update(
session, resource_folder_path
)
if not to_update and not force:
logger.info(
"resource update %s skipped: no change detected" % resource_uid
"skip updating of '%s': no change detected" % resource_uid
)
continue
resource = load_resource_from_folder(resource_folder_path)
resource["sources_hash"] = sources_hash
logger.info("resource %s loaded successful" % resource_uid)
logger.info("resource '%s' loaded successful" % resource_uid)
resource = layout_manager.transform_layout(
session,
resource_folder_path,
Expand All @@ -665,12 +635,12 @@ def update_catalogue_resources(
)
resource["adaptor_properties_hash"] = compute_config_hash(resource)
resource_sync(session, resource, storage_settings)
logger.info("resource %s db sync successful" % resource_uid)
logger.info("resource '%s' db sync successful" % resource_uid)
except Exception: # noqa
logger.exception(
"db sync for resource %s failed, error follows" % resource_uid
"db sync for resource '%s' failed, error follows" % resource_uid
)
return input_resource_uids
return involved_resource_uids


def remove_datasets(session: sa.orm.session.Session, keep_resource_uids: List[str]):
Expand All @@ -695,4 +665,26 @@ def remove_datasets(session: sa.orm.session.Session, keep_resource_uids: List[st
if dataset_to_delete.resource_data:
session.delete(dataset_to_delete.resource_data)
session.delete(dataset_to_delete)
logger.info("removed resource %s" % dataset_to_delete.resource_uid)
logger.info("removed resource '%s'" % dataset_to_delete.resource_uid)


def update_git_hashes(session: sa.orm.session.Session, hashes_dict: dict[str, Any]):
"""
Insert (or update) the record in catalogue_updates according to input dictionary.

Parameters
----------
session: opened SQLAlchemy session
hashes_dict: dictionary of record properties
"""
last_update_record = session.scalars(
sa.select(database.CatalogueUpdate)
.order_by(database.CatalogueUpdate.update_time.desc())
.limit(1)
).first()
if not last_update_record:
last_update_record = database.CatalogueUpdate(**hashes_dict)
session.add(last_update_record)
else:
hashes_dict["update_time"] = datetime.datetime.now()
session.execute(sa.update(database.CatalogueUpdate).values(**hashes_dict))
7 changes: 3 additions & 4 deletions cads_catalogue/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def update_catalogue_messages(
-------
list: list of message uids involved
"""
logger.info("running catalogue db update for messages")
# load metadata of messages from files and sync each messages in the db
msgs = load_messages(messages_folder_path)
logger.info("loaded %s messages from folder %s" % (len(msgs), messages_folder_path))
Expand All @@ -263,9 +262,9 @@ def update_catalogue_messages(
try:
with session.begin_nested():
message_sync(session, msg)
logger.info("message %s db sync successful" % msg_uid)
logger.info("message '%s' db sync successful" % msg_uid)
except Exception: # noqa
logger.exception("db sync for message %s failed, error follows" % msg_uid)
logger.exception("db sync for message '%s' failed, error follows" % msg_uid)

if not remove_orphans:
return involved_msg_ids
Expand All @@ -283,6 +282,6 @@ def update_catalogue_messages(
for msg_to_delete in msgs_to_delete:
msg_to_delete.resources = []
session.delete(msg_to_delete)
logger.debug("removed old message %s" % msg_to_delete.message_uid)
logger.info("removed old message '%s'" % msg_to_delete.message_uid)

return involved_msg_ids
30 changes: 0 additions & 30 deletions cads_catalogue/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
import hashlib
import html.parser
import json
import logging
import mimetypes
import pathlib
import subprocess
import sys
from typing import Any

import structlog
from sqlalchemy import inspect


Expand Down Expand Up @@ -135,33 +132,6 @@ def object_as_dict(obj: Any) -> dict[str, Any]:
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}


def configure_log(
loglevel=logging.INFO, logfmt="%(message)s", timefmt="%Y-%m-%d %H:%M.%S"
):
"""Configure the log for the package."""
logging.basicConfig(
level=loglevel,
format=logfmt,
stream=sys.stdout,
)

structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt=timefmt),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)


def str2bool(value: str, raise_if_unknown=True, default=False):
"""Return boolean parsing of the string."""
if value.lower() in ["t", "true", "1", "yes", "y"]:
Expand Down
Loading
Loading