容器组件

通过任意容器定义创建组件

在 KFP 中,每个任务执行都对应一个容器执行。这意味着所有组件,甚至是 Python 组件,都是由 imagecommandargs 定义的。

Python 组件是独特的,因为它们将容器定义的大部分方面抽象出来,方便用户构建使用纯 Python 的组件。在底层,KFP SDK 为用户设置执行 Python 组件所需的 imagecommandsargs 值。

与 Python 组件不同,容器组件允许组件作者直接设置 imagecommandargs 这使得可以在 KFP Python SDK 中创作执行 shell 脚本、使用其他语言和二进制文件等的组件。

一个简单的容器组件

下面从一个简单的 say_hello 容器组件开始,并逐步修改它,直到它与 Hello World 流水线示例 中的 say_hello 组件等效。这是一个简单的容器组件

from kfp import dsl

@dsl.container_component
def say_hello():
    return dsl.ContainerSpec(image='alpine', command=['echo'], args=['Hello'])

要创建容器组件,使用 dsl.container_component 装饰器并创建一个返回 dsl.ContainerSpec 对象的函数。dsl.ContainerSpec 接受三个参数:imagecommandargs。上面的组件在运行镜像 alpine 的容器中运行命令 echo,参数为 Hello

容器组件可以像 Python 组件一样在流水线中使用

from kfp import dsl
from kfp import compiler

@dsl.pipeline
def hello_pipeline():
    say_hello()

compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

如果你运行此流水线,你会在 say_hello 的日志中看到字符串 Hello

使用组件输入

为了更有用,say_hello 应该能够接受参数。你可以修改 say_hello 使其接受一个输入参数 name

from kfp import dsl

@dsl.container_component
def say_hello(name: str):
    return dsl.ContainerSpec(image='alpine', command=['echo'], args=[f'Hello, {name}!'])

容器组件函数中的参数和注解声明了组件的接口。在这种情况下,组件有一个输入参数 name,没有输出参数。

当你编译此组件时,name 将被一个占位符替换。在运行时,此占位符将被提供给 say_hello 组件的实际值替换。

实现此组件的另一种方法是使用 sh -c 从单个字符串读取命令并将名称作为参数传递。这种方法通常更灵活,因为它很容易链接多个命令。

from kfp import dsl

@dsl.container_component
def say_hello(name: str):
    return dsl.ContainerSpec(image='alpine', command=['sh', '-c', 'echo Hello, $0!'], args=[name])

当你使用参数 name='World' 运行组件时,你会在 say_hello 的日志中看到字符串 'Hello, World!'

创建组件输出

与 Python 函数不同,容器没有标准的返回值机制。为了使容器组件能够产生输出,KFP 要求你将输出写入容器内的一个文件。KFP 将读取此文件并持久化输出。

要从 say_hello 组件返回一个输出字符串,你可以使用 dsl.OutputPath(str) 注解向函数添加一个输出参数。

@dsl.container_component
def say_hello(name: str, greeting: dsl.OutputPath(str)):
    ...

此组件现在有一个名为 name 的输入参数,类型为 str,以及一个名为 greeting 的输出参数,类型也为 str。在运行时,使用 dsl.OutputPath 注解的参数将获得一个系统生成的路径作为参数。你的组件逻辑应将输出值以 JSON 格式写入此路径。greeting: dsl.OutputPath(str) 中的参数 str 描述了输出 greeting 的类型(例如,写入路径 greeting 的 JSON 将是一个字符串)。你可以填充 commandargs 来写入输出。

@dsl.container_component
def say_hello(name: str, greeting: dsl.OutputPath(str)):
    """Log a greeting and return it as an output."""

    return dsl.ContainerSpec(
        image='alpine',
        command=[
            'sh', '-c', '''RESPONSE="Hello, $0!"\
                            && echo $RESPONSE\
                            && mkdir -p $(dirname $1)\
                            && echo $RESPONSE > $1
                            '''
        ],
        args=[name, greeting])

在流水线中使用

最后,你可以在流水线中使用更新后的 say_hello 组件。

from kfp import dsl
from kfp import compiler

@dsl.pipeline
def hello_pipeline(person_to_greet: str) -> str:
    # greeting argument is provided automatically at runtime!
    hello_task = say_hello(name=person_to_greet)
    return hello_task.outputs['greeting']

compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

注意,在构建流水线时,你无需向组件提供输出参数;输出参数始终由后端在运行时自动提供。

这应该与 Hello World 流水线 非常相似,但有一个关键区别:由于 greeting 是一个命名输出参数,我们使用 hello_task.outputs['greeting'] 而不是 hello_task.output 从流水线中访问并返回它。数据传递在 流水线基础知识 中有更详细的讨论。

特殊占位符

这三种组件编写风格都通过容器 commandargs 中的占位符自动处理数据传递到你的组件。一般来说,你无需了解其工作原理。容器组件还允许你直接利用两个特殊的占位符:dsl.ConcatPlaceholderdsl.IfPresentPlaceholder

你只能在通过 @dsl.container_component 装饰器编写的容器组件所返回的 dsl.ContainerSpec 中使用这些占位符。

dsl.ConcatPlaceholder

当你提供容器 command 或容器 args 作为字符串列表时,列表中的每个元素使用空格分隔符进行连接,然后在运行时发送到容器。在两个字符串之间连接一个输入而没有空格分隔符需要 dsl.ConcatPlaceholder 提供的特殊处理。

dsl.ConcatPlaceholder 接受一个参数 items,它可以是静态字符串、上游输出、流水线参数或 dsl.ConcatPlaceholderdsl.IfPresentPlaceholder 的其他实例的任意组合列表。在运行时,这些字符串将无分隔符地连接在一起。

例如,你可以使用 dsl.ConcatPlaceholder 连接文件路径前缀、后缀和扩展名

from kfp import dsl

@dsl.container_component
def concatenator(prefix: str, suffix: str):
    return dsl.ContainerSpec(
        image='alpine',
        command=[
            'my_program.sh'
        ],
        args=['--input', dsl.ConcatPlaceholder([prefix, suffix, '.txt'])]
    )

dsl.IfPresentPlaceholder

dsl.IfPresentPlaceholder 用于条件性地提供命令行参数。dsl.IfPresentPlaceholder 接受三个参数:input_namethen 和可选的 else_。通过示例最容易理解此占位符

@dsl.container_component
def hello_someone(optional_name: str = None):
    return dsl.ContainerSpec(
        image='python:3.7',
        command=[
            'say_hello',
            dsl.IfPresentPlaceholder(
                input_name='optional_name', then=['--name', optional_name])
        ])

如果将 'world' 作为参数传递给 optional_name 字段的 hello_someone 组件,该组件将把 --name world 传递给可执行文件 say_hello。如果未提供 optional_name,则省略 --name world

第三个参数 else_ 可用于在未提供 input_name 时提供一个默认值作为备用。例如

@dsl.container_component
def hello_someone(optional_name: str = None):
    return dsl.ContainerSpec(
        image='python:3.7',
        command=[
            'say_hello',
            dsl.IfPresentPlaceholder(
                input_name='optional_name',
                then=['--name', optional_name],
                else_=['--name', 'friend'])
        ])

thenelse_ 的参数可以是静态字符串、上游输出、流水线参数或 dsl.ConcatPlaceholderdsl.IfPresentPlaceholder 的其他实例的任意组合列表。

反馈

此页面有帮助吗?


上次修改时间:2024 年 6 月 20 日:重构 Kubeflow Pipelines 文档 (#3737) (8e56df7)