迁移到 Kubeflow Pipelines v2

迁移到新的 Kubeflow Pipelines v2 后端和 SDK。

概述

Kubeflow Pipelines V2 是 Kubeflow Pipelines (KFP) 平台的一次重大更新。

KFP V2 引入的关键特性包括

  • 更具 Python 风格的 SDK - 使用诸如 (@dsl.pipeline, @dsl.component, @dsl.container_component) 的装饰器
  • 与 Argo Workflows 解耦 - 将 pipeline 编译为通用的 IR YAML,而不是 Argo Workflow YAML
  • 增强的 Workflow GUI - 可视化 pipeline、子 DAG(嵌套 pipeline)、循环和 artifact(数据集、模型和指标),以帮助您理解和调试 pipeline

版本矩阵

包含 Kubeflow Pipelines V2 后端的第一个 Kubeflow Platform 版本是 Kubeflow 1.8

下表显示了每个 Kubeflow Platform 版本包含的 KFP 后端版本

发布日期Kubeflow Platform 版本KFP 后端版本SDK 模式:v1SDK 模式:v2SDK 模式:v2-兼容
2024-07-22Kubeflow 1.92.2.0
2023-11-01Kubeflow 1.82.0.3
2023-03-29Kubeflow 1.72.0.0-alpha.7
2022-10-10Kubeflow 1.62.0.0-alpha.5
2022-06-15Kubeflow 1.51.8.2

向后兼容性

如果您现有的 KFP Pipeline 是使用 V1 SDK 编译的,则无需任何更改即可在新的 KFP V2 后端上运行它们。如果您希望编写新的 pipeline,则需要进行一些推荐和必需的迁移步骤,详见下文。

术语

术语定义
SDK v1kfp Python SDK 的 1.x.x 版本。
SDK v2kfp Python SDK 的 2.x.x 版本。
SDK v1 (v2-namespace)V1 SDK 中提供的 V2 预览模块(例如 from kfp.v2 import *)。
仅由 Google Cloud Vertex AI Pipelines 用户使用。

迁移路径

您如何迁移到 KFP V2 将取决于您当前的 SDK 版本和使用情况。

有两种常见的迁移路径

  1. SDK v1SDK v2
  2. SDK v1 (v2-namespace) → SDK v2

从 ‘SDK v1’ 迁移到 ‘SDK v2’

KFP SDK v2 通常不向后兼容使用 KFP SDK v1 主命名空间的用户代码。本节介绍了一些重要的破坏性变更以及升级到 KFP SDK v2 的迁移步骤。

我们会指明每个破坏性变更影响 KFP OSS 后端用户还是 Google Cloud Vertex AI Pipelines 用户。

破坏性变更

点击展开
create_component_from_func 和 func_to_container_op 支持

影响: KFP OSS 用户和 Vertex AI Pipelines 用户

create_component_from_funcfunc_to_container_op 都用于 KFP SDK v1 中创建轻量级基于 Python 函数的组件。

这两个函数在 KFP SDK v2 中已被移除。

变更: 使用 @dsl.component 装饰器,详见轻量级 Python 组件容器化 Python 组件

旧用法新用法
from kfp.components import create_component_from_func
from kfp.components import func_to_container_op

@create_component_from_func
def component1(...):
    ...

def component2(...):
    ...

component2 = create_component_from_func(component2)

@func_to_container_op
def component3(...):
    ...

@dsl.pipeline(name='my-pipeline')
def pipeline():
    component1(...)
    component2(...)
    component3(...)
from kfp import dsl

@dsl.component
def component1(...):
    ...

@dsl.component
def component2(...):
    ...

@dsl.component
def component3(...):
    ...

@dsl.pipeline(name='my-pipeline')
def pipeline():
    component1(...)
    component2(...)
    component3(...)

必需使用关键字参数

影响: KFP OSS 用户和 Vertex AI Pipelines 用户

在 pipeline 定义中将组件实例化为任务时,必须使用关键字参数。

变更: 使用关键字参数。

旧用法新用法
def my_pipeline():
    trainer_component(100, 0.1)
def my_pipeline():
    trainer_component(epochs=100, learning_rate=0.1)

ContainerOp 支持

影响: KFP OSS 用户

ContainerOp 已于 2020 年年中弃用。ContainerOp 实例不包含输入和输出的描述,因此无法编译为 IR YAML

ContainerOp 在 v2 中被移除。

变更: 使用 @dsl.container_component 装饰器,详见容器组件

旧用法新用法
from kfp import dsl

# v1 ContainerOp will not be supported.
component_op = dsl.ContainerOp(...)

