DSL 静态类型检查
旧版本
此页面是关于 Kubeflow Pipelines V1 的内容,请参阅 V2 文档以获取最新信息。
请注意,虽然 V2 后端能够运行由 V1 SDK 提交的 Pipeline,但我们强烈建议迁移到 V2 SDK。作为参考,V1 SDK 的最终版本是 kfp==1.8.22
,其参考文档可在此处获取。
本文档介绍如何在 Pipeline 中集成类型信息,并利用静态类型检查加快开发迭代。
动机
Pipeline 是由组件组成的工作流,每个组件包含输入和输出。DSL 编译器支持静态类型检查,以确保同一 Pipeline 中组件输入/输出的类型一致性。静态类型检查可帮助您在不运行 Pipeline 的情况下识别组件输入/输出不一致问题。它还可以通过尽早捕获错误来缩短开发周期。此功能在以下两种情况下特别有用:
- 当 Pipeline 非常大且手动检查类型不可行时;
- 当某些组件是共享组件且类型信息无法立即获取时。
类型系统
在 Kubeflow Pipeline 中,类型被定义为具有 OpenAPI Schema 属性的类型名称,该属性定义输入参数的模式。警告:Pipeline 系统目前在您提交 Pipeline 运行时不会根据模式检查输入值。但是,此功能将在不久的将来实现。
Pipelines SDK 中定义了一组核心类型,您可以使用这些核心类型或定义自定义类型。
在组件 YAML 中,类型被指定为字符串或包含 OpenAPI Schema 的字典,如下所示。“组件 a”需要一个 Integer 类型的输入,并输出三个类型分别为 GCSPath、customized_type 和 GCRPath 的输出。在这些类型中,Integer、GCSPath 和 GCRPath 是 SDK 中预定义的核心类型,而 customized_type 是用户自定义类型。
name: component a
description: component desc
inputs:
- {name: field_l, type: Integer}
outputs:
- {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
- {name: field_n, type: customized_type}
- {name: field_o, type: GCRPath}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3, /pipelines/component/src/train.py]
args: [
--field-l, {inputValue: field_l},
]
fileOutputs:
field_m: /schema.txt
field_n: /feature.txt
field_o: /output.txt
类似地,当您使用装饰器编写组件时,可以在函数签名中用类型注释输入/输出,如下所示。
from kfp.dsl import component
from kfp.dsl.types import Integer, GCRPath
@component
def task_factory_a(field_l: Integer()) -> {
'field_m': {
'GCSPath': {
'openapi_schema_validator':
'{"type": "string", "pattern": "^gs://.*$"}'
}
},
'field_n': 'customized_type',
'field_o': GCRPath()
}:
return ContainerOp(
name='operator a',
image='gcr.io/ml-pipeline/component-a',
command=['python3', '/pipelines/component/src/train.py'],
arguments=[
'--field-l',
field_l,
],
file_outputs={
'field_m': '/schema.txt',
'field_n': '/feature.txt',
'field_o': '/output.txt'
})
您也可以使用类型注释 Pipeline 输入,并且输入也会对照组件输入/输出类型进行检查。例如:
@component
def task_factory_a(
field_m: {
'GCSPath': {
'openapi_schema_validator':
'{"type": "string", "pattern": "^gs://.*$"}'
}
}, field_o: 'Integer'):
return ContainerOp(
name='operator a',
image='gcr.io/ml-pipeline/component-a',
arguments=[
'--field-l',
field_m,
'--field-o',
field_o,
],
)
# Pipeline input types are also checked against the component I/O types.
@dsl.pipeline(name='type_check', description='')
def pipeline(
a: {
'GCSPath': {
'openapi_schema_validator':
'{"type": "string", "pattern": "^gs://.*$"}'
}
} = 'good',
b: Integer() = 12):
task_factory_a(field_m=a, field_o=b)
try:
compiler.Compiler().compile(pipeline, 'pipeline.tar.gz', type_check=True)
except InconsistentTypeException as e:
print(e)
类型检查如何工作?
基本的检查标准是相等性检查。换句话说,只有当类型名称字符串相等且对应的 OpenAPI Schema 属性相等时,类型检查才会通过。类型检查失败的示例包括:
- “GCSPath” 对比 “GCRPath”
- “Integer” 对比 “Float”
- {‘GCSPath’: {‘openapi_schema_validator’: ‘{“type”: “string”, “pattern”: “^gs://.$”}’}} 对比
{‘GCSPath’: {‘openapi_schema_validator’: ‘{“type”: “string”, “pattern”: “^gcs://.$”}’}}
如果检测到类型不一致,将抛出 InconsistentTypeException。
类型检查配置
类型检查默认启用,可以通过两种方式禁用:
如果您通过编程方式编译 Pipeline
compiler.Compiler().compile(pipeline_a, 'pipeline_a.tar.gz', type_check=False)
如果您使用 dsl-compiler 工具编译 Pipeline
dsl-compiler --py pipeline.py --output pipeline.zip --disable-type-check
细粒度配置
有时,您可能希望启用类型检查但禁用某些参数。例如,当上游组件生成类型为“Float”的输出而下游组件可以接收“Float”或“Integer”时,如果您将类型定义为“Float_or_Integer”,则可能会失败。也支持按参数禁用类型检查,如下所示。
@dsl.pipeline(name='type_check_a', description='')
def pipeline():
a = task_factory_a(field_l=12)
# For each of the arguments, you can also ignore the types by calling
# ignore_type function.
b = task_factory_b(
field_x=a.outputs['field_n'],
field_y=a.outputs['field_o'],
field_z=a.outputs['field_m'].ignore_type())
compiler.Compiler().compile(pipeline, 'pipeline.tar.gz', type_check=True)
缺失类型
如果上游或下游组件中的任何一个缺少某些参数的类型信息,DSL 编译器将通过类型检查。效果与忽略类型信息相同。但是,如果某些输入/输出缺少类型信息且某些输入/输出类型不兼容,则类型检查仍然会失败。
下一步
了解如何使用 Python DSL 定义 KubeFlow Pipeline 并使用类型检查编译 Pipeline:一个 Jupyter Notebook 演示。