From a0c08306bcdba55a4894f7a83a9fac45e8b9d1e5 Mon Sep 17 00:00:00 2001 From: Paris Morgan Date: Tue, 16 Jul 2024 19:54:54 +0200 Subject: [PATCH] write feature_vector array when we update the index --- .../src/tiledb/vector_search/ingestion.py | 87 +++++++----- apis/python/test/test_index.py | 72 +++++++++- apis/python/test/test_ingestion.py | 2 +- src/include/index/ivf_pq_index.h | 129 +++++++++++++++--- src/include/test/unit_api_ivf_pq_index.cc | 61 +++++++++ 5 files changed, 293 insertions(+), 58 deletions(-) diff --git a/apis/python/src/tiledb/vector_search/ingestion.py b/apis/python/src/tiledb/vector_search/ingestion.py index 213f82192..32d71faeb 100644 --- a/apis/python/src/tiledb/vector_search/ingestion.py +++ b/apis/python/src/tiledb/vector_search/ingestion.py @@ -1587,6 +1587,10 @@ def ingest_type_erased( verbose: bool = False, trace_id: Optional[str] = None, ): + print("[ingestion@ingest_type_erased] retrain_index", retrain_index) + print("[ingestion@ingest_type_erased] size", size) + print("[ingestion@ingest_type_erased] batch", batch) + print("[ingestion@ingest_type_erased] dimensions", dimensions) import numpy as np import tiledb.cloud @@ -1613,41 +1617,14 @@ def ingest_type_erased( verbose=verbose, trace_id=trace_id, ) - - if not retrain_index and index_type == "IVF_PQ": - print( - "[ingestion@ingest_type_erased] additions_vectors:", - additions_vectors, - ) - print( - "[ingestion@ingest_type_erased] additions_external_ids:", - additions_external_ids, - ) - ctx = vspy.Ctx(config) - index = vspy.IndexIVFPQ(ctx, index_group_uri) - if ( - additions_vectors is not None - or additions_external_ids is not None - or updated_ids is not None - ): - vectors_to_add = vspy.FeatureVectorArray( - np.transpose(additions_vectors) - if additions_vectors is not None - else np.array([[]], dtype=vector_type), - np.transpose(additions_external_ids) - if additions_external_ids is not None - else np.array([], dtype=np.uint64), - ) - vector_ids_to_remove = vspy.FeatureVector( - updated_ids - if updated_ids is not None - else np.array([], np.uint64) - ) - index.update(vectors_to_add, vector_ids_to_remove) - index.write_index( - ctx, index_group_uri, to_temporal_policy(index_timestamp) - ) - return + print( + "[ingestion@ingest_type_erased] additions_vectors:", + additions_vectors, + ) + print( + "[ingestion@ingest_type_erased] additions_external_ids:", + additions_external_ids, + ) temp_data_group_uri = f"{index_group_uri}/{PARTIAL_WRITE_ARRAY_DIR}" temp_data_group = tiledb.Group(temp_data_group_uri, "w") @@ -1674,7 +1651,14 @@ def ingest_type_erased( part_end = part + batch if part_end > size: part_end = size + # First we get each vector and it's external id from the input data. + print("[ingestion@ingest_type_erased] source_uri:", source_uri) + print("[ingestion@ingest_type_erased] source_type:", source_type) + print("[ingestion@ingest_type_erased] vector_type:", vector_type) + print("[ingestion@ingest_type_erased] dimensions:", dimensions) + print("[ingestion@ingest_type_erased] part:", part) + print("[ingestion@ingest_type_erased] part_end:", part_end) in_vectors = read_input_vectors( source_uri=source_uri, source_type=source_type, @@ -1686,6 +1670,7 @@ def ingest_type_erased( verbose=verbose, trace_id=trace_id, ) + print("[ingestion@ingest_type_erased] in_vectors:", in_vectors) external_ids = read_external_ids( external_ids_uri=external_ids_uri, external_ids_type=external_ids_type, @@ -1695,6 +1680,7 @@ def ingest_type_erased( verbose=verbose, trace_id=trace_id, ) + print("[ingestion@ingest_type_erased] external_ids:", external_ids) # Then check if the external id is in the updated ids. updates_filter = np.in1d( @@ -1703,6 +1689,14 @@ def ingest_type_erased( # We only keep the vectors and external ids that are not in the updated ids. in_vectors = in_vectors[updates_filter] external_ids = external_ids[updates_filter] + print( + "[ingestion@ingest_type_erased] in_vectors after filter:", + in_vectors, + ) + print( + "[ingestion@ingest_type_erased] external_ids after filter:", + external_ids, + ) vector_len = len(in_vectors) if vector_len > 0: end_offset = write_offset + vector_len @@ -1736,6 +1730,29 @@ def ingest_type_erased( parts_array.close() ids_array.close() + if index_type == "IVF_PQ" and not retrain_index: + ctx = vspy.Ctx(config) + index = vspy.IndexIVFPQ(ctx, index_group_uri) + if ( + additions_vectors is not None + or additions_external_ids is not None + or updated_ids is not None + ): + vectors_to_add = vspy.FeatureVectorArray( + np.transpose(additions_vectors) + if additions_vectors is not None + else np.array([[]], dtype=vector_type), + np.transpose(additions_external_ids) + if additions_external_ids is not None + else np.array([], dtype=np.uint64), + ) + vector_ids_to_remove = vspy.FeatureVector( + updated_ids if updated_ids is not None else np.array([], np.uint64) + ) + index.update(vectors_to_add, vector_ids_to_remove) + index.write_index(ctx, index_group_uri, to_temporal_policy(index_timestamp)) + return + # Now that we've ingested the vectors and their IDs, train the index with the data. ctx = vspy.Ctx(config) if index_type == "VAMANA": diff --git a/apis/python/test/test_index.py b/apis/python/test/test_index.py index 4481b73c9..972d103b9 100644 --- a/apis/python/test/test_index.py +++ b/apis/python/test/test_index.py @@ -273,6 +273,7 @@ def test_vamana_index(tmp_path): # During the first ingestion we overwrite the metadata and end up with a single base size and ingestion timestamp. ingestion_timestamps, base_sizes = load_metadata(uri) assert base_sizes == [5] + assert len(ingestion_timestamps) == 1 timestamp_5_minutes_from_now = int((time.time() + 5 * 60) * 1000) timestamp_5_minutes_ago = int((time.time() - 5 * 60) * 1000) assert ( @@ -316,6 +317,9 @@ def test_ivf_pq_index(tmp_path): os.rmdir(uri) vector_type = np.float32 + print( + "[test_index] ivf_pq_index.create() --------------------------------------------------------" + ) index = ivf_pq_index.create( uri=uri, dimensions=3, @@ -342,6 +346,9 @@ def test_ivf_pq_index(tmp_path): update_vectors[2] = np.array([2, 2, 2], dtype=np.dtype(np.float32)) update_vectors[3] = np.array([3, 3, 3], dtype=np.dtype(np.float32)) update_vectors[4] = np.array([4, 4, 4], dtype=np.dtype(np.float32)) + print( + "[test_index] index.update_batch() --------------------------------------------------------" + ) index.update_batch( vectors=update_vectors, external_ids=np.array([0, 1, 2, 3, 4], dtype=np.dtype(np.uint32)), @@ -350,7 +357,70 @@ def test_ivf_pq_index(tmp_path): index, np.array([[2, 2, 2]], dtype=np.float32), 2, [[0, 3]], [[2, 1]] ) - # TODO(paris): Add tests for consolidation once we enable it. + # By default we do not re-train the index. This means we won't be able to find any results. + print( + "[test_index] index.consolidate_updates() --------------------------------------------------------" + ) + index = index.consolidate_updates(retrain_index=False) + for i in range(5): + distances, ids = index.query(np.array([[i, i, i]], dtype=np.float32), k=1) + assert np.array_equal(ids, np.array([[MAX_UINT64]], dtype=np.float32)) + assert np.array_equal(distances, np.array([[MAX_FLOAT32]], dtype=np.float32)) + + # We can retrain the index and find the results. Update ID 4 to 44 while we do that. + print( + "[test_index] index.delete() --------------------------------------------------------" + ) + index.delete(external_id=4) + print( + "[test_index] index.update() --------------------------------------------------------" + ) + index.update(vector=np.array([4, 4, 4], dtype=np.dtype(np.float32)), external_id=44) + print( + "[test_index] index.consolidate_updates() --------------------------------------------------------" + ) + index = index.consolidate_updates(retrain_index=True) + return + # During the first ingestion we overwrite the metadata and end up with a single base size and ingestion timestamp. + ingestion_timestamps, base_sizes = load_metadata(uri) + assert base_sizes == [5] + assert len(ingestion_timestamps) == 1 + timestamp_5_minutes_from_now = int((time.time() + 5 * 60) * 1000) + timestamp_5_minutes_ago = int((time.time() - 5 * 60) * 1000) + assert ( + ingestion_timestamps[0] > timestamp_5_minutes_ago + and ingestion_timestamps[0] < timestamp_5_minutes_from_now + ) + + # Test that we can query with multiple query vectors. + for i in range(5): + query_and_check_distances( + index, + np.array([[i, i, i], [i, i, i]], dtype=np.float32), + 1, + [[0], [0]], + [[i], [i]], + ) + + # Test that we can query with k > 1. + query_and_check_distances( + index, np.array([[0, 0, 0]], dtype=np.float32), 2, [[0, 3]], [[0, 1]] + ) + + # Test that we can query with multiple query vectors and k > 1. + query_and_check_distances( + index, + np.array([[0, 0, 0], [4, 4, 4]], dtype=np.float32), + 2, + [[0, 3], [0, 3]], + [[0, 1], [4, 3]], + ) + + vfs = tiledb.VFS() + + assert vfs.dir_size(uri) > 0 + Index.delete_index(uri=uri, config={}) + assert vfs.dir_size(uri) == 0 def test_delete_invalid_index(tmp_path): diff --git a/apis/python/test/test_ingestion.py b/apis/python/test/test_ingestion.py index dfb144bce..8c67d3181 100644 --- a/apis/python/test/test_ingestion.py +++ b/apis/python/test/test_ingestion.py @@ -681,7 +681,7 @@ def test_ingestion_timetravel(tmp_path): timestamp=20, ) - index = index.consolidate_updates() + index = index.consolidate_updates(retrain_index=True) # We still have no results before timestamp 10. query_and_check_equals( diff --git a/src/include/index/ivf_pq_index.h b/src/include/index/ivf_pq_index.h index 82661ae1d..b8d1a210a 100644 --- a/src/include/index/ivf_pq_index.h +++ b/src/include/index/ivf_pq_index.h @@ -362,6 +362,8 @@ class ivf_pq_index { num_partitions_, 0, temporal_policy_); + debug_matrix( + flat_ivf_centroids_, "[ivf_pq_index@uri ctor] flat_ivf_centroids_"); pq_ivf_centroids_ = tdbPreLoadMatrix( @@ -831,6 +833,14 @@ class ivf_pq_index { const Vector& vectors_to_add_ids, const VectorToRemove& vector_ids_to_remove, Distance distance = Distance{}) { + if (vector_ids_to_remove.size() == 1 && vector_ids_to_remove[0] == 5) { + std::cout << "DEBUG TIME!" << std::endl; + debug = true; + } + + debug_matrix( + flat_ivf_centroids_, "[ivf_pq_index@update] flat_ivf_centroids_"); + debug_matrix(vectors_to_add, "[ivf_pq_index@update] vectors_to_add"); debug_vector( vectors_to_add_ids, "[ivf_pq_index@update] vectors_to_add_ids"); @@ -841,7 +851,6 @@ class ivf_pq_index { debug_partitioned_matrix( *partitioned_pq_vectors_, "[ivf_pq_index@update] partitioned_pq_vectors_"); - std::cout << "[ivf_pq_index@update] num_vectors(*partitioned_pq_vectors_): " << ::num_vectors(*partitioned_pq_vectors_) << std::endl; std::cout << "[ivf_pq_index@update] ::dimensions(vector_ids_to_remove): " @@ -878,14 +887,11 @@ class ivf_pq_index { vector_ids_to_remove, "[ivf_pq_index@update] vector_ids_to_remove"); // 1. Find the vectors in unpartitioned_pq_vectors_ to delete. where the id - // is in vector_ids_to_remove. Instead of deleting outright, we will + // is in vector_ids_to_remove. Instead of deleting outright, we will just + // not copy them. auto part_indices = partitioned_pq_vectors_->indices(); debug_vector(part_indices, "[ivf_pq_index@update] part_indices"); for (int i = 0; i < ::num_vectors(*partitioned_pq_vectors_); ++i) { - // std::cout << "i: " << i - // << " (" + std::to_string((*partitioned_pq_vectors_).ids()[i]) + - // ")~~~" - // << std::endl; if (std::find( vector_ids_to_remove.begin(), vector_ids_to_remove.end(), @@ -929,6 +935,9 @@ class ivf_pq_index { // 2. Add vectors_to_add to unpartitioned_pq_vectors_. auto vectors_to_add_partition_labels = detail::flat::qv_partition( flat_ivf_centroids_, vectors_to_add, num_threads_, distance); + debug_vector( + vectors_to_add_partition_labels, + "[ivf_pq_index@update] vectors_to_add_partition_labels"); // auto& pqv = *unpartitioned_pq_vectors; for (int i = 0; i < ::num_vectors(vectors_to_add); ++i) { // pq_encode_one(vectors_to_add[i], pqv[idx++]); @@ -949,7 +958,75 @@ class ivf_pq_index { unpartitioned_pq_vectors_ = std::make_unique>( std::move(unpartitioned_pq_vectors)); - auto num_unique_labels = ::num_vectors(flat_ivf_centroids_); + debug_matrix_with_ids( + *unpartitioned_pq_vectors_, + "[ivf_pq_index@update] unpartitioned_pq_vectors_"); + auto num_unique_labels = + std::max(static_cast(1), ::num_vectors(flat_ivf_centroids_)); + std::cout << "[ivf_pq_index@update] num_unique_labels: " + << num_unique_labels << std::endl; + + // At this point we have updated partitioned_pq_vectors_. But we still need + // to update feature_vectors_ so that if we later want to re-ingest the + // data, we have the full set of input vectors and their IDs. + // 4. Load the current feature_vectors_. + feature_vectors_ = + std::move(tdbColMajorPreLoadMatrixWithIds( + group_->cached_ctx(), + group_->feature_vectors_uri(), + group_->ids_uri(), + dimensions_, + ::num_vectors(*partitioned_pq_vectors_), + 0)); + + auto feature_vectors = ColMajorMatrixWithIds( + ::dimensions(feature_vectors_), final_num_vectors); + + // 5. Copy over the vectors that are not in vector_ids_to_remove + std::set vector_ids_to_remove_set( + vector_ids_to_remove.begin(), vector_ids_to_remove.end()); + debug_matrix( + flat_ivf_centroids_, "[ivf_pq_index@update] flat_ivf_centroids_"); + + idx = 0; + for (int i = 0; i < ::num_vectors(*partitioned_pq_vectors_); ++i) { + if (vector_ids_to_remove_set.find(feature_vectors_.ids()[i]) == + vector_ids_to_remove_set.end()) { + std::copy( + feature_vectors_.data() + i * ::dimensions(feature_vectors_), + feature_vectors_.data() + (i + 1) * ::dimensions(feature_vectors_), + feature_vectors.data() + idx * ::dimensions(feature_vectors)); + feature_vectors.ids()[idx] = feature_vectors_.ids()[i]; + idx++; + } + } + debug_matrix( + flat_ivf_centroids_, "[ivf_pq_index@update] flat_ivf_centroids_"); + + // 6. Add vectors_to_add to feature_vectors + std::cout << "[ivf_pq_index@update] ::num_vectors(vectors_to_add): " + << ::num_vectors(vectors_to_add) << std::endl; + std::cout << "[ivf_pq_index@update] ::dimensions(vectors_to_add): " + << ::dimensions(vectors_to_add) << std::endl; + std::cout << "[ivf_pq_index@update] ::num_vectors(feature_vectors): " + << ::num_vectors(feature_vectors) << std::endl; + std::cout << "[ivf_pq_index@update] ::dimensions(feature_vectors): " + << ::dimensions(feature_vectors) << std::endl; + for (int i = 0; i < ::num_vectors(vectors_to_add); ++i) { + std::copy( + vectors_to_add.data() + i * ::dimensions(vectors_to_add), + vectors_to_add.data() + (i + 1) * ::dimensions(vectors_to_add), + feature_vectors.data() + idx * ::dimensions(feature_vectors)); + feature_vectors.ids()[idx] = vectors_to_add_ids[i]; + idx++; + } + + debug_matrix_with_ids( + feature_vectors, "[ivf_pq_index@update] feature_vectors"); + + // 7. Assign to local member variables. + feature_vectors_ = std::move(feature_vectors); + partitioned_pq_vectors_ = std::make_unique( *unpartitioned_pq_vectors_, partition_labels, num_unique_labels); debug_matrix_with_ids( @@ -1190,6 +1267,7 @@ class ivf_pq_index { * defult version. * @return Whether the write was successful */ + bool debug = false; auto write_index( const tiledb::Context& ctx, const std::string& group_uri, @@ -1198,6 +1276,19 @@ class ivf_pq_index { if (temporal_policy.has_value()) { temporal_policy_ = *temporal_policy; } + // if (!partitioned_pq_vectors_) { + // throw std::runtime_error( + // "[ivf_pq_index@write_index] partitioned_pq_vectors_ is not " + // "initialized"); + // } + // if (::num_vectors(feature_vectors_) != + // ::num_vectors(*partitioned_pq_vectors_)) { + // throw std::runtime_error( + // "[ivf_pq_index@write_index] num_vectors(feature_vectors_) (" + + // std::to_string(::num_vectors(feature_vectors_)) + + // ") != num_vectors(*partitioned_pq_vectors_) (" + + // std::to_string(::num_vectors(*partitioned_pq_vectors_)) + ")"); + // } auto write_group = ivf_pq_group( ctx, @@ -1287,6 +1378,10 @@ class ivf_pq_index { false, temporal_policy_); + // debug_matrix(flat_ivf_centroids_, "flat_ivf_centroids_"); + // if (debug) { + // return true; + // } write_matrix( ctx, flat_ivf_centroids_, @@ -1294,7 +1389,9 @@ class ivf_pq_index { 0, false, temporal_policy_); - + // if (debug) { + // return true; + // } write_matrix( ctx, pq_ivf_centroids_, @@ -1425,6 +1522,9 @@ class ivf_pq_index { ::num_vectors(*partitioned_pq_vectors_) == 0) { read_index_infinite(); } + debug_matrix( + flat_ivf_centroids_, + "[ivf_pq_index@query_infinite_ram] flat_ivf_centroids_"); auto&& [active_partitions, active_queries] = detail::ivf::partition_ivf_flat_index( flat_ivf_centroids_, query_vectors, nprobe, num_threads_); @@ -1963,19 +2063,6 @@ class ivf_pq_index { return flat_ivf_centroids_; } - auto set_pq_ivf_centroids(const ColMajorMatrix& centroids) { - flat_ivf_centroids_ = flat_ivf_centroid_storage_type( - ::dimensions(centroids), ::num_vectors(centroids)); - std::copy( - centroids.data(), - centroids.data() + centroids.num_rows() * centroids.num_cols(), - flat_ivf_centroids_.data()); - } - - auto& get_pq_ivf_centroids() { - return flat_ivf_centroids_; - } - /** * @brief Used for evaluating quality of partitioning * @param centroids diff --git a/src/include/test/unit_api_ivf_pq_index.cc b/src/include/test/unit_api_ivf_pq_index.cc index 1eb285bc4..f9580a450 100644 --- a/src/include/test/unit_api_ivf_pq_index.cc +++ b/src/include/test/unit_api_ivf_pq_index.cc @@ -1095,6 +1095,7 @@ TEST_CASE("update index", "[api_ivf_pq_index]") { } // Add a new vector + std::cout << "Add a new vector ------------------------" << std::endl; { auto vectors_to_add = FeatureVectorArray( ColMajorMatrixWithIds{ @@ -1143,3 +1144,63 @@ TEST_CASE("update index", "[api_ivf_pq_index]") { n_list); } } + +TEST_CASE("create an empty index and then update", "[api_ivf_pq_index]") { + auto ctx = tiledb::Context{}; + using feature_type_type = uint8_t; + using id_type_type = uint64_t; + using partitioning_index_type_type = uint64_t; + auto feature_type = "uint8"; + auto id_type = "uint64"; + auto partitioning_index_type = "uint64"; + size_t dimensions = 3; + size_t n_list = 1; + size_t num_subspaces = 3; + float convergence_tolerance = 0.00003f; + size_t max_iterations = 3; + + std::string index_uri = + (std::filesystem::temp_directory_path() / "api_ivf_pq_index_foo") + .string(); + std::cout << "index_uri: " << index_uri << std::endl; + tiledb::VFS vfs(ctx); + if (vfs.is_dir(index_uri)) { + vfs.remove_dir(index_uri); + } + + // First create an empty index. + { + auto index = IndexIVFPQ(std::make_optional( + {{"feature_type", feature_type}, + {"id_type", id_type}, + {"partitioning_index_type", partitioning_index_type}, + {"num_subspaces", "1"}})); + + size_t num_vectors = 0; + auto empty_training_vector_array = + FeatureVectorArray(dimensions, num_vectors, feature_type, id_type); + index.train(empty_training_vector_array); + index.add(empty_training_vector_array); + index.write_index(ctx, index_uri); + + CHECK(index.feature_type_string() == feature_type); + CHECK(index.id_type_string() == id_type); + CHECK(index.partitioning_index_type_string() == partitioning_index_type); + } + + // Then add two vectors to it, while also testing we can remove their IDs + // (even though they are not present so it will be a no-op). + { + auto vectors_to_add = FeatureVectorArray( + ColMajorMatrixWithIds{ + {{0, 0, 0}, {1, 1, 1}}, {0, 1}}); + auto vector_ids_to_remove = FeatureVector(std::vector{0, 1}); + + auto index = IndexIVFPQ(ctx, index_uri); + index.update(vectors_to_add, vector_ids_to_remove); + index.write_index(ctx, index_uri); + + // Note the querying here will not work b/c we have not trained any + // centroids. We just test that we don't crash. + } +}