Skip to content

Commit

Permalink
feat: extend python bindings for the v2 reader/writer (lancedb#2800)
Browse files Browse the repository at this point in the history
PR extends Python Bindings for V2 Writer/Reader and adds bindings for
adding global buffers and adding schema metadata both of which are
public functions.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
2 people authored and gagan-bhullar-tech committed Sep 13, 2024
1 parent a3b0c86 commit dc34e14
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 2 deletions.
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing-chrome = "0.7.1"
tracing-subscriber = "0.3.17"
tracing = "0.1.37"
url = "2.5.0"
bytes = "1.4"

[features]
datagen = ["lance-datagen"]
Expand Down
54 changes: 52 additions & 2 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ def take_rows(
"""
for i in range(len(indices) - 1):
if indices[i] > indices[i + 1]:
raise ValueError(f"Indices must be sorted in ascending order for \
file API, got {indices[i]} > {indices[i+1]}")
raise ValueError(
f"Indices must be sorted in ascending order for \
file API, got {indices[i]} > {indices[i+1]}"
)

return ReaderResults(
self._reader.take_rows(indices, batch_size, batch_readahead)
Expand All @@ -138,6 +140,22 @@ def metadata(self) -> LanceFileMetadata:
"""
return self._reader.metadata()

def read_global_buffer(self, index: int) -> bytes:
"""
Read a global buffer from the file at a given index
Parameters
----------
index: int
The index of the global buffer to read
Returns
-------
bytes
The contents of the global buffer
"""
return self._reader.read_global_buffer(index)


class LanceFileWriter:
"""
Expand Down Expand Up @@ -208,6 +226,38 @@ def close(self) -> int:
self.closed = True
return self._writer.finish()

def add_schema_metadata(self, key: str, value: str) -> None:
"""
Add a metadata (key/value pair) entry to the schema. This method allows you to
alter the schema metadata. It must be called before `close` is called.
Parameters
----------
key: str
The key to add.
value: str
The value to add.
"""
self._writer.add_schema_metadata(key, value)

def add_global_buffer(self, data: bytes) -> int:
"""
Add a global buffer to the file. The global buffer can contain any
arbitrary bytes.
Parameters
----------
data: bytes
The data to write to the file.
Returns
-------
int
The index of the global buffer. This will always start at 1
and increment by 1 each time this method is called.
"""
return self._writer.add_global_buffer(data)

def __enter__(self) -> "LanceFileWriter":
return self

Expand Down
3 changes: 3 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class LanceFileWriter:
): ...
def write_batch(self, batch: pa.RecordBatch) -> None: ...
def finish(self) -> int: ...
def add_schema_metadata(self, key: str, value: str) -> None: ...
def add_global_buffer(self, data: bytes) -> int: ...

class LanceFileReader:
def __init__(self, path: str): ...
Expand All @@ -56,6 +58,7 @@ class LanceFileReader:
def take_rows(
self, indices: List[int], batch_size: int, batch_readahead: int
) -> pa.RecordBatchReader: ...
def read_global_buffer(self, index: int) -> bytes: ...

class LanceBufferDescriptor:
position: int
Expand Down
35 changes: 35 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,38 @@ def round_trip(arr):
round_tripped = round_trip(dict_arr)
assert round_tripped == dict_arr
assert round_tripped.type == dict_arr.type


def test_write_read_global_buffer(tmp_path):
table = pa.table({"a": [1, 2, 3]})
path = tmp_path / "foo.lance"
global_buffer_text = "hello"
global_buffer_bytes = bytes(global_buffer_text, "utf-8")
with LanceFileWriter(str(path)) as writer:
writer.write_batch(table)
global_buffer_pos = writer.add_global_buffer(global_buffer_bytes)
reader = LanceFileReader(str(path))
assert reader.read_all().to_table() == table
assert reader.metadata().global_buffers[global_buffer_pos].size == len(
global_buffer_bytes
)
assert (
bytes(reader.read_global_buffer(global_buffer_pos)).decode()
== global_buffer_text
)


def test_write_read_additional_schema_metadata(tmp_path):
table = pa.table({"a": [1, 2, 3]})
path = tmp_path / "foo.lance"
schema_metadata_key = "foo"
schema_metadata_value = "bar"
with LanceFileWriter(str(path)) as writer:
writer.write_batch(table)
writer.add_schema_metadata(schema_metadata_key, schema_metadata_value)
reader = LanceFileReader(str(path))
assert reader.read_all().to_table() == table
assert (
reader.metadata().schema.metadata.get(schema_metadata_key.encode()).decode()
== schema_metadata_value
)
19 changes: 19 additions & 0 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{pin::Pin, sync::Arc};
use arrow::pyarrow::PyArrowType;
use arrow_array::{RecordBatch, RecordBatchReader, UInt32Array};
use arrow_schema::Schema as ArrowSchema;
use bytes::Bytes;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
Expand Down Expand Up @@ -234,6 +235,16 @@ impl LanceFileWriter {
pub fn finish(&mut self) -> PyResult<u64> {
RT.runtime.block_on(self.inner.finish()).infer_error()
}

pub fn add_global_buffer(&mut self, bytes: Vec<u8>) -> PyResult<u32> {
RT.runtime
.block_on(self.inner.add_global_buffer(Bytes::from(bytes)))
.infer_error()
}

pub fn add_schema_metadata(&mut self, key: String, value: String) {
self.inner.add_schema_metadata(key, value)
}
}

fn path_to_parent(path: &Path) -> PyResult<(Path, String)> {
Expand Down Expand Up @@ -395,4 +406,12 @@ impl LanceFileReader {
let inner_meta = self.inner.metadata();
LanceFileMetadata::new(inner_meta, py)
}

pub fn read_global_buffer(&mut self, index: u32) -> PyResult<Vec<u8>> {
let buffer_bytes = RT
.runtime
.block_on(self.inner.read_global_buffer(index))
.infer_error()?;
Ok(buffer_bytes.to_vec())
}
}

0 comments on commit dc34e14

Please sign in to comment.