Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone 12/N: Implement and test ZarrV3ArrayWriter #304

Merged
merged 93 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 91 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
c7832d7
Move driver source files and tests to a separate directory.
aliddell Sep 17, 2024
a811e22
Define the Zarr streaming API.
aliddell Sep 17, 2024
cf89e13
Define the Zarr logger.
aliddell Sep 17, 2024
fff86c6
Rename zarr.h to acquire.zarr.h.
aliddell Sep 17, 2024
61a53bc
Merge branch 'standalone-sequence-3' into standalone-sequence-4
aliddell Sep 17, 2024
bdd0c51
Instantiate the logger's mutex.
aliddell Sep 17, 2024
d23fe1a
Implement and test stream settings, with API setters and getters.
aliddell Sep 17, 2024
3cd2bb3
Wrap C API functions in extern "C" {}
aliddell Sep 17, 2024
d099795
Document the StreamSettings getters.
aliddell Sep 17, 2024
cfd412e
Merge remote-tracking branch 'upstream/standalone-sequence-3' into st…
aliddell Sep 17, 2024
4e3b470
Merge remote-tracking branch 'upstream/standalone-sequence-4' into st…
aliddell Sep 17, 2024
da98e57
call it 'type'
aliddell Sep 17, 2024
aa3489e
Merge branch 'standalone-sequence-3' into standalone-sequence-4
aliddell Sep 17, 2024
c3e419b
Merge branch 'standalone-sequence-3' into standalone-sequence-4b
aliddell Sep 17, 2024
fdf5b08
No need to double CHECK the settings pointer.
aliddell Sep 17, 2024
cbbc87d
Implement ZarrStream_s.
aliddell Sep 17, 2024
0f28cfc
Test ZarrStream_s.
aliddell Sep 17, 2024
1ca2a1d
Implement the rest of the Zarr API functions.
aliddell Sep 17, 2024
d370e17
Implement and test Zarr common functions.
aliddell Sep 17, 2024
caf295b
Implement and test ThreadPool, S3Connection{Pool}. Also implement Blo…
aliddell Sep 17, 2024
e9102dc
Implement and test Sink and SinkCreator types.
aliddell Sep 17, 2024
9522296
Implement and test base ArrayWriter.
aliddell Sep 17, 2024
d291c53
Implement and test ZarrV2ArrayWriter.
aliddell Sep 17, 2024
082dcd0
Implement and test ZarrV3ArrayWriter.
aliddell Sep 17, 2024
996d22e
Document that ZarrStream_append will block for compression and flushing
aliddell Sep 18, 2024
d6ea43e
Merge branch 'standalone-sequence-3' into standalone-sequence-4
aliddell Sep 18, 2024
46eb93d
Merge branch 'standalone-sequence-4' into standalone-sequence-4b
aliddell Sep 18, 2024
95973d4
Merge branch 'standalone-sequence-4b' into standalone-sequence-5
aliddell Sep 18, 2024
a8b30e1
Merge branch 'standalone-sequence-5' into standalone-sequence-6
aliddell Sep 18, 2024
51f820d
Merge branch 'standalone-sequence-6' into standalone-sequence-7
aliddell Sep 18, 2024
57bc287
Merge branch 'standalone-sequence-7' into standalone-sequence-8
aliddell Sep 18, 2024
46f6cbb
Merge branch 'standalone-sequence-8' into standalone-sequence-9
aliddell Sep 18, 2024
558bbc7
Merge branch 'standalone-sequence-9' into standalone-sequence-10
aliddell Sep 18, 2024
6ae570c
Merge branch 'standalone-sequence-10' into standalone-sequence-11
aliddell Sep 18, 2024
606db8e
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Sep 18, 2024
e546f7d
Respond to PR comments.
aliddell Sep 18, 2024
2d6aae1
Merge branch 'standalone-sequence-2' into standalone-sequence-3
aliddell Sep 18, 2024
a1b0034
Merge remote-tracking branch 'upstream/main' into standalone-sequence-3
aliddell Sep 18, 2024
cb5414f
Merge remote-tracking branch 'upstream/main' into standalone-sequence-4
aliddell Sep 18, 2024
a4868e8
Merge branch 'standalone-sequence-4' into standalone-sequence-4b
aliddell Sep 18, 2024
784aae6
Merge branch 'standalone-sequence-4b' into standalone-sequence-5
aliddell Sep 18, 2024
eb5b0fa
Merge branch 'standalone-sequence-5' into standalone-sequence-6
aliddell Sep 18, 2024
a25a239
Merge branch 'standalone-sequence-6' into standalone-sequence-7
aliddell Sep 18, 2024
8ce6f8f
Merge branch 'standalone-sequence-7' into standalone-sequence-8
aliddell Sep 18, 2024
c1f5207
Merge branch 'standalone-sequence-8' into standalone-sequence-9
aliddell Sep 18, 2024
c0f1fd1
Merge branch 'standalone-sequence-9' into standalone-sequence-10
aliddell Sep 18, 2024
d272794
Merge branch 'standalone-sequence-10' into standalone-sequence-11
aliddell Sep 18, 2024
82938a3
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Sep 18, 2024
42a4940
Respond to PR comments.
aliddell Sep 20, 2024
9a6212f
Merge branch 'standalone-sequence-3' into standalone-sequence-4
aliddell Sep 20, 2024
e5f54ec
Merge branch 'standalone-sequence-4' into standalone-sequence-5
aliddell Sep 20, 2024
5076311
Merge branch 'standalone-sequence-6' into standalone-sequence-5
aliddell Sep 20, 2024
2853ad1
wip
aliddell Sep 20, 2024
25bfb8b
Reorder and document settings fields
aliddell Sep 20, 2024
e65f889
Merge branch 'standalone-sequence-3' into standalone-sequence-5
aliddell Sep 20, 2024
b7aeda7
wip
aliddell Sep 20, 2024
e3e6e3f
Remove version specifier from ZarrStream_create.
aliddell Sep 20, 2024
44fe333
Merge branch 'standalone-sequence-3' into standalone-sequence-5
aliddell Sep 20, 2024
401231b
wip
aliddell Sep 20, 2024
d15f431
Document the settings struct a bit.
aliddell Sep 20, 2024
10a6318
Merge branch 'standalone-sequence-3' into standalone-sequence-5
aliddell Sep 20, 2024
e6ad827
wip
aliddell Sep 20, 2024
59bea6c
Fix up some parameters.
aliddell Sep 20, 2024
e984c90
Merge branch 'standalone-sequence-3' into standalone-sequence-5
aliddell Sep 20, 2024
3a0b63a
Update ZarrStream implementation to use settings struct.
aliddell Sep 20, 2024
68dcb03
Remove some redundant code
aliddell Sep 20, 2024
78f9344
Merge branch 'standalone-sequence-5' into standalone-sequence-7
aliddell Sep 20, 2024
3c265da
Merge branch 'standalone-sequence-7' into standalone-sequence-8
aliddell Sep 20, 2024
689f1fe
Merge branch 'standalone-sequence-8' into standalone-sequence-9
aliddell Sep 20, 2024
053e238
Merge branch 'standalone-sequence-9' into standalone-sequence-10
aliddell Sep 20, 2024
4a02046
Merge branch 'standalone-sequence-10' into standalone-sequence-11
aliddell Sep 20, 2024
7fc19e2
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Sep 20, 2024
87b318a
Merge branch 'standalone-sequence-8' into standalone-sequence-9
aliddell Sep 20, 2024
e76199d
Merge branch 'standalone-sequence-9' into standalone-sequence-10
aliddell Sep 20, 2024
d934260
Use string_view
aliddell Sep 23, 2024
697ef21
Merge branch 'standalone-sequence-10' into standalone-sequence-11
aliddell Sep 23, 2024
7141919
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Sep 23, 2024
f342aeb
Merge remote-tracking branch 'upstream/main' into standalone-sequence-9
aliddell Sep 27, 2024
9c19086
Merge branch 'standalone-sequence-9' into standalone-sequence-10
aliddell Sep 27, 2024
0dadfa6
Merge branch 'standalone-sequence-10' into standalone-sequence-11
aliddell Sep 27, 2024
9873832
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Sep 27, 2024
6bf9234
Merge remote-tracking branch 'upstream/main' into standalone-sequence-11
aliddell Oct 3, 2024
38c033f
Merge remote-tracking branch 'upstream/main' into standalone-sequence-11
aliddell Oct 3, 2024
7a61ded
Respond to PR comments
aliddell Oct 3, 2024
c35d0c4
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Oct 3, 2024
c455ad7
Put #include <string> back
aliddell Oct 3, 2024
6b2e79f
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Oct 3, 2024
e3ea0d3
Respond to PR comments.
aliddell Oct 3, 2024
0e1482a
Remove dead files
aliddell Oct 3, 2024
1a662c5
Merge branch 'standalone-sequence-11' into standalone-sequence-12
aliddell Oct 3, 2024
a61be1c
Some cleanup.
aliddell Oct 3, 2024
073657b
Merge remote-tracking branch 'upstream/main' into standalone-sequence-12
aliddell Oct 4, 2024
9b53bb8
Respond to PR comments.
aliddell Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ add_library(${tgt}
sink.creator.cpp
array.writer.hh
array.writer.cpp
zarrv2.array.writer.hh
zarrv2.array.writer.cpp
zarrv3.array.writer.hh
zarrv3.array.writer.cpp
)

