使用平台特定功能
概览
KFP 的优势之一是跨平台可移植性。KFP SDK 将管道定义编译为 IR YAML,不同的后端都可以读取和执行它,包括 Kubeflow Pipelines 开源后端 和 Vertex AI Pipelines。
对于跨平台不可移植的功能,用户可以通过 KFP SDK 平台特定插件库编写具有平台特定功能的管道。通常,平台特定插件库提供的函数与 KFP SDK 直接提供的 任务级配置方法 类似,都作用于任务。
kfp-kubernetes
目前,唯一的 KFP SDK 平台特定插件库是 kfp-kubernetes
,它受 Kubeflow Pipelines 开源后端 支持,并可以直接访问一些 Kubernetes 资源和功能。
更多信息,请参阅 kfp-kubernetes
文档。
Kubernetes PersistentVolumeClaims
在本例中,我们将使用 kfp-kubernetes
创建一个 PersistentVolumeClaim (PVC),使用该 PVC 在任务之间传递数据,然后删除该 PVC。
我们假设您熟悉 Kubernetes 中的 PersistentVolume
和 PersistentVolumeClaim
资源,以及 KFP 中的 编写组件 和 编写管道。
步骤 1:安装 kfp-kubernetes
库
运行以下命令安装 kfp-kubernetes
库
pip install kfp[kubernetes]
步骤 2:创建读/写到挂载路径的组件
创建两个简单的组件,它们读写 /data
目录下的文件。
在后面的步骤中,我们将把一个 PVC 卷挂载到 /data
目录。
from kfp import dsl
@dsl.component
def producer() -> str:
with open('/data/file.txt', 'w') as file:
file.write('Hello world')
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
@dsl.component
def consumer() -> str:
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
步骤 3:使用 CreatePVC 动态创建 PVC
现在我们有了组件,可以开始构建管道了。
我们需要一个 PVC 来挂载,所以我们将使用 kubernetes.CreatePVC
预构建组件来创建一个。
from kfp import kubernetes
@dsl.pipeline
def my_pipeline():
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteMany'],
size='5Gi',
storage_class_name='standard',
)
此组件会从 StorageClass 'standard'
动态创建大小为 5GB 的 PVC,并使用 ReadWriteMany
访问模式。该 PVC 将以创建它的底层 Argo workflow 的名称命名,并附加后缀 -my-pvc
。CreatePVC
组件会将此名称作为输出 'name'
返回。
步骤 4:读写数据到 PVC
接下来,我们将对 producer
和 consumer
组件使用 mount_pvc
任务修饰符。
我们将 task2
安排在 task1
之后运行,这样组件就不会同时读写 PVC。
# write to the PVC
task1 = producer()
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
# read to the PVC
task2 = consumer()
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/reused_data',
)
task2.after(task1)
步骤 5:删除 PVC
最后,我们可以在 task2
完成后安排删除 PVC,以清理我们创建的 Kubernetes 资源。
delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']
).after(task2)
有关完整的管道和更多信息,请参阅 kfp-kubernetes
文档中的类似示例。