Skip to content

Commit

Permalink
GH-44334: [C++] Fix S3 error handling in ObjectOutputStream (#44335)
Browse files Browse the repository at this point in the history
### Rationale for this change

See [#GH-44334](#44334). Errors from the AWS SDK are not correctly propagated onto the user of the `ObjectOutputStream`, not indicating an error even though there was one in some cases.

### What changes are included in this PR?

- Directly pass the outcome of the AWS SDK to `HandleUploadUsingSingleRequestOutcome` aswell as `HandleUploadPartOutcome` instead of wrapping it in a arrow `Result` class which has been constructed implictily, always indicating success.
- Adjust cleanup handling in `Close` so that the output stream is closed if there was an error in any of the called methods. Otherwise, destructing the output stream in debug builds fails as we abort if `Close()` returns something else than `Status::OK()`. See the [code pointer here](https://github.com/apache/arrow/blob/64891d1d176dd45f3fae574e1bcfac6fee197e5f/cpp/src/arrow/io/interfaces.cc#L293).

### Are these changes tested?

- Added assertions for catching exceptions on `Close()` in case `delayed_open` is enabled.

### Are there any user-facing changes?

No.
* GitHub Issue: #44334

Authored-by: Oliver Layer <o.layer@celonis.de>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
OliLay authored Oct 8, 2024
1 parent 0707c04 commit c50c4fa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
48 changes: 24 additions & 24 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1784,15 +1784,23 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

Status CleanupIfFailed(Status status) {
if (!status.ok()) {
RETURN_NOT_OK(CleanupAfterClose());
return status;
}
return Status::OK();
}

Status Close() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());
RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose()));

RETURN_NOT_OK(Flush());
RETURN_NOT_OK(CleanupIfFailed(Flush()));

if (IsMultipartCreated()) {
RETURN_NOT_OK(FinishPartUploadAfterFlush());
RETURN_NOT_OK(CleanupIfFailed(FinishPartUploadAfterFlush()));
}

return CleanupAfterClose();
Expand All @@ -1801,12 +1809,12 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());
RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose()));

// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self = Self()]() {
if (self->IsMultipartCreated()) {
RETURN_NOT_OK(self->FinishPartUploadAfterFlush());
RETURN_NOT_OK(self->CleanupIfFailed(self->FinishPartUploadAfterFlush()));
}
return self->CleanupAfterClose();
});
Expand Down Expand Up @@ -2021,7 +2029,7 @@ class ObjectOutputStream final : public io::OutputStream {
std::shared_ptr<UploadState> state,
int32_t part_number,
Aws::S3::Model::PutObjectOutcome outcome) {
HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult());
HandleUploadUsingSingleRequestOutcome(state, request, outcome);
return Status::OK();
};

Expand Down Expand Up @@ -2072,7 +2080,7 @@ class ObjectOutputStream final : public io::OutputStream {
std::shared_ptr<UploadState> state,
int32_t part_number,
Aws::S3::Model::UploadPartOutcome outcome) {
HandleUploadPartOutcome(state, part_number, request, outcome.GetResult());
HandleUploadPartOutcome(state, part_number, request, outcome);
return Status::OK();
};

Expand All @@ -2083,16 +2091,12 @@ class ObjectOutputStream final : public io::OutputStream {

static void HandleUploadUsingSingleRequestOutcome(
const std::shared_ptr<UploadState>& state, const S3Model::PutObjectRequest& req,
const Result<S3Model::PutObjectOutcome>& result) {
const S3Model::PutObjectOutcome& outcome) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
} else {
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
state->status &= UploadUsingSingleRequestError(req, outcome);
}
if (!outcome.IsSuccess()) {
state->status &= UploadUsingSingleRequestError(req, outcome);
}

// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
auto fut = state->pending_uploads_completed;
Expand All @@ -2103,18 +2107,14 @@ class ObjectOutputStream final : public io::OutputStream {
static void HandleUploadPartOutcome(const std::shared_ptr<UploadState>& state,
int part_number,
const S3Model::UploadPartRequest& req,
const Result<S3Model::UploadPartOutcome>& result) {
const S3Model::UploadPartOutcome& outcome) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
if (!outcome.IsSuccess()) {
state->status &= UploadPartError(req, outcome);
} else {
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
state->status &= UploadPartError(req, outcome);
} else {
AddCompletedPart(state, part_number, outcome.GetResult());
}
AddCompletedPart(state, part_number, outcome.GetResult());
}

// Notify completion
if (--state->uploads_in_progress == 0) {
// GH-41862: avoid potential deadlock if the Future's callback is called
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,10 @@ class TestS3FS : public S3TestMixin {
void TestOpenOutputStream(bool allow_delayed_open) {
std::shared_ptr<io::OutputStream> stream;

if (!allow_delayed_open) {
if (allow_delayed_open) {
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("nonexistent-bucket/somefile"));
ASSERT_RAISES(IOError, stream->Close());
} else {
// Nonexistent
ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile"));
}
Expand Down

0 comments on commit c50c4fa

Please sign in to comment.