target_include_directories(${tgt}
Expand Down
17 changes: 15 additions & 2 deletions src/streaming/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ zarr::ArrayWriter::compress_buffers_()
} catch (const std::exception& exc) {
err = "Failed to compress chunk: " +
std::string(exc.what());
} catch (...) {
err = "Failed to compress chunk (unknown)";
}
latch.count_down();

Expand Down Expand Up @@ -436,3 +434,18 @@ zarr::ArrayWriter::rollover_()
close_sinks_();
++append_chunk_index_;
}

bool
zarr::finalize_array(std::unique_ptr<ArrayWriter>&& writer)
{
writer->is_finalizing_ = true;
try {
writer->flush_();
} catch (const std::exception& exc) {
LOG_ERROR("Failed to finalize array writer: ", exc.what());
return false;
}

writer.reset();
return true;
}
4 changes: 4 additions & 0 deletions src/streaming/array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,9 @@ class ArrayWriter
[[nodiscard]] virtual bool write_array_metadata_() = 0;

void close_sinks_();

friend bool finalize_array(std::unique_ptr<ArrayWriter>&& writer);
};

bool finalize_array(std::unique_ptr<ArrayWriter>&& writer);
} // namespace zarr
6 changes: 0 additions & 6 deletions src/streaming/sink.creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ zarr::SinkCreator::make_files_(std::queue<std::string>& file_paths,
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
} catch (...) {
err = "Failed to create file '" + filename +
"': (unknown).";
}

