Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Examples for v1alpha2 version #56

Merged
merged 4 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "kubeflow.org/v1alpha1"
kind: "PyTorchJob"
metadata:
name: "cifar-job"
name: "pytorch-cifar-job"
spec:
backend: "tcp"
masterPort: "23456"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "kubeflow.org/v1alpha1"
kind: "PyTorchJob"
metadata:
name: "dist-mnist-for-e2e-test"
name: "pytorch-dist-mnist-for-e2e-test"
spec:
backend: "tcp"
masterPort: "23456"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "kubeflow.org/v1alpha1"
kind: "PyTorchJob"
metadata:
name: "dist-sendrecv"
name: "pytorch-dist-basic-sendrecv"
spec:
backend: "tcp"
masterPort: "23456"
Expand Down
4 changes: 4 additions & 0 deletions examples/v1alpha2/dist-mnist/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM pytorch/pytorch:v0.2

ADD . /opt/pytorch_dist_mnist
ENTRYPOINT ["python", "/opt/pytorch_dist_mnist/dist_mnist.py"]
55 changes: 55 additions & 0 deletions examples/v1alpha2/dist-mnist/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requirements:
# https://github.com/mattrobenolt/jinja2-cli
# pip install jinja2-clie
IMG = gcr.io/kubeflow-examples/pytorch-dist-mnist
PUBLIC = gcr.io/kubeflow-examples/pytorch-dist-mnist
DIR := ${CURDIR}

# List any changed files.
CHANGED_FILES := $(shell git diff-files --relative=examples/dist-mnist)

ifeq ($(strip $(CHANGED_FILES)),)
# Changed files is empty; not dirty
# Don't include --dirty because it could be dirty if files outside the ones we care
# about changed.
TAG := $(shell date +v%Y%m%d)-$(shell git describe --always)
else
TAG := $(shell date +v%Y%m%d)-$(shell git describe --always --dirty)-$(shell git diff | shasum -a256 | cut -c -6)
endif

all: build

# To build without the cache set the environment variable
# export DOCKER_BUILD_OPTS=--no-cache
build:
docker build ${DOCKER_BUILD_OPTS} -t $(IMG):$(TAG) .
docker tag ${DOCKER_BUILD_OPTS} $(IMG):$(TAG) $(IMG):latest
@echo Built $(IMG):$(TAG)

# Build but don't attach the latest tag. This allows manual testing/inspection of the image
# first.
push: build
gcloud docker -- push $(IMG):$(TAG)
@echo Pushed $(IMG) with :$(TAG) tags

push-latest: push
gcloud container images add-tag --quiet $(IMG):$(TAG) $(IMG):latest --verbosity=info
echo created $(IMG):latest

push-public: push-latest
gcloud container images add-tag --quiet $(IMG):$(TAG) $(PUBLIC):$(TAG) --verbosity=info
gcloud container images add-tag --quiet $(IMG):$(TAG) $(PUBLIC):latest --verbosity=info
17 changes: 17 additions & 0 deletions examples/v1alpha2/dist-mnist/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Distributed mnist model for e2e test

This folder containers Dockerfile and distributed mnist model for e2e test.

**Build Image**

The default image name and tag is `kubeflow/pytorch-dist-mnist-test:1.0`.

```shell
docker build -f Dockerfile -t kubeflow/pytorch-dist-mnist-test:1.0 ./
```

**Create the mnist PyTorch job**

```
kubectl create -f ./pytorch_job_mnist.yaml
```
134 changes: 134 additions & 0 deletions examples/v1alpha2/dist-mnist/dist_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from math import ceil
from random import Random
from torch.autograd import Variable
from torchvision import datasets, transforms


class Partition(object):
""" Dataset-like object, but only access a subset of it. """

def __init__(self, data, index):
self.data = data
self.index = index

def __len__(self):
return len(self.index)

def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]


class DataPartitioner(object):
""" Partitions a dataset into different chuncks. """

def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random()
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)

for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]

def use(self, partition):
return Partition(self.data, self.partitions[partition])


class Net(nn.Module):
""" Network architecture. """

def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)


def partition_dataset():
""" Partitioning MNIST """
dataset = datasets.MNIST(
'./data',
train=True,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))
]))
size = dist.get_world_size()
bsz = 128 / float(size)
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(
partition, batch_size=int(bsz), shuffle=True)
return train_set, bsz


def average_gradients(model):
""" Gradient averaging. """
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0)
param.grad.data /= size


