在本地执行 KFP 流水线

了解如何在本地运行 Kubeflow 流水线。

概述

KFP 支持在本地执行组件和流水线,这有助于在远程运行代码之前进行紧密的开发循环。

在本地执行组件和流水线很容易。只需使用 local.init() 初始化一个本地会话,然后像调用普通的 Python 函数一样调用组件或流水线。KFP 将记录有关执行的信息。执行完成后,您可以像组合流水线时一样访问任务输出;唯一的区别在于输出现在是具体的值,而不是未来输出的引用。

限制

本地执行旨在帮助您在远程环境中测试组件和流水线之前,快速在本地对其进行测试

本地执行存在一些限制

  • 本地执行不具备优化和附加功能,例如缓存、重试等。虽然这些功能对于生产流水线很重要,但在本地测试环境中并不那么关键。您会发现诸如 .set_retry.set_caching_options 等任务方法在本地不起作用。
  • 本地执行对您机器上可用的资源做出了简单的假设。本地执行不支持指定与内存、核心、加速器等相关的资源请求/限制/亲和性。您会发现诸如 .set_memory_limit.set_memory_request.set_accelerator_type 等任务方法在本地不起作用。
  • 本地执行不支持身份验证机制。如果您的组件与云资源交互或需要其他特权操作,您必须在云中测试您的流水线。
  • 虽然本地流水线执行完全支持顺序和嵌套流水线,但尚不支持 dsl.Conditiondsl.ParallelFordsl.ExitHandler

基本示例

在下面的示例中,我们使用了 DockerRunner 类型,下面将更详细地介绍运行器类型

from kfp import local
from kfp import dsl

local.init(runner=local.DockerRunner())

@dsl.component
def add(a: int, b: int) -> int:
    return a + b

# run a single component
task = add(a=1, b=2)
assert task.output == 3

# or run it in a pipeline
@dsl.pipeline
def math_pipeline(x: int, y: int, z: int) -> int:
    t1 = add(a=x, b=y)
    t2 = add(a=t1.output, b=z)
    return t2.output

pipeline_task = math_pipeline(x=1, y=2, z=3)
assert pipeline_task.output == 6

类似地,您可以创建制品并读取其内容

from kfp import local
from kfp import dsl
from kfp.dsl import Output, Artifact
import json

local.init(runner=local.SubprocessRunner())

@dsl.component
def add(a: int, b: int, out_artifact: Output[Artifact]):
    import json

    result = json.dumps(a + b)

    with open(out_artifact.path, 'w') as f:
        f.write(result)

    out_artifact.metadata['operation'] = 'addition'


task = add(a=1, b=2)

# can read artifact contents
with open(task.outputs['out_artifact'].path) as f:
    contents = f.read()

assert json.loads(contents) == 3
assert task.outputs['out_artifact'].metadata['operation'] == 'addition'

默认情况下,如果组件以失败状态退出,KFP 将抛出异常。您可以使用 raise_on_error 来切换此行为。您还可以使用 pipeline_root 指定新的本地“流水线根目录”。这是组件输出(包括制品)写入的本地目录。

local.init(runner=...,
           raise_on_error=False,
           pipeline_root='~/my/component/outputs')

运行器类型

Kubeflow 流水线有两种本地运行器,可用于在本地执行组件和流水线:DockerRunnerSubprocessRunner

强烈建议尽可能使用 DockerRunner

运行器: DockerRunner

DockerRunner 需要安装 Docker,但使用时基本上不需要 Docker 知识。

例如,要使用 DockerRunner

from kfp import local

local.init(runner=local.DockerRunner())

由于本地 DockerRunner 在单独的容器中执行每个任务,因此 DockerRunner

当您使用 DockerRunner 时,KFP 会将您的本地流水线根目录挂载到容器中,以便将输出写入容器外部。这意味着即使容器退出后,您的组件输出仍然可供检查。

运行器: SubprocessRunner

仅在无法安装 Docker 的情况下才推荐使用 SubprocessRunner,例如在某些 Notebook 环境中。

例如,要使用 SubprocessRunner

from kfp import local

local.init(runner=local.SubprocessRunner())

由于 SubprocessRunner 在子进程中运行您的代码,因此 SubprocessRunner

  • 提供的本地运行时环境隔离性不如 DockerRunner
  • 不支持自定义镜像或轻松支持具有复杂环境依赖的任务
  • 仅允许执行轻量级 Python 组件

反馈

此页面有帮助吗?


最后修改于 2024年12月16日:更新本地执行限制 (#3944) (d49ed65)