Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tdbPartitionedMatrix will automatically close Array's when done reading #448

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions apis/python/test/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,9 +717,6 @@ def test_ingestion_timetravel(tmp_path):
second_num_edges = num_edges_history[1]

# Clear all history at timestamp 19.
# With type-erased indexes, we cannot call clear_history() while the index is open because they
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
index = None
Index.clear_history(uri=index_uri, timestamp=19)

with tiledb.Group(index_uri, "r") as group:
Expand Down Expand Up @@ -1118,12 +1115,10 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
second_num_edges = num_edges_history[1]

# Clear history before the latest ingestion
latest_ingestion_timestamp = index.latest_ingestion_timestamp
assert index.latest_ingestion_timestamp == 102
# With type-erased indexes, we cannot call clear_history() while the index is open because they
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
index = None
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp - 1)
Index.clear_history(
uri=index_uri, timestamp=index.latest_ingestion_timestamp - 1
)

with tiledb.Group(index_uri, "r") as group:
assert metadata_to_list(group, "ingestion_timestamps") == [102]
Expand Down Expand Up @@ -1166,12 +1161,8 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
assert accuracy(result, gt_i, updated_ids=updated_ids) == 1.0

# Clear all history
latest_ingestion_timestamp = index.latest_ingestion_timestamp
assert index.latest_ingestion_timestamp == 102
# With type-erased indexes, we cannot call clear_history() while the index is open because they
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
index = None
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp)
Index.clear_history(uri=index_uri, timestamp=index.latest_ingestion_timestamp)
index = index_class(uri=index_uri, timestamp=1)
_, result = index.query(queries, k=k, nprobe=partitions)
assert accuracy(result, gt_i, updated_ids=updated_ids) == 0.0
Expand Down Expand Up @@ -1768,11 +1759,7 @@ def test_ivf_flat_ingestion_with_training_source_uri_tdb(tmp_path):
)

# Clear the index history, load, update, and query.
# With type-erased indexes, we cannot call clear_history() while the index is open because they
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
latest_ingestion_timestamp = index.latest_ingestion_timestamp
index = None
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp - 1)
Index.clear_history(uri=index_uri, timestamp=index.latest_ingestion_timestamp - 1)

index = IVFFlatIndex(uri=index_uri)

Expand Down
112 changes: 77 additions & 35 deletions src/include/detail/linalg/tdb_partitioned_matrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class tdbPartitionedMatrix

// For now, we assume this is always valid so we don't need to add constructor
// arguments to limit it
size_t num_array_rows_{0};
size_t dimensions_{0};

// We don't actually use this
// size_t num_array_cols_{0};
Expand Down Expand Up @@ -172,20 +172,38 @@ class tdbPartitionedMatrix
* Column information
****************************************************************************/

// The total number of columns (resident plus non-resident)
unsigned long total_max_cols_{0UL};

// The max number of columns that can fit in allocated memory
size_t column_capacity_{0};

// The number of columns that are currently loaded into memory
size_t num_resident_cols_{0};

// The initial and final index numbers of the resident columns
// The final index numbers of the resident columns
index_type last_resident_col_{0};

/*****************************************************************************
* Accounting information
****************************************************************************/
size_t max_resident_parts_{0};

/*****************************************************************************
* Closing the arrays
****************************************************************************/
bool closed_ = false;

void close() {
closed_ = true;
if (partitioned_vectors_array_->is_open()) {
partitioned_vectors_array_->close();
}
if (partitioned_ids_array_->is_open()) {
partitioned_ids_array_->close();
}
}

public:
tdbPartitionedMatrix(const tdbPartitionedMatrix&) = delete;
tdbPartitionedMatrix(tdbPartitionedMatrix&&) = default;
Expand Down Expand Up @@ -307,20 +325,20 @@ class tdbPartitionedMatrix

auto domain_{partitioned_vectors_schema_.domain()};

