入门

Training Operator 入门

本指南介绍如何开始使用 Training Operator 并运行几个简单示例。

先决条件

您需要安装以下组件才能运行示例

  • 安装 Training Operator 控制平面。
  • 安装 Training Python SDK。

PyTorchJob 入门

您可以使用 Python SDK 创建您的第一个 Training Operator 分布式 PyTorchJob。定义实现端到端模型训练的训练函数。每个 Worker 将在相应的 Kubernetes Pod 上执行此函数。通常,此函数包含下载数据集、创建模型和训练模型的逻辑。

Training Operator 将自动为相应的 PyTorchJob worker 设置 WORLD_SIZERANK,以执行 PyTorch 分布式数据并行 (DDP)

如果您将 Training Operator 作为 Kubeflow 平台的一部分进行安装,可以打开新的Kubeflow Notebook 来运行此脚本。如果您独立安装 Training Operator,请确保配置本地 kubeconfig 以访问您安装了 Training Operator 的 Kubernetes 集群。

def train_func():
    import torch
    import torch.nn.functional as F
    from torch.utils.data import DistributedSampler
    from torchvision import datasets, transforms
    import torch.distributed as dist

    # [1] Setup PyTorch DDP. Distributed environment will be set automatically by Training Operator.
    dist.init_process_group(backend="nccl")
    Distributor = torch.nn.parallel.DistributedDataParallel
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    print(
        "Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format(
            dist.get_world_size(),
            dist.get_rank(),
            local_rank,
        )
    )

    # [2] Create PyTorch CNN Model.
    class Net(torch.nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = torch.nn.Conv2d(1, 20, 5, 1)
            self.conv2 = torch.nn.Conv2d(20, 50, 5, 1)
            self.fc1 = torch.nn.Linear(4 * 4 * 50, 500)
            self.fc2 = torch.nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # [3] Attach model to the correct GPU device and distributor.
    device = torch.device(f"cuda:{local_rank}")
    model = Net().to(device)
    model = Distributor(model)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

    # [4] Setup FashionMNIST dataloader and distribute data across PyTorchJob workers.
    dataset = datasets.FashionMNIST(
        "./data",
        download=True,
        train=True,
        transform=transforms.Compose([transforms.ToTensor()]),
    )
    train_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=128,
        sampler=DistributedSampler(dataset),
    )

    # [5] Start model Training.
    for epoch in range(3):
        for batch_idx, (data, target) in enumerate(train_loader):
            # Attach Tensors to the device.
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                print(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                        epoch,
                        batch_idx * len(data),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )


from kubeflow.training import TrainingClient

# Start PyTorchJob with 3 Workers and 1 GPU per Worker (e.g. multi-node, multi-worker job).
TrainingClient().create_job(
    name="pytorch-ddp",
    train_func=train_func,
    num_procs_per_worker="auto",
    num_workers=3,
    resources_per_worker={"gpu": "1"},
)

TFJob 入门

与 PyTorchJob 示例类似,您可以使用 Python SDK 创建您的第一个分布式 TensorFlow 作业。运行以下脚本创建带有预构建 Docker 镜像:docker.io/kubeflow/tf-mnist-with-summaries:latest 的 TFJob,该镜像包含分布式 TensorFlow 代码

from kubeflow.training import TrainingClient

TrainingClient().create_job(
    name="tensorflow-dist",
    job_kind="TFJob",
    base_image="docker.io/kubeflow/tf-mnist-with-summaries:latest",
    num_workers=3,
)

运行以下 API 获取 TFJob 的日志

TrainingClient().get_job_logs(
    name="tensorflow-dist",
    job_kind="TFJob",
    follow=True,
)

下一步

反馈

此页面有帮助吗?