Pipelines SDK 简介
旧版本
此页面介绍 Kubeflow Pipelines V1,请参阅 V2 文档获取最新信息。
请注意,虽然 V2 后端能够运行由 V1 SDK 提交的流水线,但我们强烈建议迁移到 V2 SDK。供参考,V1 SDK 的最终版本是 kfp==1.8.22
,其参考文档在此处提供。
与 Kubeflow Pipelines SDK 提供了一组 Python 包,您可以使用它们来指定和运行机器学习 (ML) 工作流。流水线(Pipeline)是对 ML 工作流的描述,包括构成工作流步骤的所有组件(Components)以及组件之间如何交互。
注意:此处的 SDK 文档指的是默认的使用 Argo 的 Kubeflow Pipelines。如果您正在运行使用 Tekton 的 Kubeflow Pipelines,请遵循适用于 Tekton 的 Kubeflow Pipelines SDK 文档。
SDK 包
Kubeflow Pipelines SDK 包括以下软件包
kfp.compiler
包含用于将流水线 Python DSL 编译为工作流 yaml 规范的类和方法 本包中的方法包括但不限于以下内容kfp.compiler.Compiler.compile
将您的 Python DSL 代码编译成单个静态配置(YAML 格式),Kubeflow Pipelines 服务可以处理该配置。Kubeflow Pipelines 服务将静态配置转换为一组 Kubernetes 资源进行执行。
kfp.components
包含用于与流水线组件交互的类和方法 本包中的方法包括但不限于以下内容kfp.components.func_to_container_op
将 Python 函数转换为流水线组件并返回一个工厂函数。然后,您可以调用该工厂函数来构造一个流水线任务 (ContainerOp
) 实例,该任务在容器中运行原始函数。kfp.components.load_component_from_file
从文件中加载流水线组件并返回一个工厂函数。然后,您可以调用该工厂函数来构造一个流水线任务 (ContainerOp
) 实例,该任务运行组件容器镜像。kfp.components.load_component_from_url
从 URL 加载流水线组件并返回一个工厂函数。然后,您可以调用该工厂函数来构造一个流水线任务 (ContainerOp
) 实例,该任务运行组件容器镜像。
kfp.dsl
包含领域特定语言 (DSL),您可以使用它来定义流水线和组件并与之交互。本包中的方法、类和模块包括但不限于以下内容kfp.dsl.PipelineParam
表示可在流水线组件之间传递的流水线参数。请参阅流水线参数指南。kfp.dsl.component
是 DSL 函数的装饰器,它返回一个流水线组件 (ContainerOp
)。kfp.dsl.pipeline
是 Python 函数的装饰器,它返回一个流水线。kfp.dsl.python_component
是 Python 函数的装饰器,它为函数对象添加流水线组件元数据。kfp.dsl.types
包含了 Kubeflow Pipelines SDK 定义的类型列表。类型包括String
、Integer
、Float
和Bool
等基本类型,以及GCPProjectID
和GCRPath
等领域特定类型。请参阅DSL 静态类型检查指南。kfp.dsl.ResourceOp
表示一个流水线任务 (op),允许您直接操作 Kubernetes 资源 (create
,get
,apply
, …)。kfp.dsl.VolumeOp
表示一个流水线任务 (op),用于创建一个新的PersistentVolumeClaim
(PVC)。它旨在快速处理创建PersistentVolumeClaim
的常见情况。kfp.dsl.VolumeSnapshotOp
表示一个流水线任务 (op),用于创建一个新的VolumeSnapshot
。它旨在快速处理创建VolumeSnapshot
的常见情况。kfp.dsl.PipelineVolume
表示用于在流水线步骤之间传递数据的卷。ContainerOp
s 可以通过构造函数的参数pvolumes
或add_pvolumes()
方法挂载PipelineVolume
。kfp.dsl.ParallelFor
表示流水线中对静态或动态项目集的并行 for 循环。for 循环的每次迭代都并行执行。kfp.dsl.ExitHandler
表示退出流水线时调用的退出处理程序。ExitHandler
的典型用法是垃圾收集。kfp.dsl.Condition
表示一组 ops,仅在满足特定条件时执行。指定的条件需要在运行时确定,通过在布尔表达式中包含至少一个任务输出或 PipelineParam。
kfp.Client
包含 Kubeflow Pipelines API 的 Python 客户端库。本包中的方法包括但不限于以下内容kfp.Client.create_experiment
创建一个流水线实验并返回一个实验对象。kfp.Client.run_pipeline
运行流水线并返回一个运行对象。kfp.Client.create_run_from_pipeline_func
编译流水线函数并将其提交到 Kubeflow Pipelines 执行。kfp.Client.create_run_from_pipeline_package
在 Kubeflow Pipelines 上运行本地流水线包。kfp.Client.upload_pipeline
上传本地文件在 Kubeflow Pipelines 中创建新的流水线。kfp.Client.upload_pipeline_version
上传本地文件创建流水线版本。请遵循示例了解如何创建流水线版本。
Kubeflow Pipelines 扩展模块包含可在其上使用 Kubeflow Pipelines 的特定平台的类和函数。示例包括用于本地部署、Google Cloud Platform (GCP)、Amazon Web Services (AWS) 和 Microsoft Azure 的实用函数。
Kubeflow Pipelines diagnose_me 模块包含有助于环境诊断任务的类和函数。
kfp.cli.diagnose_me.dev_env
报告来自开发环境的诊断元数据,例如您的 python 库版本。kfp.cli.diagnose_me.kubernetes_cluster
报告来自 Kubernetes 集群的诊断数据,例如 Kubernetes secrets。kfp.cli.diagnose_me.gcp
报告与您的 GCP 环境相关的诊断数据。
Kubeflow Pipelines CLI 工具
Kubeflow Pipelines CLI 工具使您能够直接从命令行使用 Kubeflow Pipelines SDK 的一个子集。Kubeflow Pipelines CLI 工具提供以下命令
kfp diagnose_me
使用指定参数运行环境诊断。--json
- 表示此命令必须以 JSON 格式返回结果。否则,结果将以人类可读格式返回。--namespace TEXT
- 指定要使用的 Kubernetes 命名空间。默认值为 all-namespaces。--project-id TEXT
- 对于 GCP 部署,此值指定要使用的 GCP 项目。如果未指定此值,则使用环境默认值。
kfp pipeline <COMMAND>
提供以下命令来帮助您管理流水线。get
- 从您的 Kubeflow Pipelines 集群获取有关 Kubeflow 流水线的详细信息。list
- 列出已上传到您的 Kubeflow Pipelines 集群的流水线。upload
- 将流水线上传到您的 Kubeflow Pipelines 集群。
kfp run <COMMAND>
提供以下命令来帮助您管理流水线运行。get
- 显示流水线运行的详细信息。list
- 列出最近的流水线运行。submit
- 提交流水线运行。
kfp --endpoint <ENDPOINT>
- 指定 Kubeflow Pipelines CLI 应连接到的端点。
安装 SDK
遵循指南安装 Kubeflow Pipelines SDK。
构建流水线和组件
本节总结了您可以使用 SDK 构建流水线和组件的方式。
Kubeflow 流水线(pipeline)是对 ML 工作流的可移植和可扩展定义。ML 工作流中的每个步骤(例如准备数据或训练模型)都是流水线组件的一个实例。
流水线组件(component)是一组自包含的代码,用于执行 ML 工作流中的一个步骤。组件在组件规范中定义,该规范定义以下内容
- 组件的接口,包括输入和输出。
- 组件的实现,包括容器镜像和执行命令。
- 组件的元数据,例如组件的名称和描述。
使用以下选项创建或重用流水线组件。
您可以通过为容器化应用程序定义组件规范来构建组件。
轻量级基于 Python 函数的组件通过使用 Kubeflow Pipelines SDK 生成 Python 函数的组件规范,使构建组件更加容易。
您可以在流水线中重用预构建组件。
下一步
- 了解如何在 DSL 中编写递归函数。
- 构建流水线组件。
- 了解如何使用 DSL 在流水线步骤中动态操作 Kubernetes 资源。