auto array_rows_{domain_.dimension(0)};
auto array_cols_{domain_.dimension(1)};
auto array_rows{domain_.dimension(0)};
auto array_cols{domain_.dimension(1)};

num_array_rows_ =
(array_rows_.template domain<row_domain_type>().second -
array_rows_.template domain<row_domain_type>().first + 1);
dimensions_ =
(array_rows.template domain<row_domain_type>().second -
array_rows.template domain<row_domain_type>().first + 1);

// We don't use this. The active partitions naturally limits the number of
// columns that we will read in.
// Comment out for now
#if 0
num_array_cols_ =
(array_cols_.template domain<col_domain_type>().second -
array_cols_.template domain<col_domain_type>().first + 1);
(array_cols.template domain<col_domain_type>().second -
array_cols.template domain<col_domain_type>().first + 1);
#endif

if ((matrix_order_ == TILEDB_ROW_MAJOR && cell_order == TILEDB_COL_MAJOR) ||
Expand All @@ -329,12 +347,11 @@ class tdbPartitionedMatrix
}

// indices might not be contiguous, so we need to explicitly add the deltas
auto total_max_cols = 0UL;
size_t max_part_size{0};
for (size_t i = 0; i < total_num_parts_; ++i) {
auto part_size = master_indices_[relevant_parts_[i] + 1] -
master_indices_[relevant_parts_[i]];
total_max_cols += part_size;
total_max_cols_ += part_size;
max_part_size = std::max<size_t>(max_part_size, part_size);
}

Expand All @@ -344,8 +361,8 @@ class tdbPartitionedMatrix
std::to_string(upper_bound) + " < " + std::to_string(max_part_size));
}

if (upper_bound == 0 || upper_bound > total_max_cols) {
column_capacity_ = total_max_cols;
if (upper_bound == 0 || upper_bound > total_max_cols_) {
column_capacity_ = total_max_cols_;
} else {
column_capacity_ = upper_bound;
}
Expand Down Expand Up @@ -397,9 +414,8 @@ class tdbPartitionedMatrix
* resident at any one time. We use this to size the index of the
* partitioned_matrix base class.
*/
size_t dimension = num_array_rows_;
Base::operator=(
std::move(Base{dimension, column_capacity_, max_resident_parts_}));
std::move(Base{dimensions_, column_capacity_, max_resident_parts_}));
this->num_vectors_ = 0;
this->num_parts_ = 0;

Expand All @@ -422,7 +438,8 @@ class tdbPartitionedMatrix

if (this->part_index_.size() != max_resident_parts_ + 1) {
throw std::runtime_error(
"Invalid partitioning, part_index_ size " +
"[tdb_partioned_matrix@load] Invalid partitioning, part_index_ "
"size " +
std::to_string(this->part_index_.size()) +
" != " + std::to_string(max_resident_parts_ + 1));
}
Expand Down Expand Up @@ -458,7 +475,8 @@ class tdbPartitionedMatrix
// for, throw.
if (num_resident_cols_ > column_capacity_) {
throw std::runtime_error(
"Invalid partitioning, num_resident_cols_ (" +
"[tdb_partioned_matrix@load] Invalid partitioning, "
"num_resident_cols_ (" +
std::to_string(num_resident_cols_) + ") > column_capacity_ (" +
std::to_string(column_capacity_) + ")");
}
Expand All @@ -467,7 +485,8 @@ class tdbPartitionedMatrix
num_resident_parts = last_resident_part_ - first_resident_part;
if (num_resident_parts > max_resident_parts_) {
throw std::runtime_error(
"Invalid partitioning, num_resident_parts " +
"[tdb_partioned_matrix@load] Invalid partitioning, "
"num_resident_parts " +
std::to_string(num_resident_parts) + " > " +
std::to_string(max_resident_parts_));
}
Expand All @@ -478,29 +497,41 @@ class tdbPartitionedMatrix
if ((num_resident_cols_ == 0 && num_resident_parts != 0) ||
(num_resident_cols_ != 0 && num_resident_parts == 0)) {
throw std::runtime_error(
"Invalid partitioning, " + std::to_string(num_resident_cols_) +
" resident cols and " + std::to_string(num_resident_parts) +
" resident parts");
"[tdb_partioned_matrix@load] Invalid partitioning, " +
std::to_string(num_resident_cols_) + " resident cols and " +
std::to_string(num_resident_parts) + " resident parts");
}

