Skip to content

Commit

Permalink
Improve MV refresh: let it has exactly 1:1 mapping of snapshot with t…
Browse files Browse the repository at this point in the history
…he source
  • Loading branch information
coufon committed Jan 19, 2024
1 parent c98a95f commit 2bf56a0
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 81 deletions.
28 changes: 20 additions & 8 deletions python/src/space/core/ops/change_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
#
"""Change data feed that computes delta between two snapshots."""

from dataclasses import dataclass
from enum import Enum
from typing import Iterator, Tuple, List
from typing import Iterator, List

import pyarrow as pa
from pyroaring import BitMap # type: ignore[import-not-found]
Expand All @@ -40,9 +41,19 @@ class ChangeType(Enum):
# DELETE, on the same primary key in one snapshot change.


def read_change_data(
storage: Storage, start_snapshot_id: int,
end_snapshot_id: int) -> Iterator[Tuple[ChangeType, pa.Table]]:
@dataclass
class ChangeData:
"""Information and data of a change."""
# Snapshot ID that the change was committed to.
snapshot_id: int
# The change type.
type_: ChangeType
# The change data.
data: pa.Table


def read_change_data(storage: Storage, start_snapshot_id: int,
end_snapshot_id: int) -> Iterator[ChangeData]:
"""Read change data from a start to an end snapshot.
start_snapshot_id is excluded; end_snapshot_id is included.
Expand Down Expand Up @@ -82,6 +93,7 @@ def __init__(self, storage: Storage, snapshot_id: int):

self._storage = storage
self._metadata = self._storage.metadata
self._snapshot_id = snapshot_id

if snapshot_id not in self._metadata.snapshots:
raise errors.VersionNotFoundError(
Expand All @@ -93,8 +105,8 @@ def __init__(self, storage: Storage, snapshot_id: int):
change_log_file = self._storage.full_path(snapshot.change_log_file)
self._change_log = _read_change_log_proto(fs, change_log_file)

def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]:
# TODO: must return deletion first, otherwise when the upstream re-apply
def __iter__(self) -> Iterator[ChangeData]:
# Must return deletion first, otherwise when the upstream re-apply
# deletions and additions, it may delete newly added data.
# TODO: to enforce this check upstream, or merge deletion+addition as a
# update.
Expand All @@ -105,7 +117,7 @@ def __iter__(self) -> Iterator[Tuple[ChangeType, pa.Table]]:
yield self._read_bitmap_rows(ChangeType.ADD, bitmap)

def _read_bitmap_rows(self, change_type: ChangeType,
bitmap: meta.RowBitmap) -> Tuple[ChangeType, pa.Table]:
bitmap: meta.RowBitmap) -> ChangeData:
file_set = rt.FileSet(index_files=[rt.DataFile(path=bitmap.file)])
read_op = FileSetReadOp(self._storage.location, self._metadata, file_set)

Expand All @@ -115,7 +127,7 @@ def _read_bitmap_rows(self, change_type: ChangeType,
data = data.filter(
mask=_bitmap_mask(bitmap.roaring_bitmap, data.num_rows))

return (change_type, data)
return ChangeData(self._snapshot_id, change_type, data)


def _read_change_log_proto(fs: BaseFileSystem,
Expand Down
1 change: 0 additions & 1 deletion python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ message StorageMetadata {
// All alive refs, with reference name as key. Reference name can be a tag
// or a branch name.
map<string, SnapshotReference> refs = 8;

}

// The storage logical schema where user provided types are persisted instead
Expand Down
11 changes: 4 additions & 7 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from functools import wraps
from typing import Callable, Iterator, List, Optional, Tuple, Union
from typing import Callable, Iterator, List, Optional, Union

from absl import logging # type: ignore[import-untyped]
import pyarrow as pa
import pyarrow.compute as pc

Expand All @@ -30,7 +29,7 @@
from space.core.ops.append import LocalAppendOp
from space.core.ops.append import FileOptions
from space.core.ops.base import InputData, InputIteratorFn
from space.core.ops.change_data import ChangeType, read_change_data
from space.core.ops.change_data import ChangeData, read_change_data
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.insert import InsertOptions, LocalInsertOp
from space.core.ops.read import FileSetReadOp
Expand Down Expand Up @@ -78,7 +77,7 @@ def read_all(

@abstractmethod
def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
end_version: Union[int]) -> Iterator[ChangeData]:
"""Read the change data between two versions.
start_version is excluded; end_version is included.
Expand All @@ -102,11 +101,9 @@ def decorated(self, *args, **kwargs):
with self._storage.transaction() as txn: # pylint: disable=protected-access
txn.commit(fn(self, *args, **kwargs))
r = txn.result()
logging.info(f"Job result:\n{r}")
return r
except (errors.SpaceRuntimeError, errors.UserInputError) as e:
r = JobResult(JobResult.State.FAILED, None, repr(e))
logging.warning(f"Job result:\n{r}")
return r

