将组件组成流水线

虽然组件有三种编写方法,但流水线只有一种编写方法:它们是使用带有 @dsl.pipeline 装饰器的流水线函数定义的。以下面的 pythagorean 流水线为例,它通过简单的算术组件将勾股定理实现为流水线

from kfp import dsl

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    sum_task = add(x=a_sq_task.output, y=b_sq_task.output)
    return square_root(x=sum_task.output).output

虽然使用 @dsl.pipeline 装饰器装饰的 KFP 流水线看起来像一个普通的 Python 函数,但它实际上是流水线拓扑和控制流语义的表达,使用 KFP 领域特定语言 (DSL) 构建。

流水线定义有四个部分

  1. 流水线装饰器
  2. 在函数签名中声明的输入和输出
  3. 数据传递和任务依赖
  4. 任务配置
  5. 流水线控制流

本节介绍前四个部分。控制流在下一节中介绍。

流水线装饰器

KFP 流水线定义在带有 @dsl.pipeline 装饰器的函数内部。该装饰器接受三个可选参数

  • name 是流水线的名称。如果未提供,名称默认为流水线函数名称的规范化版本。
  • description 是对流水线的描述。
  • pipeline_root 是远程存储目标的根路径,流水线中的任务将在此创建输出。流水线提交客户端也可以设置或覆盖 pipeline_root
  • display_name 是流水线的人类可读名称。

您可以修改 pythagorean 的定义以使用这些参数

@dsl.pipeline(name='pythagorean-theorem-pipeline',
              description='Solve for the length of a hypotenuse of a triangle with sides length `a` and `b`.',
              pipeline_root='gs://my-pipelines-bucket',
              display_name='Pythagorean pipeline.')
def pythagorean(a: float, b: float) -> float:
    ...

另请参阅附加功能:组件文档字符串格式,了解如何通过文档字符串提供流水线元数据。

流水线输入和输出

组件一样,流水线输入和输出由流水线函数签名中的参数和注解定义。

在前面的示例中,pythagorean 接受输入 ab,每个类型为 float,并创建一个 float 输出。

流水线输入通过函数输入参数/注解声明,流水线输出通过函数输出注解声明。与使用输出 Artifact使用 dsl.OutputPath 的容器组件不同,流水线输出*绝不会通过流水线函数输入参数*声明。

有关如何声明流水线函数输入和输出的更多信息,请参阅数据类型

数据传递和任务依赖

在流水线定义中调用组件时,它会构造一个 PipelineTask 实例。您可以使用 PipelineTask.output.outputs 属性在任务之间传递数据。

对于具有由单个返回注解指示的单个无名输出的任务,请使用 PipelineTask.output 访问输出。组件 squareaddsquare_root 都属于这种情况,它们各自都有一个无名输出。

对于具有多个输出或命名输出的任务,请使用 PipelineTask.outputs['<output-key>'] 访问输出。在数据类型:参数中更详细地描述了使用命名输出参数。

在没有数据交换的情况下,任务将并行运行以实现高效的流水线执行。a_sq_taskb_sq_task 就是这种情况,它们不进行数据交换。

当任务交换数据时,这些任务之间会建立执行顺序。这是为了确保上游任务在其输出创建完成后,下游任务再尝试使用这些输出。例如,在 pythagorean 中,后端将在执行 sum_task 之前执行 a_sq_taskb_sq_task。同样,它将在执行从 square_root 组件创建的最终任务之前执行 sum_task

在某些情况下,您可能希望在没有数据交换的情况下建立执行顺序。在这种情况下,您可以在另一个任务上调用一个任务的 .after() 方法。例如,虽然 a_sq_taskb_sq_task 不交换数据,但我们可以指定 a_sq_taskb_sq_task 之前运行

@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    b_sq_task.after(a_sq_task)
    ...

特殊输入类型

您可以将一些特殊的输入值传递给流水线定义中的组件,以使组件能够访问自身的某些元数据。这些值可以传递给类型为 str 的输入参数。

例如,以下 print_op 组件使用 dsl.PIPELINE_JOB_NAME_PLACEHOLDER 在组件运行时打印流水线作业名称

from kfp import dsl

@dsl.pipeline
def my_pipeline():
    print_op(text=dsl.PIPELINE_JOB_NAME_PLACEHOLDER)

可以使用这种风格的几个特殊值,包括

  • dsl.PIPELINE_JOB_NAME_PLACEHOLDER
  • dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
  • dsl.PIPELINE_JOB_ID_PLACEHOLDER
  • dsl.PIPELINE_TASK_NAME_PLACEHOLDER
  • dsl.PIPELINE_TASK_ID_PLACEHOLDER
  • dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER
  • dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER
  • dsl.PIPELINE_ROOT_PLACEHOLDER

有关每个特殊输入提供的数据的更多信息,请参阅KFP SDK DSL 参考文档

任务配置

KFP SDK 通过任务方法公开了几个与平台无关的任务级别配置。与平台无关的配置是指在所有符合 KFP 标准的后端(例如开源 KFP 后端Google Cloud Vertex AI Pipelines)上预期表现出相似执行行为的配置。

所有与平台无关的任务级别配置都使用 PipelineTask 方法设置。以下是一个环境变量示例

from kfp import dsl

@dsl.component
def print_env_var():
    import os
    print(os.environ.get('MY_ENV_VAR'))

@dsl.pipeline()
def my_pipeline():
    task = print_env_var()
    task.set_env_variable('MY_ENV_VAR', 'hello')

执行时,print_env_var 组件应该打印 'hello'

任务级别配置方法也可以链式调用

print_env_var().set_env_variable('MY_ENV_VAR', 'hello').set_env_variable('OTHER_VAR', 'world')

KFP SDK 提供了以下用于设置任务级别配置的任务方法

  • .add_accelerator_type
  • .set_accelerator_limit
  • .set_cpu_limit
  • .set_memory_limit
  • .set_env_variable
  • .set_caching_options
  • .set_display_name
  • .set_retry
  • .ignore_upstream_failure

有关这些方法的更多信息,请参阅PipelineTask 参考文档

将流水线用作组件

流水线本身可以在其他流水线中用作组件,就像在流水线中使用任何其他单步骤组件一样。例如,我们可以轻松地重新组织前面的 pythagorean 流水线,以使用内部辅助流水线 square_and_sum

from kfp import dsl

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def square_and_sum(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    return add(x=a_sq_task.output, y=b_sq_task.output).output

@dsl.pipeline
def pythagorean(a: float = 1.2, b: float = 1.2) -> float:
    sq_and_sum_task = square_and_sum(a=a, b=b)
    return square_root(x=sq_and_sum_task.output).output

反馈

此页面是否有帮助?


最后修改于 2024 年 7 月 5 日:Fixed broken fragments in links (#3790) (36c2ce1)