latch.count_down();
Expand Down Expand Up @@ -396,9 +393,6 @@ zarr::SinkCreator::make_files_(
} catch (const std::exception& exc) {
err = "Failed to create file '" + filename +
"': " + exc.what();
} catch (...) {
err = "Failed to create file '" + filename +
"': (unknown).";
}

latch.count_down();
Expand Down
8 changes: 4 additions & 4 deletions src/streaming/zarr.dimension.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ struct ZarrDimension
}

std::string name;
ZarrDimensionType type;
ZarrDimensionType type{ ZarrDimensionType_Space };

uint32_t array_size_px;
uint32_t chunk_size_px;
uint32_t shard_size_chunks;
uint32_t array_size_px{ 0 };
uint32_t chunk_size_px{ 0 };
uint32_t shard_size_chunks{ 0 };
};

class ArrayDimensions
Expand Down
8 changes: 1 addition & 7 deletions src/streaming/zarr.stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,7 @@ ZarrStream_s::create_writers_()
void
ZarrStream_s::create_scaled_frames_()
{
if (multiscale_) {
// TODO (aliddell): implement this
}
// TODO (aliddell): implement this
}

bool
Expand Down Expand Up @@ -465,9 +463,5 @@ void
ZarrStream_s::write_multiscale_frames_(const uint8_t* data,
size_t bytes_of_data)
{
if (multiscale_) {
return;
}

// TODO (aliddell): implement this
}
183 changes: 183 additions & 0 deletions src/streaming/zarrv2.array.writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#include "macros.hh"
#include "zarrv2.array.writer.hh"
#include "sink.creator.hh"
#include "zarr.common.hh"

#include <nlohmann/json.hpp>

#include <latch>
#include <stdexcept>

