在本地执行 KFP 流水线
概述
KFP 支持在本地执行组件和流水线,这有助于在远程运行代码之前进行紧密的开发循环。
在本地执行组件和流水线很容易。只需使用 local.init()
初始化一个本地会话,然后像调用普通的 Python 函数一样调用组件或流水线。KFP 将记录有关执行的信息。执行完成后,您可以像组合流水线时一样访问任务输出;唯一的区别在于输出现在是具体的值,而不是未来输出的引用。
限制
本地执行旨在帮助您在远程环境中测试组件和流水线之前,快速在本地对其进行测试。
本地执行存在一些限制
- 本地执行不具备优化和附加功能,例如缓存、重试等。虽然这些功能对于生产流水线很重要,但在本地测试环境中并不那么关键。您会发现诸如
.set_retry
、.set_caching_options
等任务方法在本地不起作用。 - 本地执行对您机器上可用的资源做出了简单的假设。本地执行不支持指定与内存、核心、加速器等相关的资源请求/限制/亲和性。您会发现诸如
.set_memory_limit
、.set_memory_request
、.set_accelerator_type
等任务方法在本地不起作用。 - 本地执行不支持身份验证机制。如果您的组件与云资源交互或需要其他特权操作,您必须在云中测试您的流水线。
- 虽然本地流水线执行完全支持顺序和嵌套流水线,但尚不支持
dsl.Condition
、dsl.ParallelFor
或dsl.ExitHandler
。
基本示例
在下面的示例中,我们使用了 DockerRunner
类型,下面将更详细地介绍运行器类型。
from kfp import local
from kfp import dsl
local.init(runner=local.DockerRunner())
@dsl.component
def add(a: int, b: int) -> int:
return a + b
# run a single component
task = add(a=1, b=2)
assert task.output == 3
# or run it in a pipeline
@dsl.pipeline
def math_pipeline(x: int, y: int, z: int) -> int:
t1 = add(a=x, b=y)
t2 = add(a=t1.output, b=z)
return t2.output
pipeline_task = math_pipeline(x=1, y=2, z=3)
assert pipeline_task.output == 6
类似地,您可以创建制品并读取其内容
from kfp import local
from kfp import dsl
from kfp.dsl import Output, Artifact
import json
local.init(runner=local.SubprocessRunner())
@dsl.component
def add(a: int, b: int, out_artifact: Output[Artifact]):
import json
result = json.dumps(a + b)
with open(out_artifact.path, 'w') as f:
f.write(result)
out_artifact.metadata['operation'] = 'addition'
task = add(a=1, b=2)
# can read artifact contents
with open(task.outputs['out_artifact'].path) as f:
contents = f.read()
assert json.loads(contents) == 3
assert task.outputs['out_artifact'].metadata['operation'] == 'addition'
默认情况下,如果组件以失败状态退出,KFP 将抛出异常。您可以使用 raise_on_error
来切换此行为。您还可以使用 pipeline_root
指定新的本地“流水线根目录”。这是组件输出(包括制品)写入的本地目录。
local.init(runner=...,
raise_on_error=False,
pipeline_root='~/my/component/outputs')
运行器类型
Kubeflow 流水线有两种本地运行器,可用于在本地执行组件和流水线:DockerRunner
和 SubprocessRunner
。
强烈建议尽可能使用 DockerRunner
。
运行器: DockerRunner
DockerRunner
需要安装 Docker,但使用时基本上不需要 Docker 知识。
例如,要使用 DockerRunner
from kfp import local
local.init(runner=local.DockerRunner())
由于本地 DockerRunner
在单独的容器中执行每个任务,因此 DockerRunner
- 提供了最强的本地运行时环境隔离形式
- 最接近远程运行时环境
- 允许执行所有组件类型:轻量级 Python 组件、容器化 Python 组件和容器组件
当您使用 DockerRunner
时,KFP 会将您的本地流水线根目录挂载到容器中,以便将输出写入容器外部。这意味着即使容器退出后,您的组件输出仍然可供检查。
运行器: SubprocessRunner
仅在无法安装 Docker 的情况下才推荐使用 SubprocessRunner
,例如在某些 Notebook 环境中。
例如,要使用 SubprocessRunner
from kfp import local
local.init(runner=local.SubprocessRunner())
由于 SubprocessRunner
在子进程中运行您的代码,因此 SubprocessRunner
- 提供的本地运行时环境隔离性不如
DockerRunner
- 不支持自定义镜像或轻松支持具有复杂环境依赖的任务
- 仅允许执行轻量级 Python 组件
提示
默认情况下,SubprocessRunner
会将您的依赖项安装到虚拟环境中。
这是推荐做法,但可以通过设置 use_venv=False
来禁用
from kfp import local
local.init(runner=local.SubprocessRunner(use_venv=False))