控制流

在 Kubeflow Pipelines 中使用条件、循环和退出处理等控制流。

概述

虽然使用 @dsl.pipeline 装饰器修饰的 KFP 流水线看起来像一个普通的 Python 函数,但它实际上是流水线拓扑和控制流语义的表达式,使用 KFP 领域特定语言 (DSL) 构建。

组件指南展示了流水线拓扑如何通过数据传递和任务依赖关系来表达。本节介绍如何在流水线中引入控制流以创建更复杂的工作流。

KFP 流水线中核心的控制流类型是

  1. 条件
  2. 循环
  3. 退出处理

条件

Kubeflow Pipelines 支持常见的条件控制流结构。您可以使用这些结构根据上游任务的输出或流水线输入参数有条件地执行任务。

dsl.If / dsl.Elif / dsl.Else

dsl.If 上下文管理器根据上游任务的输出或流水线输入参数,允许在其范围内有条件地执行任务。

上下文管理器接受两个参数:必需的 condition 和可选的 namecondition 是一个比较表达式,其中两个操作数中至少有一个是来自上游任务的输出或流水线输入参数。

在以下流水线中,conditional_task 仅在 coin_flip_task 的输出为 'heads' 时执行。

from kfp import dsl

@dsl.component
def flip_coin() -> str:
    import random
    return random.choice(['heads', 'tails'])

#@dsl.component
#def my_comp():
#    print('Conditional task executed!')

@dsl.pipeline
def my_pipeline():
    coin_flip_task = flip_coin()
    with dsl.If(coin_flip_task.output == 'heads'):
        conditional_task = my_comp()

您还可以在 dsl.If 紧随其后使用dsl.Elifdsl.Else 上下文管理器,以实现附加的条件控制流功能。

from kfp import dsl

@dsl.component
def flip_three_sided_coin() -> str:
    import random
    return random.choice(['heads', 'tails', 'draw'])

@dsl.component
def print_comp(text: str):
    print(text)

@dsl.pipeline
def my_pipeline():
    coin_flip_task = flip_three_sided_coin()
    with dsl.If(coin_flip_task.output == 'heads'):
        print_comp(text='Got heads!')
    with dsl.Elif(coin_flip_task.output == 'tails'):
        print_comp(text='Got tails!')
    with dsl.Else():
        print_comp(text='Draw!')

dsl.OneOf

dsl.OneOf 可用于将互斥分支的输出收集到单个任务输出中,该输出可以由下游任务使用或从流水线输出。分支是互斥的,如果只有一个将被执行。为了强制执行此规则,KFP SDK 编译器要求 dsl.OneOf 消费来自逻辑关联的条件分支组中的任务,并且其中一个分支是 dsl.Else 分支。

例如,以下流水线使用 dsl.OneOf 收集互斥分支的输出

from kfp import dsl

@dsl.component
def flip_three_sided_coin() -> str:
    import random
    return random.choice(['heads', 'tails', 'draw'])

@dsl.component
def print_and_return(text: str) -> str:
    print(text)
    return text

@dsl.component
def announce_result(result: str):
    print(f'The result is: {result}')

@dsl.pipeline
def my_pipeline() -> str:
    coin_flip_task = flip_three_sided_coin()
    with dsl.If(coin_flip_task.output == 'heads'):
        t1 = print_and_return(text='Got heads!')
    with dsl.Elif(coin_flip_task.output == 'tails'):
        t2 = print_and_return(text='Got tails!')
    with dsl.Else():
        t3 = print_and_return(text='Draw!')
    
    oneof = dsl.OneOf(t1.output, t2.output, t3.output)
    announce_result(oneof)
    return oneof

您应该使用 .output.outputs[<key>] 将任务输出提供给 dsl.OneOf,就像您将输出传递给下游任务一样。提供给 dsl.OneOf 的输出必须是相同类型,并且不能是其他 dsl.OneOfdsl.Collected 实例。

循环

Kubeflow Pipelines 支持导致任务扇出和扇入的循环。

dsl.ParallelFor

dsl.ParallelFor 上下文管理器允许对静态项目集并行执行任务。

上下文管理器接受三个参数

  • items:要循环遍历的静态项目集
  • name(可选):循环上下文的名称
  • parallelism(可选):执行 dsl.ParallelFor 组时并发迭代的最大数量
    • 注意,parallelism=0 表示无约束并行

在以下流水线中,train_model 将为 1、5、10 和 25 个 epoch 训练模型,同时最多运行两个训练任务

from kfp import dsl

#@dsl.component
#def train_model(epochs: int) -> Model:
#    ...

@dsl.pipeline
def my_pipeline():
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
        parallelism=2
    ) as epochs:
        train_model(epochs=epochs)

dsl.Collected

dsl.Collecteddsl.ParallelFor 一起使用,以收集并行任务循环的输出。

示例:dsl.Collected 用作下游任务的输入

下游任务可以通过使用 List 参数或 List 制品注释的输入来消费 dsl.Collected 输出。

例如,在以下流水线中,max_accuracy 具有类型为 Input[List[Model]] 的输入 models,并将从并行循环中训练的模型中找到准确率最高的模型

from kfp import dsl
from kfp.dsl import Model, Input

#def score_model(model: Model) -> float:
#    return ...

#@dsl.component
#def train_model(epochs: int) -> Model:
#    ...

@dsl.component
def max_accuracy(models: Input[List[Model]]) -> float:
    return max(score_model(model) for model in models)

