From 2bf56a077bfb8aaa6b8fdef85fc9007ba1ca007f Mon Sep 17 00:00:00 2001 From: coufon Date: Fri, 19 Jan 2024 06:15:49 +0000 Subject: [PATCH] Improve MV refresh: let it has exactly 1:1 mapping of snapshot with the source --- python/src/space/core/ops/change_data.py | 28 +++++-- python/src/space/core/proto/metadata.proto | 1 - python/src/space/core/runners.py | 11 +-- python/src/space/ray/runners.py | 68 ++++++++++++----- python/tests/core/ops/test_change_data.py | 39 +++++----- python/tests/core/test_runners.py | 11 ++- python/tests/ray/test_runners.py | 86 +++++++++++++++------- 7 files changed, 163 insertions(+), 81 deletions(-) diff --git a/python/src/space/core/ops/change_data.py b/python/src/space/core/ops/change_data.py index 0ee9708..1482cad 100644 --- a/python/src/space/core/ops/change_data.py +++ b/python/src/space/core/ops/change_data.py @@ -14,8 +14,9 @@ # """Change data feed that computes delta between two snapshots.""" +from dataclasses import dataclass from enum import Enum -from typing import Iterator, Tuple, List +from typing import Iterator, List import pyarrow as pa from pyroaring import BitMap # type: ignore[import-not-found] @@ -40,9 +41,19 @@ class ChangeType(Enum): # DELETE, on the same primary key in one snapshot change. -def read_change_data( - storage: Storage, start_snapshot_id: int, - end_snapshot_id: int) -> Iterator[Tuple[ChangeType, pa.Table]]: +@dataclass +class ChangeData: + """Information and data of a change.""" + # Snapshot ID that the change was committed to. + snapshot_id: int + # The change type. + type_: ChangeType + # The change data. + data: pa.Table + + +def read_change_data(storage: Storage, start_snapshot_id: int, + end_snapshot_id: int) -> Iterator[ChangeData]: """Read change data from a start to an end snapshot. start_snapshot_id is excluded; end_snapshot_id is included. @@ -82,6 +93,7 @@ def __init__(self, storage: Storage, snapshot_id: int): self._storage = storage self._metadata = self._storage.metadata + self._snapshot_id = snapshot_id if snapshot_id not in self._metadata.snapshots: raise errors.VersionNotFoundError( @@ -93,8 +105,8 @@ def __init__(self, storage: Storage, snapshot_id: int): change_log_file = self._storage.full_path(snapshot.change_log_file) self._change_log = _read_change_log_proto(fs, change_log_file) - def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]: - # TODO: must return deletion first, otherwise when the upstream re-apply + def __iter__(self) -> Iterator[ChangeData]: + # Must return deletion first, otherwise when the upstream re-apply # deletions and additions, it may delete newly added data. # TODO: to enforce this check upstream, or merge deletion+addition as a # update. @@ -105,7 +117,7 @@ def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]: yield self._read_bitmap_rows(ChangeType.ADD, bitmap) def _read_bitmap_rows(self, change_type: ChangeType, - bitmap: meta.RowBitmap) -> Tuple[ChangeType, pa.Table]: + bitmap: meta.RowBitmap) -> ChangeData: file_set = rt.FileSet(index_files=[rt.DataFile(path=bitmap.file)]) read_op = FileSetReadOp(self._storage.location, self._metadata, file_set) @@ -115,7 +127,7 @@ def _read_bitmap_rows(self, change_type: ChangeType, data = data.filter( mask=_bitmap_mask(bitmap.roaring_bitmap, data.num_rows)) - return (change_type, data) + return ChangeData(self._snapshot_id, change_type, data) def _read_change_log_proto(fs: BaseFileSystem, diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index 642a610..beb4e1d 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -66,7 +66,6 @@ message StorageMetadata { // All alive refs, with reference name as key. Reference name can be a tag // or a branch name. map refs = 8; - } // The storage logical schema where user provided types are persisted instead diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 3779289..86cf734 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -17,9 +17,8 @@ from __future__ import annotations from abc import ABC, abstractmethod from functools import wraps -from typing import Callable, Iterator, List, Optional, Tuple, Union +from typing import Callable, Iterator, List, Optional, Union -from absl import logging # type: ignore[import-untyped] import pyarrow as pa import pyarrow.compute as pc @@ -30,7 +29,7 @@ from space.core.ops.append import LocalAppendOp from space.core.ops.append import FileOptions from space.core.ops.base import InputData, InputIteratorFn -from space.core.ops.change_data import ChangeType, read_change_data +from space.core.ops.change_data import ChangeData, read_change_data from space.core.ops.delete import FileSetDeleteOp from space.core.ops.insert import InsertOptions, LocalInsertOp from space.core.ops.read import FileSetReadOp @@ -78,7 +77,7 @@ def read_all( @abstractmethod def diff(self, start_version: Union[int], - end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]: + end_version: Union[int]) -> Iterator[ChangeData]: """Read the change data between two versions. start_version is excluded; end_version is included. @@ -102,11 +101,9 @@ def decorated(self, *args, **kwargs): with self._storage.transaction() as txn: # pylint: disable=protected-access txn.commit(fn(self, *args, **kwargs)) r = txn.result() - logging.info(f"Job result:\n{r}") return r except (errors.SpaceRuntimeError, errors.UserInputError) as e: r = JobResult(JobResult.State.FAILED, None, repr(e)) - logging.warning(f"Job result:\n{r}") return r return decorated @@ -215,7 +212,7 @@ def read( @StorageMixin.reload def diff(self, start_version: Version, - end_version: Version) -> Iterator[Tuple[ChangeType, pa.Table]]: + end_version: Version) -> Iterator[ChangeData]: return read_change_data(self._storage, self._storage.version_to_snapshot_id(start_version), self._storage.version_to_snapshot_id(end_version)) diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index a4d57de..0a89d19 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -17,7 +17,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING -from typing import Iterator, List, Optional, Tuple, Union +from typing import Iterator, List, Optional, Union import pyarrow as pa import pyarrow.compute as pc @@ -30,7 +30,7 @@ from space.core.ops.utils import FileOptions from space.core.ops.append import LocalAppendOp from space.core.ops.base import InputData, InputIteratorFn -from space.core.ops.change_data import ChangeType, read_change_data +from space.core.ops.change_data import ChangeData, ChangeType, read_change_data from space.core.ops.delete import FileSetDeleteOp from space.core.ops.insert import InsertOptions from space.core.options import JoinOptions, ReadOptions @@ -44,7 +44,7 @@ if TYPE_CHECKING: from space.core.datasets import Dataset - from space.core.storage import Storage, Version + from space.core.storage import Storage, Transaction, Version from space.core.views import MaterializedView, View @@ -88,21 +88,21 @@ def read( join_options).to_arrow_refs(): yield ray.get(ref) - def diff( - self, start_version: Union[Version], - end_version: Union[Version]) -> Iterator[Tuple[ChangeType, pa.Table]]: + def diff(self, start_version: Union[Version], + end_version: Union[Version]) -> Iterator[ChangeData]: self._source_storage.reload() source_changes = read_change_data( self._source_storage, self._source_storage.version_to_snapshot_id(start_version), self._source_storage.version_to_snapshot_id(end_version)) - for change_type, data in source_changes: + for change in source_changes: # TODO: skip processing the data for deletions; the caller is usually # only interested at deleted primary keys. - processed_remote_data = self._view.process_source(data) + processed_remote_data = self._view.process_source(change.data) processed_data = ray.get(processed_remote_data.to_arrow_refs()) - yield change_type, pa.concat_tables(processed_data) + yield ChangeData(change.snapshot_id, change.type_, + pa.concat_tables(processed_data)) @property def _source_storage(self) -> Storage: @@ -164,19 +164,44 @@ def refresh(self, start_snapshot_id = self._storage.metadata.current_snapshot_id job_results: List[JobResult] = [] - for change_type, data in self.diff(start_snapshot_id, end_snapshot_id): - # In the scope of changes from the same snapshot, must process DELETE - # before ADD. - if change_type == ChangeType.DELETE: - job_results.append(self._process_delete(data)) - elif change_type == ChangeType.ADD: - job_results.append(self._process_append(data)) - else: - raise NotImplementedError(f"Change type {change_type} not supported") + patches: List[Optional[rt.Patch]] = [] + previous_snapshot_id: Optional[int] = None + + txn = self._start_txn() + for change in self.diff(start_snapshot_id, end_snapshot_id): + # Commit when changes from the same snapshot end. + if (previous_snapshot_id is not None and + change.snapshot_id != previous_snapshot_id): + txn.commit(utils.merge_patches(patches)) + patches.clear() + + # Stop early when something is wrong. + r = txn.result() + if r.state == JobResult.State.FAILED: + return job_results + [r] + + job_results.append(r) + txn = self._start_txn() + + try: + if change.type_ == ChangeType.DELETE: + patches.append(self._process_delete(change.data)) + elif change.type_ == ChangeType.ADD: + patches.append(self._process_append(change.data)) + else: + raise NotImplementedError(f"Change type {change.type_} not supported") + except (errors.SpaceRuntimeError, errors.UserInputError) as e: + r = JobResult(JobResult.State.FAILED, None, repr(e)) + return job_results + [r] + + previous_snapshot_id = change.snapshot_id + + if patches: + txn.commit(utils.merge_patches(patches)) + job_results.append(txn.result()) return job_results - @StorageMixin.transactional def _process_delete(self, data: pa.Table) -> Optional[rt.Patch]: filter_ = utils.primary_key_filter(self._storage.primary_keys, data) if filter_ is None: @@ -187,13 +212,16 @@ def _process_delete(self, data: pa.Table) -> Optional[rt.Patch]: self._file_options) return op.delete() - @StorageMixin.transactional def _process_append(self, data: pa.Table) -> Optional[rt.Patch]: op = LocalAppendOp(self._storage.location, self._storage.metadata, self._file_options) op.write(data) return op.finish() + def _start_txn(self) -> Transaction: + with self._storage.transaction() as txn: + return txn + class RayReadWriterRunner(RayReadOnlyRunner, BaseReadWriteRunner): """Ray read write runner.""" diff --git a/python/tests/core/ops/test_change_data.py b/python/tests/core/ops/test_change_data.py index b0dbf7e..73ec7f3 100644 --- a/python/tests/core/ops/test_change_data.py +++ b/python/tests/core/ops/test_change_data.py @@ -16,7 +16,7 @@ import pyarrow.compute as pc from space import Dataset -from space.core.ops.change_data import ChangeType +from space.core.ops.change_data import ChangeData, ChangeType def test_read_change_data(tmp_path, all_types_schema, all_types_input_data): @@ -32,20 +32,22 @@ def test_read_change_data(tmp_path, all_types_schema, all_types_input_data): changes = list(runner.diff(0, 1)) assert len(changes) == 1 - expected_change0 = (ChangeType.ADD, runner.read_all()) + expected_change0 = ChangeData(ds.storage.metadata.current_snapshot_id, + ChangeType.ADD, runner.read_all()) assert changes[0] == expected_change0 # Validate DELETE changes. runner.delete((pc.field("string") == "a") | (pc.field("string") == "A")) changes = list(runner.diff(1, 2)) assert len(changes) == 1 - expected_change1 = (ChangeType.DELETE, - pa.Table.from_pydict({ - "int64": [1, 0], - "float64": [0.1, -0.1], - "bool": [True, False], - "string": ["a", "A"] - })) + expected_change1 = ChangeData( + ds.storage.metadata.current_snapshot_id, ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [1, 0], + "float64": [0.1, -0.1], + "bool": [True, False], + "string": ["a", "A"] + })) assert changes[0] == expected_change1 # Validate Upsert operation's changes. @@ -58,14 +60,17 @@ def test_read_change_data(tmp_path, all_types_schema, all_types_input_data): runner.upsert(upsert_data) changes = list(runner.diff(2, 3)) assert len(changes) == 2 - expected_change2 = (ChangeType.DELETE, - pa.Table.from_pydict({ - "int64": [2, 3], - "float64": [0.2, 0.3], - "bool": [False, False], - "string": ["b", "c"] - })) - expected_change3 = (ChangeType.ADD, pa.Table.from_pydict(upsert_data)) + expected_change2 = ChangeData( + ds.storage.metadata.current_snapshot_id, ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [2, 3], + "float64": [0.2, 0.3], + "bool": [False, False], + "string": ["b", "c"] + })) + expected_change3 = ChangeData(ds.storage.metadata.current_snapshot_id, + ChangeType.ADD, + pa.Table.from_pydict(upsert_data)) assert changes == [expected_change2, expected_change3] # Validate diff with several snapshot in-between diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index 25bb435..648c468 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -26,7 +26,7 @@ from space import Dataset, LocalRunner, TfFeatures from space.core.jobs import JobResult -from space.core.ops.change_data import ChangeType +from space.core.ops.change_data import ChangeData, ChangeType from space.core.schema.types import File from space.core.utils import errors @@ -122,12 +122,17 @@ def test_read_and_write_should_reload_storage(self, sample_dataset): sample_data1 = _generate_data([1, 2]) local_runner1.append(sample_data1) + snapshot_id1 = ds1.storage.metadata.current_snapshot_id assert local_runner2.read_all() == sample_data1 sample_data2 = _generate_data([3, 4]) local_runner1.append(sample_data2) - assert list(local_runner2.diff(0, 2)) == [(ChangeType.ADD, sample_data1), - (ChangeType.ADD, sample_data2)] + snapshot_id2 = ds1.storage.metadata.current_snapshot_id + + assert list(local_runner2.diff(0, 2)) == [ + ChangeData(snapshot_id1, ChangeType.ADD, sample_data1), + ChangeData(snapshot_id2, ChangeType.ADD, sample_data2) + ] sample_data3 = _generate_data([5]) sample_data4 = _generate_data([6]) diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 63c15e7..614da7e 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -24,7 +24,7 @@ from space import Dataset, JoinOptions, Range from space.core.jobs import JobResult -from space.core.ops.change_data import ChangeType +from space.core.ops.change_data import ChangeData, ChangeType from space.core.ops.read import read_record_column from space.core.utils import errors from space.core.utils.uuids import random_id @@ -145,6 +145,7 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: input_fields=["int64", "float64"], output_schema=view_schema) mv = view.materialize(str(tmp_path / "mv")) + mv1 = view.materialize(str(tmp_path / "mv1")) ds_runner = ds.local() view_runner = view.ray() @@ -156,20 +157,22 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: "binary": [b"b1", b"b2", b"b3"] }) - expected_change0 = (ChangeType.ADD, - pa.Table.from_pydict({ - "int64": [1, 2, 3], - "float64": [1.1, 1.2, 1.3], - })) + expected_change0 = ChangeData( + ds.storage.metadata.current_snapshot_id, ChangeType.ADD, + pa.Table.from_pydict({ + "int64": [1, 2, 3], + "float64": [1.1, 1.2, 1.3], + })) assert list(view_runner.diff(0, 1)) == [expected_change0] # Test deletion. ds_runner.delete(pc.field("int64") == 2) - expected_change1 = (ChangeType.DELETE, - pa.Table.from_pydict({ - "int64": [2], - "float64": [1.2] - })) + expected_change1 = ChangeData( + ds.storage.metadata.current_snapshot_id, ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [2], + "float64": [1.2] + })) assert list(view_runner.diff(1, 2)) == [expected_change1] # Test that diff supports tags. @@ -185,11 +188,11 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: ray_runner = mv.ray() local_runner = mv.local() - ray_runner.refresh(1) - assert local_runner.read_all() == expected_change0[1] - assert ray_runner.read_all() == expected_change0[1] + assert len(ray_runner.refresh(1)) == 1 + assert local_runner.read_all() == expected_change0.data + assert ray_runner.read_all() == expected_change0.data - ray_runner.refresh() + assert len(ray_runner.refresh()) == 1 assert local_runner.read_all() == pa.Table.from_pydict({ "int64": [1, 3], "float64": [1.1, 1.3], @@ -204,6 +207,37 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: match=r".*Target snapshot ID 3 higher than source dataset version 2.*"): ray_runner.refresh(3) + # Test upsert's diff. + ds_runner.upsert({ + "int64": [3, 4], + "float64": [0.33, 0.4], + "binary": [b"b3", b"b4"] + }) + assert list(ds_runner.diff(2, 3)) == [ + ChangeData( + 3, ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [3], + "float64": [0.3], + "binary": [b"b3"] + })), + ChangeData( + 3, ChangeType.ADD, + pa.Table.from_pydict({ + "int64": [3, 4], + "float64": [0.33, 0.4], + "binary": [b"b3", b"b4"] + })) + ] + + # Test refresh multiple snapshots. + ray_runner = mv1.ray() + assert len(ray_runner.refresh()) == 3 + assert ray_runner.read_all() == pa.Table.from_pydict({ + "int64": [1, 3, 4], + "float64": [1.1, 1.33, 1.4], + }) + def test_diff_filter(self, sample_dataset): # A sample UDF for testing. def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]: @@ -222,20 +256,22 @@ def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]: "binary": [b"b1", b"b2", b"b3"] }) - expected_change0 = (ChangeType.ADD, - pa.Table.from_pydict({ - "int64": [2, 3], - "float64": [0.2, 0.3] - })) + expected_change0 = ChangeData( + sample_dataset.storage.metadata.current_snapshot_id, ChangeType.ADD, + pa.Table.from_pydict({ + "int64": [2, 3], + "float64": [0.2, 0.3] + })) assert list(view_runner.diff(0, 1)) == [expected_change0] # Test deletion. ds_runner.delete(pc.field("int64") == 2) - expected_change1 = (ChangeType.DELETE, - pa.Table.from_pydict({ - "int64": [2], - "float64": [0.2] - })) + expected_change1 = ChangeData( + sample_dataset.storage.metadata.current_snapshot_id, ChangeType.DELETE, + pa.Table.from_pydict({ + "int64": [2], + "float64": [0.2] + })) assert list(view_runner.diff(1, 2)) == [expected_change1] # Test several changes.