return decorated
Expand Down Expand Up @@ -215,7 +212,7 @@ def read(

@StorageMixin.reload
def diff(self, start_version: Version,
end_version: Version) -> Iterator[Tuple[ChangeType, pa.Table]]:
end_version: Version) -> Iterator[ChangeData]:
return read_change_data(self._storage,
self._storage.version_to_snapshot_id(start_version),
self._storage.version_to_snapshot_id(end_version))
Expand Down
68 changes: 48 additions & 20 deletions python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import Iterator, List, Optional, Tuple, Union
from typing import Iterator, List, Optional, Union

import pyarrow as pa
import pyarrow.compute as pc
Expand All @@ -30,7 +30,7 @@
from space.core.ops.utils import FileOptions
from space.core.ops.append import LocalAppendOp
from space.core.ops.base import InputData, InputIteratorFn
from space.core.ops.change_data import ChangeType, read_change_data
from space.core.ops.change_data import ChangeData, ChangeType, read_change_data
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.insert import InsertOptions
from space.core.options import JoinOptions, ReadOptions
Expand All @@ -44,7 +44,7 @@

if TYPE_CHECKING:
from space.core.datasets import Dataset
from space.core.storage import Storage, Version
from space.core.storage import Storage, Transaction, Version
from space.core.views import MaterializedView, View


Expand Down Expand Up @@ -88,21 +88,21 @@ def read(
join_options).to_arrow_refs():
yield ray.get(ref)

def diff(
self, start_version: Union[Version],
end_version: Union[Version]) -> Iterator[Tuple[ChangeType, pa.Table]]:
def diff(self, start_version: Union[Version],
end_version: Union[Version]) -> Iterator[ChangeData]:
self._source_storage.reload()
source_changes = read_change_data(
self._source_storage,
self._source_storage.version_to_snapshot_id(start_version),
self._source_storage.version_to_snapshot_id(end_version))

for change_type, data in source_changes:
for change in source_changes:
# TODO: skip processing the data for deletions; the caller is usually
# only interested at deleted primary keys.
processed_remote_data = self._view.process_source(data)
processed_remote_data = self._view.process_source(change.data)
processed_data = ray.get(processed_remote_data.to_arrow_refs())
yield change_type, pa.concat_tables(processed_data)
yield ChangeData(change.snapshot_id, change.type_,
pa.concat_tables(processed_data))

@property
def _source_storage(self) -> Storage:
Expand Down Expand Up @@ -164,19 +164,44 @@ def refresh(self,
start_snapshot_id = self._storage.metadata.current_snapshot_id

job_results: List[JobResult] = []
for change_type, data in self.diff(start_snapshot_id, end_snapshot_id):
# In the scope of changes from the same snapshot, must process DELETE
# before ADD.
if change_type == ChangeType.DELETE:
job_results.append(self._process_delete(data))
elif change_type == ChangeType.ADD:
job_results.append(self._process_append(data))
else:
raise NotImplementedError(f"Change type {change_type} not supported")
patches: List[Optional[rt.Patch]] = []
previous_snapshot_id: Optional[int] = None

txn = self._start_txn()
for change in self.diff(start_snapshot_id, end_snapshot_id):
# Commit when changes from the same snapshot end.
if (previous_snapshot_id is not None and
change.snapshot_id != previous_snapshot_id):
txn.commit(utils.merge_patches(patches))
patches.clear()

# Stop early when something is wrong.
r = txn.result()
if r.state == JobResult.State.FAILED:
return job_results + [r]

job_results.append(r)
txn = self._start_txn()

try:
if change.type_ == ChangeType.DELETE:
patches.append(self._process_delete(change.data))
elif change.type_ == ChangeType.ADD:
patches.append(self._process_append(change.data))
else:
raise NotImplementedError(f"Change type {change.type_} not supported")
except (errors.SpaceRuntimeError, errors.UserInputError) as e:
r = JobResult(JobResult.State.FAILED, None, repr(e))
return job_results + [r]

previous_snapshot_id = change.snapshot_id

if patches:
txn.commit(utils.merge_patches(patches))
job_results.append(txn.result())

return job_results

@StorageMixin.transactional
def _process_delete(self, data: pa.Table) -> Optional[rt.Patch]:
filter_ = utils.primary_key_filter(self._storage.primary_keys, data)
if filter_ is None:
Expand All @@ -187,13 +212,16 @@ def _process_delete(self, data: pa.Table) -> Optional[rt.Patch]:
self._file_options)
return op.delete()

