构建基于 Python 函数的组件

使用 Python 构建自己的轻量级管道组件

Kubeflow Pipelines 组件是执行机器学习工作流程中一个步骤的自包含代码集。管道组件由以下部分组成:

  • 组件代码,实现了执行机器学习工作流程中一个步骤所需的逻辑。

  • 组件规范,定义了以下内容:

    • 组件的元数据,包括其名称和描述。
    • 组件的接口,包括组件的输入和输出。
    • 组件的实现,包括要运行的 Docker 容器镜像、如何将输入传递给组件代码以及如何获取组件的输出。

基于 Python 函数的组件让您可以通过将组件代码构建为 Python 函数并为您生成组件规范来更轻松地快速迭代。本文档介绍了如何构建基于 Python 函数的组件以及如何在管道中使用它们。

准备工作

  1. 运行以下命令安装 Kubeflow Pipelines SDK。如果您在 Jupyter Notebook 中运行此命令,安装 SDK 后请重启内核。
$ pip install kfp==1.8
  1. 导入 kfp 包。
import kfp
from kfp.components import create_component_from_func
  1. 按照使用 SDK 客户端连接到 Kubeflow Pipelines 中的步骤创建一个 kfp.Client实例。
client = kfp.Client() # change arguments accordingly

有关 Kubeflow Pipelines SDK 的更多信息,请参阅SDK 参考指南

开始使用基于 Python 函数的组件

本节通过创建一个简单组件的过程,演示了如何开始构建基于 Python 函数的组件。

  1. 将组件代码定义为一个独立的 Python 函数。在此示例中,该函数将两个浮点数相加并返回两个参数的和。
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b
  1. 使用 kfp.components.create_component_from_func 生成组件规范 YAML,并返回一个工厂函数,您可以使用该函数为管道创建 kfp.dsl.ContainerOp实例。组件规范 YAML 是组件的可重用和可共享定义。
add_op = create_component_from_func(
    add, output_component_file='add_component.yaml')
  1. 创建并运行您的管道。了解更多关于创建和运行管道的信息