# v1 ContainerOp from class will not be supported.
class FlipCoinOp(dsl.ContainerOp):
from kfp import dsl

@dsl.container_component
def flip_coin(rand: int, result: dsl.OutputPath(str)):
  return ContainerSpec(
    image='gcr.io/flip-image'
    command=['flip'],
    arguments=['--seed', rand, '--result-file', result])

VolumeOp 和 ResourceOp 支持

影响: KFP OSS 用户

VolumeOpResourceOp 在 pipeline 定义中直接访问 Kubernetes 资源。在非 Kubernetes 平台上不支持这些功能。

KFP v2 通过 KFP SDK 扩展库支持平台特定功能。Kubernetes 特定功能在 KFP v2 中通过 kfp-kubernetes 扩展库支持。


v1 组件 YAML 支持

影响: KFP OSS 用户和 Vertex AI Pipelines 用户

KFP v1 支持使用 v1 组件 YAML 格式(示例)直接在 YAML 中编写组件。这种编写风格允许组件作者直接设置组件的 image、command 和 args。

在 KFP v2 中,组件和 pipeline 都被编译为相同的 IR YAML 格式,这与 v1 组件 YAML 格式不同。

为了向后兼容,KFP v2 将继续支持使用 components.load_component_from_file 函数和类似函数加载现有的 v1 组件 YAML。

变更: 要通过自定义 image、command 和 args 编写组件,请使用 @dsl.container_component 装饰器,详见容器组件。请注意,与编写 v1 组件 YAML 不同,容器组件不支持在组件本身上设置环境变量。环境变量应在使用 .set_env_variable 任务配置方法时,在 pipeline 定义中通过组件实例化的任务上设置。


v1 轻量级组件类型 InputTextFile、InputBinaryFile、OutputTextFile 和 OutputBinaryFile 支持

影响: KFP OSS 用户和 Vertex AI Pipelines 用户

这些类型确保使用 KFP SDK v1 编写的组件中的文件以文本模式或二进制模式写入。

KFP SDK v2 不支持使用这些类型进行编写,因为用户可以轻松自行实现。

变更: 组件作者应使用 KFP 的artifactparameter类型来处理输入和输出。


AIPlatformClient 支持

影响: Vertex AI Pipelines 用户

KFP SDK v1 包含一个 AIPlatformClient,用于将 pipeline 提交到 Vertex AI Pipelines

KFP SDK v2 不包含此客户端。

变更: 使用官方 Python Vertex SDKPipelineJob 类。

旧用法新用法
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

response = api_client.create_run_from_job_spec(
    job_spec_path=PACKAGE_PATH, pipeline_root=PIPELINE_ROOT,
)
# pip install google-cloud-aiplatform
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
)

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
)

job.submit()

run_as_aiplatform_custom_job 支持

影响: Vertex AI Pipelines 用户

KFP v1 的 run_as_aiplatform_custom_job 是一个实验性功能,允许将任何组件转换为 Vertex AI CustomJob

KFP v2 不包含此功能。

变更: 使用 Google Cloud Pipeline Componentcreate_custom_training_job_from_component 函数。

旧用法新用法
from kfp import components
from kfp.v2 import dsl
from kfp.v2.google.experimental import run_as_aiplatform_custom_job

training_op = components.load_component_from_url(...)

@dsl.pipeline(name='my-pipeline')
def pipeline():
  training_task = training_op(...)
  run_as_aiplatform_custom_job(
      training_task, ...)
# pip install google-cloud-pipeline-components
from kfp import components
from kfp import dsl
from google_cloud_pipeline_components.v1.custom_job import utils

training_op = components.load_component_from_url(...)

@dsl.pipeline(name='my-pipeline')
def pipeline():
    utils.create_custom_training_job_from_component(training_op, ...)

类型转换行为变更

影响: KFP OSS 用户和 Vertex AI Pipelines 用户

KFP SDK v1 的 pipeline 类型检查比 KFP SDK v2 更宽松。一些利用了这种宽松性的 pipeline 可能无法使用 KFP SDK v2 编译。例如,使用 float 类型定义的参数会接受字符串 "0.1"

from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp import components


@dsl.component
def train(
    number_of_epochs: int,
    learning_rate: float,
):
    print(f"number_of_epochs={number_of_epochs}")
    print(f"learning_rate={learning_rate}")


def training_pipeline(number_of_epochs: int = 1):
    train(
        number_of_epochs=number_of_epochs,
        learning_rate="0.1",  # string cannot be passed to float parameter using KFP SDK v2
    )

变更: 我们建议更新您的组件和 pipeline,严格使用类型。