if (this->part_index_.size() != max_resident_parts_ + 1) {
throw std::runtime_error(
"Invalid partitioning, part_index_ size (" +
"[tdb_partioned_matrix@load] Invalid partitioning, part_index_ "
"size (" +
std::to_string(this->part_index_.size()) +
") != max_resident_parts_ + 1 (" +
std::to_string(max_resident_parts_ + 1) + ")");
}
}

// If closed_ is true, it means we have already read all the data and closed
// our arrays. Note that we could add this at the top of `load()` and
// return false, but the `num_resident_cols_ == 0` check already handles
// this case. So instead we leave this here - it should never be hit, but if
// it is, we'll have an error to investigate, rather than just returning
// false incorrectly.
if (closed_) {
throw std::runtime_error(
"[tdb_partioned_matrix@load] Arrays are closed - this should not "
"happen.");
}

// 2. Load the vectors and IDs.
{
// a. Set up the vectors subarray.
auto attr = partitioned_vectors_schema_.attribute(0);
std::string attr_name = attr.name();
tiledb::Subarray subarray(ctx_, *(this->partitioned_vectors_array_));
// For a 128 dimension vector, Dimension 0 will go from 0 to 127.
auto dimension = num_array_rows_;
subarray.add_range(0, 0, (int)dimension - 1);
subarray.add_range(0, 0, static_cast<int>(dimensions_) - 1);

// b. Set up the IDs subarray.
auto ids_attr = ids_schema_.attribute(0);
Expand All @@ -521,22 +552,26 @@ class tdbPartitionedMatrix
ids_subarray.add_range(0, (int)start, (int)stop - 1);
}
if (col_count != last_resident_col_ - first_resident_col) {
throw std::runtime_error("Column count mismatch");
throw std::runtime_error(
"[tdb_partioned_matrix@load] Column count mismatch");
}

// c. Execute the vectors query.
tiledb::Query query(ctx_, *(this->partitioned_vectors_array_));
auto ptr = this->data();
query.set_subarray(subarray)
.set_layout(partitioned_vectors_schema_.cell_order())
.set_data_buffer(attr_name, ptr, col_count * dimension);
.set_data_buffer(attr_name, ptr, col_count * dimensions_);
tiledb_helpers::submit_query(tdb_func__, partitioned_vectors_uri_, query);
_memory_data.insert_entry(tdb_func__, col_count * dimension * sizeof(T));
_memory_data.insert_entry(
tdb_func__, col_count * dimensions_ * sizeof(T));

auto qs = query.query_status();
// @todo Handle incomplete queries.
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
throw std::runtime_error("Query status is not complete -- fix me");
throw std::runtime_error(
"[tdb_partioned_matrix@load] Query status is not complete -- fix "
"me");
}

// d. Execute the IDs query.
Expand All @@ -549,7 +584,9 @@ class tdbPartitionedMatrix

// assert(tiledb::Query::Status::COMPLETE == query.query_status());
if (tiledb::Query::Status::COMPLETE != ids_query.query_status()) {
throw std::runtime_error("Query status is not complete -- fix me");
throw std::runtime_error(
"[tdb_partioned_matrix@load] Query status is not complete -- fix "
"me");
}
}

Expand All @@ -564,6 +601,12 @@ class tdbPartitionedMatrix
this->num_vectors_ = num_resident_cols_;
this->num_parts_ = num_resident_parts;

if (last_resident_part_ == total_num_parts_ &&
last_resident_col_ == total_max_cols_) {
// We have loaded all the data we can, let's close our Array's.
close();
}

