将组件组成流水线
虽然组件有三种编写方法,但流水线只有一种编写方法:它们是使用带有 @dsl.pipeline 装饰器的流水线函数定义的。以下面的 pythagorean 流水线为例,它通过简单的算术组件将勾股定理实现为流水线
from kfp import dsl
@dsl.component
def square(x: float) -> float:
return x ** 2
@dsl.component
def add(x: float, y: float) -> float:
return x + y
@dsl.component
def square_root(x: float) -> float:
return x ** .5
@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
a_sq_task = square(x=a)
b_sq_task = square(x=b)
sum_task = add(x=a_sq_task.output, y=b_sq_task.output)
return square_root(x=sum_task.output).output
虽然使用 @dsl.pipeline 装饰器装饰的 KFP 流水线看起来像一个普通的 Python 函数,但它实际上是流水线拓扑和控制流语义的表达,使用 KFP 领域特定语言 (DSL) 构建。
流水线定义有四个部分
- 流水线装饰器
- 在函数签名中声明的输入和输出
- 数据传递和任务依赖
- 任务配置
- 流水线控制流
本节介绍前四个部分。控制流在下一节中介绍。
流水线装饰器
KFP 流水线定义在带有 @dsl.pipeline 装饰器的函数内部。该装饰器接受三个可选参数
name是流水线的名称。如果未提供,名称默认为流水线函数名称的规范化版本。description是对流水线的描述。pipeline_root是远程存储目标的根路径,流水线中的任务将在此创建输出。流水线提交客户端也可以设置或覆盖pipeline_root。display_name是流水线的人类可读名称。
您可以修改 pythagorean 的定义以使用这些参数
@dsl.pipeline(name='pythagorean-theorem-pipeline',
description='Solve for the length of a hypotenuse of a triangle with sides length `a` and `b`.',
pipeline_root='gs://my-pipelines-bucket',
display_name='Pythagorean pipeline.')
def pythagorean(a: float, b: float) -> float:
...
另请参阅附加功能:组件文档字符串格式,了解如何通过文档字符串提供流水线元数据。
流水线输入和输出
与组件一样,流水线输入和输出由流水线函数签名中的参数和注解定义。
在前面的示例中,pythagorean 接受输入 a 和 b,每个类型为 float,并创建一个 float 输出。
流水线输入通过函数输入参数/注解声明,流水线输出通过函数输出注解声明。与使用输出 Artifact 或使用 dsl.OutputPath 的容器组件不同,流水线输出*绝不会通过流水线函数输入参数*声明。
有关如何声明流水线函数输入和输出的更多信息,请参阅数据类型。
数据传递和任务依赖
在流水线定义中调用组件时,它会构造一个 PipelineTask 实例。您可以使用 PipelineTask 的 .output 和 .outputs 属性在任务之间传递数据。
对于具有由单个返回注解指示的单个无名输出的任务,请使用 PipelineTask.output 访问输出。组件 square、add 和 square_root 都属于这种情况,它们各自都有一个无名输出。
对于具有多个输出或命名输出的任务,请使用 PipelineTask.outputs['<output-key>'] 访问输出。在数据类型:参数中更详细地描述了使用命名输出参数。
在没有数据交换的情况下,任务将并行运行以实现高效的流水线执行。a_sq_task 和 b_sq_task 就是这种情况,它们不进行数据交换。
当任务交换数据时,这些任务之间会建立执行顺序。这是为了确保上游任务在其输出创建完成后,下游任务再尝试使用这些输出。例如,在 pythagorean 中,后端将在执行 sum_task 之前执行 a_sq_task 和 b_sq_task。同样,它将在执行从 square_root 组件创建的最终任务之前执行 sum_task。
在某些情况下,您可能希望在没有数据交换的情况下建立执行顺序。在这种情况下,您可以在另一个任务上调用一个任务的 .after() 方法。例如,虽然 a_sq_task 和 b_sq_task 不交换数据,但我们可以指定 a_sq_task 在 b_sq_task 之前运行
@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
a_sq_task = square(x=a)
b_sq_task = square(x=b)
b_sq_task.after(a_sq_task)
...
特殊输入类型
您可以将一些特殊的输入值传递给流水线定义中的组件,以使组件能够访问自身的某些元数据。这些值可以传递给类型为 str 的输入参数。
例如,以下 print_op 组件使用 dsl.PIPELINE_JOB_NAME_PLACEHOLDER 在组件运行时打印流水线作业名称
from kfp import dsl
@dsl.pipeline
def my_pipeline():
print_op(text=dsl.PIPELINE_JOB_NAME_PLACEHOLDER)
可以使用这种风格的几个特殊值,包括
dsl.PIPELINE_JOB_NAME_PLACEHOLDERdsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDERdsl.PIPELINE_JOB_ID_PLACEHOLDERdsl.PIPELINE_TASK_NAME_PLACEHOLDERdsl.PIPELINE_TASK_ID_PLACEHOLDERdsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDERdsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDERdsl.PIPELINE_ROOT_PLACEHOLDER
尚不支持
PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER、PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER 和 PIPELINE_ROOT_PLACEHOLDER 尚未被 KFP 编排后端支持,但可能被其他编排后端支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
有关每个特殊输入提供的数据的更多信息,请参阅KFP SDK DSL 参考文档。
任务配置
KFP SDK 通过任务方法公开了几个与平台无关的任务级别配置。与平台无关的配置是指在所有符合 KFP 标准的后端(例如开源 KFP 后端或Google Cloud Vertex AI Pipelines)上预期表现出相似执行行为的配置。
所有与平台无关的任务级别配置都使用 PipelineTask 方法设置。以下是一个环境变量示例
from kfp import dsl
@dsl.component
def print_env_var():
import os
print(os.environ.get('MY_ENV_VAR'))
@dsl.pipeline()
def my_pipeline():
task = print_env_var()
task.set_env_variable('MY_ENV_VAR', 'hello')
执行时,print_env_var 组件应该打印 'hello'。
任务级别配置方法也可以链式调用
print_env_var().set_env_variable('MY_ENV_VAR', 'hello').set_env_variable('OTHER_VAR', 'world')
KFP SDK 提供了以下用于设置任务级别配置的任务方法
.add_accelerator_type.set_accelerator_limit.set_cpu_limit.set_memory_limit.set_env_variable.set_caching_options.set_display_name.set_retry.ignore_upstream_failure
尚不支持
.ignore_upstream_failure 尚未被 KFP 编排后端支持,但可能被其他编排后端支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
有关这些方法的更多信息,请参阅PipelineTask 参考文档。
将流水线用作组件
流水线本身可以在其他流水线中用作组件,就像在流水线中使用任何其他单步骤组件一样。例如,我们可以轻松地重新组织前面的 pythagorean 流水线,以使用内部辅助流水线 square_and_sum
from kfp import dsl
@dsl.component
def square(x: float) -> float:
return x ** 2
@dsl.component
def add(x: float, y: float) -> float:
return x + y
@dsl.component
def square_root(x: float) -> float:
return x ** .5
@dsl.pipeline
def square_and_sum(a: float, b: float) -> float:
a_sq_task = square(x=a)
b_sq_task = square(x=b)
return add(x=a_sq_task.output, y=b_sq_task.output).output
@dsl.pipeline
def pythagorean(a: float = 1.2, b: float = 1.2) -> float:
sq_and_sum_task = square_and_sum(a=a, b=b)
return square_root(x=sq_and_sum_task.output).output