入门
旧版本
本页介绍 Kubeflow Training Operator V1,最新信息请查看 Kubeflow Trainer V2 文档。
本指南介绍如何开始使用 Training Operator 并运行几个简单示例。
先决条件
您需要安装以下组件才能运行示例
PyTorchJob 入门
您可以使用 Python SDK 创建您的第一个 Training Operator 分布式 PyTorchJob。定义实现端到端模型训练的训练函数。每个 Worker 将在相应的 Kubernetes Pod 上执行此函数。通常,此函数包含下载数据集、创建模型和训练模型的逻辑。
Training Operator 将自动为相应的 PyTorchJob worker 设置 WORLD_SIZE
和 RANK
,以执行 PyTorch 分布式数据并行 (DDP)。
如果您将 Training Operator 作为 Kubeflow 平台的一部分进行安装,可以打开新的Kubeflow Notebook 来运行此脚本。如果您独立安装 Training Operator,请确保配置本地 kubeconfig
以访问您安装了 Training Operator 的 Kubernetes 集群。
def train_func():
import torch
import torch.nn.functional as F
from torch.utils.data import DistributedSampler
from torchvision import datasets, transforms
import torch.distributed as dist
# [1] Setup PyTorch DDP. Distributed environment will be set automatically by Training Operator.
dist.init_process_group(backend="nccl")
Distributor = torch.nn.parallel.DistributedDataParallel
local_rank = int(os.getenv("LOCAL_RANK", 0))
print(
"Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format(
dist.get_world_size(),
dist.get_rank(),
local_rank,
)
)
# [2] Create PyTorch CNN Model.
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = torch.nn.Conv2d(1, 20, 5, 1)
self.conv2 = torch.nn.Conv2d(20, 50, 5, 1)
self.fc1 = torch.nn.Linear(4 * 4 * 50, 500)
self.fc2 = torch.nn.Linear(500, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
# [3] Attach model to the correct GPU device and distributor.
device = torch.device(f"cuda:{local_rank}")
model = Net().to(device)
model = Distributor(model)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# [4] Setup FashionMNIST dataloader and distribute data across PyTorchJob workers.
dataset = datasets.FashionMNIST(
"./data",
download=True,
train=True,
transform=transforms.Compose([transforms.ToTensor()]),
)
train_loader = torch.utils.data.DataLoader(
dataset=dataset,
batch_size=128,
sampler=DistributedSampler(dataset),
)
# [5] Start model Training.
for epoch in range(3):
for batch_idx, (data, target) in enumerate(train_loader):
# Attach Tensors to the device.
data = data.to(device)
target = target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 10 == 0 and dist.get_rank() == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
from kubeflow.training import TrainingClient
# Start PyTorchJob with 3 Workers and 1 GPU per Worker (e.g. multi-node, multi-worker job).
TrainingClient().create_job(
name="pytorch-ddp",
train_func=train_func,
num_procs_per_worker="auto",
num_workers=3,
resources_per_worker={"gpu": "1"},
)
TFJob 入门
与 PyTorchJob 示例类似,您可以使用 Python SDK 创建您的第一个分布式 TensorFlow 作业。运行以下脚本创建带有预构建 Docker 镜像:docker.io/kubeflow/tf-mnist-with-summaries:latest
的 TFJob,该镜像包含分布式 TensorFlow 代码
from kubeflow.training import TrainingClient
TrainingClient().create_job(
name="tensorflow-dist",
job_kind="TFJob",
base_image="docker.io/kubeflow/tf-mnist-with-summaries:latest",
num_workers=3,
)
运行以下 API 获取 TFJob 的日志
TrainingClient().get_job_logs(
name="tensorflow-dist",
job_kind="TFJob",
follow=True,
)
下一步
使用 Training Operator Python SDK 运行 FashionMNIST 示例。
了解更多关于 PyTorchJob API 的信息。