From cabeef5d7db60012a55024945adcd00373f929c0 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 19 Jul 2023 10:57:31 +0000 Subject: [PATCH] Split journal basic api and entity api endpoints, added handlers --- sample.env | 2 +- spire/api.py | 7 +- spire/journal/actions.py | 47 +- spire/journal/api_collections.py | 296 +++++++++ spire/journal/{api.py => api_journals.py} | 726 ++++------------------ spire/journal/data.py | 6 +- spire/journal/handlers.py | 609 ++++++++++++++++++ spire/journal/representations.py | 28 +- spire/journal/version.py | 3 +- 9 files changed, 1094 insertions(+), 630 deletions(-) create mode 100644 spire/journal/api_collections.py rename spire/journal/{api.py => api_journals.py} (73%) create mode 100644 spire/journal/handlers.py diff --git a/sample.env b/sample.env index dcc9adf..eaa2711 100644 --- a/sample.env +++ b/sample.env @@ -37,7 +37,7 @@ export BUGOUT_BOT_INSTALLATION_TOKEN="" export BUGOUT_BOT_INSTALLATION_TOKEN_HEADER="" export BUGOUT_DRONES_TOKEN="" export BUGOUT_DRONES_TOKEN_HEADER="" -export SPIRE_OPENAPI_LIST="journals,humbug,preferences,public,go" +export SPIRE_OPENAPI_LIST="journals,collections,humbug,preferences,public,go" export BUGOUT_REDIS_URL="http://127.0.0.1:6379" export BUGOUT_REDIS_PASSWORD="mypassword" export REDIS_REPORTS_QUEUE="" diff --git a/spire/api.py b/spire/api.py index 987c82c..964bb95 100644 --- a/spire/api.py +++ b/spire/api.py @@ -11,7 +11,8 @@ from .github.api import app as github_api from .go.api import app as go_api from .humbug.api import app as humbug_app -from .journal.api import app as journal_api +from .journal.api_journals import app as journals_api +from .journal.api_collections import app as collections_api from .preferences.api import app as preferences_api from .public.api import app_public as public_api from .slack.api import app as slack_api @@ -48,8 +49,8 @@ async def version() -> VersionResponse: app.mount("/go", go_api) app.mount("/slack", slack_api) -app.mount("/journals", journal_api) -app.mount("/collections", journal_api) +app.mount("/journals", journals_api) +app.mount("/collections", collections_api) app.mount("/public", public_api) app.mount("/github", github_api) app.mount("/preferences", preferences_api) diff --git a/spire/journal/actions.py b/spire/journal/actions.py index 7330a02..fe89c7c 100644 --- a/spire/journal/actions.py +++ b/spire/journal/actions.py @@ -11,7 +11,7 @@ from uuid import UUID, uuid4 import boto3 - +from fastapi import Request, HTTPException from sqlalchemy.orm import Session, Query from sqlalchemy import or_, func, text, and_, select from sqlalchemy.dialects import postgresql @@ -54,6 +54,7 @@ ) from .representations import journal_representation_parsers, parse_entity_to_entry from ..utils.confparse import scope_conf +from ..utils.settings import BUGOUT_CLIENT_ID_HEADER from ..broodusers import bugout_api logger = logging.getLogger(__name__) @@ -118,6 +119,18 @@ class CommitFailed(Exception): """ +def bugout_client_id_from_request(request: Request) -> Optional[str]: + """ + Returns Bugout search client ID from request if it has been passed. + """ + bugout_client_id: Optional[str] = request.headers.get(BUGOUT_CLIENT_ID_HEADER) + # We are deprecating the SIMIOTICS_CLIENT_ID_HEADER header in favor of BUGOUT_CLIENT_ID_HEADER, but + # this needs to be here for legacy support. + if bugout_client_id is None: + bugout_client_id = request.headers.get("x-simiotics-client-id") + return bugout_client_id + + def acl_auth( db_session: Session, user_id: str, user_group_id_list: List[str], journal_id: UUID ) -> Tuple[Journal, Dict[HolderType, List[str]]]: @@ -201,6 +214,38 @@ def acl_check( raise PermissionsNotFound("No permissions for requested information") +def ensure_journal_permission( + db_session: Session, + user_id: str, + user_group_ids: List[str], + journal_id: UUID, + required_scopes: Set[Union[JournalScopes, JournalEntryScopes]], +) -> Journal: + """ + Checks if the given user (who is a member of the groups specified by user_group_ids) holds the + given scope on the journal specified by journal_id. + + Returns: None if the user is a holder of that scope, and raises the appropriate HTTPException + otherwise. + """ + try: + journal, acl = acl_auth(db_session, user_id, user_group_ids, journal_id) + acl_check(acl, required_scopes) + except PermissionsNotFound: + logger.error( + f"User (id={user_id}) does not have the appropriate permissions (scopes={required_scopes}) " + f"for journal (id={journal_id})" + ) + raise HTTPException(status_code=404) + except Exception: + logger.error( + f"Error checking permissions for user (id={user_id}) in journal (id={journal_id})" + ) + raise HTTPException(status_code=500) + + return journal + + async def find_journals( db_session: Session, user_id: UUID, user_group_id_list: Optional[List[str]] = None ) -> List[Journal]: diff --git a/spire/journal/api_collections.py b/spire/journal/api_collections.py new file mode 100644 index 0000000..7b28cf3 --- /dev/null +++ b/spire/journal/api_collections.py @@ -0,0 +1,296 @@ +import logging +from typing import List, Optional +from uuid import UUID + +from elasticsearch import Elasticsearch +from fastapi import BackgroundTasks, Body, Depends, FastAPI, Path, Query, Request +from fastapi.middleware.cors import CORSMiddleware +from sqlalchemy.orm import Session + +from .. import db, es +from ..data import VersionResponse +from ..middleware import BroodAuthMiddleware +from ..utils.settings import ( + DOCS_PATHS, + DOCS_TARGET_PATH, + SPIRE_OPENAPI_LIST, + SPIRE_RAW_ORIGINS_LST, +) +from . import handlers, search +from .data import ( + EntitiesResponse, + Entity, + EntityCollection, + EntityCollectionResponse, + EntityCollectionsResponse, + EntityList, + EntityResponse, + EntitySearchResponse, + JournalRepresentationTypes, +) +from .version import SPIRE_COLLECTIONS_VERSION + +SUBMODULE_NAME = "collections" + +logger = logging.getLogger(__name__) + +tags_metadata = [ + {"name": "collections", "description": "Operations with collections."}, + {"name": "entities", "description": "Operations with collection entities."}, + {"name": "tags", "description": "Operations with collection entity tags."}, + {"name": "permissions", "description": "Collection access managements."}, + {"name": "search", "description": "Collection search."}, +] + +app = FastAPI( + title=f"Spire {SUBMODULE_NAME} submodule", + description="Spire API endpoints to work with entities, statistics and search in collections.", + version=SPIRE_COLLECTIONS_VERSION, + openapi_tags=tags_metadata, + openapi_url=f"/{DOCS_TARGET_PATH}/openapi.json" + if SUBMODULE_NAME in SPIRE_OPENAPI_LIST + else None, + docs_url=None, + redoc_url=f"/{DOCS_TARGET_PATH}", +) + +# Important to save consistency for middlewares (stack queue) +app.add_middleware( + CORSMiddleware, + allow_origins=SPIRE_RAW_ORIGINS_LST, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +app.add_middleware(BroodAuthMiddleware, whitelist=DOCS_PATHS) + + +@app.get("/version", response_model=VersionResponse) +async def version() -> VersionResponse: + """ + Spire collections submodule version. + """ + return VersionResponse(version=SPIRE_COLLECTIONS_VERSION) + + +@app.get( + "/", + tags=["collections"], + response_model=EntityCollectionsResponse, +) +async def list_collections( + request: Request, + db_session: Session = Depends(db.yield_connection_from_env), +) -> EntityCollectionsResponse: + """ + List all collections user has access to. + """ + result = await handlers.list_journals_handler( + db_session=db_session, + request=request, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.post( + "/", + tags=["collections"], + response_model=EntityCollectionResponse, +) +async def create_collection( + request: Request, + create_request: EntityCollection = Body(...), + db_session: Session = Depends(db.yield_connection_from_env), +) -> EntityCollectionResponse: + """ + Creates a collection object for the authenticated user. + """ + result = await handlers.create_journal_handler( + db_session=db_session, + request=request, + create_request=create_request, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.delete( + "/{collection_id}", + tags=["collections"], + response_model=EntityCollectionResponse, +) +async def delete_collection( + request: Request, + collection_id: UUID = Path(...), + db_session: Session = Depends(db.yield_connection_from_env), + es_client: Elasticsearch = Depends(es.yield_es_client_from_env), +) -> EntityCollectionResponse: + """ + Soft delete the collection with the given ID (assuming the collection was created by the authenticated + user). + """ + result = await handlers.delete_journal_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + es_client=es_client, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.post( + "/{collection_id}/entities", + tags=["entities"], + response_model=EntityResponse, +) +async def create_collection_entity( + request: Request, + collection_id: UUID = Path(...), + create_request: Entity = Body(...), + db_session: Session = Depends(db.yield_connection_from_env), + es_client: Elasticsearch = Depends(es.yield_es_client_from_env), +) -> EntityResponse: + """ + Creates a collection entity. + """ + result = await handlers.create_journal_entry_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + create_request=create_request, + es_client=es_client, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.post( + "/{collection_id}/bulk", + tags=["entities"], + response_model=EntitiesResponse, +) +async def create_collection_entities_pack( + request: Request, + collection_id: UUID = Path(...), + create_request: EntityList = Body(...), + db_session: Session = Depends(db.yield_connection_from_env), + es_client: Elasticsearch = Depends(es.yield_es_client_from_env), +) -> EntitiesResponse: + """ + Creates a pack of collection entities. + """ + result = await handlers.create_journal_entries_pack_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + create_request=create_request, + es_client=es_client, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.get( + "/{collection_id}/entities", + tags=["entities"], + response_model=EntitiesResponse, +) +async def get_entities( + request: Request, + collection_id: UUID = Path(...), + db_session: Session = Depends(db.yield_connection_from_env), + context_type: Optional[str] = Query(None), + context_id: Optional[str] = Query(None), + context_url: Optional[str] = Query(None), + limit: int = Query(10), + offset: int = Query(0), +) -> EntitiesResponse: + """ + List all entities in a collection. + """ + result = await handlers.get_entries_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + limit=limit, + offset=offset, + representation=JournalRepresentationTypes.COLLECTION, + context_type=context_type, + context_id=context_id, + context_url=context_url, + ) + + return result + + +@app.delete( + "/{collection_id}/entities/{entity_id}", + tags=["entities"], + response_model=EntityResponse, +) +async def delete_entity( + request: Request, + collection_id: UUID = Path(...), + entity_id: UUID = Path(...), + db_session: Session = Depends(db.yield_connection_from_env), + es_client: Elasticsearch = Depends(es.yield_es_client_from_env), +) -> EntityResponse: + """ + Deletes collection entity. + """ + result = await handlers.delete_entry_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + entry_id=entity_id, + es_client=es_client, + representation=JournalRepresentationTypes.COLLECTION, + ) + + return result + + +@app.get( + "/{collection_id}/search", + tags=["search"], + response_model=EntitySearchResponse, +) +async def search_journal( + request: Request, + background_tasks: BackgroundTasks, + collection_id: UUID = Path(...), + q: str = Query(""), + filters: Optional[List[str]] = Query(None), + limit: int = Query(10), + offset: int = Query(0), + content: bool = Query(True), + order: search.ResultsOrder = Query(search.ResultsOrder.DESCENDING), + db_session: Session = Depends(db.yield_connection_from_env), + es_client: Elasticsearch = Depends(es.yield_es_client_from_env), +) -> EntitySearchResponse: + """ + Executes a search query against the given collection. + """ + result = await handlers.search_journal_handler( + db_session=db_session, + request=request, + journal_id=collection_id, + es_client=es_client, + background_tasks=background_tasks, + q=q, + limit=limit, + offset=offset, + content=content, + order=order, + representation=JournalRepresentationTypes.COLLECTION, + filters=filters, + ) + + return result diff --git a/spire/journal/api.py b/spire/journal/api_journals.py similarity index 73% rename from spire/journal/api.py rename to spire/journal/api_journals.py index 67050a1..7cf988d 100644 --- a/spire/journal/api.py +++ b/spire/journal/api_journals.py @@ -1,7 +1,5 @@ -import json import logging -from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast +from typing import Dict, List, Optional, Set, Tuple, Union, cast from uuid import UUID import boto3 @@ -24,11 +22,9 @@ from ..data import VersionResponse from ..middleware import BroodAuthMiddleware from ..utils.settings import ( - BUGOUT_CLIENT_ID_HEADER, BUGOUT_DRONES_TOKEN, BUGOUT_DRONES_TOKEN_HEADER, BULK_CHUNKSIZE, - DEFAULT_JOURNALS_ES_INDEX, DOCS_PATHS, DOCS_TARGET_PATH, DRONES_BUCKET, @@ -38,26 +34,17 @@ SPIRE_RAW_ORIGINS_LST, STATISTICS_S3_PRESIGNED_URL_EXPIRATION_TIME, ) -from . import actions, search +from . import actions, handlers, search from .data import ( - EntityList, - EntityCollectionResponse, - ContextSpec, CreateEntriesTagsRequest, CreateJournalAPIRequest, - CreateJournalEntryRequest, CreateJournalEntryTagRequest, CreateJournalEntryTagsAPIRequest, - CreateJournalRequest, DeleteJournalEntriesByTagsAPIRequest, DeleteJournalEntryTagAPIRequest, DeletingQuery, DronesStatisticsResponce, - EntitiesResponse, - Entity, - EntityCollectionsResponse, - EntityResponse, - EntitySearchResponse, + EntityCollectionResponse, EntryUpdateTagActions, JournalEntriesBySearchDeletionResponse, JournalEntriesByTagsDeletionResponse, @@ -77,7 +64,6 @@ JournalSpec, JournalStatisticsResponse, JournalStatisticsSpecs, - JournalTypes, ListJournalEntriesResponse, ListJournalScopeSpec, ListJournalsResponse, @@ -90,8 +76,7 @@ UpdateJournalSpec, UpdateStatsRequest, ) -from .models import Journal, JournalEntryLock, JournalEntryTag -from .representations import journal_representation_parsers, parse_entity_to_entry +from .models import JournalEntryTag from .version import SPIRE_JOURNALS_VERSION SUBMODULE_NAME = "journals" @@ -130,50 +115,6 @@ app.add_middleware(BroodAuthMiddleware, whitelist=DOCS_PATHS) -def bugout_client_id_from_request(request: Request) -> Optional[str]: - """ - Returns Bugout search client ID from request if it has been passed. - """ - bugout_client_id: Optional[str] = request.headers.get(BUGOUT_CLIENT_ID_HEADER) - # We are deprecating the SIMIOTICS_CLIENT_ID_HEADER header in favor of BUGOUT_CLIENT_ID_HEADER, but - # this needs to be here for legacy support. - if bugout_client_id is None: - bugout_client_id = request.headers.get("x-simiotics-client-id") - return bugout_client_id - - -def ensure_journal_permission( - db_session: Session, - user_id: str, - user_group_ids: List[str], - journal_id: UUID, - required_scopes: Set[Union[JournalScopes, JournalEntryScopes]], -) -> Journal: - """ - Checks if the given user (who is a member of the groups specified by user_group_ids) holds the - given scope on the journal specified by journal_id. - - Returns: None if the user is a holder of that scope, and raises the appropriate HTTPException - otherwise. - """ - try: - journal, acl = actions.acl_auth(db_session, user_id, user_group_ids, journal_id) - actions.acl_check(acl, required_scopes) - except actions.PermissionsNotFound: - logger.error( - f"User (id={user_id}) does not have the appropriate permissions (scopes={required_scopes}) " - f"for journal (id={journal_id})" - ) - raise HTTPException(status_code=404) - except Exception: - logger.error( - f"Error checking permissions for user (id={user_id}) in journal (id={journal_id})" - ) - raise HTTPException(status_code=500) - - return journal - - @app.get("/version", response_model=VersionResponse) async def version() -> VersionResponse: """ @@ -232,7 +173,7 @@ async def get_journal_scopes_handler( \f :param journal_id: Journal ID to extract permissions from. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -291,7 +232,7 @@ async def get_journal_permissions_handler( :param journal_id: Journal ID to extract permissions from. :param holder_ids: Filter our holders (user or group) by ID. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -338,7 +279,7 @@ async def update_journal_scopes_handler( if JournalEntryScopes.DELETE.value in create_request.permission_list: ensure_permissions_set.add(JournalEntryScopes.DELETE) - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -406,7 +347,7 @@ async def delete_journal_scopes_handler( status_code=400, detail="Only group owner/admin allowed to manage group in journal", ) - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -450,80 +391,44 @@ async def delete_journal_scopes_handler( @app.get( "/", - tags=["journals", "collections"], - response_model=Union[ListJournalsResponse, EntityCollectionsResponse], + tags=["journals"], + response_model=ListJournalsResponse, ) async def list_journals( request: Request, db_session: Session = Depends(db.yield_connection_from_env), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), -) -> Union[ListJournalsResponse, EntityCollectionsResponse]: +) -> ListJournalsResponse: """ List all journals user has access to. """ - try: - journals = await actions.find_journals( - db_session=db_session, - user_id=request.state.user_id, - user_group_id_list=request.state.user_group_id_list, - ) - - parsed_journals = [] - for j in journals: - obj = await journal_representation_parsers[representation]["journal"]( - j, j.holders_ids - ) - parsed_journals.append(obj) - - result = await journal_representation_parsers[representation]["journals"]( - parsed_journals - ) - except actions.JournalNotFound: - logger.error(f"Journals not found for user={request.state.user_id}") - raise HTTPException(status_code=404) - except Exception as err: - logger.error(err) - raise HTTPException(status_code=500) + result = await handlers.list_journals_handler( + db_session=db_session, + request=request, + representation=JournalRepresentationTypes.JOURNAL, + ) return result @app.post( "/", - tags=["journals", "collections"], - response_model=Union[JournalResponse, EntityCollectionResponse], + tags=["journals"], + response_model=JournalResponse, ) async def create_journal( - create_request: CreateJournalAPIRequest, request: Request, + create_request: CreateJournalAPIRequest = Body(...), db_session: Session = Depends(db.yield_connection_from_env), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), -) -> Union[JournalResponse, EntityCollectionResponse]: +) -> JournalResponse: """ Creates a journal object for the authenticated user. """ - search_index: Optional[str] = DEFAULT_JOURNALS_ES_INDEX - if create_request.journal_type == JournalTypes.HUMBUG: - search_index = None - - journal_request = CreateJournalRequest( - bugout_user_id=request.state.user_id, - name=create_request.name, - search_index=search_index, + result = await handlers.create_journal_handler( + db_session=db_session, + request=request, + create_request=create_request, + representation=JournalRepresentationTypes.JOURNAL, ) - try: - journal = await actions.create_journal(db_session, journal_request) - - result = await journal_representation_parsers[representation]["journal"]( - journal, {holder.holder_id for holder in journal.permissions} - ) - except Exception as e: - logger.error(f"Error creating journal: {str(e)}") - raise HTTPException(status_code=500) return result @@ -540,7 +445,7 @@ async def get_journal( :param journal_id: Journal ID to extract permissions from """ - journal = ensure_journal_permission( + journal = actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -572,7 +477,7 @@ async def update_journal( :param journal_id: Journal ID to extract permissions from :param update_request: Journal parameters """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -614,49 +519,24 @@ async def update_journal( response_model=Union[JournalResponse, EntityCollectionResponse], ) async def delete_journal( - journal_id: UUID, request: Request, + journal_id: UUID = Path(...), db_session: Session = Depends(db.yield_connection_from_env), es_client: Elasticsearch = Depends(es.yield_es_client_from_env), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), ) -> Union[JournalResponse, EntityCollectionResponse]: """ - Retrieves the journal with the given ID (assuming the journal was created by the authenticated + Soft delete the journal with the given ID (assuming the journal was created by the authenticated user). """ - ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalScopes.DELETE}, + result = await handlers.delete_journal_handler( + db_session=db_session, + request=request, + journal_id=journal_id, + es_client=es_client, + representation=JournalRepresentationTypes.JOURNAL, ) - journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) - try: - journal = await actions.delete_journal( - db_session, - journal_spec, - user_group_id_list=request.state.user_group_id_list, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error deleting journal: {str(e)}") - raise HTTPException(status_code=500) - - es_index = journal.search_index - - search.delete_journal_entries(es_client, es_index=es_index, journal_id=journal_id) - - return await journal_representation_parsers[representation]["journal"]( - journal, {holder.holder_id for holder in journal.permissions} - ) + return result @app.post("/{journal_id}/stats", response_model=DronesStatisticsResponce) @@ -670,7 +550,7 @@ async def update_journal_stats( Return journal statistics journal.read permission required. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -715,7 +595,7 @@ async def generate_journal_stats( """ Return journal statistics """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -842,285 +722,88 @@ async def generate_journal_stats( @app.post( "/{journal_id}/entries", tags=["entries"], - response_model=Union[JournalEntryResponse, EntityResponse], -) -@app.post( - "/{journal_id}/entities", - tags=["entities"], - response_model=Union[JournalEntryResponse, EntityResponse], + response_model=JournalEntryResponse, ) async def create_journal_entry( request: Request, journal_id: UUID = Path(...), - entry_request: Union[Entity, JournalEntryContent] = Body(...), + create_request: JournalEntryContent = Body(...), db_session: Session = Depends(db.yield_connection_from_env), es_client: Elasticsearch = Depends(es.yield_es_client_from_env), -) -> Union[JournalEntryResponse, EntityResponse]: +) -> JournalEntryResponse: """ Creates a journal entry """ - journal = ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalEntryScopes.CREATE}, + result = await handlers.create_journal_entry_handler( + db_session=db_session, + request=request, + journal_id=journal_id, + create_request=create_request, + es_client=es_client, + representation=JournalRepresentationTypes.JOURNAL, ) - journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) - tags: List[str] - representation: JournalRepresentationTypes - if type(entry_request) == JournalEntryContent: - representation = JournalRepresentationTypes.JOURNAL - creation_request = CreateJournalEntryRequest( - journal_spec=journal_spec, - title=entry_request.title, - content=entry_request.content, - tags=entry_request.tags, - context_type=entry_request.context_type, - context_id=entry_request.context_id, - context_url=entry_request.context_url, - ) - - if entry_request.created_at is not None: - created_at_utc = datetime.astimezone( - entry_request.created_at, tz=timezone.utc - ) - created_at = created_at_utc.replace(tzinfo=None) - creation_request.created_at = created_at - - tags = entry_request.tags if entry_request.tags is not None else [] - elif type(entry_request) == Entity: - representation = JournalRepresentationTypes.ENTITY - title, tags, content = parse_entity_to_entry( - create_entity=entry_request, - ) - creation_request = CreateJournalEntryRequest( - journal_spec=journal_spec, - title=title, - content=json.dumps(content), - tags=tags, - context_type="entity", - ) - else: - raise HTTPException(status_code=500) - - es_index = journal.search_index - - try: - journal_entry, entry_lock = await actions.create_journal_entry( - db_session=db_session, - journal=journal, - entry_request=creation_request, - locked_by=request.state.user_id, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error creating journal entry: {str(e)}") - raise HTTPException(status_code=500) - - if es_index is not None: - try: - search.new_entry( - es_client, - es_index=es_index, - journal_id=journal_entry.journal_id, - entry_id=journal_entry.id, - title=journal_entry.title, - content=journal_entry.content, - tags=tags, - created_at=journal_entry.created_at, - updated_at=journal_entry.updated_at, - context_type=journal_entry.context_type, - context_id=journal_entry.context_id, - context_url=journal_entry.context_url, - ) - except Exception as e: - logger.warning( - f"Error indexing journal entry ({journal_entry.id}) in journal " - f"({journal_entry.journal_id}) for user ({request.state.user_id})" - ) - - return await journal_representation_parsers[representation]["entry"]( - id=journal_entry.id, - journal_id=journal_entry.journal_id, - title=journal_entry.title, - content=journal_entry.content, - url=str(request.url).rstrip("/"), - tags=tags, - created_at=journal_entry.created_at, - updated_at=journal_entry.updated_at, - context_url=journal_entry.context_url, - context_type=journal_entry.context_type, - context_id=journal_entry.context_id, - locked_by=entry_lock.locked_by, - ) + return result @app.post( "/{journal_id}/bulk", - tags=["entries", "entities"], - response_model=Union[ListJournalEntriesResponse, EntitiesResponse], + tags=["entries"], + response_model=ListJournalEntriesResponse, ) async def create_journal_entries_pack( - journal_id: UUID, request: Request, - entries_request: Union[EntityList, JournalEntryListContent] = Body(...), + journal_id: UUID = Path(...), + create_request: JournalEntryListContent = Body(...), db_session: Session = Depends(db.yield_connection_from_env), es_client: Elasticsearch = Depends(es.yield_es_client_from_env), -) -> Union[ListJournalEntriesResponse, EntitiesResponse]: +) -> ListJournalEntriesResponse: """ Creates a pack of journal entries. """ - ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalEntryScopes.CREATE}, + result = await handlers.create_journal_entries_pack_handler( + db_session=db_session, + request=request, + journal_id=journal_id, + create_request=create_request, + es_client=es_client, + representation=JournalRepresentationTypes.JOURNAL, ) - journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) - - try: - journal = await actions.find_journal( - db_session=db_session, - journal_spec=journal_spec, - user_group_id_list=request.state.user_group_id_list, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error retrieving journal: {str(e)}") - raise HTTPException(status_code=500) - - representation: JournalRepresentationTypes - if type(entries_request) == JournalEntryListContent: - representation = JournalRepresentationTypes.JOURNAL - elif type(entries_request) == EntityList: - representation = JournalRepresentationTypes.ENTITY - else: - raise HTTPException(status_code=500) - - try: - response = await actions.create_journal_entries_pack( - db_session, - journal.id, - entries_request, - representation=representation, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error creating journal entry: {str(e)}") - raise HTTPException(status_code=500) - es_index = journal.search_index - if es_index is not None: - e_list = ( - response.entities if JournalRepresentationTypes.ENTITY else response.entries - ) - search.bulk_create_entries(es_client, es_index, journal_id, e_list) - - return response + return result @app.get( "/{journal_id}/entries", tags=["entries"], - response_model=Union[ListJournalEntriesResponse, EntitiesResponse], -) -@app.get( - "/{journal_id}/entities", - tags=["entities"], - response_model=Union[ListJournalEntriesResponse, EntitiesResponse], + response_model=ListJournalEntriesResponse, ) async def get_entries( - journal_id: UUID, request: Request, + journal_id: UUID = Path(...), db_session: Session = Depends(db.yield_connection_from_env), context_type: Optional[str] = Query(None), context_id: Optional[str] = Query(None), context_url: Optional[str] = Query(None), limit: int = Query(10), offset: int = Query(0), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), -) -> Union[ListJournalEntriesResponse, EntitiesResponse]: +) -> ListJournalEntriesResponse: """ List all entries in a journal. """ - ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalEntryScopes.READ}, - ) - - journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) - context_spec = ContextSpec( - context_type=context_type, context_id=context_id, context_url=context_url + result = await handlers.get_entries_handler( + db_session=db_session, + request=request, + journal_id=journal_id, + limit=limit, + offset=offset, + representation=JournalRepresentationTypes.JOURNAL, + context_type=context_type, + context_id=context_id, + context_url=context_url, ) - try: - entries = await actions.get_journal_entries( - db_session, - journal_spec, - None, - user_group_id_list=request.state.user_group_id_list, - context_spec=context_spec, - limit=limit, - offset=offset, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error listing journal entries: {str(e)}") - raise HTTPException(status_code=500) - - url: str = str(request.url).rstrip("/") - parsed_entries = [] - - for e in entries: - tag_objects = await actions.get_journal_entry_tags( - db_session, - journal_spec, - e.id, - user_group_id_list=request.state.user_group_id_list, - ) - obj = await journal_representation_parsers[representation]["entry"]( - id=e.id, - journal_id=journal_id, - title=e.title, - content=e.content, - url=url, - tags=[tag.tag for tag in tag_objects], - created_at=e.created_at, - updated_at=e.updated_at, - context_url=e.context_url, - context_type=e.context_type, - context_id=e.context_id, - locked_by=None, - ) - parsed_entries.append(obj) - - return await journal_representation_parsers[representation]["entries"]( - parsed_entries - ) + return result @app.get( @@ -1137,7 +820,7 @@ async def get_entry( """ Gets a single journal entry """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1193,7 +876,7 @@ async def get_entry_content( """ Retrieves the text content of a journal entry """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1256,7 +939,7 @@ async def update_entry_content( Modifies the content of a journal entry through a simple override. If tags in not empty, update them - delete old and insert new. """ - journal = ensure_journal_permission( + journal = actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1382,7 +1065,7 @@ async def delete_entry_lock( Releases journal entry lock. Entry may be unlocked by other user. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1421,83 +1104,28 @@ async def delete_entry_lock( @app.delete( "/{journal_id}/entries/{entry_id}", tags=["entries"], - response_model=Union[JournalEntryResponse, EntityResponse], -) -@app.delete( - "/{journal_id}/entities/{entry_id}", - tags=["entities"], - response_model=Union[JournalEntryResponse, EntityResponse], + response_model=JournalEntryResponse, ) async def delete_entry( - journal_id: UUID, - entry_id: UUID, request: Request, + journal_id: UUID = Path(...), + entry_id: UUID = Path(...), db_session: Session = Depends(db.yield_connection_from_env), es_client: Elasticsearch = Depends(es.yield_es_client_from_env), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), -) -> Union[JournalEntryResponse, EntityResponse]: +) -> JournalEntryResponse: """ - Deletes a journal entry + Deletes journal entry. """ - journal = ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalEntryScopes.DELETE}, + result = await handlers.delete_entry_handler( + db_session=db_session, + request=request, + journal_id=journal_id, + entry_id=entry_id, + es_client=es_client, + representation=JournalRepresentationTypes.JOURNAL, ) - try: - journal_entry = await actions.delete_journal_entry( - db_session, - journal, - entry_id, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404, detail="Journal not found") - except actions.EntryNotFound: - logger.error( - f"Entry not found with ID={entry_id} in journal with ID={journal_id}" - ) - raise HTTPException(status_code=404, detail="Entry not found") - except Exception as e: - logger.error(f"Error listing journal entries: {str(e)}") - raise HTTPException(status_code=500) - - es_index = journal.search_index - if es_index is not None: - try: - search.delete_entry( - es_client, - es_index=es_index, - journal_id=journal_entry.journal_id, - entry_id=journal_entry.id, - ) - except Exception as e: - logger.warning( - f"Error deindexing entry ({journal_entry.id}) from index for journal " - f"({journal_entry.journal_id}) for user ({request.state.user_id})" - ) - - return await journal_representation_parsers[representation]["entry"]( - id=journal_entry.id, - journal_id=journal_entry.journal_id, - title=journal_entry.title, - content=journal_entry.content, - url=str(request.url).rstrip("/"), - tags=[], - created_at=journal_entry.created_at, - updated_at=journal_entry.updated_at, - context_url=journal_entry.context_url, - context_type=journal_entry.context_type, - context_id=journal_entry.context_id, - locked_by=None, - ) + return result @app.delete( @@ -1513,7 +1141,7 @@ async def delete_entries( """ Deletes a journal entries """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1590,7 +1218,7 @@ async def delete_entries_by_search( """ Deletes a journal entries """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1682,7 +1310,7 @@ async def delete_entries_by_tags( """ Deletes a journal entries by tags list using AND condition """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1754,7 +1382,7 @@ async def most_used_tags( Get all tags for a journal entry. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1795,7 +1423,7 @@ async def create_tags( """ Create tags for a journal entry. """ - journal = ensure_journal_permission( + journal = actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1894,7 +1522,7 @@ async def get_tags( """ Get all tags for a journal entry. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -1943,7 +1571,7 @@ async def update_tags( """ Update tags for a journal entry tags. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -2039,7 +1667,7 @@ async def create_entries_tags( Create tags for multiple journal entries. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -2123,7 +1751,7 @@ async def delete_entries_tags( Delete tags for multiple journal entries. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -2206,7 +1834,7 @@ async def delete_tag( journal.read permission required. """ - ensure_journal_permission( + actions.ensure_journal_permission( db_session, request.state.user_id, request.state.user_group_id_list, @@ -2301,157 +1929,37 @@ async def delete_tag( @app.get( "/{journal_id}/search", tags=["search"], - response_model=Union[JournalSearchResultsResponse, EntitySearchResponse], + response_model=JournalSearchResultsResponse, ) async def search_journal( - journal_id: UUID, request: Request, background_tasks: BackgroundTasks, + journal_id: UUID = Path(...), q: str = Query(""), filters: Optional[List[str]] = Query(None), limit: int = Query(10), offset: int = Query(0), - content: Optional[bool] = Query(True), + content: bool = Query(True), order: search.ResultsOrder = Query(search.ResultsOrder.DESCENDING), db_session: Session = Depends(db.yield_connection_from_env), es_client: Elasticsearch = Depends(es.yield_es_client_from_env), - representation: JournalRepresentationTypes = Query( - JournalRepresentationTypes.JOURNAL - ), -) -> Union[JournalSearchResultsResponse, EntitySearchResponse]: +) -> JournalSearchResultsResponse: """ Executes a search query against the given journal. """ - ensure_journal_permission( - db_session, - request.state.user_id, - request.state.user_group_id_list, - journal_id, - {JournalEntryScopes.READ}, - ) - - journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) - try: - journal = await actions.find_journal( - db_session=db_session, - journal_spec=journal_spec, - user_group_id_list=request.state.user_group_id_list, - ) - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) - except Exception as e: - logger.error(f"Error retrieving journal: {str(e)}") - raise HTTPException(status_code=500) - - if filters is None: - filters = [] - search_query = search.normalized_search_query(q, filters, strict_filter_mode=False) - - url: str = str(request.url).rstrip("/") - journal_url = "/".join(url.split("/")[:-1]) - - results: List[Any] = [] - - es_index = journal.search_index - if es_index is None: - total_results, rows = search.search_database( - db_session, journal_id, search_query, limit, offset, order=order - ) - max_score: Optional[float] = 1.0 - - for entry in rows: - entry_url = f"{journal_url}/entries/{str(entry.id)}" - content_url = f"{entry_url}/content" - - result = await journal_representation_parsers[representation][ - "search_entry" - ]( - str(entry.id), - str(journal.id), - entry_url, - content_url, - entry.title, - entry.tags, - str(entry.created_at), - str(entry.updated_at), - 1.0, - entry.context_type, - entry.context_id, - entry.context_url, - entry.content, - ) - results.append(result) - else: - search_results = search.search( - es_client, - es_index=es_index, - journal_id=journal_id, - search_query=search_query, - size=limit, - start=offset, - order=order, - ) - - total_results = search_results.get("total", {}).get("value", 0) - max_score = search_results.get("max_score") - if max_score is None: - max_score = 0.0 - - for hit in search_results.get("hits", []): - entry_url = f"{journal_url}/entries/{hit['_id']}" - content_url = f"{entry_url}/content" - source = hit.get("_source", {}) - source_tags: Union[str, List[str]] = source.get("tag", []) - tags = [] - if source_tags == str(source_tags): - source_tags = cast(str, source_tags) - tags = [source_tags] - else: - source_tags = cast(List[str], source_tags) - tags = source_tags - - result = await journal_representation_parsers[representation][ - "search_entry" - ]( - source.get("entry_id"), - str(journal.id), - entry_url, - content_url, - source.get("title", ""), - tags, - datetime.fromtimestamp(source.get("created_at")).isoformat(), - datetime.fromtimestamp(source.get("updated_at")).isoformat(), - hit.get("_score"), - source.get("context_type"), - source.get("context_id"), - source.get("context_url"), - source.get("content", "") if content is True else None, - ) - results.append(result) - - next_offset: Optional[int] = None - if offset + limit < total_results: - next_offset = offset + limit - - response = await journal_representation_parsers[representation]["search_entries"]( - total_results, offset, max_score, next_offset, results - ) - - bugout_client_id = bugout_client_id_from_request(request) - background_tasks.add_task( - actions.store_search_results, - search_url=url, + result = await handlers.search_journal_handler( + db_session=db_session, + request=request, journal_id=journal_id, - bugout_user_id=request.state.user_id, - bugout_client_id=bugout_client_id, + es_client=es_client, + background_tasks=background_tasks, q=q, - filters=filters, limit=limit, offset=offset, - response=response, + content=content, + order=order, + representation=JournalRepresentationTypes.JOURNAL, + filters=filters, ) - return response + return result diff --git a/spire/journal/data.py b/spire/journal/data.py index 95d0f4c..f417cc4 100644 --- a/spire/journal/data.py +++ b/spire/journal/data.py @@ -55,7 +55,7 @@ class RuleActions(Enum): class JournalRepresentationTypes(Enum): - ENTITY = "entity" + COLLECTION = "collection" JOURNAL = "journal" @@ -342,6 +342,10 @@ class EntityList(BaseModel): entities: List[Entity] = Field(default_factory=list) +class EntityCollection(BaseModel): + name: str + + class EntityCollectionResponse(BaseModel): collection_id: uuid.UUID bugout_user_id: str diff --git a/spire/journal/handlers.py b/spire/journal/handlers.py new file mode 100644 index 0000000..04d50a0 --- /dev/null +++ b/spire/journal/handlers.py @@ -0,0 +1,609 @@ +import json +import logging +from datetime import datetime, timezone +from typing import Any, List, Optional, Union, cast +from uuid import UUID + +from elasticsearch import Elasticsearch +from fastapi import BackgroundTasks, HTTPException, Request +from sqlalchemy.orm import Session + +from ..utils.settings import DEFAULT_JOURNALS_ES_INDEX +from . import actions, search +from .data import ( + ContextSpec, + CreateJournalAPIRequest, + CreateJournalEntryRequest, + CreateJournalRequest, + Entity, + JournalEntryContent, + JournalEntryListContent, + JournalEntryScopes, + JournalRepresentationTypes, + JournalScopes, + JournalSpec, + JournalTypes, +) +from .representations import journal_representation_parsers, parse_entity_to_entry + +logger = logging.getLogger(__name__) + + +# list_journals_handler operates for api endpoints: +# - list_journals +# - list_collections +async def list_journals_handler( + db_session: Session, request: Request, representation: JournalRepresentationTypes +): + try: + journals = await actions.find_journals( + db_session=db_session, + user_id=request.state.user_id, + user_group_id_list=request.state.user_group_id_list, + ) + + parsed_journals = [] + for j in journals: + obj = await journal_representation_parsers[representation]["journal"]( + j, j.holders_ids + ) + parsed_journals.append(obj) + + result = await journal_representation_parsers[representation]["journals"]( + parsed_journals + ) + except actions.JournalNotFound: + logger.error(f"Journals not found for user={request.state.user_id}") + raise HTTPException(status_code=404) + except Exception as err: + logger.error(err) + raise HTTPException(status_code=500) + + return result + + +# create_journal_handler operates for api endpoints: +# - create_journal +# - create_collection +async def create_journal_handler( + db_session: Session, + request: Request, + create_request: CreateJournalAPIRequest, + representation: JournalRepresentationTypes, +): + search_index: Optional[str] = DEFAULT_JOURNALS_ES_INDEX + if create_request.journal_type == JournalTypes.HUMBUG: + search_index = None + + journal_request = CreateJournalRequest( + bugout_user_id=request.state.user_id, + name=create_request.name, + search_index=search_index, + ) + try: + journal = await actions.create_journal(db_session, journal_request) + + result = await journal_representation_parsers[representation]["journal"]( + journal, {holder.holder_id for holder in journal.permissions} + ) + except Exception as e: + logger.error(f"Error creating journal: {str(e)}") + raise HTTPException(status_code=500) + + return result + + +# delete_journal_handler operates for api endpoints: +# - delete_journal +# - delete_collection +async def delete_journal_handler( + db_session: Session, + request: Request, + journal_id: UUID, + es_client: Elasticsearch, + representation: JournalRepresentationTypes, +): + actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalScopes.DELETE}, + ) + + journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) + try: + journal = await actions.delete_journal( + db_session, + journal_spec, + user_group_id_list=request.state.user_group_id_list, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error deleting journal: {str(e)}") + raise HTTPException(status_code=500) + + es_index = journal.search_index + + search.delete_journal_entries(es_client, es_index=es_index, journal_id=journal_id) + + return await journal_representation_parsers[representation]["journal"]( + journal, {holder.holder_id for holder in journal.permissions} + ) + + +# create_journal_entry_handler operates for api endpoints: +# - create_journal_entry +# - create_collection_entity +async def create_journal_entry_handler( + db_session: Session, + request: Request, + journal_id: UUID, + create_request: Union[JournalEntryContent, Entity], + es_client: Elasticsearch, + representation: JournalRepresentationTypes, +): + journal = actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalEntryScopes.CREATE}, + ) + journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) + + tags: List[str] + representation: JournalRepresentationTypes + if type(create_request) == JournalEntryContent: + representation = JournalRepresentationTypes.JOURNAL + creation_request = CreateJournalEntryRequest( + journal_spec=journal_spec, + title=create_request.title, + content=create_request.content, + tags=create_request.tags, + context_type=create_request.context_type, + context_id=create_request.context_id, + context_url=create_request.context_url, + ) + + if create_request.created_at is not None: + created_at_utc = datetime.astimezone( + create_request.created_at, tz=timezone.utc + ) + created_at = created_at_utc.replace(tzinfo=None) + creation_request.created_at = created_at + + tags = create_request.tags if create_request.tags is not None else [] + elif type(create_request) == Entity: + representation = JournalRepresentationTypes.ENTITY + title, tags, content = parse_entity_to_entry( + create_entity=create_request, + ) + creation_request = CreateJournalEntryRequest( + journal_spec=journal_spec, + title=title, + content=json.dumps(content), + tags=tags, + context_type="entity", + ) + else: + raise HTTPException(status_code=500) + + es_index = journal.search_index + + try: + journal_entry, entry_lock = await actions.create_journal_entry( + db_session=db_session, + journal=journal, + entry_request=creation_request, + locked_by=request.state.user_id, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error creating journal entry: {str(e)}") + raise HTTPException(status_code=500) + + if es_index is not None: + try: + search.new_entry( + es_client, + es_index=es_index, + journal_id=journal_entry.journal_id, + entry_id=journal_entry.id, + title=journal_entry.title, + content=journal_entry.content, + tags=tags, + created_at=journal_entry.created_at, + updated_at=journal_entry.updated_at, + context_type=journal_entry.context_type, + context_id=journal_entry.context_id, + context_url=journal_entry.context_url, + ) + except Exception as e: + logger.warning( + f"Error indexing journal entry ({journal_entry.id}) in journal " + f"({journal_entry.journal_id}) for user ({request.state.user_id})" + ) + + return await journal_representation_parsers[representation]["entry"]( + id=journal_entry.id, + journal_id=journal_entry.journal_id, + title=journal_entry.title, + content=journal_entry.content, + url=str(request.url).rstrip("/"), + tags=tags, + created_at=journal_entry.created_at, + updated_at=journal_entry.updated_at, + context_url=journal_entry.context_url, + context_type=journal_entry.context_type, + context_id=journal_entry.context_id, + locked_by=entry_lock.locked_by, + ) + + +# create_journal_entries_pack_handler operates for api endpoints: +# - create_journal_entries_pack +# - create_collection_entities_pack +async def create_journal_entries_pack_handler( + db_session: Session, + request: Request, + journal_id: UUID, + create_request: Union[JournalEntryListContent, Entity], + es_client: Elasticsearch, + representation: JournalRepresentationTypes, +): + actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalEntryScopes.CREATE}, + ) + journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) + + try: + journal = await actions.find_journal( + db_session=db_session, + journal_spec=journal_spec, + user_group_id_list=request.state.user_group_id_list, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error retrieving journal: {str(e)}") + raise HTTPException(status_code=500) + + try: + response = await actions.create_journal_entries_pack( + db_session, + journal.id, + create_request, + representation=representation, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error creating journal entry: {str(e)}") + raise HTTPException(status_code=500) + + es_index = journal.search_index + if es_index is not None: + e_list = ( + response.entities if JournalRepresentationTypes.ENTITY else response.entries + ) + search.bulk_create_entries(es_client, es_index, journal_id, e_list) + + return response + + +# get_entries_handler operates for api endpoints: +# - get_entries +# - get_entities +async def get_entries_handler( + db_session: Session, + request: Request, + journal_id: UUID, + limit: int, + offset: int, + representation: JournalRepresentationTypes, + context_type: Optional[str] = None, + context_id: Optional[str] = None, + context_url: Optional[str] = None, +): + actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalEntryScopes.READ}, + ) + + journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) + context_spec = ContextSpec( + context_type=context_type, context_id=context_id, context_url=context_url + ) + try: + entries = await actions.get_journal_entries( + db_session, + journal_spec, + None, + user_group_id_list=request.state.user_group_id_list, + context_spec=context_spec, + limit=limit, + offset=offset, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error listing journal entries: {str(e)}") + raise HTTPException(status_code=500) + + url: str = str(request.url).rstrip("/") + parsed_entries = [] + + for e in entries: + tag_objects = await actions.get_journal_entry_tags( + db_session, + journal_spec, + e.id, + user_group_id_list=request.state.user_group_id_list, + ) + + obj = await journal_representation_parsers[representation]["entry"]( + id=e.id, + journal_id=journal_id, + title=e.title, + content=e.content, + url=url, + tags=[tag.tag for tag in tag_objects], + created_at=e.created_at, + updated_at=e.updated_at, + context_url=e.context_url, + context_type=e.context_type, + context_id=e.context_id, + locked_by=None, + ) + parsed_entries.append(obj) + + return await journal_representation_parsers[representation]["entries"]( + parsed_entries + ) + + +# delete_entry_handler operates for api endpoints: +# - delete_entry +# - delete_entity +async def delete_entry_handler( + db_session: Session, + request: Request, + journal_id: UUID, + entry_id: UUID, + es_client: Elasticsearch, + representation: JournalRepresentationTypes, +): + journal = actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalEntryScopes.DELETE}, + ) + + try: + journal_entry = await actions.delete_journal_entry( + db_session, + journal, + entry_id, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404, detail="Journal not found") + except actions.EntryNotFound: + logger.error( + f"Entry not found with ID={entry_id} in journal with ID={journal_id}" + ) + raise HTTPException(status_code=404, detail="Entry not found") + except Exception as e: + logger.error(f"Error listing journal entries: {str(e)}") + raise HTTPException(status_code=500) + + es_index = journal.search_index + if es_index is not None: + try: + search.delete_entry( + es_client, + es_index=es_index, + journal_id=journal_entry.journal_id, + entry_id=journal_entry.id, + ) + except Exception as e: + logger.warning( + f"Error deindexing entry ({journal_entry.id}) from index for journal " + f"({journal_entry.journal_id}) for user ({request.state.user_id})" + ) + + return await journal_representation_parsers[representation]["entry"]( + id=journal_entry.id, + journal_id=journal_entry.journal_id, + title=journal_entry.title, + content=journal_entry.content, + url=str(request.url).rstrip("/"), + tags=[], + created_at=journal_entry.created_at, + updated_at=journal_entry.updated_at, + context_url=journal_entry.context_url, + context_type=journal_entry.context_type, + context_id=journal_entry.context_id, + locked_by=None, + ) + + +# search_journal_handler operates for api endpoints: +# - search_journal +# - search_collection +async def search_journal_handler( + db_session: Session, + request: Request, + journal_id: UUID, + es_client: Elasticsearch, + background_tasks: BackgroundTasks, + q: str, + limit: int, + offset: int, + content: bool, + order: search.ResultsOrder, + representation: JournalRepresentationTypes, + filters: Optional[List[str]] = None, +): + actions.ensure_journal_permission( + db_session, + request.state.user_id, + request.state.user_group_id_list, + journal_id, + {JournalEntryScopes.READ}, + ) + + journal_spec = JournalSpec(id=journal_id, bugout_user_id=request.state.user_id) + try: + journal = await actions.find_journal( + db_session=db_session, + journal_spec=journal_spec, + user_group_id_list=request.state.user_group_id_list, + ) + except actions.JournalNotFound: + logger.error( + f"Journal not found with ID={journal_id} for user={request.state.user_id}" + ) + raise HTTPException(status_code=404) + except Exception as e: + logger.error(f"Error retrieving journal: {str(e)}") + raise HTTPException(status_code=500) + + if filters is None: + filters = [] + search_query = search.normalized_search_query(q, filters, strict_filter_mode=False) + + url: str = str(request.url).rstrip("/") + journal_url = "/".join(url.split("/")[:-1]) + + results: List[Any] = [] + + es_index = journal.search_index + if es_index is None: + total_results, rows = search.search_database( + db_session, journal_id, search_query, limit, offset, order=order + ) + max_score: Optional[float] = 1.0 + + for entry in rows: + entry_url = f"{journal_url}/entries/{str(entry.id)}" + content_url = f"{entry_url}/content" + + result = await journal_representation_parsers[representation][ + "search_entry" + ]( + str(entry.id), + str(journal.id), + entry_url, + content_url, + entry.title, + entry.tags, + str(entry.created_at), + str(entry.updated_at), + 1.0, + entry.context_type, + entry.context_id, + entry.context_url, + entry.content, + ) + results.append(result) + else: + search_results = search.search( + es_client, + es_index=es_index, + journal_id=journal_id, + search_query=search_query, + size=limit, + start=offset, + order=order, + ) + + total_results = search_results.get("total", {}).get("value", 0) + max_score = search_results.get("max_score") + if max_score is None: + max_score = 0.0 + + for hit in search_results.get("hits", []): + entry_url = f"{journal_url}/entries/{hit['_id']}" + content_url = f"{entry_url}/content" + source = hit.get("_source", {}) + source_tags: Union[str, List[str]] = source.get("tag", []) + tags = [] + if source_tags == str(source_tags): + source_tags = cast(str, source_tags) + tags = [source_tags] + else: + source_tags = cast(List[str], source_tags) + tags = source_tags + + result = await journal_representation_parsers[representation][ + "search_entry" + ]( + source.get("entry_id"), + str(journal.id), + entry_url, + content_url, + source.get("title", ""), + tags, + datetime.fromtimestamp(source.get("created_at")).isoformat(), + datetime.fromtimestamp(source.get("updated_at")).isoformat(), + hit.get("_score"), + source.get("context_type"), + source.get("context_id"), + source.get("context_url"), + source.get("content", "") if content is True else None, + ) + results.append(result) + + next_offset: Optional[int] = None + if offset + limit < total_results: + next_offset = offset + limit + + response = await journal_representation_parsers[representation]["search_entries"]( + total_results, offset, max_score, next_offset, results + ) + + bugout_client_id = actions.bugout_client_id_from_request(request) + background_tasks.add_task( + actions.store_search_results, + search_url=url, + journal_id=journal_id, + bugout_user_id=request.state.user_id, + bugout_client_id=bugout_client_id, + q=q, + filters=filters, + limit=limit, + offset=offset, + response=response, + ) + + return response diff --git a/spire/journal/representations.py b/spire/journal/representations.py index 3ad7dd5..489ccdb 100644 --- a/spire/journal/representations.py +++ b/spire/journal/representations.py @@ -7,7 +7,7 @@ import json import logging from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, cast, Set +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, cast from uuid import UUID from web3 import Web3 @@ -148,7 +148,7 @@ async def parse_journals_model(journals: List[JournalResponse]) -> ListJournalsR return ListJournalsResponse(journals=journals) -async def parse_journal_model_entity( +async def parse_journal_model_collection( journal: Journal, holder_ids: Set[str] ) -> EntityCollectionResponse: return EntityCollectionResponse( @@ -161,7 +161,7 @@ async def parse_journal_model_entity( ) -async def parse_journals_model_entity( +async def parse_journals_model_collection( journals: List[EntityCollectionResponse], ) -> EntityCollectionsResponse: return EntityCollectionsResponse(collections=journals) @@ -206,7 +206,7 @@ async def parse_entries_model( @enforce_same_args -async def parse_entry_model_entity( +async def parse_entry_model_collection( id: UUID, journal_id: UUID, title: Optional[str] = None, @@ -244,7 +244,7 @@ async def parse_entry_model_entity( ) -async def parse_entries_model_entity( +async def parse_entries_model_collection( entries: List[EntityResponse], ) -> EntitiesResponse: return EntitiesResponse(entities=entries) @@ -297,7 +297,7 @@ async def parse_search_entries_model( ) -async def parse_search_entry_model_entity( +async def parse_search_entry_model_collection( entry_id: str, collection_id: str, entry_url: str, @@ -327,7 +327,7 @@ async def parse_search_entry_model_entity( ) -async def parse_search_entries_model_entity( +async def parse_search_entries_model_collection( total_results: int, offset: int, max_score: float, @@ -354,12 +354,12 @@ async def parse_search_entries_model_entity( "search_entry": parse_search_entry_model, "search_entries": parse_search_entries_model, }, - JournalRepresentationTypes.ENTITY: { - "journal": parse_journal_model_entity, - "journals": parse_journals_model_entity, - "entry": parse_entry_model_entity, - "entries": parse_entries_model_entity, - "search_entry": parse_search_entry_model_entity, - "search_entries": parse_search_entries_model_entity, + JournalRepresentationTypes.COLLECTION: { + "journal": parse_journal_model_collection, + "journals": parse_journals_model_collection, + "entry": parse_entry_model_collection, + "entries": parse_entries_model_collection, + "search_entry": parse_search_entry_model_collection, + "search_entries": parse_search_entries_model_collection, }, } diff --git a/spire/journal/version.py b/spire/journal/version.py index 58e4dfb..ee02f90 100644 --- a/spire/journal/version.py +++ b/spire/journal/version.py @@ -1,5 +1,6 @@ """ -Journals module of Spire library and API version. +Journal and collection modules of Spire library and API version. """ SPIRE_JOURNALS_VERSION = "0.1.2" +SPIRE_COLLECTIONS_VERSION = "0.0.7"