从 ‘SDK v1 (v2-namespace)’ 迁移到 ‘SDK v2’

除少数例外,KFP SDK v2 向后兼容使用 KFP SDK v1 v2-namespace 的用户代码。

非破坏性变更

本节记录了 SDK v2 相对于 SDK v1 v2-namespace 的非破坏性变更。我们建议您将代码迁移到“新用法”,即使“旧用法”在带有警告的情况下仍然有效。

点击展开
导入命名空间

KFP SDK v1 v2-namespace 的导入 (from kfp.v2 import *) 应转换为从主命名空间的导入 (from kfp import *)。

变更: 从所有 KFP SDK v1 v2-namespace 导入中移除 .v2 模块。

旧用法新用法
from kfp.v2 import dsl
from kfp.v2 import compiler

@dsl.pipeline(name='my-pipeline')
def pipeline():
  ...

compiler.Compiler().compile(...)
from kfp import dsl
from kfp import compiler

@dsl.pipeline(name='my-pipeline')
def pipeline():
  ...

compiler.Compiler().compile(...)

output_component_file 参数

在 KFP SDK v2 中,组件可以像 pipeline 一样编译IR YAML 并从中加载

KFP SDK v1 v2-namespace 支持通过 @dsl.component 装饰器的 output_component_file 参数编译组件。这在 KFP SDK v2 中已弃用。如果您选择仍然使用此参数,您的 pipeline 将被编译为 IR YAML 而不是 v1 组件 YAML。

变更: 移除 output_component_file 的使用。替换为调用 Compiler().compile()

旧用法新用法
from kfp.v2.dsl import component

@component(output_component_file='my_component.yaml')
def my_component(input: str):
   ...
from kfp.dsl import component
from kfp import compiler

@component()
def my_component(input: str):
   ...

compiler.Compiler().compile(my_component, 'my_component.yaml')

Pipeline 包文件扩展名

KFP 编译器将根据提供给编译器的扩展名(.yaml.json)编译您的 pipeline。

在 KFP SDK v2 中,YAML 是首选的序列化格式。

变更: 将使用 .json 扩展名的 package_path 参数转换为使用 .yaml 扩展名。

旧用法新用法
from kfp.v2 import compiler
# .json extension, deprecated format
compiler.Compiler().compile(pipeline, package_path='my_pipeline.json')
from kfp import compiler
# .yaml extension, preferred format
compiler.Compiler().compile(pipeline, package_path='my_pipeline.yaml')

破坏性变更

SDK v2 相对于 SDK v1 v2-namespace 只有少数细微的破坏性变更。

点击展开
停止支持 Python 3.6

KFP SDK v1 支持 Python 3.6。KFP SDK v2 支持 Python >=3.7.0,<3.12.0。


CLI 输出变更

v2 KFP CLI 更一致、更易读且更易解析。解析 v1 CLI 输出的代码可能无法解析 v2 CLI 输出。


在 dsl.ParallelFor 循环中引用上游任务的 .after

以下 pipeline 无法在 KFP SDK v2 中编译

with dsl.ParallelFor(...):
    t1 = comp()
t2 = comp().after(t1)

此用法主要由实现了自定义 dsl.ParallelFor 扇入的 KFP SDK v1 用户使用。KFP SDK v2 使用 dsl.Collected 本地支持从 dsl.ParallelFor 进行扇入。请参阅控制流用户文档了解说明。


导入器组件导入语句

importer_node 对象的位置已更改。

变更:kfp.dsl 导入。

旧用法新用法
from kfp.components import importer_node
from kfp.dsl import importer_node

添加节点选择器约束/加速器

任务方法 .add_node_selector_constraint 已弃用,转而使用 .set_accelerator_type。与 .add_node_selector_constraint 的先前实现相比,这两个方法都移除了 label_name 参数,并且 value 参数被参数 accelerator 替换。

变更: 使用 task.set_accelerator_type(accelerator=...)。将之前的 value 参数提供给 accelerator 参数。省略 label_name

旧用法新用法
@dsl.pipeline
def my_pipeline():
    task.add_node_selector_constraint(
        label_name='cloud.google.com/gke-accelerator',
        value='NVIDIA_TESLA_A100',
    )
@dsl.pipeline
def my_pipeline():
    task.set_accelerator_type(accelerator="NVIDIA_TESLA_K80")


我们是否遗漏了什么?

如果您认为我们遗漏了重要的破坏性变更或迁移步骤,请在 kubeflow/pipelines 仓库创建一个 issue 来描述该变更。

反馈

此页面是否有帮助?


最后修改于 2025年3月29日:website: 添加深色主题 (#3981) (4f092f1)