From 20d0368eeb539aa2600e73e766bfaaaee343a171 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Klocek?= Date: Sat, 5 Oct 2024 11:22:58 +0200 Subject: [PATCH] [blob] Add support for removing holders by tags Summary: In D13646, a request format was extended to support removing holders by tags. Now server-side support is added. Depends on D13646, D13618 Test Plan: Played with the `DELETE /holders` endpoint in Postman. Added some prefixed holders and verified that they were removed from DDB Reviewers: kamil, varun, will Reviewed By: kamil Subscribers: ashoat, tomek Differential Revision: https://phab.comm.dev/D13647 --- services/blob/src/http/handlers/holders.rs | 36 +++++++++++++++----- services/blob/src/service.rs | 39 +++++++++++++++++++++- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs index 7ea495d943..7b3a1944cb 100644 --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -1,5 +1,6 @@ -use actix_web::error::{ErrorBadRequest, ErrorNotImplemented}; +use actix_web::error::{ErrorBadRequest, ErrorForbidden}; use actix_web::{web, HttpResponse}; +use comm_lib::auth::AuthorizationCredential; use comm_lib::blob::types::http::{ AssignHoldersRequest, AssignHoldersResponse, BlobInfo, HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse, @@ -62,17 +63,24 @@ pub async fn assign_holders_handler( pub async fn remove_holders_handler( service: web::Data, payload: web::Json, + requesting_identity: AuthorizationCredential, ) -> actix_web::Result { - let RemoveHoldersRequest::Items { - requests, - instant_delete, - } = payload.into_inner() - else { - return Err(ErrorNotImplemented("not implemented")); + let (requests, instant_delete) = match payload.into_inner() { + RemoveHoldersRequest::Items { + requests, + instant_delete, + } => (requests, instant_delete), + RemoveHoldersRequest::ByIndexedTags { tags } => { + verify_caller_is_service(&requesting_identity)?; + + tracing::debug!("Querying holders for {} tags", tags.len()); + let requests = service.query_holders_by_tags(tags).await?; + (requests, false) + } }; info!( instant_delete, - "Remove request for {} holders.", + "Requested removal of {} holders.", requests.len() ); validate_request(&requests)?; @@ -114,3 +122,15 @@ fn validate_request(items: &[BlobInfo]) -> actix_web::Result<()> { Ok(()) } + +/// Returns HTTP 403 if caller is not a Comm service +fn verify_caller_is_service( + requesting_identity: &AuthorizationCredential, +) -> actix_web::Result<()> { + match requesting_identity { + AuthorizationCredential::ServicesToken(_) => Ok(()), + _ => Err(ErrorForbidden( + "This endpoint can only be called by other services", + )), + } +} diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index 0b5524272b..37965e7abd 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -13,7 +13,7 @@ use comm_lib::tools::BoxedError; use once_cell::sync::Lazy; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Instrument}; use crate::config::{CONFIG, OFFENSIVE_INVITE_LINKS}; use crate::constants::{ @@ -47,8 +47,11 @@ pub enum BlobServiceError { InvalidState, DB(DBError), S3(S3Error), + #[from(ignore)] InputError(#[error(ignore)] BoxedError), InviteLinkError(InviteLinkError), + #[from(ignore)] + UnexpectedError(#[error(ignore)] BoxedError), } type BlobServiceResult = Result; @@ -307,6 +310,40 @@ impl BlobService { Ok(results) } + pub async fn query_holders_by_tags( + &self, + tags: Vec, + ) -> BlobServiceResult> { + let mut tasks = tokio::task::JoinSet::new(); + + for tag in tags { + let db = self.db.clone(); + let task = async move { db.query_indexed_holders(tag).await }; + tasks.spawn(task.in_current_span()); + } + + let mut results = Vec::new(); + while let Some(result) = tasks.join_next().await { + match result { + Ok(Ok(items)) => results.extend(items), + Ok(Err(db_error)) => { + tasks.abort_all(); + return Err(db_error.into()); + } + Err(join_error) => { + error!( + errorType = error_types::OTHER_ERROR, + "Holder query task failed: {:?}", join_error + ); + tasks.abort_all(); + return Err(BlobServiceError::UnexpectedError(Box::new(join_error))); + } + } + } + + Ok(results) + } + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked"