diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml
index c747ab2..a2f1f89 100644
--- a/.github/workflows/python-ci.yml
+++ b/.github/workflows/python-ci.yml
@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: ["3.8", "3.9", "3.10", "3.11"]
+ python-version: ["3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
diff --git a/README.md b/README.md
index c8731ce..db88d0c 100644
--- a/README.md
+++ b/README.md
@@ -24,13 +24,6 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen
-## Space 101
-
-- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO.
-- Data operations in Space can run locally or distributedly in [Ray](https://github.com/ray-project/ray) clusters.
-- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving.
-- Please read [the design](docs/design.md) for more details.
-
## Onboarding Examples
- [Manage Tensorflow COCO dataset](notebooks/tfds_coco_tutorial.ipynb)
@@ -38,10 +31,19 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen
- [Transforms and materialized views: Segment Anything as example](notebooks/segment_anything_tutorial.ipynb)
- [Incrementally build embedding vector indexes](notebooks/incremental_embedding_index.ipynb)
+## Space 101
+
+- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO.
+- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving.
+- Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see `space.core.schema.types.files`).
+- `space.TfFeatures` is a built-in field type providing serializers for nested dicts of numpy arrays, based on [TFDS FeaturesDict](https://www.tensorflow.org/datasets/api_docs/python/tfds/features/FeaturesDict).
+- Please find more information in [the design page](docs/design.md).
+
## Quick Start
- [Install](#install)
- [Cloud Storage](#cloud-storage)
+- [Cluster setup](#cluster-setup)
- [Create and Load Datasets](#create-and-load-datasets)
- [Write and Read](#write-and-read)
- [Transform and Materialized Views](#transform-and-materialized-views)
@@ -71,6 +73,24 @@ gcsfuse "/path/to/"
Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach.
+### Cluster Setup
+
+Optionally, setup a cluster to run Space operations distributedly. We support Ray clusters, on the Ray cluster head/worker nodes:
+```bash
+# Start a Ray head node (IP 123.45.67.89, for example).
+# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details.
+ray start --head --port=6379
+```
+
+Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. Run `gcsfuse` on all machines and the mapped local directory paths **must be the same**.
+
+Run the following code on the client machine to connect to the Ray cluster:
+```py
+import ray
+# Connect to the Ray cluster.
+ray.init(address="ray://123.45.67.89:10001")
+```
+
### Create and Load Datasets
Create a Space dataset with two index fields (`id`, `image_name`) (store in Parquet) and a record field (`feature`) (store in ArrayRecord).
@@ -90,7 +110,7 @@ ds = Dataset.create(
"/path/to//example_ds",
schema,
primary_keys=["id"],
- record_fields=["feature"])
+ record_fields=["feature"]) # Store this field in ArrayRecord files
# Load the dataset from files later:
ds = Dataset.load("/path/to//example_ds")
@@ -120,9 +140,13 @@ print(catalog.datasets())
Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. We expect to support the [Iceberg](https://iceberg.apache.org/docs/latest/branching/) style tags and branches for better version management.
```py
import pyarrow.compute as pc
+from space import RayOptions
+
+# Create a local runner:
+runner = ds.local()
-# Create a local or Ray runner.
-runner = ds.local() # or ds.ray()
+# Or create a Ray runner:
+runner = ds.ray(ray_options=RayOptions(max_parallelism=8))
# Appending data generates a new dataset version `snapshot_id=1`
# Write methods:
@@ -205,8 +229,8 @@ mv = view.materialize("/path/to//example_mv")
# mv = catalog.materialize("example_mv", view)
mv_runner = mv.ray()
-# Refresh the MV up to version `1`.
-mv_runner.refresh(1) # mv_runner.refresh() refresh to the latest version
+# Refresh the MV up to version tag `after_add` of the source.
+mv_runner.refresh("after_add") # mv_runner.refresh() refresh to the latest version
# Use the MV runner instead of view runner to directly read from materialized
# view files, no data processing any more.
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 5a5db8b..39a96be 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -8,7 +8,6 @@ license = { text = "Apache-2.0" }
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
- "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
@@ -28,9 +27,9 @@ dependencies = [
[project.optional-dependencies]
dev = [
- "pandas",
+ "pandas == 2.1.4",
"pyarrow-stubs",
- "ray",
+ "ray == 2.9.1",
"tensorflow",
"types-protobuf",
]
diff --git a/python/src/space/__init__.py b/python/src/space/__init__.py
index 9f19cce..599adab 100644
--- a/python/src/space/__init__.py
+++ b/python/src/space/__init__.py
@@ -22,3 +22,4 @@
from space.core.random_access import RandomAccessDataSource
from space.core.schema.types import File, TfFeatures
from space.core.views import MaterializedView
+from space.ray.options import RayOptions
diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py
index 7e052af..0a66064 100644
--- a/python/src/space/core/datasets.py
+++ b/python/src/space/core/datasets.py
@@ -27,7 +27,8 @@
from space.core.transform.plans import LogicalPlanBuilder
from space.core.utils.lazy_imports_utils import ray
from space.core.views import View
-from space.ray.runners import RayOptions, RayReadWriterRunner
+from space.ray.options import RayOptions
+from space.ray.runners import RayReadWriterRunner
class Dataset(View):
diff --git a/python/src/space/core/options.py b/python/src/space/core/options.py
index d70b8ec..419a8f2 100644
--- a/python/src/space/core/options.py
+++ b/python/src/space/core/options.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-"""APIs of Space core lib."""
+"""Options of Space core lib."""
from dataclasses import dataclass
from typing import Any, Callable, List, Optional
diff --git a/python/src/space/core/schema/types/files.py b/python/src/space/core/schema/types/files.py
index 2e80ff4..0a3144c 100644
--- a/python/src/space/core/schema/types/files.py
+++ b/python/src/space/core/schema/types/files.py
@@ -27,7 +27,11 @@
class File(pa.ExtensionType):
- """A custom Arrow type for files."""
+ """A custom Arrow type representing data in a standalone file.
+
+ TODO: several features to add, e.g., auto read file content, write a new file
+ at data write time, serializer/deserializer.
+ """
EXTENSION_NAME = "space.file"
diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py
index b9b6a79..53e20ad 100644
--- a/python/src/space/ray/ops/append.py
+++ b/python/src/space/ray/ops/append.py
@@ -26,6 +26,7 @@
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.utils.lazy_imports_utils import ray
+from space.ray.options import RayOptions
class RayAppendOp(BaseAppendOp):
@@ -35,31 +36,33 @@ class RayAppendOp(BaseAppendOp):
def __init__(self,
location: str,
metadata: meta.StorageMetadata,
- parallelism: int,
+ ray_options: RayOptions,
file_options: FileOptions,
record_address_input: bool = False):
"""
Args:
record_address_input: if true, input record fields are addresses.
"""
- self._parallelism = parallelism
+ self._ray_options = ray_options
self._actors = [
_AppendActor.remote( # type: ignore[attr-defined] # pylint: disable=no-member
location, metadata, file_options, record_address_input)
- for _ in range(parallelism)
+ for _ in range(self._ray_options.max_parallelism)
]
def write(self, data: InputData) -> None:
if not isinstance(data, pa.Table):
data = pa.Table.from_pydict(data)
- shard_size = data.num_rows // self._parallelism
+ num_shards = self._ray_options.max_parallelism
+
+ shard_size = data.num_rows // num_shards
if shard_size == 0:
shard_size = 1
responses = []
offset = 0
- for i in range(self._parallelism):
+ for i in range(num_shards):
shard = data.slice(offset=offset, length=shard_size)
responses.append(self._actors[i].write.remote(shard))
diff --git a/python/src/space/ray/ops/insert.py b/python/src/space/ray/ops/insert.py
index 7d1d5c0..5b87809 100644
--- a/python/src/space/ray/ops/insert.py
+++ b/python/src/space/ray/ops/insert.py
@@ -28,15 +28,16 @@
from space.core.storage import Storage
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray
+from space.ray.options import RayOptions
class RayInsertOp(LocalInsertOp):
"""Insert data to a dataset with distributed duplication check."""
def __init__(self, storage: Storage, options: InsertOptions,
- parallelism: int, file_options: FileOptions):
+ ray_options: RayOptions, file_options: FileOptions):
LocalInsertOp.__init__(self, storage, options, file_options)
- self._parallelism = parallelism
+ self._ray_options = ray_options
def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression):
remote_duplicated_values = []
@@ -50,11 +51,10 @@ def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression):
for duplicated in ray.get(remote_duplicated_values):
if duplicated:
- raise errors.PrimaryKeyExistError(
- "Primary key to insert already exist")
+ raise errors.PrimaryKeyExistError("Primary key to insert already exist")
def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None:
- append_op = RayAppendOp(self._location, self._metadata, self._parallelism,
+ append_op = RayAppendOp(self._location, self._metadata, self._ray_options,
self._file_options)
append_op.write(data)
patches.append(append_op.finish())
@@ -64,5 +64,4 @@ def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None:
def _remote_filter_matched(location: str, metadata: meta.StorageMetadata,
data_files: rt.FileSet, pk_filter: pc.Expression,
primary_keys: List[str]) -> bool:
- return filter_matched(location, metadata, data_files, pk_filter,
- primary_keys)
+ return filter_matched(location, metadata, data_files, pk_filter, primary_keys)
diff --git a/python/src/space/ray/options.py b/python/src/space/ray/options.py
new file mode 100644
index 0000000..992f8fc
--- /dev/null
+++ b/python/src/space/ray/options.py
@@ -0,0 +1,24 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Options of Space Ray lib."""
+
+from dataclasses import dataclass
+
+
+@dataclass
+class RayOptions:
+ """Options of Ray runners."""
+ # The max parallelism of computing resources to use in a Ray cluster.
+ max_parallelism: int = 8
diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py
index 0a89d19..f4cd7b5 100644
--- a/python/src/space/ray/runners.py
+++ b/python/src/space/ray/runners.py
@@ -15,7 +15,7 @@
"""Ray runner implementations."""
from __future__ import annotations
-from dataclasses import dataclass
+import copy
from typing import TYPE_CHECKING
from typing import Iterator, List, Optional, Union
@@ -41,6 +41,7 @@
from space.ray.ops.delete import RayDeleteOp
from space.ray.ops.insert import RayInsertOp
from space.ray.ops.utils import singleton_storage
+from space.ray.options import RayOptions
if TYPE_CHECKING:
from space.core.datasets import Dataset
@@ -48,12 +49,6 @@
from space.core.views import MaterializedView, View
-@dataclass
-class RayOptions:
- """Options of Ray runners."""
- parallelism: int = 2
-
-
class RayReadOnlyRunner(BaseReadOnlyRunner):
"""A read-only Ray runner."""
@@ -237,7 +232,7 @@ def __init__(self,
@StorageMixin.transactional
def append(self, data: InputData) -> Optional[rt.Patch]:
op = RayAppendOp(self._storage.location, self._storage.metadata,
- self._ray_options.parallelism, self._file_options)
+ self._ray_options, self._file_options)
op.write(data)
return op.finish()
@@ -248,8 +243,12 @@ def append_from(
if not isinstance(source_fns, list):
source_fns = [source_fns]
+ ray_options = copy.deepcopy(self._ray_options)
+ ray_options.max_parallelism = min(len(source_fns),
+ ray_options.max_parallelism)
+
op = RayAppendOp(self._storage.location, self._storage.metadata,
- self._ray_options.parallelism, self._file_options)
+ ray_options, self._file_options)
op.write_from(source_fns)
return op.finish()
@@ -267,8 +266,8 @@ def append_parquet(self, pattern: str) -> Optional[rt.Patch]:
@StorageMixin.transactional
def _insert(self, data: InputData,
mode: InsertOptions.Mode) -> Optional[rt.Patch]:
- op = RayInsertOp(self._storage, InsertOptions(mode=mode),
- self._ray_options.parallelism, self._file_options)
+ op = RayInsertOp(self._storage, InsertOptions(mode=mode), self._ray_options,
+ self._file_options)
return op.write(data)
@StorageMixin.transactional