构建基于 Python 函数的组件
旧版本
本页介绍的是 Kubeflow Pipelines V1,请参阅 V2 文档以获取最新信息。
请注意,虽然 V2 后端能够运行由 V1 SDK 提交的管道,但我们强烈建议迁移到 V2 SDK。作为参考,V1 SDK 的最终版本是 kfp==1.8.22
,其参考文档可在此处获取。
Kubeflow Pipelines 组件是执行机器学习工作流程中一个步骤的自包含代码集。管道组件由以下部分组成:
组件代码,实现了执行机器学习工作流程中一个步骤所需的逻辑。
组件规范,定义了以下内容:
- 组件的元数据,包括其名称和描述。
- 组件的接口,包括组件的输入和输出。
- 组件的实现,包括要运行的 Docker 容器镜像、如何将输入传递给组件代码以及如何获取组件的输出。
基于 Python 函数的组件让您可以通过将组件代码构建为 Python 函数并为您生成组件规范来更轻松地快速迭代。本文档介绍了如何构建基于 Python 函数的组件以及如何在管道中使用它们。
准备工作
- 运行以下命令安装 Kubeflow Pipelines SDK。如果您在 Jupyter Notebook 中运行此命令,安装 SDK 后请重启内核。
$ pip install kfp==1.8
- 导入
kfp
包。
import kfp
from kfp.components import create_component_from_func
- 按照使用 SDK 客户端连接到 Kubeflow Pipelines 中的步骤创建一个
kfp.Client
类实例。
client = kfp.Client() # change arguments accordingly
有关 Kubeflow Pipelines SDK 的更多信息,请参阅SDK 参考指南。
开始使用基于 Python 函数的组件
本节通过创建一个简单组件的过程,演示了如何开始构建基于 Python 函数的组件。
- 将组件代码定义为一个独立的 Python 函数。在此示例中,该函数将两个浮点数相加并返回两个参数的和。
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
- 使用
kfp.components.create_component_from_func
生成组件规范 YAML,并返回一个工厂函数,您可以使用该函数为管道创建kfp.dsl.ContainerOp
类实例。组件规范 YAML 是组件的可重用和可共享定义。
add_op = create_component_from_func(
add, output_component_file='add_component.yaml')
- 创建并运行您的管道。了解更多关于创建和运行管道的信息。
import kfp.dsl as dsl
@dsl.pipeline(
name='Addition pipeline',
description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
a='1',
b='7',
):
# Passes a pipeline parameter and a constant value to the `add_op` factory
# function.
first_add_task = add_op(a, 4)
# Passes an output reference from `first_add_task` and a pipeline parameter
# to the `add_op` factory function. For operations with a single return
# value, the output reference can be accessed as `task.output` or
# `task.outputs['output_name']`.
second_add_task = add_op(first_add_task.output, b)
# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)
构建基于 Python 函数的组件
使用以下说明构建基于 Python 函数的组件:
定义一个独立的 Python 函数。此函数必须满足以下要求:
- 不应使用在函数定义之外声明的任何代码。
- 导入语句必须添加在函数内部。了解更多关于在组件中使用和安装 Python 包的信息。
- 辅助函数必须在此函数内部定义。
Kubeflow Pipelines 使用函数的输入和输出来定义组件的接口。了解更多关于在组件之间传递数据的信息。您的函数的输入和输出必须满足以下要求:
- 如果函数接受或返回大量数据或复杂数据类型,则必须将这些数据作为文件传递。了解更多关于使用大量数据作为输入或输出的信息。
- 如果函数接受数字值作为参数,参数必须具有类型提示。支持的类型是
int
和float
。否则,参数将作为字符串传递。 - 如果您的组件返回多个小输出(短字符串、数字或布尔值),请使用
typing.NamedTuple
类型提示来标注您的函数,并使用collections.namedtuple
函数将函数的输出作为 tuple 的新子类返回。有关示例,请阅读按值传递参数。
(可选)如果您的函数具有复杂的依赖关系,请选择或构建一个容器镜像来运行您的 Python 函数。了解更多关于选择或构建组件容器镜像的信息。
调用
kfp.components.create_component_from_func(func)
将函数转换为管道组件。- func:要转换的 Python 函数。
- base_image:(可选)指定用于运行此函数的 Docker 容器镜像。了解更多关于选择或构建容器镜像的信息。
- output_component_file:(可选)将您的组件定义写入文件。您可以使用此文件与同事共享组件,或在不同的管道中重用它。
- packages_to_install:(可选)在运行函数之前要安装的带版本信息的 Python 包列表。
使用和安装 Python 包
当 Kubeflow Pipelines 运行您的管道时,每个组件都会在 Kubernetes Pod 上的 Docker 容器镜像中运行。要加载 Python 函数依赖的包,必须满足以下条件之一:
- 该包必须安装在容器镜像中。
- 必须使用
kfp.components.create_component_from_func(func)
函数的packages_to_install
参数定义该包。 - 您的函数必须安装该包。例如,您的函数可以使用
subprocess
模块来运行pip install
等命令以安装包。
选择或构建容器镜像
目前,如果您未指定容器镜像,基于 Python 函数的组件将使用 python:3.7
容器镜像。如果您的函数具有复杂的依赖关系,使用预装了依赖项的容器镜像或构建自定义容器镜像可能会更有益。预装依赖项可以减少组件的运行时间,因为组件无需在每次运行时下载和安装包。
许多框架(如 TensorFlow 和 PyTorch)以及云服务提供商都提供预构建的容器镜像,其中安装了常见的依赖项。
如果预构建容器不可用,您可以构建一个包含 Python 函数依赖项的自定义容器镜像。有关构建自定义容器的更多信息,请阅读 Docker 文档中的 Dockerfile 参考指南。
如果您构建或选择了容器镜像,而不是使用默认容器镜像,则该容器镜像必须使用 Python 3.5 或更高版本。
理解数据如何在组件之间传递
当 Kubeflow Pipelines 运行您的组件时,会在 Kubernetes Pod 中启动一个容器镜像,并将组件的输入作为命令行参数传递。组件完成后,组件的输出将作为文件返回。
基于 Python 函数的组件通过为您构建组件规范,使得构建管道组件变得更加容易。基于 Python 函数的组件还处理将输入传递到组件以及将函数输出返回到管道的复杂性。
以下章节介绍了如何按值和按文件传递参数。
- 按值传递的参数包括数字、布尔值和短字符串。Kubeflow Pipelines 通过将值作为命令行参数传递,按值将参数传递给组件。
- 按文件传递的参数包括 CSV、图像和复杂类型。这些文件存储在运行在 Kubernetes 上的组件可访问的位置,例如持久卷声明或云存储服务。Kubeflow Pipelines 通过将文件路径作为命令行参数传递,按文件将参数传递给组件。
输入和输出参数名称
当您使用 Kubeflow Pipelines SDK 将 Python 函数转换为管道组件时,Kubeflow Pipelines SDK 会使用函数的接口以以下方式定义组件的接口:
- 某些参数定义输入参数。
- 某些参数定义输出参数。
- 函数的返回值用作输出参数。如果返回值是
collections.namedtuple
,则使用该命名元组返回多个小值。
由于您可以在组件之间按值或按路径传递参数,Kubeflow Pipelines SDK 会移除常见的参数后缀,这些后缀会泄露组件预期的实现方式。例如,一个摄取数据并输出 CSV 数据的基于 Python 函数的组件可能有一个定义为 csv_path: comp.OutputPath(str)
的输出参数。在这种情况下,输出是 CSV 数据,而不是路径。因此,Kubeflow Pipelines SDK 将输出名称简化为 csv
。
Kubeflow Pipelines SDK 使用以下规则定义组件接口中的输入和输出参数名称:
- 如果参数名以
_path
结尾,并且参数被标注为kfp.components.InputPath
或kfp.components.OutputPath
,则参数名是去掉末尾_path
的参数名。 - 如果参数名以
_file
结尾,则参数名是去掉末尾_file
的参数名。 - 如果使用
return
语句从组件返回单个小值,则输出参数命名为output
。 - 如果您通过返回
collections.namedtuple
从组件返回多个小值,Kubeflow Pipelines SDK 会使用元组的字段名作为输出参数名。
否则,Kubeflow Pipelines SDK 将使用参数名作为参数名称。
按值传递参数
基于 Python 函数的组件通过允许您标注 Python 函数来定义组件接口,从而使得在组件之间按值(如数字、布尔值和短字符串)传递参数更加容易。支持的类型包括 int
、float
、bool
和 str
。如果 list
或 dict
实例包含 int
、float
、bool
或 str
等小值,您也可以按值传递它们。如果您未标注函数,这些输入参数将作为字符串传递。
如果您的组件按值返回多个输出,请使用 typing.NamedTuple
类型提示标注您的函数,并使用 collections.namedtuple
函数将函数的输出作为 tuple 的新子类返回。
您还可以从函数返回元数据和指标。
- 元数据有助于您可视化管道结果。了解更多关于可视化管道元数据的信息。
- 指标可帮助您比较管道运行。了解更多关于使用管道指标的信息。
以下示例演示了如何按值返回多个输出,包括组件元数据和指标。
from typing import NamedTuple
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
'ExampleOutputs',
[
('sum', float),
('product', float),
('mlpipeline_ui_metadata', 'UI_metadata'),
('mlpipeline_metrics', 'Metrics')
]):
"""Example function that demonstrates how to return multiple values."""
sum_value = a + b
product_value = a * b
# Export a sample tensorboard
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Export two metrics
metrics = {
'metrics': [
{
'name': 'sum',
'numberValue': float(sum_value),
},{
'name': 'product',
'numberValue': float(product_value),
}
]
}
from collections import namedtuple
example_output = namedtuple(
'ExampleOutputs',
['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
return example_output(sum_value, product_value, metadata, metrics)
按文件传递参数
基于 Python 函数的组件通过允许您标注 Python 函数的参数来指定哪些参数引用文件,从而使得向组件传递文件或从组件返回文件变得更加容易。Python 函数的参数可以引用输入文件或输出文件。如果您的参数是输出文件,Kubeflow Pipelines 会向您的函数传递一个可用于存储输出文件的路径或流。
以下示例接受一个文件作为输入,并返回两个文件作为输出。
def split_text_lines(
source_path: comp.InputPath(str),
odd_lines_path: comp.OutputPath(str),
even_lines_path: comp.OutputPath(str)):
"""Splits a text file into two files, with even lines going to one file
and odd lines to the other."""
with open(source_path, 'r') as reader:
with open(odd_lines_path, 'w') as odd_writer:
with open(even_lines_path, 'w') as even_writer:
while True:
line = reader.readline()
if line == "":
break
odd_writer.write(line)
line = reader.readline()
if line == "":
break
even_writer.write(line)
在此示例中,输入和输出被定义为 split_text_lines
函数的参数。这使得 Kubeflow Pipelines 可以将源数据文件的路径以及输出数据文件的路径传递到函数中。
要接受文件作为输入参数,请使用以下类型标注之一:
kfp.components.InputBinaryFile
:使用此标注指定您的函数期望参数是此函数可读取的io.BytesIO
实例。kfp.components.InputPath
:使用此标注指定您的函数期望参数是输入文件的路径,类型为string
。kfp.components.InputTextFile
:使用此标注指定您的函数期望参数是此函数可读取的io.TextIOWrapper
实例。
要将文件作为输出返回,请使用以下类型标注之一:
kfp.components.OutputBinaryFile
:使用此标注指定您的函数期望参数是此函数可写入的io.BytesIO
实例。kfp.components.OutputPath
:使用此标注指定您的函数期望参数是存储输出文件的路径,类型为string
。kfp.components.OutputTextFile
:使用此标注指定您的函数期望参数是此函数可写入的io.TextIOWrapper
实例。
基于 Python 函数的组件示例
本节演示了如何构建一个基于 Python 函数的组件,该组件使用导入、辅助函数并产生多个输出。
- 定义您的函数。此示例函数使用
numpy
包在辅助函数中计算给定被除数和除数的商和余数。除了商和余数,该函数还返回用于可视化的元数据和两个指标。
from typing import NamedTuple
def my_divmod(
dividend: float,
divisor: float) -> NamedTuple(
'MyDivmodOutput',
[
('quotient', float),
('remainder', float),
('mlpipeline_ui_metadata', 'UI_metadata'),
('mlpipeline_metrics', 'Metrics')
]):
'''Divides two numbers and calculate the quotient and remainder'''
# Import the numpy package inside the component function
import numpy as np
# Define a helper function
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
from tensorflow.python.lib.io import file_io
import json
# Export a sample tensorboard
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Export two metrics
metrics = {
'metrics': [{
'name': 'quotient',
'numberValue': float(quotient),
},{
'name': 'remainder',
'numberValue': float(remainder),
}]}
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput',
['quotient', 'remainder', 'mlpipeline_ui_metadata',
'mlpipeline_metrics'])
return divmod_output(quotient, remainder, json.dumps(metadata),
json.dumps(metrics))
- 通过直接运行或使用单元测试来测试您的函数。
my_divmod(100, 7)
这应该返回如下结果:
MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
使用
kfp.components.create_component_from_func
返回一个工厂函数,您可以使用该函数为管道创建kfp.dsl.ContainerOp
类实例。此示例还指定了用于运行此函数的基础容器镜像。
divmod_op = comp.create_component_from_func(
my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
- 定义您的管道。此示例使用了
divmod_op
工厂函数和之前示例中的add_op
工厂函数。
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
a='1',
b='7',
c='17',
):
# Passes a pipeline parameter and a constant value as operation arguments.
add_task = add_op(a, 4) # The add_op factory function returns
# a dsl.ContainerOp class instance.
# Passes the output of the add_task and a pipeline parameter as operation
# arguments. For an operation with a single return value, the output
# reference is accessed using `task.output` or
# `task.outputs['output_name']`.
divmod_task = divmod_op(add_task.output, b)
# For an operation with multiple return values, output references are
# accessed as `task.outputs['output_name']`.
result_task = add_op(divmod_task.outputs['quotient'], c)
- 编译并运行您的管道。了解更多关于编译和运行管道的信息。
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)