Skip to content

Commit

Permalink
Support batch inserting album read models
Browse files Browse the repository at this point in the history
  • Loading branch information
shedrachokonofua committed Jun 16, 2024
1 parent 89f3c0e commit 1a9600e
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 251 deletions.
30 changes: 19 additions & 11 deletions core/src/albums/album_event_subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
GroupingStrategy,
},
},
group_event_handler,
helpers::priority::Priority,
parser::parsed_file_data::{ParsedArtistReference, ParsedCredit, ParsedFileData, ParsedTrack},
};
Expand Down Expand Up @@ -48,19 +49,26 @@ impl From<&ParsedCredit> for AlbumReadModelCredit {
}

async fn update_album_read_models(
event_data: EventData,
event_data: Vec<EventData>,
app_context: Arc<ApplicationContext>,
_: Arc<EventSubscriberInteractor>,
) -> Result<()> {
if let Event::FileParsed {
file_id: _,
file_name,
data: ParsedFileData::Album(parsed_album),
} = &event_data.payload.event
{
let album_read_model = AlbumReadModel::from_parsed_album(file_name, parsed_album.clone());
app_context.album_interactor.put(album_read_model).await?;
let albums = event_data
.into_iter()
.filter_map(|event_data| match event_data.payload.event {
Event::FileParsed {
file_id: _,
file_name,
data: ParsedFileData::Album(parsed_album),
} => Some(AlbumReadModel::from_parsed_album(&file_name, parsed_album)),
_ => None,
})
.collect::<Vec<_>>();

if !albums.is_empty() {
app_context.album_interactor.put_many(albums).await?;
}

Ok(())
}

Expand Down Expand Up @@ -148,7 +156,7 @@ pub fn build_album_event_subscribers(
EventSubscriberBuilder::default()
.id("update_album_read_models")
.topic(Topic::Parser)
.batch_size(250)
.batch_size(500)
.app_context(Arc::clone(&app_context))
.grouping_strategy(GroupingStrategy::GroupByKey(Arc::new(|row| {
match &row.payload.event {
Expand All @@ -159,7 +167,7 @@ pub fn build_album_event_subscribers(
_ => "".to_string(),
}
})))
.handler(event_handler!(update_album_read_models))
.handler(group_event_handler!(update_album_read_models))
.build()?,
EventSubscriberBuilder::default()
.id("delete_album_read_models")
Expand Down
55 changes: 38 additions & 17 deletions core/src/albums/album_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,33 +166,54 @@ impl AlbumInteractor {
Ok(())
}

#[instrument(skip(self), name = "AlbumInteractor::put")]
pub async fn put(&self, album: AlbumReadModel) -> Result<()> {
let file_name = album.file_name.clone();
self.album_repository.put(album.clone()).await?;
self.album_search_index.put(album.clone()).await?;
if let Err(err) = self.process_duplicates(&album).await {
error!(
"Failed to process duplicates for {}: {}",
file_name.to_string(),
err
);
#[instrument(skip_all, name = "AlbumInteractor::put_many", fields(count = albums.len()))]
pub async fn put_many(&self, albums: Vec<AlbumReadModel>) -> Result<()> {
let album_file_names = albums
.iter()
.map(|album| album.file_name.clone())
.collect::<Vec<_>>();
self.album_repository.put_many(albums.clone()).await?;
for album in albums.iter() {
if let Err(err) = self.album_search_index.put(album.clone()).await {
error!(
"Failed to put album into search index {}: {}",
album.file_name.to_string(),
err
);
}
if let Err(err) = self.process_duplicates(album).await {
error!(
"Failed to process duplicates for {}: {}",
album.file_name.to_string(),
err
);
}
}
self
.event_publisher
.publish(
.publish_many(
Topic::Album,
EventPayloadBuilder::default()
.key(file_name.clone())
.event(Event::AlbumSaved {
file_name: file_name.clone(),
album_file_names
.into_iter()
.map(|file_name| {
Ok(
EventPayloadBuilder::default()
.key(file_name.clone())
.event(Event::AlbumSaved { file_name })
.build()?,
)
})
.build()?,
.collect::<Result<Vec<_>>>()?,
)
.await?;
Ok(())
}

#[instrument(skip(self), name = "AlbumInteractor::put")]
pub async fn put(&self, album: AlbumReadModel) -> Result<()> {
self.put_many(vec![album]).await
}

async fn process_duplicates_by_file_name(&self, file_name: &FileName) -> Result<()> {
let album = self.album_repository.get(file_name).await?;
self.process_duplicates(&album).await
Expand Down
Loading

0 comments on commit 1a9600e

Please sign in to comment.