构建流水线
旧版本
本页面介绍的是 Kubeflow Pipelines V1,请参阅 V2 文档获取最新信息。
请注意,虽然 V2 后端能够运行由 V1 SDK 提交的流水线,我们仍强烈建议迁移到 V2 SDK。作为参考,V1 SDK 的最终发布版本是 kfp==1.8.22
,其参考文档可在此处获取。
Kubeflow 流水线是机器学习 (ML) 工作流的一种可移植且可扩展的定义。ML 工作流中的每个步骤,例如准备数据或训练模型,都是一个流水线组件的实例。本文档概述了流水线概念和最佳实践,并提供了如何构建 ML 流水线的说明。
开始之前
- 运行以下命令安装 Kubeflow Pipelines SDK。如果你在 Jupyter notebook 中运行此命令,安装 SDK 后请重启内核。
$ pip install kfp --upgrade
- 导入
kfp
和kfp.components
包。
import kfp
import kfp.components as comp
理解流水线
Kubeflow 流水线是基于容器的 ML 工作流的一种可移植且可扩展的定义。流水线由一组输入参数和工作流中的步骤列表组成。流水线中的每个步骤都是一个组件的实例,表示为 ContainerOp
的实例。
你可以使用流水线来:
- 编排可重复的 ML 工作流。
- 通过使用不同的超参数集运行工作流来加速实验。
理解流水线组件
流水线组件是执行流水线工作流中一个步骤的容器化应用程序。流水线组件在组件规范中定义,其中定义了以下内容:
- 组件的接口、其输入和输出。
- 组件的实现、容器镜像和要执行的命令。
- 组件的元数据,例如组件的名称和描述。
你可以通过为容器化应用程序定义组件规范来构建组件,也可以使用 Kubeflow Pipelines SDK 为 Python 函数生成组件规范。你还可以在流水线中重用预构建组件。
理解流水线图
流水线工作流中的每个步骤都是一个组件的实例。定义流水线时,你需要指定每个步骤输入的来源。步骤输入可以来自流水线的输入参数、常量,或者步骤输入可以依赖于此流水线中其他步骤的输出。Kubeflow Pipelines 使用这些依赖关系将你的流水线工作流定义为图。
例如,考虑包含以下步骤的流水线:数据摄取、生成统计信息、预处理数据和训练模型。下面描述了每个步骤之间的数据依赖关系。
- 数据摄取:此步骤从使用流水线参数指定的外部源加载数据,并输出一个数据集。由于此步骤不依赖于任何其他步骤的输出,因此此步骤可以首先运行。
- 生成统计信息:此步骤使用摄取的数据集生成并输出一组统计信息。由于此步骤依赖于数据摄取步骤生成的数据集,因此它必须在数据摄取步骤之后运行。
- 预处理数据:此步骤预处理摄取的数据集,并将数据转换为预处理数据集。由于此步骤依赖于数据摄取步骤生成的数据集,因此它必须在数据摄取步骤之后运行。
- 训练模型:此步骤使用预处理数据集、生成的统计信息以及学习率等流水线参数来训练模型。由于此步骤依赖于预处理数据和生成的统计信息,因此它必须在预处理数据和生成统计信息步骤都完成后运行。
由于生成统计信息和预处理数据步骤都依赖于摄取的数据,因此生成统计信息和预处理数据步骤可以并行运行。一旦数据依赖关系可用,所有其他步骤都会执行。
设计你的流水线
设计流水线时,考虑如何将 ML 工作流拆分为流水线组件。将 ML 工作流拆分为流水线组件的过程类似于将一个整体脚本拆分为可测试函数的过程。以下规则可以帮助你定义构建流水线所需的组件。
组件应具有单一职责。单一职责使得组件更容易测试和重用。例如,如果你有一个加载数据的组件,你可以将其重用于类似的加载数据任务。如果你有一个加载和转换数据集的组件,则该组件的实用性可能会降低,因为你只能在需要加载和转换该数据集时使用它。
尽可能重用组件。Kubeflow Pipelines 提供了用于常见流水线任务和访问云服务的组件。
考虑你需要了解什么来调试流水线并研究流水线生成的模型的谱系。Kubeflow Pipelines 存储每个流水线步骤的输入和输出。通过检查流水线运行产生的 Artifact,你可以更好地理解不同运行之间模型质量的差异或跟踪工作流中的错误。
通常,你应该在设计组件时考虑其可组合性。
流水线由组件实例(也称为步骤)组成。步骤可以将它们的输入定义为依赖于另一个步骤的输出。步骤之间的依赖关系定义了流水线工作流图。
构建流水线组件
Kubeflow 流水线组件是容器化应用程序,用于执行 ML 工作流中的一个步骤。以下是定义流水线组件的方法:
如果你有一个容器化应用程序,希望将其用作流水线组件,请创建一个组件规范来将此容器镜像定义为流水线组件。
此选项提供了将用任何语言编写的代码包含在流水线中的灵活性,只要你能将应用程序打包为容器镜像即可。详细了解构建流水线组件。
如果你的组件代码可以表示为 Python 函数,请评估你的组件是否可以构建为基于 Python 函数的组件。Kubeflow Pipelines SDK 使构建轻量级基于 Python 函数的组件变得更容易,从而为你节省了创建组件规范的工作量。
尽可能重用预构建组件,以节省构建自定义组件的工作量。
本指南中的示例演示了如何构建一个使用基于 Python 函数的组件并重用预构建组件的流水线。
理解数据如何在组件之间传递
当 Kubeflow Pipelines 运行组件时,会在 Kubernetes Pod 中启动一个容器镜像,并将组件的输入作为命令行参数传递进去。组件完成后,组件的输出将作为文件返回。
在组件规范中,你定义了组件的输入和输出,以及如何将输入和输出路径作为命令行参数传递给你的程序。你可以通过值传递少量输入(例如短字符串或数字)给你的组件。大型输入(例如数据集)必须作为文件路径传递给你的组件。输出写入 Kubeflow Pipelines 提供的路径。
基于 Python 函数的组件通过为你构建组件规范,使构建流水线组件变得更容易。基于 Python 函数的组件还处理将输入传递到组件并将函数输出传递回流水线的复杂性。
开始构建流水线
以下部分通过逐步讲解将 Python 脚本转换为流水线的流程,演示了如何开始构建 Kubeflow 流水线。
设计你的流水线
以下步骤将逐步讲解设计流水线时可能面临的一些设计决策。
- 评估流程。在以下示例中,一个 Python 函数从公共网站下载一个包含多个 CSV 文件的压缩 tar 文件(
.tar.gz
)。该函数提取 CSV 文件,然后将它们合并到单个文件中。
import glob
import pandas as pd
import tarfile
import urllib.request
def download_and_merge_csv(url: str, output_csv: str):
with urllib.request.urlopen(url) as res:
tarfile.open(fileobj=res, mode="r|gz").extractall('data')
df = pd.concat(
[pd.read_csv(csv_file, header=None)
for csv_file in glob.glob('data/*.csv')])
df.to_csv(output_csv, index=False, header=False)
- 运行以下 Python 命令测试函数。
download_and_merge_csv(
url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz',
output_csv='merged_data.csv')
- 运行以下命令打印合并后 CSV 文件的前几行。
$ head merged_data.csv
设计你的流水线。例如,考虑以下流水线设计。
使用单个步骤实现流水线。在这种情况下,流水线包含一个与示例函数功能相似的组件。这是一个简单的函数,实现一个单步骤流水线在此情况下是合理的方法。
这种方法的缺点是压缩的 tar 文件不会成为流水线运行的 Artifact。如果无法获得此 Artifact,可能会使该组件在生产环境中的调试变得更加困难。
将其实现为两步流水线。第一步从网站下载文件。第二步从压缩的 tar 文件中提取 CSV 文件,并将它们合并到单个文件中。
这种方法有几个好处:
- 你可以重用Web 下载组件来实现第一步。
- 每个步骤都有单一职责,这使得组件更容易重用。
- 压缩的 tar 文件是第一个流水线步骤的 Artifact。这意味着在调试使用此组件的流水线时,你可以检查此 Artifact。
此示例实现了两步流水线。
构建你的流水线组件
构建你的流水线组件。此示例修改了初始脚本,以提取压缩 tar 文件的内容,合并压缩 tar 文件中包含的 CSV 文件,并返回合并后的 CSV 文件。
此示例构建了一个基于 Python 函数的组件。你还可以将组件代码打包为 Docker 容器镜像,并使用 ComponentSpec 定义组件。
在此示例中,需要对原始函数进行以下修改。
- 文件下载逻辑被移除。压缩 tar 文件的路径作为参数传递给此函数。
- import 语句被移到函数内部。基于 Python 函数的组件需要独立的 Python 函数。这意味着任何必需的 import 语句必须在函数内部定义,并且任何辅助函数也必须在函数内部定义。详细了解构建基于 Python 函数的组件。
- 函数的参数使用
kfp.components.InputPath
和kfp.components.OutputPath
注解进行装饰。这些注解让 Kubeflow Pipelines 知道要提供压缩 tar 文件的路径,并创建一个函数存储合并后 CSV 文件的路径。
以下示例显示了更新后的
merge_csv
函数。
def merge_csv(file_path: comp.InputPath('Tarball'),
output_csv: comp.OutputPath('CSV')):
import glob
import pandas as pd
import tarfile
tarfile.open(name=file_path, mode="r|gz").extractall('data')
df = pd.concat(
[pd.read_csv(csv_file, header=None)
for csv_file in glob.glob('data/*.csv')])
df.to_csv(output_csv, index=False, header=False)
- 使用
kfp.components.create_component_from_func
返回一个工厂函数,你可以使用该函数创建流水线步骤。此示例还指定了运行此函数的基础容器镜像、保存组件规范的路径以及需要在运行时安装到容器中的 PyPI 包列表。
create_step_merge_csv = kfp.components.create_component_from_func(
func=merge_csv,
output_component_file='component.yaml', # This is optional. It saves the component spec for future use.
base_image='python:3.7',
packages_to_install=['pandas==1.1.4'])
构建你的流水线
- 使用
kfp.components.load_component_from_url
加载在此流水线中重用的任何组件的组件规范 YAML。
web_downloader_op = kfp.components.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')
将你的流水线定义为 Python 函数。
你的流水线函数的参数定义了流水线的参数。使用流水线参数可以尝试不同的超参数(例如用于训练模型的学习率),或将运行级别输入(例如输入文件的路径)传递到流水线运行中。
使用由
kfp.components.create_component_from_func
和kfp.components.load_component_from_url
创建的工厂函数来创建流水线的任务。组件工厂函数的输入可以是流水线参数、其他任务的输出或常量值。在此示例中,web_downloader_task
任务使用了url
流水线参数,而merge_csv_task
使用了web_downloader_task
的data
输出。
# Define a pipeline and create a task from a component:
def my_pipeline(url):
web_downloader_task = web_downloader_op(url=url)
merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
# The outputs of the merge_csv_task can be referenced using the
# merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']
编译并运行你的流水线
如上文所述,在 Python 中定义好流水线后,使用以下任一选项编译流水线并将其提交到 Kubeflow Pipelines 服务。
选项 1:编译然后在 UI 中上传
- 运行以下命令编译流水线并将其保存为
pipeline.yaml
。
kfp.compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path='pipeline.yaml')
- 使用 Kubeflow Pipelines 用户界面上传并运行你的
pipeline.yaml
。请参阅UI 入门指南。
选项 2:使用 Kubeflow Pipelines SDK 客户端运行流水线
- 按照使用 SDK 客户端连接到 Kubeflow Pipelines 中的步骤创建
kfp.Client
类的实例。
client = kfp.Client() # change arguments accordingly
- 使用
kfp.Client
实例运行流水线
client.create_run_from_pipeline_func(
my_pipeline,
arguments={
'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
})
下一步
- 了解高级流水线功能,例如编写递归组件以及在流水线中使用条件执行。
- 了解如何在流水线中操作 Kubernetes 资源(实验性)。