使用平台特定功能

了解如何在 Kubeflow Pipelines 中使用平台特定功能。

概览

KFP 的优势之一是跨平台可移植性。KFP SDK 将管道定义编译为 IR YAML,不同的后端都可以读取和执行它,包括 Kubeflow Pipelines 开源后端Vertex AI Pipelines

对于跨平台不可移植的功能,用户可以通过 KFP SDK 平台特定插件库编写具有平台特定功能的管道。通常,平台特定插件库提供的函数与 KFP SDK 直接提供的 任务级配置方法 类似,都作用于任务。

kfp-kubernetes

目前,唯一的 KFP SDK 平台特定插件库是 kfp-kubernetes,它受 Kubeflow Pipelines 开源后端 支持,并可以直接访问一些 Kubernetes 资源和功能。

更多信息,请参阅 kfp-kubernetes 文档

Kubernetes PersistentVolumeClaims

在本例中,我们将使用 kfp-kubernetes 创建一个 PersistentVolumeClaim (PVC),使用该 PVC 在任务之间传递数据,然后删除该 PVC。

我们假设您熟悉 Kubernetes 中的 PersistentVolumePersistentVolumeClaim 资源,以及 KFP 中的 编写组件编写管道

步骤 1:安装 kfp-kubernetes

运行以下命令安装 kfp-kubernetes

pip install kfp[kubernetes]

步骤 2:创建读/写到挂载路径的组件

创建两个简单的组件,它们读写 /data 目录下的文件。

在后面的步骤中,我们将把一个 PVC 卷挂载到 /data 目录。

from kfp import dsl

@dsl.component
def producer() -> str:
    with open('/data/file.txt', 'w') as file:
        file.write('Hello world')
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

@dsl.component
def consumer() -> str:
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

步骤 3:使用 CreatePVC 动态创建 PVC

现在我们有了组件,可以开始构建管道了。

我们需要一个 PVC 来挂载,所以我们将使用 kubernetes.CreatePVC 预构建组件来创建一个。

from kfp import kubernetes

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteMany'],
        size='5Gi',
        storage_class_name='standard',
    )

此组件会从 StorageClass 'standard' 动态创建大小为 5GB 的 PVC,并使用 ReadWriteMany 访问模式。该 PVC 将以创建它的底层 Argo workflow 的名称命名,并附加后缀 -my-pvcCreatePVC 组件会将此名称作为输出 'name' 返回。

步骤 4:读写数据到 PVC

接下来,我们将对 producerconsumer 组件使用 mount_pvc 任务修饰符。

我们将 task2 安排在 task1 之后运行,这样组件就不会同时读写 PVC。

    # write to the PVC
    task1 = producer()
    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )

    # read to the PVC
    task2 = consumer()
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )
    task2.after(task1)

步骤 5:删除 PVC

最后,我们可以在 task2 完成后安排删除 PVC,以清理我们创建的 Kubernetes 资源。

    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']
    ).after(task2)

有关完整的管道和更多信息,请参阅 kfp-kubernetes 文档中的类似示例

反馈

此页面是否有帮助?