def run():
""" Distributed Synchronous SGD Example """
rank = dist.get_rank()
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
model = model
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
data, target = Variable(data), Variable(target)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.data[0]
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ',
rank, ', epoch ', epoch, ': ',
epoch_loss / num_batches)


def init_processes(fn, backend='tcp'):
""" Initialize the distributed environment. """
dist.init_process_group(backend)
fn()


if __name__ == "__main__":
init_processes(run)
22 changes: 22 additions & 0 deletions examples/v1alpha2/dist-mnist/pytorch_job_mnist.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: "kubeflow.org/v1alpha2"
kind: "PyTorchJob"
metadata:
name: "pytorch-dist-mnist-for-e2e-test"
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0
5 changes: 5 additions & 0 deletions examples/v1alpha2/dist-sendrecv/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM pytorch/pytorch:v0.2

RUN mkdir -p /opt/mlkube
COPY dist_sendrecv.py /opt/mlkube/
ENTRYPOINT ["python", "/opt/mlkube/dist_sendrecv.py"]
17 changes: 17 additions & 0 deletions examples/v1alpha2/dist-sendrecv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Distributed send/recv e2e test

This folder containers Dockerfile and distributed send/recv test.

**Build Image**

The default image name and tag is `kubeflow/pytorch-dist-sendrecv-test:1.0`.

```shell
docker build -f Dockerfile -t kubeflow/pytorch-dist-sendrecv-test:1.0 ./
```

**Create the PyTorch job**

```
kubectl create -f ./pytorch_job_sendrecv.yaml
```
61 changes: 61 additions & 0 deletions examples/v1alpha2/dist-sendrecv/dist_sendrecv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
import os
import json
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from math import ceil
from random import Random
from torch.autograd import Variable
from torchvision import datasets, transforms

def run():
""" Simple Send/Recv for testing Master <--> Workers communication """
rank = dist.get_rank()
size = dist.get_world_size()
inp = torch.randn(2,2)
result = torch.zeros(2,2)
if rank == 0:
# Send the input tensor to all workers
for i in range(1, size):
dist.send(tensor=inp, dst=i)
# Receive the result tensor from all workers
dist.recv(tensor=result, src=i)
logging.info("Result from worker %d : %s", i, result)
else:
# Receive input tensor from master
dist.recv(tensor=inp, src=0)
# Elementwise tensor multiplication
result = torch.mul(inp,inp)
# Send the result tensor back to master
dist.send(tensor=result, dst=0)

def init_processes(fn, backend='tcp'):
""" Initialize the distributed environment. """
dist.init_process_group(backend)
fn()

def main():
logging.info("Torch version: %s", torch.__version__)

port = os.environ.get("MASTER_PORT", "{}")
logging.info("MASTER_PORT: %s", port)

addr = os.environ.get("MASTER_ADDR", "{}")
logging.info("MASTER_ADDR: %s", addr)

world_size = os.environ.get("WORLD_SIZE", "{}")
logging.info("WORLD_SIZE: %s", world_size)

rank = os.environ.get("RANK", "{}")
logging.info("RANK: %s", rank)

init_processes(run)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
main()
22 changes: 22 additions & 0 deletions examples/v1alpha2/dist-sendrecv/pytorch_job_sendrecv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: "kubeflow.org/v1alpha2"
kind: "PyTorchJob"
metadata:
name: "pytorch-dist-basic-sendrecv"
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0
4 changes: 2 additions & 2 deletions scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ gcloud version
gcloud container builds submit . --tag=${REGISTRY}/${REPO_NAME}:${VERSION} --project=${PROJECT}
# build a mnist testing image for our smoke test
MNIST_TEST_IMAGE_TAG="pytorch-dist-mnist_test:1.0"
gcloud container builds submit ./examples/dist-mnist/ --tag=${REGISTRY}/${MNIST_TEST_IMAGE_TAG} --project=${PROJECT}
gcloud container builds submit ./examples/v1alpha1/dist-mnist/ --tag=${REGISTRY}/${MNIST_TEST_IMAGE_TAG} --project=${PROJECT}

#Image for minimal dist sendrecv test
SENDRECV_TEST_IMAGE_TAG="pytorch-dist-sendrecv-test:1.0"
gcloud container builds submit ./examples/dist-sendrecv/ --tag=${REGISTRY}/${SENDRECV_TEST_IMAGE_TAG} --project=${PROJECT}
gcloud container builds submit ./examples/v1alpha1/dist-sendrecv/ --tag=${REGISTRY}/${SENDRECV_TEST_IMAGE_TAG} --project=${PROJECT}