控制流
概述
虽然使用 @dsl.pipeline
装饰器修饰的 KFP 流水线看起来像一个普通的 Python 函数,但它实际上是流水线拓扑和控制流语义的表达式,使用 KFP 领域特定语言 (DSL) 构建。
组件指南展示了流水线拓扑如何通过数据传递和任务依赖关系来表达。本节介绍如何在流水线中引入控制流以创建更复杂的工作流。
KFP 流水线中核心的控制流类型是
条件
Kubeflow Pipelines 支持常见的条件控制流结构。您可以使用这些结构根据上游任务的输出或流水线输入参数有条件地执行任务。
已弃用
dsl.Condition
已被弃用,取而代之的是功能相同的 dsl.If
,后者更简洁、更符合 Python 习惯,并且与 dsl.Elif
和 dsl.Else
对象保持一致。dsl.If / dsl.Elif / dsl.Else
dsl.If
上下文管理器根据上游任务的输出或流水线输入参数,允许在其范围内有条件地执行任务。
上下文管理器接受两个参数:必需的 condition
和可选的 name
。condition
是一个比较表达式,其中两个操作数中至少有一个是来自上游任务的输出或流水线输入参数。
在以下流水线中,conditional_task
仅在 coin_flip_task
的输出为 'heads'
时执行。
from kfp import dsl
@dsl.component
def flip_coin() -> str:
import random
return random.choice(['heads', 'tails'])
#@dsl.component
#def my_comp():
# print('Conditional task executed!')
@dsl.pipeline
def my_pipeline():
coin_flip_task = flip_coin()
with dsl.If(coin_flip_task.output == 'heads'):
conditional_task = my_comp()
您还可以在 dsl.If
紧随其后使用dsl.Elif
和dsl.Else
上下文管理器,以实现附加的条件控制流功能。
from kfp import dsl
@dsl.component
def flip_three_sided_coin() -> str:
import random
return random.choice(['heads', 'tails', 'draw'])
@dsl.component
def print_comp(text: str):
print(text)
@dsl.pipeline
def my_pipeline():
coin_flip_task = flip_three_sided_coin()
with dsl.If(coin_flip_task.output == 'heads'):
print_comp(text='Got heads!')
with dsl.Elif(coin_flip_task.output == 'tails'):
print_comp(text='Got tails!')
with dsl.Else():
print_comp(text='Draw!')
dsl.OneOf
dsl.OneOf
可用于将互斥分支的输出收集到单个任务输出中,该输出可以由下游任务使用或从流水线输出。分支是互斥的,如果只有一个将被执行。为了强制执行此规则,KFP SDK 编译器要求 dsl.OneOf
消费来自逻辑关联的条件分支组中的任务,并且其中一个分支是 dsl.Else
分支。
尚不支持
KFP 编排后端尚不支持 dsl.OneOf
,但其他编排后端可能支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
例如,以下流水线使用 dsl.OneOf
收集互斥分支的输出
from kfp import dsl
@dsl.component
def flip_three_sided_coin() -> str:
import random
return random.choice(['heads', 'tails', 'draw'])
@dsl.component
def print_and_return(text: str) -> str:
print(text)
return text
@dsl.component
def announce_result(result: str):
print(f'The result is: {result}')
@dsl.pipeline
def my_pipeline() -> str:
coin_flip_task = flip_three_sided_coin()
with dsl.If(coin_flip_task.output == 'heads'):
t1 = print_and_return(text='Got heads!')
with dsl.Elif(coin_flip_task.output == 'tails'):
t2 = print_and_return(text='Got tails!')
with dsl.Else():
t3 = print_and_return(text='Draw!')
oneof = dsl.OneOf(t1.output, t2.output, t3.output)
announce_result(oneof)
return oneof
您应该使用 .output
或 .outputs[<key>]
将任务输出提供给 dsl.OneOf
,就像您将输出传递给下游任务一样。提供给 dsl.OneOf
的输出必须是相同类型,并且不能是其他 dsl.OneOf
或 dsl.Collected
实例。
循环
Kubeflow Pipelines 支持导致任务扇出和扇入的循环。
dsl.ParallelFor
dsl.ParallelFor
上下文管理器允许对静态项目集并行执行任务。
上下文管理器接受三个参数
items
:要循环遍历的静态项目集name
(可选):循环上下文的名称parallelism
(可选):执行dsl.ParallelFor
组时并发迭代的最大数量- 注意,
parallelism=0
表示无约束并行
- 注意,
尚不支持
KFP 编排后端尚不支持设置 parallelism
,但其他编排后端可能支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
在以下流水线中,train_model
将为 1、5、10 和 25 个 epoch 训练模型,同时最多运行两个训练任务
from kfp import dsl
#@dsl.component
#def train_model(epochs: int) -> Model:
# ...
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
train_model(epochs=epochs)
dsl.Collected
将 dsl.Collected
与 dsl.ParallelFor
一起使用,以收集并行任务循环的输出。
尚不支持
KFP 编排后端尚不支持 dsl.Collected
,但其他编排后端可能支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
示例:将 dsl.Collected
用作下游任务的输入
下游任务可以通过使用 List
参数或 List
制品注释的输入来消费 dsl.Collected
输出。
例如,在以下流水线中,max_accuracy
具有类型为 Input[List[Model]]
的输入 models
,并将从并行循环中训练的模型中找到准确率最高的模型
from kfp import dsl
from kfp.dsl import Model, Input
#def score_model(model: Model) -> float:
# return ...
#@dsl.component
#def train_model(epochs: int) -> Model:
# ...
@dsl.component
def max_accuracy(models: Input[List[Model]]) -> float:
return max(score_model(model) for model in models)
@dsl.pipeline
def my_pipeline():
# Train a model for 1, 5, 10, and 25 epochs
with dsl.ParallelFor(
items=[1, 5, 10, 25],
) as epochs:
train_model_task = train_model(epochs=epochs)
# Find the model with the highest accuracy
max_accuracy(
models=dsl.Collected(train_model_task.outputs['model'])
)
示例:参数的嵌套列表
您可以使用 dsl.Collected
收集嵌套循环中的输出,形成参数的嵌套列表。
例如,来自两个嵌套 dsl.ParallelFor
组的输出参数被收集在一个多级嵌套参数列表中,其中每个嵌套列表包含来自一个 dsl.ParallelFor
组的输出参数。嵌套级别数取决于嵌套 dsl.ParallelFor
上下文的数量。
相比之下,在嵌套循环中创建的制品被收集在扁平列表中。
示例:从流水线返回 dsl.Collected
您还可以从流水线返回一个 dsl.Collected
。
在返回注释中使用 List
参数或 List
制品,如下例所示
from typing import List
from kfp import dsl
from kfp.dsl import Model
#@dsl.component
#def train_model(epochs: int) -> Model:
# ...
@dsl.pipeline
def my_pipeline() -> List[Model]:
with dsl.ParallelFor(
items=[1, 5, 10, 25],
) as epochs:
train_model_task = train_model(epochs=epochs)
return dsl.Collected(train_model_task.outputs['model'])
退出处理
Kubeflow Pipelines 支持退出处理程序,用于实现主流水线任务执行完毕后运行的清理和错误处理任务。
dsl.ExitHandler
dsl.ExitHandler
上下文管理器允许流水线作者指定一个退出任务,该任务将在上下文管理器范围内任务执行完毕后运行,即使其中一个任务失败。这个结构类似于在普通 Python 中使用 try:
块后接 finally:
块,其中退出任务位于 finally:
块中。上下文管理器接受两个参数:必需的 exit_task
和可选的 name
。exit_task
接受一个实例化的 PipelineTask
。
上下文管理器接受两个参数:必需的 exit_task
和可选的 name
。exit_task
接受一个实例化的 PipelineTask
。
示例:基本清理任务
dsl.ExitHandler
最常见的用例是在主流水线任务执行完毕后运行清理任务。
在以下流水线中,clean_up_task
将在 create_dataset
和 train_and_save_models
都完成(无论它们是成功还是失败)后执行
from kfp import dsl
from kfp.dsl import Dataset
#@dsl.component
#def clean_up_resources():
# ...
#@dsl.component
#def create_datasets():
# ...
#@dsl.component
#def train_and_save_models(dataset: Dataset):
# ...
@dsl.pipeline
def my_pipeline():
clean_up_task = clean_up_resources()
with dsl.ExitHandler(exit_task=clean_up_task):
dataset_task = create_datasets()
train_task = train_and_save_models(dataset=dataset_task.output)
示例:访问流水线和任务状态元数据
您用作退出任务的任务可以使用一个特殊输入,该输入提供对流水线和任务状态元数据的访问,包括流水线失败或成功状态。
您可以通过使用 dsl.PipelineTaskFinalStatus
注释来注释您的退出任务来使用此特殊输入。此参数的参数将在运行时由后端自动提供。实例化您的退出任务时,您不应为此注释提供任何输入。
以下流水线使用 dsl.PipelineTaskFinalStatus
来获取关于流水线和任务失败的信息,即使在 fail_op
失败后也是如此
from kfp import dsl
from kfp.dsl import PipelineTaskFinalStatus
@dsl.component
def print_op(message: str):
print(message)
@dsl.component
def exit_op(user_input: str, status: PipelineTaskFinalStatus):
"""Prints pipeline run status."""
print(user_input)
print('Pipeline status: ', status.state)
print('Job resource name: ', status.pipeline_job_resource_name)
print('Pipeline task name: ', status.pipeline_task_name)
print('Error code: ', status.error_code)
print('Error message: ', status.error_message)
@dsl.component
def fail_op():
import sys
sys.exit(1)
@dsl.pipeline
def my_pipeline():
print_op(message='Starting pipeline...')
print_status_task = exit_op(user_input='Task execution status:')
with dsl.ExitHandler(exit_task=print_status_task):
fail_op()
尚不支持
KFP 编排后端尚不支持设置 PipelineTaskFinalStatus
,但其他编排后端可能支持。您可以通过 GitHub issue 跟踪此功能的支持情况。
示例:忽略上游任务失败
.ignore_upstream_failure()
是 PipelineTask
上的任务方法,它提供了另一种编写带有退出处理行为的流水线的方法。调用此方法将导致任务忽略任何指定的上游任务的失败(通过数据交换或使用 .after()
建立)。如果任务没有上游任务,则此方法无效。
在以下流水线定义中,无论 fail_op
是否成功,clean_up_task
都将在 fail_task
之后执行
from kfp import dsl
@dsl.component
def cleanup_op(message: str = 'Cleaning up...'):
print(message)
@dsl.component
def fail_op(message: str):
print(message)
raise ValueError('Task failed!')
@dsl.pipeline()
def my_pipeline(text: str = 'message'):
fail_task = fail_op(message=text)
clean_up_task = cleanup_op(
message=fail_task.output
).ignore_upstream_failure()
请注意,用于调用任务的组件(上例中的 cleanup_op
)需要为其从上游任务消费的所有输入设置默认值。如果在上游任务未能生成传递给调用任务的输出时,将应用默认值。指定默认值可确保调用任务始终成功,无论上游任务的状态如何。