Skip to content

Commit

Permalink
refactor: move IVF_HNSW_SQ & IVF_FLAT to new buliding & search path (#…
Browse files Browse the repository at this point in the history
…2469)

- IVF_HNSW_SQ for new search/build path
- IVF_FLAT e2e pass
- support to train quantizer with new index builder
- fix partition order broken after building
- clean IVF related types
- index builder method chaining for customizing
- impl merging deltas for new IVF_HNSW_SQ & IVF_FLAT

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
  • Loading branch information
BubbleCal committed Jun 25, 2024
1 parent 4a114b2 commit f51c5f0
Show file tree
Hide file tree
Showing 39 changed files with 1,772 additions and 1,186 deletions.
8 changes: 1 addition & 7 deletions python/python/lance/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .dependencies import _check_for_numpy, _check_for_pandas
from .dependencies import numpy as np
from .dependencies import pandas as pd
from .lance import _build_sq_storage, _Hnsw, _KMeans
from .lance import _Hnsw, _KMeans

if TYPE_CHECKING:
ts_types = Union[datetime, pd.Timestamp, str]
Expand Down Expand Up @@ -245,9 +245,3 @@ def to_lance_file(self, file_path):

def vectors(self) -> pa.Array:
return self._hnsw.vectors()


def build_sq_storage(
row_ids_array: Iterator[pa.Array], vectors_array: pa.Array, dim, bounds: tuple
) -> pa.RecordBatch:
return _build_sq_storage(row_ids_array, vectors_array, dim, bounds)
2 changes: 0 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pub(crate) mod utils;
pub use crate::arrow::{bfloat16_array, BFloat16};
use crate::fragment::{cleanup_partial_writes, write_fragments};
pub use crate::tracing::{trace_to_chrome, TraceGuard};
use crate::utils::build_sq_storage;
use crate::utils::Hnsw;
use crate::utils::KMeans;
pub use dataset::write_dataset;
Expand Down Expand Up @@ -142,7 +141,6 @@ fn lance(py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(cleanup_partial_writes))?;
m.add_wrapped(wrap_pyfunction!(trace_to_chrome))?;
m.add_wrapped(wrap_pyfunction!(manifest_needs_migration))?;
m.add_wrapped(wrap_pyfunction!(build_sq_storage))?;
// Debug functions
m.add_wrapped(wrap_pyfunction!(debug::format_schema))?;
m.add_wrapped(wrap_pyfunction!(debug::format_manifest))?;
Expand Down
52 changes: 5 additions & 47 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,18 @@

use std::sync::Arc;

use arrow::compute::{concat, concat_batches};
use arrow::compute::concat;
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow_array::{
cast::AsArray, Array, FixedSizeListArray, Float32Array, UInt32Array, UInt64Array,
};
use arrow_array::{cast::AsArray, Array, FixedSizeListArray, Float32Array, UInt32Array};
use arrow_data::ArrayData;
use arrow_schema::DataType;
use lance::Result;
use lance::{datatypes::Schema, index::vector::sq, io::ObjectStore};
use lance::{datatypes::Schema, io::ObjectStore};
use lance_arrow::FixedSizeListArrayExt;
use lance_file::writer::FileWriter;
use lance_index::scalar::IndexWriter;
use lance_index::vector::hnsw::{builder::HnswBuildParams, HNSW};
use lance_index::vector::v3::subindex::IvfSubIndex;
use lance_index::vector::{
hnsw::{builder::HnswBuildParams, HNSW},
storage::VectorStore,
};
use lance_linalg::kmeans::compute_partitions;
use lance_linalg::{
distance::DistanceType,
Expand All @@ -41,7 +36,7 @@ use object_store::path::Path;
use pyo3::{
exceptions::{PyIOError, PyRuntimeError, PyValueError},
prelude::*,
types::{PyIterator, PyTuple},
types::PyIterator,
};

use crate::RT;
Expand Down Expand Up @@ -220,40 +215,3 @@ impl Hnsw {
self.vectors.to_data().to_pyarrow(py)
}
}

#[pyfunction(name = "_build_sq_storage")]
pub fn build_sq_storage(
py: Python,
row_ids_array: &PyIterator,
vectors: &PyAny,
dim: usize,
bounds: &PyTuple,
) -> PyResult<PyObject> {
let mut row_ids_arr: Vec<Arc<dyn Array>> = Vec::new();
for row_ids in row_ids_array {
let row_ids = ArrayData::from_pyarrow(row_ids?)?;
if !matches!(row_ids.data_type(), DataType::UInt64) {
return Err(PyValueError::new_err("Must be a UInt64"));
}
row_ids_arr.push(Arc::new(UInt64Array::from(row_ids)));
}
let row_ids_refs = row_ids_arr.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
let row_ids = concat(&row_ids_refs).map_err(|e| PyIOError::new_err(e.to_string()))?;
std::mem::drop(row_ids_arr);

let vectors = Arc::new(FixedSizeListArray::from(ArrayData::from_pyarrow(vectors)?));

let lower_bound = bounds.get_item(0)?.extract::<f64>()?;
let upper_bound = bounds.get_item(1)?.extract::<f64>()?;
let quantizer =
lance_index::vector::sq::ScalarQuantizer::with_bounds(8, dim, lower_bound..upper_bound);
let storage = sq::build_sq_storage(DistanceType::L2, row_ids, vectors, quantizer)
.map_err(|e| PyIOError::new_err(e.to_string()))?;
let batches = storage
.to_batches()
.map_err(|e| PyIOError::new_err(e.to_string()))?
.collect::<Vec<_>>();
let batch = concat_batches(&batches[0].schema(), &batches)
.map_err(|e| PyIOError::new_err(e.to_string()))?;
batch.to_pyarrow(py)
}
24 changes: 24 additions & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//!
//! To improve Arrow-RS ergonomic

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{
Expand Down Expand Up @@ -374,6 +375,19 @@ pub trait RecordBatchExt {
/// Project the schema over the [RecordBatch].
fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;

/// metadata of the schema.
fn metadata(&self) -> &HashMap<String, String>;

/// Add metadata to the schema.
fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
let mut metadata = self.metadata().clone();
metadata.insert(key, value);
self.with_metadata(metadata)
}

/// Replace the schema metadata with the provided one.
fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;

/// Take selected rows from the [RecordBatch].
fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
}
Expand Down Expand Up @@ -460,6 +474,16 @@ impl RecordBatchExt for RecordBatch {
self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
}

fn metadata(&self) -> &HashMap<String, String> {
self.schema_ref().metadata()
}

fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
let mut schema = self.schema_ref().as_ref().clone();
schema.metadata = metadata;
Self::try_new(schema.into(), self.columns().into())
}

fn take(&self, indices: &UInt32Array) -> Result<Self> {
let struct_array: StructArray = self.clone().into();
let taken = take(&struct_array, indices, None)?;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ impl CoreFieldDecoderStrategy {
/// Helper method to verify the page encoding of a struct header column
fn check_simple_struct(column_info: &ColumnInfo, path: &VecDeque<u32>) -> Result<()> {
Self::ensure_values_encoded(column_info, path)?;
if !column_info.page_infos.len() == 1 {
if column_info.page_infos.len() != 1 {
return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
}
let encoding = &column_info.page_infos[0].encoding;
Expand Down
8 changes: 4 additions & 4 deletions rust/lance-index/benches/find_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};

use lance_index::vector::ivf::Ivf;
use lance_index::vector::ivf::IvfTransformer;
use lance_linalg::distance::DistanceType;
use lance_testing::datagen::generate_random_array_with_seed;

Expand All @@ -27,14 +27,14 @@ fn bench_partitions(c: &mut Criterion) {
let fsl = FixedSizeListArray::try_new_from_values(centroids, DIMENSION as i32).unwrap();

for k in &[1, 10, 50] {
let ivf = Ivf::new(fsl.clone(), DistanceType::L2, vec![]);
let ivf = IvfTransformer::new(fsl.clone(), DistanceType::L2, vec![]);
c.bench_function(format!("IVF{},k={},L2", num_centroids, k).as_str(), |b| {
b.iter(|| {
let _ = ivf.find_partitions(&query, *k);
})
});

let ivf = Ivf::new(fsl.clone(), DistanceType::Cosine, vec![]);
let ivf = IvfTransformer::new(fsl.clone(), DistanceType::Cosine, vec![]);
c.bench_function(
format!("IVF{},k={},Cosine", num_centroids, k).as_str(),
|b| {
Expand All @@ -45,7 +45,7 @@ fn bench_partitions(c: &mut Criterion) {
);
}

let ivf = Ivf::new(fsl.clone(), DistanceType::L2, vec![]);
let ivf = IvfTransformer::new(fsl.clone(), DistanceType::L2, vec![]);
let batch = generate_random_array_with_seed::<Float32Type>(DIMENSION * 4096, SEED);
let fsl = FixedSizeListArray::try_new_from_values(batch, DIMENSION as i32).unwrap();
c.bench_function(
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub trait Index: Send + Sync + DeepSizeOf {
/// Cast to [Index]
fn as_index(self: Arc<Self>) -> Arc<dyn Index>;

/// Cast to [vector::VectorIndex]
fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn vector::VectorIndex>>;

/// Retrieve index statistics as a JSON Value
fn statistics(&self) -> Result<serde_json::Value>;

Expand Down
7 changes: 7 additions & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,13 @@ impl Index for BTreeIndex {
self
}

fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
Err(Error::NotSupported {
source: "BTreeIndex is not vector index".into(),
location: location!(),
})
}

fn index_type(&self) -> IndexType {
IndexType::Scalar
}
Expand Down
10 changes: 9 additions & 1 deletion rust/lance-index/src/scalar/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_physical_expr::expressions::{in_list, lit, Column};
use deepsize::DeepSizeOf;
use lance_core::utils::address::RowAddress;
use lance_core::Result;
use lance_core::{Error, Result};
use roaring::RoaringBitmap;
use snafu::{location, Location};

use crate::{Index, IndexType};

Expand Down Expand Up @@ -157,6 +158,13 @@ impl Index for FlatIndex {
self
}

fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
Err(Error::NotSupported {
source: "FlatIndex is not vector index".into(),
location: location!(),
})
}

fn index_type(&self) -> IndexType {
IndexType::Scalar
}
Expand Down
24 changes: 23 additions & 1 deletion rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

use std::{collections::HashMap, sync::Arc};

use arrow_array::{ArrayRef, RecordBatch};
use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
use arrow_schema::Field;
use async_trait::async_trait;
use ivf::storage::IvfModel;
use lance_core::{Result, ROW_ID_FIELD};
use lance_io::traits::Reader;
use lance_linalg::distance::DistanceType;
use lazy_static::lazy_static;
use quantizer::{QuantizationType, Quantizer};
use v3::subindex::SubIndexType;

pub mod bq;
pub mod flat;
Expand Down Expand Up @@ -102,6 +105,7 @@ impl From<DistanceType> for pb::VectorMetricType {
}

/// Vector Index for (Approximate) Nearest Neighbor (ANN) Search.
/// It's always the IVF index, any other index types without partitioning will be treated as IVF with one partition.
#[async_trait]
#[allow(clippy::redundant_pub_crate)]
pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
Expand All @@ -125,6 +129,15 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
/// - Only supports `f32` now. Will add f64/f16 later.
async fn search(&self, query: &Query, pre_filter: Arc<dyn PreFilter>) -> Result<RecordBatch>;

fn find_partitions(&self, query: &Query) -> Result<UInt32Array>;

async fn search_in_partition(
&self,
partition_id: usize,
query: &Query,
pre_filter: Arc<dyn PreFilter>,
) -> Result<RecordBatch>;

/// If the index is loadable by IVF, so it can be a sub-index that
/// is loaded on demand by IVF.
fn is_loadable(&self) -> bool;
Expand All @@ -136,6 +149,9 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
/// explaining why not
fn check_can_remap(&self) -> Result<()>;

// async fn append(&self, batches: Vec<RecordBatch>) -> Result<()>;
// async fn merge(&self, indices: Vec<Arc<dyn VectorIndex>>) -> Result<()>;

/// Load the index from the reader on-demand.
async fn load(
&self,
Expand Down Expand Up @@ -170,4 +186,10 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {

/// The metric type of this vector index.
fn metric_type(&self) -> DistanceType;

fn ivf_model(&self) -> IvfModel;
fn quantizer(&self) -> Quantizer;

/// the index type of this vector index.
fn sub_index_type(&self) -> (SubIndexType, QuantizationType);
}
Loading

0 comments on commit f51c5f0

Please sign in to comment.