diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index c747ab2..a2f1f89 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -25,7 +25,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/README.md b/README.md index c8731ce..db88d0c 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,6 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen -## Space 101 - -- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO. -- Data operations in Space can run locally or distributedly in [Ray](https://github.com/ray-project/ray) clusters. -- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving. -- Please read [the design](docs/design.md) for more details. - ## Onboarding Examples - [Manage Tensorflow COCO dataset](notebooks/tfds_coco_tutorial.ipynb) @@ -38,10 +31,19 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen - [Transforms and materialized views: Segment Anything as example](notebooks/segment_anything_tutorial.ipynb) - [Incrementally build embedding vector indexes](notebooks/incremental_embedding_index.ipynb) +## Space 101 + +- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO. +- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving. +- Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see `space.core.schema.types.files`). +- `space.TfFeatures` is a built-in field type providing serializers for nested dicts of numpy arrays, based on [TFDS FeaturesDict](https://www.tensorflow.org/datasets/api_docs/python/tfds/features/FeaturesDict). +- Please find more information in [the design page](docs/design.md). + ## Quick Start - [Install](#install) - [Cloud Storage](#cloud-storage) +- [Cluster setup](#cluster-setup) - [Create and Load Datasets](#create-and-load-datasets) - [Write and Read](#write-and-read) - [Transform and Materialized Views](#transform-and-materialized-views) @@ -71,6 +73,24 @@ gcsfuse "/path/to/" Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach. +### Cluster Setup + +Optionally, setup a cluster to run Space operations distributedly. We support Ray clusters, on the Ray cluster head/worker nodes: +```bash +# Start a Ray head node (IP 123.45.67.89, for example). +# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details. +ray start --head --port=6379 +``` + +Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. Run `gcsfuse` on all machines and the mapped local directory paths **must be the same**. + +Run the following code on the client machine to connect to the Ray cluster: +```py +import ray +# Connect to the Ray cluster. +ray.init(address="ray://123.45.67.89:10001") +``` + ### Create and Load Datasets Create a Space dataset with two index fields (`id`, `image_name`) (store in Parquet) and a record field (`feature`) (store in ArrayRecord). @@ -90,7 +110,7 @@ ds = Dataset.create( "/path/to//example_ds", schema, primary_keys=["id"], - record_fields=["feature"]) + record_fields=["feature"]) # Store this field in ArrayRecord files # Load the dataset from files later: ds = Dataset.load("/path/to//example_ds") @@ -120,9 +140,13 @@ print(catalog.datasets()) Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. We expect to support the [Iceberg](https://iceberg.apache.org/docs/latest/branching/) style tags and branches for better version management. ```py import pyarrow.compute as pc +from space import RayOptions + +# Create a local runner: +runner = ds.local() -# Create a local or Ray runner. -runner = ds.local() # or ds.ray() +# Or create a Ray runner: +runner = ds.ray(ray_options=RayOptions(max_parallelism=8)) # Appending data generates a new dataset version `snapshot_id=1` # Write methods: @@ -205,8 +229,8 @@ mv = view.materialize("/path/to//example_mv") # mv = catalog.materialize("example_mv", view) mv_runner = mv.ray() -# Refresh the MV up to version `1`. -mv_runner.refresh(1) # mv_runner.refresh() refresh to the latest version +# Refresh the MV up to version tag `after_add` of the source. +mv_runner.refresh("after_add") # mv_runner.refresh() refresh to the latest version # Use the MV runner instead of view runner to directly read from materialized # view files, no data processing any more. diff --git a/python/pyproject.toml b/python/pyproject.toml index 5a5db8b..39a96be 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -8,7 +8,6 @@ license = { text = "Apache-2.0" } classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -28,9 +27,9 @@ dependencies = [ [project.optional-dependencies] dev = [ - "pandas", + "pandas == 2.1.4", "pyarrow-stubs", - "ray", + "ray == 2.9.1", "tensorflow", "types-protobuf", ] diff --git a/python/src/space/__init__.py b/python/src/space/__init__.py index 9f19cce..599adab 100644 --- a/python/src/space/__init__.py +++ b/python/src/space/__init__.py @@ -22,3 +22,4 @@ from space.core.random_access import RandomAccessDataSource from space.core.schema.types import File, TfFeatures from space.core.views import MaterializedView +from space.ray.options import RayOptions diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 7e052af..0a66064 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -27,7 +27,8 @@ from space.core.transform.plans import LogicalPlanBuilder from space.core.utils.lazy_imports_utils import ray from space.core.views import View -from space.ray.runners import RayOptions, RayReadWriterRunner +from space.ray.options import RayOptions +from space.ray.runners import RayReadWriterRunner class Dataset(View): diff --git a/python/src/space/core/options.py b/python/src/space/core/options.py index d70b8ec..419a8f2 100644 --- a/python/src/space/core/options.py +++ b/python/src/space/core/options.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -"""APIs of Space core lib.""" +"""Options of Space core lib.""" from dataclasses import dataclass from typing import Any, Callable, List, Optional diff --git a/python/src/space/core/schema/types/files.py b/python/src/space/core/schema/types/files.py index 2e80ff4..0a3144c 100644 --- a/python/src/space/core/schema/types/files.py +++ b/python/src/space/core/schema/types/files.py @@ -27,7 +27,11 @@ class File(pa.ExtensionType): - """A custom Arrow type for files.""" + """A custom Arrow type representing data in a standalone file. + + TODO: several features to add, e.g., auto read file content, write a new file + at data write time, serializer/deserializer. + """ EXTENSION_NAME = "space.file" diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index b9b6a79..53e20ad 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -26,6 +26,7 @@ from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as rt from space.core.utils.lazy_imports_utils import ray +from space.ray.options import RayOptions class RayAppendOp(BaseAppendOp): @@ -35,31 +36,33 @@ class RayAppendOp(BaseAppendOp): def __init__(self, location: str, metadata: meta.StorageMetadata, - parallelism: int, + ray_options: RayOptions, file_options: FileOptions, record_address_input: bool = False): """ Args: record_address_input: if true, input record fields are addresses. """ - self._parallelism = parallelism + self._ray_options = ray_options self._actors = [ _AppendActor.remote( # type: ignore[attr-defined] # pylint: disable=no-member location, metadata, file_options, record_address_input) - for _ in range(parallelism) + for _ in range(self._ray_options.max_parallelism) ] def write(self, data: InputData) -> None: if not isinstance(data, pa.Table): data = pa.Table.from_pydict(data) - shard_size = data.num_rows // self._parallelism + num_shards = self._ray_options.max_parallelism + + shard_size = data.num_rows // num_shards if shard_size == 0: shard_size = 1 responses = [] offset = 0 - for i in range(self._parallelism): + for i in range(num_shards): shard = data.slice(offset=offset, length=shard_size) responses.append(self._actors[i].write.remote(shard)) diff --git a/python/src/space/ray/ops/insert.py b/python/src/space/ray/ops/insert.py index 7d1d5c0..5b87809 100644 --- a/python/src/space/ray/ops/insert.py +++ b/python/src/space/ray/ops/insert.py @@ -28,15 +28,16 @@ from space.core.storage import Storage from space.core.utils import errors from space.core.utils.lazy_imports_utils import ray +from space.ray.options import RayOptions class RayInsertOp(LocalInsertOp): """Insert data to a dataset with distributed duplication check.""" def __init__(self, storage: Storage, options: InsertOptions, - parallelism: int, file_options: FileOptions): + ray_options: RayOptions, file_options: FileOptions): LocalInsertOp.__init__(self, storage, options, file_options) - self._parallelism = parallelism + self._ray_options = ray_options def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression): remote_duplicated_values = [] @@ -50,11 +51,10 @@ def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression): for duplicated in ray.get(remote_duplicated_values): if duplicated: - raise errors.PrimaryKeyExistError( - "Primary key to insert already exist") + raise errors.PrimaryKeyExistError("Primary key to insert already exist") def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None: - append_op = RayAppendOp(self._location, self._metadata, self._parallelism, + append_op = RayAppendOp(self._location, self._metadata, self._ray_options, self._file_options) append_op.write(data) patches.append(append_op.finish()) @@ -64,5 +64,4 @@ def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None: def _remote_filter_matched(location: str, metadata: meta.StorageMetadata, data_files: rt.FileSet, pk_filter: pc.Expression, primary_keys: List[str]) -> bool: - return filter_matched(location, metadata, data_files, pk_filter, - primary_keys) + return filter_matched(location, metadata, data_files, pk_filter, primary_keys) diff --git a/python/src/space/ray/options.py b/python/src/space/ray/options.py new file mode 100644 index 0000000..992f8fc --- /dev/null +++ b/python/src/space/ray/options.py @@ -0,0 +1,24 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Options of Space Ray lib.""" + +from dataclasses import dataclass + + +@dataclass +class RayOptions: + """Options of Ray runners.""" + # The max parallelism of computing resources to use in a Ray cluster. + max_parallelism: int = 8 diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 0a89d19..f4cd7b5 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -15,7 +15,7 @@ """Ray runner implementations.""" from __future__ import annotations -from dataclasses import dataclass +import copy from typing import TYPE_CHECKING from typing import Iterator, List, Optional, Union @@ -41,6 +41,7 @@ from space.ray.ops.delete import RayDeleteOp from space.ray.ops.insert import RayInsertOp from space.ray.ops.utils import singleton_storage +from space.ray.options import RayOptions if TYPE_CHECKING: from space.core.datasets import Dataset @@ -48,12 +49,6 @@ from space.core.views import MaterializedView, View -@dataclass -class RayOptions: - """Options of Ray runners.""" - parallelism: int = 2 - - class RayReadOnlyRunner(BaseReadOnlyRunner): """A read-only Ray runner.""" @@ -237,7 +232,7 @@ def __init__(self, @StorageMixin.transactional def append(self, data: InputData) -> Optional[rt.Patch]: op = RayAppendOp(self._storage.location, self._storage.metadata, - self._ray_options.parallelism, self._file_options) + self._ray_options, self._file_options) op.write(data) return op.finish() @@ -248,8 +243,12 @@ def append_from( if not isinstance(source_fns, list): source_fns = [source_fns] + ray_options = copy.deepcopy(self._ray_options) + ray_options.max_parallelism = min(len(source_fns), + ray_options.max_parallelism) + op = RayAppendOp(self._storage.location, self._storage.metadata, - self._ray_options.parallelism, self._file_options) + ray_options, self._file_options) op.write_from(source_fns) return op.finish() @@ -267,8 +266,8 @@ def append_parquet(self, pattern: str) -> Optional[rt.Patch]: @StorageMixin.transactional def _insert(self, data: InputData, mode: InsertOptions.Mode) -> Optional[rt.Patch]: - op = RayInsertOp(self._storage, InsertOptions(mode=mode), - self._ray_options.parallelism, self._file_options) + op = RayInsertOp(self._storage, InsertOptions(mode=mode), self._ray_options, + self._file_options) return op.write(data) @StorageMixin.transactional