Skip to content

Commit

Permalink
Merge branch 'main' into display-thumbnails
Browse files Browse the repository at this point in the history
* main:
  πŸ„ Mario Pipelines (#133)
  πŸ¦‡ Batch status (#134)
  • Loading branch information
mrharpo committed Aug 29, 2023
2 parents b14a5e7 + 4ba8c93 commit db99ab0
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 129 deletions.
7 changes: 2 additions & 5 deletions chowda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from fastapi.staticfiles import StaticFiles
from starlette.middleware import Middleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import RedirectResponse
from starlette.responses import HTMLResponse
from starlette.routing import Route

from chowda._version import __version__
Expand All @@ -18,7 +18,6 @@
from chowda.models import (
Batch,
ClamsApp,
ClamsEvent,
Collection,
MediaFile,
Pipeline,
Expand All @@ -29,7 +28,6 @@
from chowda.views import (
BatchView,
ClamsAppView,
ClamsEventView,
CollectionView,
DashboardView,
MediaFileView,
Expand All @@ -44,7 +42,7 @@
routes=[
Route(
'/',
lambda r: RedirectResponse('/admin'),
lambda r: HTMLResponse('<h1>Chowda!</h1><br><a href="/admin">Login</a>'),
)
],
middleware=[Middleware(SessionMiddleware, secret_key=SECRET)],
Expand Down Expand Up @@ -73,7 +71,6 @@
admin.add_view(BatchView(Batch, icon='fa fa-folder', label='Batches'))
admin.add_view(ClamsAppView(ClamsApp, icon='fa fa-box'))
admin.add_view(PipelineView(Pipeline, icon='fa fa-boxes-stacked'))
admin.add_view(ClamsEventView(ClamsEvent, icon='fa fa-file-lines'))
admin.add_view(UserView(User, icon='fa fa-users'))


Expand Down
86 changes: 86 additions & 0 deletions chowda/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,89 @@ class SonyCiAssetThumbnail(BaseField):

async def parse_obj(self, request: Request, obj: Any) -> Any:
return obj.thumbnails_by_type['standard']


class BatchMediaFilesDisplayField(BaseField):
name: str = 'batch_media_files'
display_template: str = 'displays/batch_media_files.html'
label: str = 'Media Files'
exclude_from_edit: bool = True
exclude_from_create: bool = True
exclude_from_list: bool = True
read_only: bool = True

async def parse_obj(self, request: Request, obj: Any) -> Any:
media_file_rows = []

for media_file in obj.media_files:
media_file_row = {'guid': media_file.guid}

# Lookup the real Metaflow Run using the last Run ID
run = media_file.last_metaflow_run_for_batch(batch_id=obj.id)
if run:
media_file_row['run_id'] = run.id
media_file_row[
'run_link'
] = f'https://mario.wgbh-mla.org/{run.pathspec}'
media_file_row['finished_at'] = run.source.finished_at or ''
media_file_row['successful'] = run.source.successful
else:
media_file_row['run_id'] = None
media_file_row['run_link'] = None
media_file_row['finished_at'] = None
media_file_row['successful'] = None

media_file_rows.append(media_file_row)
return media_file_rows


@dataclass
class BatchPercentCompleted(BaseField):
"""A field that displays the percentage of MediaFiles
in a batch that have finished"""

name: str = 'batch_percent_completed'
exclude_from_edit: bool = True
exclude_from_create: bool = True
label: str = 'Completed %'

async def parse_obj(self, request: Request, obj: Any) -> Any:
runs = [
last_run.source
for last_run in [
media_file.last_metaflow_run_for_batch(batch_id=obj.id)
for media_file in obj.media_files
]
if last_run
]

finished_runs = [run for run in runs if run.finished_at]
percent_completed = len(finished_runs) / len(obj.media_files)

return f'{percent_completed:.1%}'


@dataclass
class BatchPercentSuccessful(BaseField):
"""The percentage of MediaFiles in a batch that have finished successfully"""

name: str = 'batch_percent_successful'
label: str = 'Successful %'
exclude_from_create: bool = True
exclude_from_edit: bool = True

async def parse_obj(self, request: Request, obj: Any) -> Any:
runs = [
last_run.source
for last_run in [
media_file.last_metaflow_run_for_batch(batch_id=obj.id)
for media_file in obj.media_files
]
if last_run
]

successful_runs = [run for run in runs if run.successful]

percent_successful = len(successful_runs) / len(obj.media_files)

return f'{percent_successful:.1%}'
49 changes: 31 additions & 18 deletions chowda/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import enum
from typing import Any, Dict, List, Optional

from metaflow import Run, namespace
from pydantic import AnyHttpUrl, EmailStr, stricturl
from sqlalchemy import JSON, Column, Enum
from sqlalchemy.dialects import postgresql
Expand Down Expand Up @@ -95,7 +96,7 @@ class MediaFile(SQLModel, table=True):
assets: List of SonyCiAssets
collections: List of Collections
batches: List of Batches
clams_events: List of ClamsEvents
metaflow_runs: List of MetaflowRuns
"""

__tablename__ = 'media_files'
Expand All @@ -110,7 +111,20 @@ class MediaFile(SQLModel, table=True):
batches: List['Batch'] = Relationship(
back_populates='media_files', link_model=MediaFileBatchLink
)
clams_events: List['ClamsEvent'] = Relationship(back_populates='media_file')
metaflow_runs: List['MetaflowRun'] = Relationship(back_populates='media_file')

def metaflow_runs_for_batch(self, batch_id: int):
return [
metaflow_run
for metaflow_run in self.metaflow_runs
if metaflow_run.batch_id == batch_id
]

def last_metaflow_run_for_batch(self, batch_id: int):
# TODO: is getting the last one sufficient, or do we need to add sortable
# timestamps?
runs = self.metaflow_runs_for_batch(batch_id=batch_id)
return runs[-1] if len(runs) > 0 else None

async def __admin_repr__(self, request: Request):
return self.guid
Expand Down Expand Up @@ -149,6 +163,9 @@ class SonyCiAsset(SQLModel, table=True):
def thumbnails_by_type(self):
return {thumbnail['type']: thumbnail for thumbnail in self.thumbnails}

async def __admin_repr__(self, request: Request):
return self.name


class Collection(SQLModel, table=True):
__tablename__ = 'collections'
Expand Down Expand Up @@ -176,7 +193,7 @@ class Batch(SQLModel, table=True):
media_files: List[MediaFile] = Relationship(
back_populates='batches', link_model=MediaFileBatchLink
)
clams_events: List['ClamsEvent'] = Relationship(back_populates='batch')
metaflow_runs: List['MetaflowRun'] = Relationship(back_populates='batch')

async def __admin_repr__(self, request: Request):
return f'{self.name or self.id}'
Expand All @@ -203,7 +220,6 @@ class ClamsApp(SQLModel, table=True):
pipelines: List['Pipeline'] = Relationship(
back_populates='clams_apps', link_model=ClamsAppPipelineLink
)
clams_events: List['ClamsEvent'] = Relationship(back_populates='clams_app')

async def __admin_repr__(self, request: Request):
return f'{self.name or self.id}'
Expand All @@ -229,20 +245,17 @@ async def __admin_select2_repr__(self, request: Request) -> str:
return f'<span><strong>{self.name or self.id}</span>'


class ClamsEvent(SQLModel, table=True):
__tablename__ = 'clams_events'
id: Optional[int] = Field(primary_key=True, default=None)
status: str
response_json: Dict[str, Any] = Field(sa_column=Column(JSON))
class MetaflowRun(SQLModel, table=True):
__tablename__ = 'metaflow_runs'
id: Optional[str] = Field(primary_key=True, default=None)
pathspec: str
batch_id: Optional[int] = Field(default=None, foreign_key='batches.id')
batch: Optional[Batch] = Relationship(back_populates='clams_events')
clams_app_id: Optional[int] = Field(default=None, foreign_key='clams_apps.id')
clams_app: Optional[ClamsApp] = Relationship(back_populates='clams_events')
batch: Optional[Batch] = Relationship(back_populates='metaflow_runs')
media_file_id: Optional[str] = Field(default=None, foreign_key='media_files.guid')
media_file: Optional[MediaFile] = Relationship(back_populates='clams_events')
media_file: Optional[MediaFile] = Relationship(back_populates='metaflow_runs')

async def __admin_repr__(self, request: Request):
return f'{self.clams_app.name}: {self.status}'

async def __admin_select2_repr__(self, request: Request) -> str:
return f'<span><strong>{self.clams_app.name}:</strong> {self.status}</span>'
@property
def source(self):
# TODO: is setting namespace to None the right way to go here?
namespace(None)
return Run(self.pathspec)
2 changes: 1 addition & 1 deletion chowda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def chunks_of_size(lst, n):

def chunks_striped(lst, n):
"""Yield n number of striped chunks from l."""
for i in range(0, n):
for i in range(n):
yield lst[i::n]


Expand Down
90 changes: 61 additions & 29 deletions chowda/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from metaflow.exception import MetaflowNotFound
from metaflow.integrations import ArgoEvent
from sqlmodel import Session, select
from starlette.datastructures import FormData
from starlette.requests import Request
from starlette.responses import Response
from starlette.templating import Jinja2Templates
Expand All @@ -15,8 +16,14 @@

from chowda.auth.utils import get_user
from chowda.db import engine
from chowda.fields import MediaFileCount, MediaFilesGuidsField, SonyCiAssetThumbnail
from chowda.models import Batch, Collection
from chowda.fields import (
BatchMediaFilesDisplayField,
BatchPercentCompleted,
BatchPercentSuccessful,
MediaFileCount,
MediaFilesGuidsField,
)
from chowda.models import Batch, Collection, MediaFile
from chowda.utils import validate_media_file_guids


Expand Down Expand Up @@ -162,21 +169,15 @@ class BatchView(BaseModelView):
'pipeline',
'description',
MediaFileCount(),
BatchPercentCompleted(),
BatchPercentSuccessful(),
MediaFilesGuidsField(
'media_files',
id='media_file_guids',
label='GUIDs',
exclude_from_detail=True,
),
BaseField(
'media_files',
display_template='displays/batch_media_files.html',
label='Media Files',
exclude_from_edit=True,
exclude_from_create=True,
exclude_from_list=True,
read_only=True,
),
BatchMediaFilesDisplayField(),
]

async def validate(self, request: Request, data: Dict[str, Any]):
Expand All @@ -197,19 +198,27 @@ async def is_action_allowed(self, request: Request, name: str) -> bool:
async def start_batches(self, request: Request, pks: List[Any]) -> str:
"""Starts a Batch by sending a message to the Argo Event Bus"""
try:
for batch_id in pks:
with Session(engine) as db:
with Session(engine) as db:
for batch_id in pks:
batch = db.get(Batch, batch_id)
pipeline = ','.join(
[app.endpoint for app in batch.pipeline.clams_apps]
)
for media_file in batch.media_files:
ArgoEvent(
batch.pipeline.name, payload={'guid': media_file.guid}
'pipeline',
payload={
'batch_id': batch_id,
'guid': media_file.guid,
'pipeline': pipeline,
},
).publish(ignore_errors=False)

except Exception as error:
raise ActionFailed(f'{error!s}') from error

# Display Success message
return f'Started {len(pks)} Batche(s)'
return f'Started {len(pks)} Batch(es)'

@action(
name='duplicate_batches',
Expand Down Expand Up @@ -270,21 +279,54 @@ async def combine_batches(self, request: Request, pks: List[Any]) -> str:


class MediaFileView(BaseModelView):
pk_attr = 'guid'
pk_attr: str = 'guid'
actions: ClassVar[List[str]] = ['create_new_batch']

fields: ClassVar[list[Any]] = [
fields: ClassVar[list[str]] = [
'guid',
'collections',
'batches',
'assets',
'mmif_json',
'clams_events',
]
exclude_fields_from_list: ClassVar[list[str]] = ['mmif_json', 'clams_events']
exclude_fields_from_list: ClassVar[list[str]] = ['mmif_json']

def can_create(self, request: Request) -> bool:
return get_user(request).is_admin

@action(
name='create_new_batch',
text='Create Batch',
confirmation='Create a Batches from these Media Files?',
submit_btn_text='Yasss!',
submit_btn_class='btn-success',
form="""
<form>
<div class="mt-3">
<input type="text" class="form-control" name="batch_name"
placeholder="Batch Name">
<textarea class="form-control" name="batch_description"
placeholder="Batch Description"></textarea>
</div>
</form>
""",
)
async def create_new_batch(self, request: Request, pks: List[Any]) -> str:
with Session(engine) as db:
media_files = db.exec(
select(MediaFile).where(MediaFile.guid.in_(pks))
).all()
data: FormData = await request.form()
batch = Batch(
name=data.get("batch_name"),
description=data.get("batch_description"),
media_files=media_files,
)
db.add(batch)
db.commit()

return f"Batch of {len(pks)} Media Files created"


class UserView(AdminModelView):
fields: ClassVar[list[Any]] = ['first_name', 'last_name', 'email']
Expand All @@ -301,16 +343,6 @@ def is_accessible(self, request: Request) -> bool:
return True


class ClamsEventView(BaseModelView):
fields: ClassVar[list[Any]] = [
'batch',
'media_file',
'clams_app',
'status',
'response_json',
]


class DashboardView(CustomView):
def sync_history(self) -> Dict[str, Any]:
try:
Expand Down
Loading

0 comments on commit db99ab0

Please sign in to comment.