return true;
}

Expand All @@ -572,12 +615,7 @@ class tdbPartitionedMatrix
*/
~tdbPartitionedMatrix() {
// Don't really need these since tiledb::Array will close on destruction
if (partitioned_vectors_array_->is_open()) {
partitioned_vectors_array_->close();
}
if (partitioned_ids_array_->is_open()) {
partitioned_ids_array_->close();
}
close();
}

void debug_tdb_partitioned_matrix(const std::string& msg, size_t max_size) {
Expand All @@ -587,7 +625,11 @@ class tdbPartitionedMatrix
debug_vector(squashed_indices_, "# squashed_indices_", max_size);
std::cout << "# total_num_parts_: " << total_num_parts_ << std::endl;
std::cout << "# last_resident_part_: " << last_resident_part_ << std::endl;
std::cout << "# num_parts_: " << this->num_parts_ << std::endl;

std::cout << "# total_max_cols_: " << total_max_cols_ << std::endl;
std::cout << "# column_capacity_: " << column_capacity_ << std::endl;
std::cout << "# num_vectors_: " << this->num_vectors_ << std::endl;
std::cout << "# num_resident_cols_: " << num_resident_cols_ << std::endl;
std::cout << "# last_resident_col_: " << last_resident_col_ << std::endl;
std::cout << "# max_resident_parts_: " << max_resident_parts_ << std::endl;
Expand Down
50 changes: 50 additions & 0 deletions src/include/test/unit_api_ivf_pq_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,56 @@ TEST_CASE("storage_version", "[api_ivf_pq_index]") {
}
}

TEST_CASE("clear history with an open index", "[api_ivf_pq_index]") {
auto ctx = tiledb::Context{};
using feature_type_type = uint8_t;
using id_type_type = uint32_t;
auto feature_type = "uint8";
auto id_type = "uint32";
auto partitioning_index_type = "uint32";
size_t dimensions = 3;
size_t n_list = 1;
size_t num_subspaces = 1;
float convergence_tolerance = 0.00003f;
size_t max_iterations = 3;

std::string index_uri =
(std::filesystem::temp_directory_path() / "api_ivf_pq_index").string();
tiledb::VFS vfs(ctx);
if (vfs.is_dir(index_uri)) {
vfs.remove_dir(index_uri);
}

auto index = IndexIVFPQ(std::make_optional<IndexOptions>(
{{"feature_type", feature_type},
{"id_type", id_type},
{"partitioning_index_type", partitioning_index_type},
{"n_list", std::to_string(n_list)},
{"num_subspaces", std::to_string(num_subspaces)},
{"convergence_tolerance", std::to_string(convergence_tolerance)},
{"max_iterations", std::to_string(max_iterations)}}));

auto training = ColMajorMatrixWithIds<feature_type_type, id_type_type>{
{{1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}}, {1, 2, 3, 4}};
auto training_vector_array = FeatureVectorArray(training);
index.train(training_vector_array);
index.add(training_vector_array);
index.write_index(ctx, index_uri, TemporalPolicy(TimeTravel, 99));

auto&& [scores_vector_array, ids_vector_array] =
index.query(QueryType::InfiniteRAM, training_vector_array, 1, 1);

auto second_index = IndexIVFPQ(ctx, index_uri);
auto&& [scores_vector_array_finite, ids_vector_array_finite] =
second_index.query(QueryType::FiniteRAM, training_vector_array, 1, 1);

// Here we check that we can clear_history() even with a index in memory. This
// makes sure that every Array which IndexIVFPQ opens has been closed,
// otherwise clear_history() will throw when it tries to call
// delete_fragments() on the index Array's.
IndexIVFPQ::clear_history(ctx, index_uri, 99);
}

TEST_CASE("write and load index with timestamps", "[api_ivf_pq_index]") {
auto ctx = tiledb::Context{};
using feature_type_type = uint8_t;
Expand Down
Loading
Loading