import kfp.dsl as dsl
@dsl.pipeline(
  name='Addition pipeline',
  description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
  a='1',
  b='7',
):
  # Passes a pipeline parameter and a constant value to the `add_op` factory
  # function.
  first_add_task = add_op(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add_op` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add_op(first_add_task.output, b)

# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)

构建基于 Python 函数的组件

使用以下说明构建基于 Python 函数的组件:

  1. 定义一个独立的 Python 函数。此函数必须满足以下要求:

  2. Kubeflow Pipelines 使用函数的输入和输出来定义组件的接口。了解更多关于在组件之间传递数据的信息。您的函数的输入和输出必须满足以下要求:

  3. (可选)如果您的函数具有复杂的依赖关系,请选择或构建一个容器镜像来运行您的 Python 函数。了解更多关于选择或构建组件容器镜像的信息

  4. 调用 kfp.components.create_component_from_func(func) 将函数转换为管道组件。

    • func:要转换的 Python 函数。
    • base_image:(可选)指定用于运行此函数的 Docker 容器镜像。了解更多关于选择或构建容器镜像的信息
    • output_component_file:(可选)将您的组件定义写入文件。您可以使用此文件与同事共享组件,或在不同的管道中重用它。
    • packages_to_install:(可选)在运行函数之前要安装的带版本信息的 Python 包列表。

使用和安装 Python 包

当 Kubeflow Pipelines 运行您的管道时,每个组件都会在 Kubernetes Pod 上的 Docker 容器镜像中运行。要加载 Python 函数依赖的包,必须满足以下条件之一:

  • 该包必须安装在容器镜像中。
  • 必须使用 kfp.components.create_component_from_func(func) 函数的 packages_to_install 参数定义该包。
  • 您的函数必须安装该包。例如,您的函数可以使用 subprocess 模块来运行 pip install 等命令以安装包。

选择或构建容器镜像

目前,如果您未指定容器镜像,基于 Python 函数的组件将使用 python:3.7 容器镜像。如果您的函数具有复杂的依赖关系,使用预装了依赖项的容器镜像或构建自定义容器镜像可能会更有益。预装依赖项可以减少组件的运行时间,因为组件无需在每次运行时下载和安装包。

许多框架(如 TensorFlowPyTorch)以及云服务提供商都提供预构建的容器镜像,其中安装了常见的依赖项。

如果预构建容器不可用,您可以构建一个包含 Python 函数依赖项的自定义容器镜像。有关构建自定义容器的更多信息,请阅读 Docker 文档中的 Dockerfile 参考指南

如果您构建或选择了容器镜像,而不是使用默认容器镜像,则该容器镜像必须使用 Python 3.5 或更高版本。

理解数据如何在组件之间传递

当 Kubeflow Pipelines 运行您的组件时,会在 Kubernetes Pod 中启动一个容器镜像,并将组件的输入作为命令行参数传递。组件完成后,组件的输出将作为文件返回。

基于 Python 函数的组件通过为您构建组件规范,使得构建管道组件变得更加容易。基于 Python 函数的组件还处理将输入传递到组件以及将函数输出返回到管道的复杂性。

以下章节介绍了如何按值和按文件传递参数。

  • 按值传递的参数包括数字、布尔值和短字符串。Kubeflow Pipelines 通过将值作为命令行参数传递,按值将参数传递给组件。
  • 按文件传递的参数包括 CSV、图像和复杂类型。这些文件存储在运行在 Kubernetes 上的组件可访问的位置,例如持久卷声明或云存储服务。Kubeflow Pipelines 通过将文件路径作为命令行参数传递,按文件将参数传递给组件。

输入和输出参数名称

当您使用 Kubeflow Pipelines SDK 将 Python 函数转换为管道组件时,Kubeflow Pipelines SDK 会使用函数的接口以以下方式定义组件的接口:

  • 某些参数定义输入参数。
  • 某些参数定义输出参数。
  • 函数的返回值用作输出参数。如果返回值是 collections.namedtuple,则使用该命名元组返回多个小值。

由于您可以在组件之间按值或按路径传递参数,Kubeflow Pipelines SDK 会移除常见的参数后缀,这些后缀会泄露组件预期的实现方式。例如,一个摄取数据并输出 CSV 数据的基于 Python 函数的组件可能有一个定义为 csv_path: comp.OutputPath(str) 的输出参数。在这种情况下,输出是 CSV 数据,而不是路径。因此,Kubeflow Pipelines SDK 将输出名称简化为 csv

Kubeflow Pipelines SDK 使用以下规则定义组件接口中的输入和输出参数名称:

  • 如果参数名以 _path 结尾,并且参数被标注为 kfp.components.InputPathkfp.components.OutputPath,则参数名是去掉末尾 _path 的参数名。
  • 如果参数名以 _file 结尾,则参数名是去掉末尾 _file 的参数名。
  • 如果使用 return 语句从组件返回单个小值,则输出参数命名为 output
  • 如果您通过返回 collections.namedtuple 从组件返回多个小值,Kubeflow Pipelines SDK 会使用元组的字段名作为输出参数名。

否则,Kubeflow Pipelines SDK 将使用参数名作为参数名称。

按值传递参数

基于 Python 函数的组件通过允许您标注 Python 函数来定义组件接口,从而使得在组件之间按值(如数字、布尔值和短字符串)传递参数更加容易。支持的类型包括 intfloatboolstr。如果 listdict 实例包含 intfloatboolstr 等小值,您也可以按值传递它们。如果您未标注函数,这些输入参数将作为字符串传递。

如果您的组件按值返回多个输出,请使用 typing.NamedTuple 类型提示标注您的函数,并使用 collections.namedtuple 函数将函数的输出作为 tuple 的新子类返回。

您还可以从函数返回元数据和指标。

以下示例演示了如何按值返回多个输出,包括组件元数据和指标。

from typing import NamedTuple
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float),
    ('mlpipeline_ui_metadata', 'UI_metadata'),
    ('mlpipeline_metrics', 'Metrics')
  ]):
  """Example function that demonstrates how to return multiple values."""  
  sum_value = a + b
  product_value = a * b

  # Export a sample tensorboard
  metadata = {
    'outputs' : [{
      'type': 'tensorboard',
      'source': 'gs://ml-pipeline-dataset/tensorboard-train',
    }]
  }

  # Export two metrics
  metrics = {
    'metrics': [
      {
        'name': 'sum',
        'numberValue':  float(sum_value),
      },{
        'name': 'product',
        'numberValue':  float(product_value),
      }
    ]  
  }

  from collections import namedtuple
  example_output = namedtuple(
      'ExampleOutputs',
      ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  return example_output(sum_value, product_value, metadata, metrics)

按文件传递参数

基于 Python 函数的组件通过允许您标注 Python 函数的参数来指定哪些参数引用文件,从而使得向组件传递文件或从组件返回文件变得更加容易。Python 函数的参数可以引用输入文件或输出文件。如果您的参数是输出文件,Kubeflow Pipelines 会向您的函数传递一个可用于存储输出文件的路径或流。

以下示例接受一个文件作为输入,并返回两个文件作为输出。

def split_text_lines(
    source_path: comp.InputPath(str),
    odd_lines_path: comp.OutputPath(str),
    even_lines_path: comp.OutputPath(str)):
    """Splits a text file into two files, with even lines going to one file
    and odd lines to the other."""

    with open(source_path, 'r') as reader:
        with open(odd_lines_path, 'w') as odd_writer:
            with open(even_lines_path, 'w') as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

在此示例中,输入和输出被定义为 split_text_lines 函数的参数。这使得 Kubeflow Pipelines 可以将源数据文件的路径以及输出数据文件的路径传递到函数中。

要接受文件作为输入参数,请使用以下类型标注之一:

  • kfp.components.InputBinaryFile:使用此标注指定您的函数期望参数是此函数可读取的 io.BytesIO 实例
  • kfp.components.InputPath:使用此标注指定您的函数期望参数是输入文件的路径,类型为 string
  • kfp.components.InputTextFile:使用此标注指定您的函数期望参数是此函数可读取的 io.TextIOWrapper 实例

要将文件作为输出返回,请使用以下类型标注之一:

  • kfp.components.OutputBinaryFile:使用此标注指定您的函数期望参数是此函数可写入的 io.BytesIO 实例
  • kfp.components.OutputPath:使用此标注指定您的函数期望参数是存储输出文件的路径,类型为 string
  • kfp.components.OutputTextFile:使用此标注指定您的函数期望参数是此函数可写入的 io.TextIOWrapper 实例

基于 Python 函数的组件示例

本节演示了如何构建一个基于 Python 函数的组件,该组件使用导入、辅助函数并产生多个输出。

  1. 定义您的函数。此示例函数使用 numpy 包在辅助函数中计算给定被除数和除数的商和余数。除了商和余数,该函数还返回用于可视化的元数据和两个指标。
from typing import NamedTuple

def my_divmod(
  dividend: float,
  divisor: float) -> NamedTuple(
    'MyDivmodOutput',
    [
      ('quotient', float),
      ('remainder', float),
      ('mlpipeline_ui_metadata', 'UI_metadata'),
      ('mlpipeline_metrics', 'Metrics')
    ]):
    '''Divides two numbers and calculate  the quotient and remainder'''

    # Import the numpy package inside the component function
    import numpy as np

    # Define a helper function
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    from tensorflow.python.lib.io import file_io
    import json

    # Export a sample tensorboard
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Export two metrics
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput',
        ['quotient', 'remainder', 'mlpipeline_ui_metadata',
         'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata),
                         json.dumps(metrics))
  1. 通过直接运行或使用单元测试来测试您的函数。
my_divmod(100, 7)
  1. 这应该返回如下结果:

    MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
    
  2. 使用 kfp.components.create_component_from_func 返回一个工厂函数,您可以使用该函数为管道创建 kfp.dsl.ContainerOp实例。此示例还指定了用于运行此函数的基础容器镜像。

divmod_op = comp.create_component_from_func(
    my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
  1. 定义您的管道。此示例使用了 divmod_op 工厂函数和之前示例中的 add_op 工厂函数。
import kfp.dsl as dsl
@dsl.pipeline(
   name='Calculation pipeline',
   description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a='1',
   b='7',
   c='17',
):
    # Passes a pipeline parameter and a constant value as operation arguments.
    add_task = add_op(a, 4) # The add_op factory function returns
                            # a dsl.ContainerOp class instance. 

    # Passes the output of the add_task and a pipeline parameter as operation
    # arguments. For an operation with a single return value, the output
    # reference is accessed using `task.output` or
    # `task.outputs['output_name']`.
    divmod_task = divmod_op(add_task.output, b)

    # For an operation with multiple return values, output references are
    # accessed as `task.outputs['output_name']`.
    result_task = add_op(divmod_task.outputs['quotient'], c)
  1. 编译并运行您的管道。了解更多关于编译和运行管道的信息
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

反馈

本页是否有帮助?


最后修改时间:July 31, 2024: 修复 Pipelines 中的损坏链接 (#3807) (17e27bf)