@StorageMixin.transactional
def _process_append(self, data: pa.Table) -> Optional[rt.Patch]:
op = LocalAppendOp(self._storage.location, self._storage.metadata,
self._file_options)
op.write(data)
return op.finish()

def _start_txn(self) -> Transaction:
with self._storage.transaction() as txn:
return txn


class RayReadWriterRunner(RayReadOnlyRunner, BaseReadWriteRunner):
"""Ray read write runner."""
Expand Down
39 changes: 22 additions & 17 deletions python/tests/core/ops/test_change_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pyarrow.compute as pc

from space import Dataset
from space.core.ops.change_data import ChangeType
from space.core.ops.change_data import ChangeData, ChangeType


def test_read_change_data(tmp_path, all_types_schema, all_types_input_data):
Expand All @@ -32,20 +32,22 @@ def test_read_change_data(tmp_path, all_types_schema, all_types_input_data):

changes = list(runner.diff(0, 1))
assert len(changes) == 1
expected_change0 = (ChangeType.ADD, runner.read_all())
expected_change0 = ChangeData(ds.storage.metadata.current_snapshot_id,
ChangeType.ADD, runner.read_all())
assert changes[0] == expected_change0

# Validate DELETE changes.
runner.delete((pc.field("string") == "a") | (pc.field("string") == "A"))
changes = list(runner.diff(1, 2))
assert len(changes) == 1
expected_change1 = (ChangeType.DELETE,
pa.Table.from_pydict({
"int64": [1, 0],
"float64": [0.1, -0.1],
"bool": [True, False],
"string": ["a", "A"]
}))
expected_change1 = ChangeData(
ds.storage.metadata.current_snapshot_id, ChangeType.DELETE,
pa.Table.from_pydict({
"int64": [1, 0],
"float64": [0.1, -0.1],
"bool": [True, False],
"string": ["a", "A"]
}))
assert changes[0] == expected_change1

# Validate Upsert operation's changes.
Expand All @@ -58,14 +60,17 @@ def test_read_change_data(tmp_path, all_types_schema, all_types_input_data):
runner.upsert(upsert_data)
changes = list(runner.diff(2, 3))
assert len(changes) == 2
expected_change2 = (ChangeType.DELETE,
pa.Table.from_pydict({
"int64": [2, 3],
"float64": [0.2, 0.3],
"bool": [False, False],
"string": ["b", "c"]
}))
expected_change3 = (ChangeType.ADD, pa.Table.from_pydict(upsert_data))
expected_change2 = ChangeData(
ds.storage.metadata.current_snapshot_id, ChangeType.DELETE,
pa.Table.from_pydict({
"int64": [2, 3],
"float64": [0.2, 0.3],
"bool": [False, False],
"string": ["b", "c"]
}))
expected_change3 = ChangeData(ds.storage.metadata.current_snapshot_id,
ChangeType.ADD,
pa.Table.from_pydict(upsert_data))
assert changes == [expected_change2, expected_change3]

# Validate diff with several snapshot in-between
Expand Down
11 changes: 8 additions & 3 deletions python/tests/core/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from space import Dataset, LocalRunner, TfFeatures
from space.core.jobs import JobResult
from space.core.ops.change_data import ChangeType
from space.core.ops.change_data import ChangeData, ChangeType
from space.core.schema.types import File
from space.core.utils import errors

Expand Down Expand Up @@ -122,12 +122,17 @@ def test_read_and_write_should_reload_storage(self, sample_dataset):

sample_data1 = _generate_data([1, 2])
local_runner1.append(sample_data1)
snapshot_id1 = ds1.storage.metadata.current_snapshot_id
assert local_runner2.read_all() == sample_data1

sample_data2 = _generate_data([3, 4])
local_runner1.append(sample_data2)
assert list(local_runner2.diff(0, 2)) == [(ChangeType.ADD, sample_data1),
(ChangeType.ADD, sample_data2)]
snapshot_id2 = ds1.storage.metadata.current_snapshot_id

assert list(local_runner2.diff(0, 2)) == [
ChangeData(snapshot_id1, ChangeType.ADD, sample_data1),
ChangeData(snapshot_id2, ChangeType.ADD, sample_data2)
]

sample_data3 = _generate_data([5])
sample_data4 = _generate_data([6])
Expand Down
Loading

0 comments on commit 2bf56a0

Please sign in to comment.