Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32276: [C++][FlightRPC] Align RecordBatch buffers given to IPC #44279

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

EnricoMi
Copy link

@EnricoMi EnricoMi commented Oct 1, 2024

Rationale for this change

Data retrieved via IPC is expected to provide memory-aligned arrays, but data retrieved via C++ Flight client is mis-aligned. Datafusion (Rust), which requires proper alignment, cannot handle such data: #43552.

What changes are included in this PR?

This aligns RecordBatch array buffers decoded by IPC if mis-aligned according to the data type byte width.
Implementation mirrors that of align_buffers in arrow-rs (apache/arrow-rs#4681).

Are these changes tested?

Configuration flag tested in unit test.
Manually end-to-end tested that memory alignment fixes issue with reproduction code provided in #43552.

Are there any user-facing changes?

Memory alignment is checked and fixed by default. This is configurable via IpcReadOptions.ensure_memory_alignment.

@EnricoMi EnricoMi changed the title GH-32276: [C++][Flight] Align RecordBatch buffers retrieved via IPC GH-32276: [C++][FlightRPC] Align RecordBatch buffers retrieved via IPC Oct 1, 2024
@github-actions github-actions bot added the awaiting review Awaiting review label Oct 1, 2024
@EnricoMi EnricoMi changed the title GH-32276: [C++][FlightRPC] Align RecordBatch buffers retrieved via IPC GH-32276: [C++][FlightRPC] Align RecordBatch buffers given to IPC Oct 1, 2024
@EnricoMi
Copy link
Author

EnricoMi commented Oct 2, 2024

@pitrou do you think this fix is viable?

@rok rok requested a review from lidavidm October 2, 2024 10:23
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up. This seems reasonable to me at a quick glance.

@@ -23,6 +23,7 @@
#include <memory>
#include <utility>
#include <vector>
#include <arrow/util/range.h>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put this include with the rest of the Arrow includes (and use quotes to be consistent)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
// align children data recursively
for (unsigned int i=0; i<child_data.size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could iterate with for (auto& child : child_data) and avoid the explicit index?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better!

@@ -548,11 +548,16 @@ def test_read_options():
options = pa.ipc.IpcReadOptions()
assert options.use_threads is True
assert options.ensure_native_endian is True
assert options.ensure_memory_alignment is True
assert options.ens is True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this come from?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Oct 6, 2024
@EnricoMi
Copy link
Author

EnricoMi commented Oct 7, 2024

While attempting to write some unit tests I found there is util::EnsureAlignment:

Result<std::shared_ptr<ArrayData>> EnsureAlignment(std::shared_ptr<ArrayData> array_data,
int64_t alignment,
MemoryPool* memory_pool) {
if (!CheckAlignment(*array_data, alignment)) {
std::vector<std::shared_ptr<Buffer>> buffers = array_data->buffers;
Type::type type_id = GetTypeForBuffers(*array_data);
for (size_t i = 0; i < buffers.size(); ++i) {
if (buffers[i]) {
int64_t expected_alignment = alignment;
if (alignment == kValueAlignment) {
expected_alignment =
RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
}
ARROW_ASSIGN_OR_RAISE(
buffers[i],
EnsureAlignment(std::move(buffers[i]), expected_alignment, memory_pool));
}
}
for (auto& it : array_data->child_data) {
ARROW_ASSIGN_OR_RAISE(it, EnsureAlignment(std::move(it), alignment, memory_pool));
}
if (array_data->type->id() == Type::DICTIONARY) {
ARROW_ASSIGN_OR_RAISE(
array_data->dictionary,
EnsureAlignment(std::move(array_data->dictionary), alignment, memory_pool));
}
auto new_array_data = ArrayData::Make(
array_data->type, array_data->length, std::move(buffers), array_data->child_data,
array_data->dictionary, array_data->GetNullCount(), array_data->offset);
return new_array_data;
} else {
return array_data;
}
}

I will try to reuse that method rather than re-implementing it. There is also test infrastructure for misaligned array data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants