Skip to content

Commit

Permalink
feat: add CreateIndex commit type to python API (#2883)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiachengdb committed Sep 16, 2024
1 parent f3eef61 commit 4ce680b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
22 changes: 22 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Literal,
NamedTuple,
Optional,
Set,
TypedDict,
Union,
)
Expand Down Expand Up @@ -2233,6 +2234,27 @@ def _to_inner(self):
rewritten_indices = [index._to_inner() for index in self.rewritten_indices]
return _Operation.rewrite(groups, rewritten_indices)

@dataclass
class CreateIndex(BaseOperation):
"""
Operation that creates an index on the dataset.
"""

uuid: str
name: str
fields: List[int]
dataset_version: int
fragment_ids: Set[int]

def _to_inner(self):
return _Operation.create_index(
self.uuid,
self.name,
self.fields,
self.dataset_version,
self.fragment_ids,
)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
86 changes: 86 additions & 0 deletions python/python/tests/test_commit_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import random
import shutil
import string
from pathlib import Path

import lance
import numpy as np
import pyarrow as pa
import pytest


@pytest.fixture()
def test_table():
num_rows = 1000
price = np.random.rand(num_rows) * 100

def gen_str(n, split="", char_set=string.ascii_letters + string.digits):
return "".join(random.choices(char_set, k=n))

meta = np.array([gen_str(100) for _ in range(num_rows)])
doc = [gen_str(10, " ", string.ascii_letters) for _ in range(num_rows)]
tbl = pa.Table.from_arrays(
[
pa.array(price),
pa.array(meta),
pa.array(doc, pa.large_string()),
pa.array(range(num_rows)),
],
names=["price", "meta", "doc", "id"],
)
return tbl


@pytest.fixture()
def dataset_with_index(test_table, tmp_path):
dataset = lance.write_dataset(test_table, tmp_path)
dataset.create_scalar_index("meta", index_type="BTREE")
return dataset


def test_commit_index(dataset_with_index, test_table, tmp_path):
index_id = dataset_with_index.list_indices()[0]["uuid"]

# Create a new dataset without index
dataset_without_index = lance.write_dataset(
test_table, tmp_path / "dataset_without_index"
)

# Copy the index from dataset_with_index to dataset_without_index
src_index_dir = Path(dataset_with_index.uri) / "_indices" / index_id
dest_index_dir = Path(dataset_without_index.uri) / "_indices" / index_id
shutil.copytree(src_index_dir, dest_index_dir)

# Commit the index to dataset_without_index
field_idx = dataset_without_index.schema.get_field_index("meta")
create_index_op = lance.LanceOperation.CreateIndex(
index_id,
"meta_idx",
[field_idx],
dataset_without_index.version,
set([f.fragment_id for f in dataset_without_index.get_fragments()]),
)
dataset_without_index = lance.LanceDataset.commit(
dataset_without_index.uri,
create_index_op,
read_version=dataset_without_index.version,
)

# Verify that both datasets have the index
assert len(dataset_with_index.list_indices()) == 1
assert len(dataset_without_index.list_indices()) == 1

assert (
dataset_without_index.list_indices()[0] == dataset_with_index.list_indices()[0]
)

# Check if the index is used in scans
for dataset in [dataset_with_index, dataset_without_index]:
scanner = dataset.scanner(
fast_search=True, prefilter=True, filter="meta = 'hello'"
)
plan = scanner.explain_plan()
assert "MaterializeIndex" in plan
27 changes: 27 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use lance_index::{
use lance_io::object_store::ObjectStoreParams;
use lance_linalg::distance::MetricType;
use lance_table::format::Fragment;
use lance_table::format::Index;
use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
use pyo3::exceptions::{PyStopIteration, PyTypeError};
Expand Down Expand Up @@ -320,6 +321,32 @@ impl Operation {
};
Ok(Self(op))
}

#[staticmethod]
fn create_index(
uuid: String,
name: String,
fields: Vec<i32>,
dataset_version: u64,
fragment_ids: &PySet,
) -> PyResult<Self> {
let fragment_ids: Vec<u32> = fragment_ids
.iter()
.map(|item| item.extract::<u32>())
.collect::<PyResult<Vec<u32>>>()?;
let new_indices = vec![Index {
uuid: Uuid::parse_str(&uuid).map_err(|e| PyValueError::new_err(e.to_string()))?,
name,
fields,
dataset_version,
fragment_bitmap: Some(fragment_ids.into_iter().collect()),
}];
let op = LanceOperation::CreateIndex {
new_indices,
removed_indices: vec![],
};
Ok(Self(op))
}
}

/// Lance Dataset that will be wrapped by another class in Python
Expand Down

0 comments on commit 4ce680b

Please sign in to comment.