-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Standalone 12/N: Implement and test ZarrV3ArrayWriter #304
Changes from all commits
c7832d7
a811e22
cf89e13
fff86c6
61a53bc
bdd0c51
d23fe1a
3cd2bb3
d099795
cfd412e
4e3b470
da98e57
aa3489e
c3e419b
fdf5b08
cbbc87d
0f28cfc
1ca2a1d
d370e17
caf295b
e9102dc
9522296
d291c53
082dcd0
996d22e
d6ea43e
46eb93d
95973d4
a8b30e1
51f820d
57bc287
46f6cbb
558bbc7
6ae570c
606db8e
e546f7d
2d6aae1
a1b0034
cb5414f
a4868e8
784aae6
eb5b0fa
a25a239
8ce6f8f
c1f5207
c0f1fd1
d272794
82938a3
42a4940
9a6212f
e5f54ec
5076311
2853ad1
25bfb8b
e65f889
b7aeda7
e3e6e3f
44fe333
401231b
d15f431
10a6318
e6ad827
59bea6c
e984c90
3a0b63a
68dcb03
78f9344
3c265da
689f1fe
053e238
4a02046
7fc19e2
87b318a
e76199d
d934260
697ef21
7141919
f342aeb
9c19086
0dadfa6
9873832
6bf9234
38c033f
7a61ded
c35d0c4
c455ad7
6b2e79f
e3ea0d3
0e1482a
1a662c5
a61be1c
073657b
9b53bb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
+1 −1 | acquire-driver-common/tests/integration/abort-while-waiting-for-trigger.cpp |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
#include "macros.hh" | ||
#include "zarrv3.array.writer.hh" | ||
#include "sink.creator.hh" | ||
#include "zarr.common.hh" | ||
|
||
#include <nlohmann/json.hpp> | ||
|
||
#include <algorithm> // std::fill | ||
#include <latch> | ||
#include <stdexcept> | ||
|
||
#ifdef max | ||
#undef max | ||
#endif | ||
|
||
namespace { | ||
std::string | ||
sample_type_to_dtype(ZarrDataType t) | ||
{ | ||
switch (t) { | ||
case ZarrDataType_uint8: | ||
return "uint8"; | ||
case ZarrDataType_uint16: | ||
return "uint16"; | ||
case ZarrDataType_uint32: | ||
return "uint32"; | ||
case ZarrDataType_uint64: | ||
return "uint64"; | ||
case ZarrDataType_int8: | ||
return "int8"; | ||
case ZarrDataType_int16: | ||
return "int16"; | ||
case ZarrDataType_int32: | ||
return "int32"; | ||
case ZarrDataType_int64: | ||
return "int64"; | ||
case ZarrDataType_float32: | ||
return "float32"; | ||
case ZarrDataType_float64: | ||
return "float64"; | ||
default: | ||
throw std::runtime_error("Invalid ZarrDataType: " + | ||
std::to_string(static_cast<int>(t))); | ||
} | ||
} | ||
} // namespace | ||
|
||
zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter( | ||
ArrayWriterConfig&& config, | ||
std::shared_ptr<ThreadPool> thread_pool) | ||
: ZarrV3ArrayWriter(std::move(config), thread_pool, nullptr) | ||
{ | ||
} | ||
|
||
zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter( | ||
ArrayWriterConfig&& config, | ||
std::shared_ptr<ThreadPool> thread_pool, | ||
std::shared_ptr<S3ConnectionPool> s3_connection_pool) | ||
: ArrayWriter(std::move(config), thread_pool, s3_connection_pool) | ||
{ | ||
const auto number_of_shards = config_.dimensions->number_of_shards(); | ||
const auto chunks_per_shard = config_.dimensions->chunks_per_shard(); | ||
|
||
shard_file_offsets_.resize(number_of_shards, 0); | ||
shard_tables_.resize(number_of_shards); | ||
|
||
for (auto& table : shard_tables_) { | ||
table.resize(2 * chunks_per_shard); | ||
std::fill( | ||
table.begin(), table.end(), std::numeric_limits<uint64_t>::max()); | ||
} | ||
} | ||
|
||
bool | ||
zarr::ZarrV3ArrayWriter::flush_impl_() | ||
{ | ||
// create shard files if they don't exist | ||
if (data_sinks_.empty() && !make_data_sinks_()) { | ||
return false; | ||
} | ||
Comment on lines
+77
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've mentioned this before in a more general context, but explicit APIs are always preferable. In this specific instance, it would be better to raise an error and notify the client that sinks need to be created first, rather than creating them implicitly. The rationale is simple: if you return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love the idea of throwing an exception if the sinks aren't created and letting the caller (i.e., another method within the class) catch it, make the decision to create them, and then try to flush again. An empty sink vector is a signal that sinks need to be created, and that decision can be made right as they're needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I consider this approach bad API design, but I'm not going to make the case here. Ultimately, it's up to you as the owner of this codebase. |
||
|
||
const auto n_shards = config_.dimensions->number_of_shards(); | ||
aliddell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
CHECK(data_sinks_.size() == n_shards); | ||
|
||
// get shard indices for each chunk | ||
std::vector<std::vector<size_t>> chunk_in_shards(n_shards); | ||
for (auto i = 0; i < chunk_buffers_.size(); ++i) { | ||
const auto index = config_.dimensions->shard_index_for_chunk(i); | ||
chunk_in_shards.at(index).push_back(i); | ||
} | ||
|
||
// write out chunks to shards | ||
auto write_table = is_finalizing_ || should_rollover_(); | ||
std::latch latch(n_shards); | ||
for (auto i = 0; i < n_shards; ++i) { | ||
const auto& chunks = chunk_in_shards.at(i); | ||
auto& chunk_table = shard_tables_.at(i); | ||
auto* file_offset = &shard_file_offsets_.at(i); | ||
|
||
EXPECT(thread_pool_->push_job([&sink = data_sinks_.at(i), | ||
&chunks, | ||
&chunk_table, | ||
file_offset, | ||
write_table, | ||
&latch, | ||
this](std::string& err) { | ||
bool success = false; | ||
|
||
try { | ||
for (const auto& chunk_idx : chunks) { | ||
auto& chunk = chunk_buffers_.at(chunk_idx); | ||
std::span data{ reinterpret_cast<std::byte*>(chunk.data()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
chunk.size() }; | ||
success = sink->write(*file_offset, data); | ||
if (!success) { | ||
break; | ||
} | ||
|
||
const auto internal_idx = | ||
config_.dimensions->shard_internal_index(chunk_idx); | ||
chunk_table.at(2 * internal_idx) = *file_offset; | ||
chunk_table.at(2 * internal_idx + 1) = chunk.size(); | ||
|
||
*file_offset += chunk.size(); | ||
} | ||
|
||
if (success && write_table) { | ||
auto* table = | ||
reinterpret_cast<std::byte*>(chunk_table.data()); | ||
std::span data{ table, | ||
chunk_table.size() * sizeof(uint64_t) }; | ||
success = sink->write(*file_offset, data); | ||
} | ||
} catch (const std::exception& exc) { | ||
err = "Failed to write chunk: " + std::string(exc.what()); | ||
} | ||
|
||
latch.count_down(); | ||
return success; | ||
}), | ||
"Failed to push job to thread pool"); | ||
} | ||
|
||
// wait for all threads to finish | ||
latch.wait(); | ||
|
||
// reset shard tables and file offsets | ||
if (write_table) { | ||
for (auto& table : shard_tables_) { | ||
std::fill( | ||
table.begin(), table.end(), std::numeric_limits<uint64_t>::max()); | ||
} | ||
|
||
std::fill(shard_file_offsets_.begin(), shard_file_offsets_.end(), 0); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
bool | ||
zarr::ZarrV3ArrayWriter::write_array_metadata_() | ||
{ | ||
if (!make_metadata_sink_()) { | ||
return false; | ||
} | ||
|
||
using json = nlohmann::json; | ||
|
||
std::vector<size_t> array_shape, chunk_shape, shard_shape; | ||
|
||
size_t append_size = frames_written_; | ||
for (auto i = config_.dimensions->ndims() - 3; i > 0; --i) { | ||
const auto& dim = config_.dimensions->at(i); | ||
const auto& array_size_px = dim.array_size_px; | ||
CHECK(array_size_px); | ||
append_size = (append_size + array_size_px - 1) / array_size_px; | ||
} | ||
array_shape.push_back(append_size); | ||
|
||
const auto& final_dim = config_.dimensions->final_dim(); | ||
chunk_shape.push_back(final_dim.chunk_size_px); | ||
shard_shape.push_back(final_dim.shard_size_chunks); | ||
for (auto i = 1; i < config_.dimensions->ndims(); ++i) { | ||
const auto& dim = config_.dimensions->at(i); | ||
array_shape.push_back(dim.array_size_px); | ||
chunk_shape.push_back(dim.chunk_size_px); | ||
shard_shape.push_back(dim.shard_size_chunks); | ||
} | ||
|
||
json metadata; | ||
metadata["attributes"] = json::object(); | ||
metadata["chunk_grid"] = json::object({ | ||
{ "chunk_shape", chunk_shape }, | ||
{ "separator", "/" }, | ||
{ "type", "regular" }, | ||
}); | ||
|
||
metadata["chunk_memory_layout"] = "C"; | ||
metadata["data_type"] = sample_type_to_dtype(config_.dtype); | ||
metadata["extensions"] = json::array(); | ||
metadata["fill_value"] = 0; | ||
metadata["shape"] = array_shape; | ||
|
||
if (config_.compression_params) { | ||
const auto params = *config_.compression_params; | ||
metadata["compressor"] = json::object({ | ||
{ "codec", "https://purl.org/zarr/spec/codec/blosc/1.0" }, | ||
{ "configuration", | ||
json::object({ | ||
{ "blocksize", 0 }, | ||
{ "clevel", params.clevel }, | ||
{ "cname", params.codec_id }, | ||
{ "shuffle", params.shuffle }, | ||
}) }, | ||
}); | ||
} else { | ||
metadata["compressor"] = nullptr; | ||
} | ||
|
||
// sharding storage transformer | ||
// TODO (aliddell): | ||
// https://github.com/zarr-developers/zarr-python/issues/877 | ||
metadata["storage_transformers"] = json::array(); | ||
metadata["storage_transformers"][0] = json::object({ | ||
{ "type", "indexed" }, | ||
{ "extension", | ||
"https://purl.org/zarr/spec/storage_transformers/sharding/1.0" }, | ||
{ "configuration", | ||
json::object({ | ||
{ "chunks_per_shard", shard_shape }, | ||
}) }, | ||
}); | ||
|
||
std::string metadata_str = metadata.dump(4); | ||
std::span data = { reinterpret_cast<std::byte*>(metadata_str.data()), | ||
metadata_str.size() }; | ||
|
||
return metadata_sink_->write(0, data); | ||
} | ||
|
||
bool | ||
zarr::ZarrV3ArrayWriter::should_rollover_() const | ||
{ | ||
const auto& dims = config_.dimensions; | ||
const auto& append_dim = dims->final_dim(); | ||
size_t frames_before_flush = | ||
append_dim.chunk_size_px * append_dim.shard_size_chunks; | ||
for (auto i = 1; i < dims->ndims() - 2; ++i) { | ||
frames_before_flush *= dims->at(i).array_size_px; | ||
} | ||
|
||
CHECK(frames_before_flush > 0); | ||
return frames_written_ % frames_before_flush == 0; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
#pragma once | ||
|
||
#include "array.writer.hh" | ||
|
||
namespace zarr { | ||
struct ZarrV3ArrayWriter : public ArrayWriter | ||
{ | ||
public: | ||
ZarrV3ArrayWriter(ArrayWriterConfig&& config, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is an r-value reference necessary in this context? I assume you want to enforce a move operation, but I'm unsure why the config should be moved rather than passed as a pointer, which is one alternative. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
std::shared_ptr<ThreadPool> thread_pool); | ||
ZarrV3ArrayWriter( | ||
ArrayWriterConfig&& config, | ||
std::shared_ptr<ThreadPool> thread_pool, | ||
std::shared_ptr<S3ConnectionPool> s3_connection_pool); | ||
|
||
private: | ||
std::vector<size_t> shard_file_offsets_; | ||
std::vector<std::vector<uint64_t>> shard_tables_; | ||
|
||
ZarrVersion version_() const override { return ZarrVersion_3; } | ||
bool flush_impl_() override; | ||
bool write_array_metadata_() override; | ||
bool should_rollover_() const override; | ||
}; | ||
} // namespace zarr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is
max
defined?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere in minio, which is included in s3.connection.hh which is included in sink.creator.hh which is included here. I have an issue here to address it.