@dsl.pipeline
def my_pipeline():
    
    # Train a model for 1, 5, 10, and 25 epochs
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
    ) as epochs:
        train_model_task = train_model(epochs=epochs)
        
    # Find the model with the highest accuracy
    max_accuracy(
        models=dsl.Collected(train_model_task.outputs['model'])
    )

示例:参数的嵌套列表

您可以使用 dsl.Collected 收集嵌套循环中的输出,形成参数的嵌套列表

例如,来自两个嵌套 dsl.ParallelFor 组的输出参数被收集在一个多级嵌套参数列表中,其中每个嵌套列表包含来自一个 dsl.ParallelFor 组的输出参数。嵌套级别数取决于嵌套 dsl.ParallelFor 上下文的数量。

相比之下,在嵌套循环中创建的制品被收集在扁平列表中。

示例:从流水线返回 dsl.Collected

您还可以从流水线返回一个 dsl.Collected

在返回注释中使用 List 参数或 List 制品,如下例所示

from typing import List

from kfp import dsl
from kfp.dsl import Model

#@dsl.component
#def train_model(epochs: int) -> Model:
#    ...

@dsl.pipeline
def my_pipeline() -> List[Model]:
    with dsl.ParallelFor(
        items=[1, 5, 10, 25],
    ) as epochs:
        train_model_task = train_model(epochs=epochs)
    return dsl.Collected(train_model_task.outputs['model'])

退出处理

Kubeflow Pipelines 支持退出处理程序,用于实现主流水线任务执行完毕后运行的清理和错误处理任务。

dsl.ExitHandler

dsl.ExitHandler 上下文管理器允许流水线作者指定一个退出任务,该任务将在上下文管理器范围内任务执行完毕后运行,即使其中一个任务失败。这个结构类似于在普通 Python 中使用 try: 块后接 finally: 块,其中退出任务位于 finally: 块中。上下文管理器接受两个参数:必需的 exit_task 和可选的 nameexit_task 接受一个实例化的 PipelineTask

上下文管理器接受两个参数:必需的 exit_task 和可选的 nameexit_task 接受一个实例化的 PipelineTask

示例:基本清理任务

dsl.ExitHandler 最常见的用例是在主流水线任务执行完毕后运行清理任务。

在以下流水线中,clean_up_task 将在 create_datasettrain_and_save_models 都完成(无论它们是成功还是失败)后执行

from kfp import dsl
from kfp.dsl import Dataset

#@dsl.component
#def clean_up_resources():
#    ...

#@dsl.component
#def create_datasets():
#    ...

#@dsl.component
#def train_and_save_models(dataset: Dataset):
#    ...

@dsl.pipeline
def my_pipeline():
    clean_up_task = clean_up_resources()
    with dsl.ExitHandler(exit_task=clean_up_task):
        dataset_task = create_datasets()
        train_task = train_and_save_models(dataset=dataset_task.output)

示例:访问流水线和任务状态元数据

您用作退出任务的任务可以使用一个特殊输入,该输入提供对流水线和任务状态元数据的访问,包括流水线失败或成功状态。

您可以通过使用 dsl.PipelineTaskFinalStatus 注释来注释您的退出任务来使用此特殊输入。此参数的参数将在运行时由后端自动提供。实例化您的退出任务时,您不应为此注释提供任何输入。

以下流水线使用 dsl.PipelineTaskFinalStatus 来获取关于流水线和任务失败的信息,即使在 fail_op 失败后也是如此

from kfp import dsl
from kfp.dsl import PipelineTaskFinalStatus

@dsl.component
def print_op(message: str):
    print(message)

@dsl.component
def exit_op(user_input: str, status: PipelineTaskFinalStatus):
    """Prints pipeline run status."""
    print(user_input)
    print('Pipeline status: ', status.state)
    print('Job resource name: ', status.pipeline_job_resource_name)
    print('Pipeline task name: ', status.pipeline_task_name)
    print('Error code: ', status.error_code)
    print('Error message: ', status.error_message)

@dsl.component
def fail_op():
    import sys
    sys.exit(1)

@dsl.pipeline
def my_pipeline():
    print_op(message='Starting pipeline...')
    print_status_task = exit_op(user_input='Task execution status:')
    with dsl.ExitHandler(exit_task=print_status_task):
        fail_op()

示例:忽略上游任务失败

.ignore_upstream_failure()PipelineTask 上的任务方法,它提供了另一种编写带有退出处理行为的流水线的方法。调用此方法将导致任务忽略任何指定的上游任务的失败(通过数据交换或使用 .after() 建立)。如果任务没有上游任务,则此方法无效。

在以下流水线定义中,无论 fail_op 是否成功,clean_up_task 都将在 fail_task 之后执行

from kfp import dsl

@dsl.component
def cleanup_op(message: str = 'Cleaning up...'):
    print(message)

@dsl.component
def fail_op(message: str):
    print(message)
    raise ValueError('Task failed!')

@dsl.pipeline()
def my_pipeline(text: str = 'message'):
    fail_task = fail_op(message=text)
    clean_up_task = cleanup_op(
        message=fail_task.output
    ).ignore_upstream_failure()

请注意,用于调用任务的组件(上例中的 cleanup_op)需要为其从上游任务消费的所有输入设置默认值。如果在上游任务未能生成传递给调用任务的输出时,将应用默认值。指定默认值可确保调用任务始终成功,无论上游任务的状态如何。

反馈

此页面是否有帮助?