forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[air/docs] improve consistency of getting started (ray-project#26247)
- Loading branch information
1 parent
009c65e
commit 1abe908
Showing
7 changed files
with
613 additions
and
121 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
# flake8: noqa | ||
# isort: skip_file | ||
|
||
# __air_generic_preprocess_start__ | ||
import ray | ||
from ray.data.preprocessors import StandardScaler | ||
from ray.air import train_test_split | ||
|
||
# Load data. | ||
import pandas as pd | ||
|
||
bc_df = pd.read_csv( | ||
"https://air-example-data.s3.us-east-2.amazonaws.com/breast_cancer.csv" | ||
) | ||
dataset = ray.data.from_pandas(bc_df) | ||
# Optionally, read directly from s3 | ||
# dataset = ray.data.read_csv("s3://air-example-data/breast_cancer.csv") | ||
|
||
# Split data into train and validation. | ||
train_dataset, valid_dataset = train_test_split(dataset, test_size=0.3) | ||
|
||
# Create a test dataset by dropping the target column. | ||
test_dataset = valid_dataset.map_batches( | ||
lambda df: df.drop("target", axis=1), batch_format="pandas" | ||
) | ||
|
||
# Create a preprocessor to scale some columns | ||
columns_to_scale = ["mean radius", "mean texture"] | ||
preprocessor = StandardScaler(columns=columns_to_scale) | ||
# __air_generic_preprocess_end__ | ||
|
||
# __air_pytorch_preprocess_start__ | ||
import numpy as np | ||
import pandas as pd | ||
|
||
from ray.data.preprocessors import BatchMapper, Chain | ||
|
||
# Get the training data schema | ||
schema_order = [k for k in train_dataset.schema().names if k != "target"] | ||
|
||
|
||
def concat_for_tensor(dataframe): | ||
# Concatenate the dataframe into a single tensor. | ||
from ray.data.extensions import TensorArray | ||
|
||
result = {} | ||
input_data = dataframe[schema_order].to_numpy(dtype=np.float32) | ||
result["input"] = TensorArray(input_data) | ||
if "target" in dataframe: | ||
target_data = dataframe["target"].to_numpy(dtype=np.float32) | ||
result["target"] = TensorArray(target_data) | ||
return pd.DataFrame(result) | ||
|
||
|
||
# Chain the preprocessors together. | ||
preprocessor = Chain(preprocessor, BatchMapper(concat_for_tensor)) | ||
# __air_pytorch_preprocess_end__ | ||
|
||
|
||
# __air_pytorch_train_start__ | ||
import torch | ||
import torch.nn as nn | ||
from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present | ||
|
||
from ray import train | ||
from ray.air import session | ||
from ray.train.torch import TorchTrainer, to_air_checkpoint | ||
|
||
|
||
def create_model(input_features): | ||
return nn.Sequential( | ||
nn.Linear(in_features=input_features, out_features=16), | ||
nn.ReLU(), | ||
nn.Linear(16, 16), | ||
nn.ReLU(), | ||
nn.Linear(16, 1), | ||
nn.Sigmoid(), | ||
) | ||
|
||
|
||
def train_loop_per_worker(config): | ||
batch_size = config["batch_size"] | ||
lr = config["lr"] | ||
epochs = config["num_epochs"] | ||
num_features = config["num_features"] | ||
|
||
# Get the Ray Dataset shard for this data parallel worker, | ||
# and convert it to a PyTorch Dataset. | ||
train_data = train.get_dataset_shard("train") | ||
|
||
def to_tensor_iterator(dataset, batch_size): | ||
data_iterator = dataset.iter_batches( | ||
batch_format="numpy", batch_size=batch_size | ||
) | ||
|
||
for d in data_iterator: | ||
yield torch.Tensor(d["input"]).float(), torch.Tensor(d["target"]).float() | ||
|
||
# Create model. | ||
model = create_model(num_features) | ||
model = train.torch.prepare_model(model) | ||
|
||
loss_fn = nn.BCELoss() | ||
optimizer = torch.optim.SGD(model.parameters(), lr=lr) | ||
|
||
for cur_epoch in range(epochs): | ||
for inputs, labels in to_tensor_iterator(train_data, batch_size): | ||
optimizer.zero_grad() | ||
predictions = model(inputs) | ||
train_loss = loss_fn(predictions, labels.unsqueeze(1)) | ||
train_loss.backward() | ||
optimizer.step() | ||
loss = train_loss.item() | ||
session.report({"loss": loss}, checkpoint=to_air_checkpoint(model)) | ||
|
||
|
||
num_features = len(schema_order) | ||
|
||
trainer = TorchTrainer( | ||
train_loop_per_worker=train_loop_per_worker, | ||
train_loop_config={ | ||
# Training batch size | ||
"batch_size": 128, | ||
# Number of epochs to train each task for. | ||
"num_epochs": 20, | ||
# Number of columns of datset | ||
"num_features": num_features, | ||
# Optimizer args. | ||
"lr": 0.001, | ||
}, | ||
scaling_config={ | ||
# Number of workers to use for data parallelism. | ||
"num_workers": 3, | ||
# Whether to use GPU acceleration. | ||
"use_gpu": False, | ||
# trainer_resources=0 so that the example works on Colab. | ||
"trainer_resources": {"CPU": 0}, | ||
}, | ||
datasets={"train": train_dataset}, | ||
preprocessor=preprocessor, | ||
) | ||
# Execute training. | ||
result = trainer.fit() | ||
print(f"Last result: {result.metrics}") | ||
# Last result: {'loss': 0.6559339960416158, ...} | ||
# __air_pytorch_train_end__ | ||
|
||
# __air_pytorch_tuner_start__ | ||
from ray import tune | ||
|
||
param_space = {"train_loop_config": {"lr": tune.loguniform(0.0001, 0.01)}} | ||
metric = "loss" | ||
# __air_pytorch_tuner_end__ | ||
|
||
# __air_tune_generic_start__ | ||
from ray.tune.tuner import Tuner, TuneConfig | ||
from ray.air.config import RunConfig | ||
|
||
tuner = Tuner( | ||
trainer, | ||
param_space=param_space, | ||
tune_config=TuneConfig(num_samples=5, metric=metric, mode="min"), | ||
) | ||
# Execute tuning. | ||
result_grid = tuner.fit() | ||
|
||
# Fetch the best result. | ||
best_result = result_grid.get_best_result() | ||
print("Best Result:", best_result) | ||
# Best Result: Result(metrics={'loss': 0.278409322102863, ...}) | ||
# __air_tune_generic_end__ | ||
|
||
# __air_pytorch_batchpred_start__ | ||
from ray.train.batch_predictor import BatchPredictor | ||
from ray.train.torch import TorchPredictor | ||
|
||
# You can also create a checkpoint from a trained model using `to_air_checkpoint`. | ||
checkpoint = best_result.checkpoint | ||
|
||
batch_predictor = BatchPredictor.from_checkpoint( | ||
checkpoint, TorchPredictor, model=create_model(num_features) | ||
) | ||
|
||
predicted_probabilities = batch_predictor.predict(test_dataset) | ||
print("PREDICTED PROBABILITIES") | ||
predicted_probabilities.show() | ||
# {'predictions': array([1.], dtype=float32)} | ||
# {'predictions': array([0.], dtype=float32)} | ||
# __air_pytorch_batchpred_end__ |
Oops, something went wrong.