namespace {
[[nodiscard]]
bool
sample_type_to_dtype(ZarrDataType t, std::string& t_str)

{
const std::string dtype_prefix =
std::endian::native == std::endian::big ? ">" : "<";

switch (t) {
case ZarrDataType_uint8:
t_str = dtype_prefix + "u1";
break;
case ZarrDataType_uint16:
t_str = dtype_prefix + "u2";
break;
case ZarrDataType_uint32:
t_str = dtype_prefix + "u4";
break;
case ZarrDataType_uint64:
t_str = dtype_prefix + "u8";
break;
case ZarrDataType_int8:
t_str = dtype_prefix + "i1";
break;
case ZarrDataType_int16:
t_str = dtype_prefix + "i2";
break;
case ZarrDataType_int32:
t_str = dtype_prefix + "i4";
break;
case ZarrDataType_int64:
t_str = dtype_prefix + "i8";
break;
case ZarrDataType_float32:
t_str = dtype_prefix + "f4";
break;
case ZarrDataType_float64:
t_str = dtype_prefix + "f8";
break;
default:
LOG_ERROR("Unsupported sample type: ", t);
return false;
}

return true;
}
} // namespace

zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool)
: ArrayWriter(std::move(config), thread_pool)
{
}

zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
: ArrayWriter(std::move(config), thread_pool, s3_connection_pool)
{
}

bool
zarr::ZarrV2ArrayWriter::flush_impl_()
{
// create chunk files
CHECK(data_sinks_.empty());
if (!make_data_sinks_()) {
return false;
}

CHECK(data_sinks_.size() == chunk_buffers_.size());

std::latch latch(chunk_buffers_.size());
{
std::scoped_lock lock(buffers_mutex_);
for (auto i = 0; i < data_sinks_.size(); ++i) {
auto& chunk = chunk_buffers_.at(i);
EXPECT(thread_pool_->push_job(
std::move([&sink = data_sinks_.at(i),
data_ = chunk.data(),
size = chunk.size(),
&latch](std::string& err) -> bool {
bool success = false;
try {
std::span data{
reinterpret_cast<std::byte*>(data_), size
};
CHECK(sink->write(0, data));
success = true;
} catch (const std::exception& exc) {
err = "Failed to write chunk: " +
std::string(exc.what());
}

latch.count_down();
return success;
})),
"Failed to push job to thread pool");
}
}

// wait for all threads to finish
latch.wait();

return true;
}

bool
zarr::ZarrV2ArrayWriter::write_array_metadata_()
{
if (!make_metadata_sink_()) {
return false;
}

using json = nlohmann::json;

std::string dtype;
if (!sample_type_to_dtype(config_.dtype, dtype)) {
return false;
}

std::vector<size_t> array_shape, chunk_shape;

size_t append_size = frames_written_;
for (auto i = config_.dimensions->ndims() - 3; i > 0; --i) {
const auto& dim = config_.dimensions->at(i);
const auto& array_size_px = dim.array_size_px;
CHECK(array_size_px);
append_size = (append_size + array_size_px - 1) / array_size_px;
}
array_shape.push_back(append_size);

chunk_shape.push_back(config_.dimensions->final_dim().chunk_size_px);
for (auto i = 1; i < config_.dimensions->ndims(); ++i) {
const auto& dim = config_.dimensions->at(i);
array_shape.push_back(dim.array_size_px);
chunk_shape.push_back(dim.chunk_size_px);
}

json metadata;
metadata["zarr_format"] = 2;
metadata["shape"] = array_shape;
metadata["chunks"] = chunk_shape;
metadata["dtype"] = dtype;
metadata["fill_value"] = 0;
metadata["order"] = "C";
metadata["filters"] = nullptr;
metadata["dimension_separator"] = "/";

if (config_.compression_params) {
const BloscCompressionParams bcp = *config_.compression_params;
metadata["compressor"] = json{ { "id", "blosc" },
{ "cname", bcp.codec_id },
{ "clevel", bcp.clevel },
{ "shuffle", bcp.shuffle } };
} else {
metadata["compressor"] = nullptr;
}

std::string metadata_str = metadata.dump(4);
std::span data{ reinterpret_cast<std::byte*>(metadata_str.data()),
metadata_str.size() };
return metadata_sink_->write(0, data);
}

bool
zarr::ZarrV2ArrayWriter::should_rollover_() const
{
return true;
}
22 changes: 22 additions & 0 deletions src/streaming/zarrv2.array.writer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include "array.writer.hh"

namespace zarr {
class ZarrV2ArrayWriter final : public ArrayWriter
{
public:
ZarrV2ArrayWriter(ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool);

ZarrV2ArrayWriter(ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

private:
ZarrVersion version_() const override { return ZarrVersion_2; };
bool flush_impl_() override;
bool write_array_metadata_() override;
bool should_rollover_() const override;
};
} // namespace zarr
Loading