Skip to content

Commit

Permalink
[blob] Add support for removing holders by tags
Browse files Browse the repository at this point in the history
Summary:
In D13646, a request format was extended to support removing holders by tags. Now server-side support is added.

Depends on D13646, D13618

Test Plan: Played with the `DELETE /holders` endpoint in Postman. Added some prefixed holders and verified that they were removed from DDB

Reviewers: kamil, varun, will

Reviewed By: kamil

Subscribers: ashoat, tomek

Differential Revision: https://phab.comm.dev/D13647
  • Loading branch information
barthap committed Oct 14, 2024
1 parent 6c86404 commit 20d0368
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
36 changes: 28 additions & 8 deletions services/blob/src/http/handlers/holders.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::error::{ErrorBadRequest, ErrorNotImplemented};
use actix_web::error::{ErrorBadRequest, ErrorForbidden};
use actix_web::{web, HttpResponse};
use comm_lib::auth::AuthorizationCredential;
use comm_lib::blob::types::http::{
AssignHoldersRequest, AssignHoldersResponse, BlobInfo,
HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse,
Expand Down Expand Up @@ -62,17 +63,24 @@ pub async fn assign_holders_handler(
pub async fn remove_holders_handler(
service: web::Data<BlobService>,
payload: web::Json<RemoveHoldersRequest>,
requesting_identity: AuthorizationCredential,
) -> actix_web::Result<HttpResponse> {
let RemoveHoldersRequest::Items {
requests,
instant_delete,
} = payload.into_inner()
else {
return Err(ErrorNotImplemented("not implemented"));
let (requests, instant_delete) = match payload.into_inner() {
RemoveHoldersRequest::Items {
requests,
instant_delete,
} => (requests, instant_delete),
RemoveHoldersRequest::ByIndexedTags { tags } => {
verify_caller_is_service(&requesting_identity)?;

tracing::debug!("Querying holders for {} tags", tags.len());
let requests = service.query_holders_by_tags(tags).await?;
(requests, false)
}
};
info!(
instant_delete,
"Remove request for {} holders.",
"Requested removal of {} holders.",
requests.len()
);
validate_request(&requests)?;
Expand Down Expand Up @@ -114,3 +122,15 @@ fn validate_request(items: &[BlobInfo]) -> actix_web::Result<()> {

Ok(())
}

/// Returns HTTP 403 if caller is not a Comm service
fn verify_caller_is_service(
requesting_identity: &AuthorizationCredential,
) -> actix_web::Result<()> {
match requesting_identity {
AuthorizationCredential::ServicesToken(_) => Ok(()),
_ => Err(ErrorForbidden(
"This endpoint can only be called by other services",
)),
}
}
39 changes: 38 additions & 1 deletion services/blob/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use comm_lib::tools::BoxedError;
use once_cell::sync::Lazy;
use tokio_stream::StreamExt;
use tonic::codegen::futures_core::Stream;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace, warn, Instrument};

use crate::config::{CONFIG, OFFENSIVE_INVITE_LINKS};
use crate::constants::{
Expand Down Expand Up @@ -47,8 +47,11 @@ pub enum BlobServiceError {
InvalidState,
DB(DBError),
S3(S3Error),
#[from(ignore)]
InputError(#[error(ignore)] BoxedError),
InviteLinkError(InviteLinkError),
#[from(ignore)]
UnexpectedError(#[error(ignore)] BoxedError),
}

type BlobServiceResult<T> = Result<T, BlobServiceError>;
Expand Down Expand Up @@ -307,6 +310,40 @@ impl BlobService {
Ok(results)
}

pub async fn query_holders_by_tags(
&self,
tags: Vec<String>,
) -> BlobServiceResult<Vec<BlobInfo>> {
let mut tasks = tokio::task::JoinSet::new();

for tag in tags {
let db = self.db.clone();
let task = async move { db.query_indexed_holders(tag).await };
tasks.spawn(task.in_current_span());
}

let mut results = Vec::new();
while let Some(result) = tasks.join_next().await {
match result {
Ok(Ok(items)) => results.extend(items),
Ok(Err(db_error)) => {
tasks.abort_all();
return Err(db_error.into());
}
Err(join_error) => {
error!(
errorType = error_types::OTHER_ERROR,
"Holder query task failed: {:?}", join_error
);
tasks.abort_all();
return Err(BlobServiceError::UnexpectedError(Box::new(join_error)));
}
}
}

Ok(results)
}

pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"
Expand Down

0 comments on commit 20d0368

Please sign in to comment.