Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support materialized views in directory catalog #67

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/src/space/catalogs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pyarrow as pa

from space.core.datasets import Dataset
from space.core.views import MaterializedView
from space.core.views import MaterializedView, View


class BaseCatalog(ABC):
Expand All @@ -44,6 +44,14 @@ def create_dataset(self, name: str, schema: pa.Schema,
record_fields: fields stored in row format (ArrayRecord).
"""

def materialize(self, name: str, view: View):
"""Create a new materialized view.

Args:
name: the materialized view name.
view: the view to be materialized.
"""

@abstractmethod
def delete_dataset(self, name: str) -> None:
"""Delete an existing dataset or materialized view.
Expand Down
26 changes: 19 additions & 7 deletions python/src/space/catalogs/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

from space.catalogs.base import BaseCatalog, DatasetInfo
from space.core.datasets import Dataset
from space.core.utils import paths
from space.core.views import MaterializedView
import space.core.proto.metadata_pb2 as meta
from space.core.storage import Storage
from space.core.utils import errors, paths
from space.core.views import MaterializedView, View, load_materialized_view


class DirCatalog(BaseCatalog):
Expand All @@ -39,18 +41,28 @@ def __init__(self, location):
def create_dataset(self, name: str, schema: pa.Schema,
primary_keys: List[str],
record_fields: List[str]) -> Dataset:
# TODO: should disallow overwriting an entry point file, to avoid creating
# two datasets at the same location.
return Dataset.create(self._dataset_location(name), schema, primary_keys,
record_fields)

def materialize(self, name: str, view: View):
return view.materialize(self._dataset_location(name))

def delete_dataset(self, name: str) -> None:
raise NotImplementedError("delete_dataset has not been implemented")

def dataset(self, name: str) -> Union[Dataset, MaterializedView]:
# TODO: to catch file not found and re-throw a DatasetNotFoundError.
# TODO: to support loading a materialized view.
return Dataset.load(self._dataset_location(name))
try:
storage = Storage.load(self._dataset_location(name))
except FileNotFoundError as e:
raise errors.StorageNotFoundError(str(e)) from None

if storage.metadata.type == meta.StorageMetadata.DATASET:
return Dataset(storage)
elif storage.metadata.type == meta.StorageMetadata.MATERIALIZED_VIEW:
return load_materialized_view(storage)

raise errors.SpaceRuntimeError(
f"Storage type {storage.metadata.type} is not supported")

def datasets(self) -> List[DatasetInfo]:
results = []
Expand Down
20 changes: 12 additions & 8 deletions python/src/space/core/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class PrimaryKeyExistError(UserInputError):
"""Errors caused by duplicated primary keys."""


class FileExistError(UserInputError):
"""Errors caused by a file to create already exists."""


class StorageExistError(UserInputError):
"""Errors caused by a storage to create already exists."""


class StorageNotFoundError(UserInputError):
"""The storage to load is not found."""


class SpaceRuntimeError(RuntimeError):
"""Basic class of errors thrown from Space runtime."""

Expand All @@ -41,11 +53,3 @@ class TransactionError(SpaceRuntimeError):

class LogicalPlanError(SpaceRuntimeError):
"""Errors from parsing logical plan."""


class FileExistError(UserInputError):
"""Errors caused by a file to create already exists."""


class StorageExistError(UserInputError):
"""Errors caused by a storage to create already exists."""
21 changes: 12 additions & 9 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ def _sanitize_fields(field_names: List[str]) -> None:
from space.core.transform.join import JoinTransform
from space.ray.ops.join import JoinInput
return JoinTransform(join_keys=keys,
left=JoinInput(left, left_fields,
left_reference_read),
left=JoinInput(left, left_fields, left_reference_read),
right=JoinInput(right, right_fields,
right_reference_read))

Expand Down Expand Up @@ -293,11 +292,15 @@ def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan,
@classmethod
def load(cls, location: str) -> MaterializedView:
"""Load a materialized view from files."""
storage = Storage.load(location)
metadata = storage.metadata
plan = metadata.logical_plan.logical_plan
return load_materialized_view(Storage.load(location))

# pylint: disable=cyclic-import,import-outside-toplevel
from space.core.transform.udfs import load_view
view = load_view(storage.location, metadata, plan)
return MaterializedView(storage, view)

def load_materialized_view(storage: Storage) -> MaterializedView:
"""Load a materialized view from a storage."""
metadata = storage.metadata
plan = metadata.logical_plan.logical_plan

# pylint: disable=cyclic-import,import-outside-toplevel
from space.core.transform.udfs import load_view
view = load_view(storage.location, metadata, plan)
return MaterializedView(storage, view)
52 changes: 51 additions & 1 deletion python/tests/catalogs/test_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
# limitations under the License.

import os
from typing import Dict

import numpy as np
import pyarrow as pa
import pytest

from space import DatasetInfo, DirCatalog
from space.core.utils import errors


# A sample UDF for testing.
def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["float64"] = batch["float64"] + 1
return batch


class TestDirectoryCatalog:

def test_crud(self, tmp_path):
def test_dataset_crud(self, tmp_path):
schema = pa.schema([("f", pa.int64())])
pks = ["f"]
records = []
Expand Down Expand Up @@ -57,3 +65,45 @@ def test_crud(self, tmp_path):
cat.create_dataset("ds2", schema, pks, records)

assert "already exists" in str(excinfo.value)

with pytest.raises(errors.StorageNotFoundError) as excinfo:
cat.dataset("ds_not_exist")

assert "Failed to open local file" in str(excinfo.value)

def test_materialized_view_crud(self, tmp_path):
schema = pa.schema([("f", pa.int64()), ("float64", pa.float64())])
pks = ["f"]
records = []

location = str(tmp_path / "cat")
cat = DirCatalog(location)

ds = cat.create_dataset("ds", schema, pks, records)
view = ds.map_batches(fn=_sample_map_udf,
input_fields=["f", "float64"],
output_schema=schema,
output_record_fields=[])

mv1 = cat.materialize("mv1", view)

ds.local().append({"f": [1, 2, 3], "float64": [0.1, 0.2, 0.3]})
mv1.ray().refresh()
expected_data = {"f": [1, 2, 3], "float64": [1.1, 1.2, 1.3]}
assert mv1.local().read_all().to_pydict() == expected_data

mv1_loaded = cat.dataset("mv1")
assert mv1_loaded.local().read_all().to_pydict() == expected_data

with pytest.raises(errors.StorageExistError):
cat.materialize("mv1", view)

with pytest.raises(errors.StorageExistError):
cat.materialize("ds", view)

key_fn = lambda ds: ds.location # pylint: disable=unnecessary-lambda-assignment
assert sorted(cat.datasets(), key=key_fn) == sorted([
DatasetInfo("ds", ds.storage.location),
DatasetInfo("mv1", mv1.storage.location)
],
key=key_fn)