构建组件

如何创建组件并在流水线中使用它们的教程

流水线组件是自包含的代码集合,用于执行 ML 工作流中的一个步骤。本文档描述了构建组件所需的概念,并演示了如何开始构建组件。

开始之前

运行以下命令安装 Kubeflow Pipelines SDK。

$ pip install kfp==1.8

有关 Kubeflow Pipelines SDK 的更多信息,请参阅SDK 参考指南

理解流水线组件

流水线组件是自包含的代码集合,用于执行 ML 工作流中的一个步骤,例如数据预处理或模型训练。要创建组件,您必须构建组件的实现定义组件规范

组件的实现包括组件的可执行代码以及代码运行所在的 Docker 容器镜像。了解有关设计流水线组件的更多信息

构建组件实现后,您可以将组件接口定义为组件规范。组件规范定义了

  • 组件的输入和输出。
  • 组件代码运行所在的容器镜像、用于运行组件代码的命令以及传递给组件代码的命令行参数。
  • 组件的元数据,例如名称和描述。

了解有关创建组件规范的更多信息.

如果组件代码实现为 Python 函数,请使用 Kubeflow Pipelines SDK 将您的函数打包为组件。了解有关构建基于 Python 函数的组件的更多信息

设计流水线组件

当 Kubeflow Pipelines 执行组件时,会在 Kubernetes Pod 中启动一个容器镜像,组件的输入作为命令行参数传入。您可以按值传递少量输入,例如字符串和数字。较大的输入,例如 CSV 数据,必须作为文件路径传递。组件完成后,组件的输出将作为文件返回。

设计组件代码时,请考虑以下几点

  • 哪些输入可以按值传递给您的组件?可以按值传递的输入示例包括数字、布尔值和短字符串。任何可以合理地作为命令行参数传递的值都可以按值传递给您的组件。所有其他输入都通过对输入路径的引用传递给您的组件。
  • 要从组件返回输出,输出数据必须存储为文件。定义组件时,您需要告知 Kubeflow Pipelines 组件生成哪些输出。流水线运行时,Kubeflow Pipelines 会将您用于存储组件输出的路径作为输入传递给您的组件。
  • 输出通常写入单个文件。在某些情况下,您可能需要将文件目录作为输出返回。在这种情况下,请在输出路径创建目录并将输出文件写入该位置。在这两种情况下,如果父目录不存在,可能需要创建父目录。
  • 组件的目标可能是创建外部服务中的数据集,例如 BigQuery 表。在这种情况下,组件输出生成数据的标识符(例如表名)而不是数据本身可能是有意义的。我们建议您将此模式限制在数据必须放入外部系统而非保留在 Kubeflow Pipelines 系统内部的情况下。
  • 由于您的输入和输出路径作为命令行参数传递,您的组件代码必须能够从命令行读取输入。如果您的组件使用 Python 构建,则像 argparseabsl.flags 这样的库可以更轻松地读取组件的输入。
  • 只要可以在容器镜像中运行,组件代码就可以用任何语言实现。

以下是一个使用 Python3 编写的示例程序。该程序从输入文件中读取给定数量的行,并将这些行写入输出文件。这意味着此函数接受三个命令行参数

  • 输入文件的路径。
  • 要读取的行数。
  • 输出文件的路径。
#!/usr/bin/env python3
import argparse
from pathlib import Path

# Function doing the actual work (Outputs first N lines from a text file)
def do_work(input1_file, output1_file, param1):
  for x, line in enumerate(input1_file):
    if x >= param1:
      break
    _ = output1_file.write(line)
  
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Paths must be passed in, not hardcoded
parser.add_argument('--input1-path', type=str,
  help='Path of the local file containing the Input 1 data.')
parser.add_argument('--output1-path', type=str,
  help='Path of the local file where the Output 1 data should be written.')
parser.add_argument('--param1', type=int, default=100,
  help='The number of lines to read from the input and write to the output.')
args = parser.parse_args()

# Creating the directory where the output file is created (the directory
# may or may not exist).
Path(args.output1_path).parent.mkdir(parents=True, exist_ok=True)

with open(args.input1_path, 'r') as input1_file:
    with open(args.output1_path, 'w') as output1_file:
        do_work(input1_file, output1_file, args.param1)

如果该程序保存为 program.py,则此程序的命令行调用为

python3 program.py --input1-path <path-to-the-input-file> \
  --param1 <number-of-lines-to-read> \
  --output1-path <path-to-write-the-output-to> 

将组件代码容器化

为了让 Kubeflow Pipelines 运行您的组件,您的组件必须打包为 Docker 容器镜像,并发布到您的 Kubernetes 集群可以访问的容器注册表。创建容器镜像的步骤并非 Kubeflow Pipelines 所独有。为了方便您,本节提供了一些关于标准容器创建的指南。

  1. 为您的容器创建 Dockerfile。Dockerfile 指定了

    • 基础容器镜像。例如,代码运行所在的操作系统。
    • 代码运行所需的任何依赖项。
    • 要复制到容器中的文件,例如此组件的可运行代码。

    以下是一个 Dockerfile 示例。

    FROM python:3.7
    RUN python3 -m pip install keras
    COPY ./src /pipelines/component/src
    

    在此示例中

    • 基础容器镜像是 python:3.7
    • keras Python 包已安装在容器镜像中。
    • ./src 目录中的文件被复制到容器镜像中的 /pipelines/component/src
  2. 创建一个名为 build_image.sh 的脚本,使用 Docker 构建容器镜像并将其推送到容器注册表。您的 Kubernetes 集群必须能够访问您的容器注册表才能运行您的组件。容器注册表的示例包括 Google Container RegistryDocker Hub

    以下示例构建一个容器镜像,将其推送到容器注册表,并输出严格的镜像名称。在组件规范中使用严格的镜像名称是最佳实践,以确保在每次组件执行中都使用预期版本的容器镜像。

    #!/bin/bash -e
    image_name=gcr.io/my-org/my-image
    image_tag=latest
    full_image_name=${image_name}:${image_tag}
    
    cd "$(dirname "$0")" 
    docker build -t "${full_image_name}" .
    docker push "$full_image_name"
    
    # Output the strict image name, which contains the sha256 image digest
    docker inspect --format="{{index .RepoDigests 0}}" "${full_image_name}"
    

    在前面的示例中

    • image_name 指定了您的容器镜像在容器注册表中的完整名称。
    • image_tag 指定此镜像应标记为 latest

    保存此文件并运行以下命令使其可执行。

    chmod +x build_image.sh
    
  3. 运行 build_image.sh 脚本构建容器镜像并将其推送到容器注册表。

  4. 使用 docker run 在本地测试您的容器镜像。如果需要,修改您的应用程序和 Dockerfile,直到您的应用程序在容器中正常运行。

创建组件规范

要从容器化的程序创建组件,您必须创建一个组件规范,定义组件的接口和实现。以下部分概述了如何通过演示如何定义组件的实现、接口和元数据来创建组件规范。

要了解有关定义组件规范的更多信息,请参阅组件规范参考指南

定义组件实现

以下示例创建一个组件规范 YAML 文件并定义组件的实现。

  1. 创建一个名为 component.yaml 的文件并在文本编辑器中打开。

  2. 创建组件的实现部分并指定容器镜像的严格名称。运行 build_image.sh 脚本时提供了严格的镜像名称。

    implementation:
      container:
        # The strict name of a container image that you've pushed to a container registry.
        image: gcr.io/my-org/my-image@sha256:a172..752f
    
  3. 为组件的实现定义一个 command。此字段指定用于在容器中运行程序的命令行参数。

    implementation:
      container:
        image: gcr.io/my-org/my-image@sha256:a172..752f
        # command is a list of strings (command-line arguments). 
        # The YAML language has two syntaxes for lists and you can use either of them. 
        # Here we use the "flow syntax" - comma-separated strings inside square brackets.
        command: [
          python3, 
          # Path of the program inside the container
          /pipelines/component/src/program.py,
          --input1-path,
          {inputPath: Input 1},
          --param1, 
          {inputValue: Parameter 1},
          --output1-path, 
          {outputPath: Output 1},
        ]
    

    command 格式为一个字符串列表。command 中的每个字符串都是一个命令行参数或一个占位符。运行时,占位符会被输入或输出替换。在前面的示例中,两个输入和一个输出路径被传递到 /pipelines/component/src/program.py 处的 Python 脚本中。

    有三种类型的输入/输出占位符

    • {inputValue: <input-name>}:此占位符被指定输入的值替换。这对于少量输入数据很有用,例如数字或短字符串。

    • {inputPath: <input-name>}:此占位符被此输入作为文件的路径替换。您的组件可以在流水线运行期间在该路径读取该输入的内容。

    • {outputPath: <output-name>}:此占位符被您的程序写入此输出数据的路径替换。这使得 Kubeflow Pipelines 系统可以读取文件内容并将其存储为指定输出的值。

    <input-name> 名称必须与组件规范的 inputs 部分中的输入名称匹配。<output-name> 名称必须与组件规范的 outputs 部分中的输出名称匹配。

定义组件接口

以下示例演示如何指定组件的接口。

  1. 要在 component.yaml 中定义输入,请在 inputs 列表中添加一个项,并包含以下属性

    在此示例中,Python 程序有两个输入

    • Input 1 包含 String 数据。
    • Parameter 1 包含一个 Integer
    inputs:
    - {name: Input 1, type: String, description: 'Data for input 1'}
    - {name: Parameter 1, type: Integer, default: '100', description: 'Number of lines to copy'}
    

    注意:Input 1Parameter 1 没有指定有关它们如何存储或包含多少数据的详细信息。考虑使用命名约定来指示输入是否预期小到足以按值传递。

  2. 组件完成任务后,组件的输出将作为路径传递给您的流水线。运行时,Kubeflow Pipelines 为您的每个组件输出创建路径。这些路径作为输入传递给组件的实现。

    要在组件规范 YAML 中定义输出,请在 outputs 列表中添加一个项,并包含以下属性

    在此示例中,Python 程序返回一个输出。该输出命名为 Output 1,包含 String 数据。

    outputs:
    - {name: Output 1, type: String, description: 'Output 1 data.'}
    

    注意:考虑使用命名约定来指示此输出是否预期小到足以按值传递。您应将按值传递的数据量限制在每个流水线运行 200 KB。

  3. 定义组件接口后,component.yaml 应如下所示

    inputs:
    - {name: Input 1, type: String, description: 'Data for input 1'}
    - {name: Parameter 1, type: Integer, default: '100', description: 'Number of lines to copy'}
    
    outputs:
    - {name: Output 1, type: String, description: 'Output 1 data.'}
    
    implementation:
      container:
        image: gcr.io/my-org/my-image@sha256:a172..752f
        # command is a list of strings (command-line arguments). 
        # The YAML language has two syntaxes for lists and you can use either of them. 
        # Here we use the "flow syntax" - comma-separated strings inside square brackets.
        command: [
          python3, 
          # Path of the program inside the container
          /pipelines/component/src/program.py,
          --input1-path,
          {inputPath: Input 1},
          --param1, 
          {inputValue: Parameter 1},
          --output1-path, 
          {outputPath: Output 1},
        ]
    

指定组件元数据

要定义组件的元数据,请将 namedescription 字段添加到 component.yaml

name: Get Lines
description: Gets the specified number of lines from the input file.

inputs:
- {name: Input 1, type: String, description: 'Data for input 1'}
- {name: Parameter 1, type: Integer, default: '100', description: 'Number of lines to copy'}

outputs:
- {name: Output 1, type: String, description: 'Output 1 data.'}

implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f
    # command is a list of strings (command-line arguments). 
    # The YAML language has two syntaxes for lists and you can use either of them. 
    # Here we use the "flow syntax" - comma-separated strings inside square brackets.
    command: [
      python3, 
      # Path of the program inside the container
      /pipelines/component/src/program.py,
      --input1-path,
      {inputPath: Input 1},
      --param1, 
      {inputValue: Parameter 1},
      --output1-path, 
      {outputPath: Output 1},
    ]

在流水线中使用组件

您可以使用 Kubeflow Pipelines SDK 通过以下方法加载组件

这些函数创建一个工厂函数,您可以使用该函数创建 ContainerOp 实例,用作流水线中的步骤。此工厂函数的输入参数包括您的组件输入和组件输出的路径。函数签名可能以以下方式修改,以确保其有效且符合 Python 习惯。

  • 具有默认值的输入将排在没有默认值的输入和输出之后。
  • 输入和输出名称会转换为 Python 风格的名称(空格和符号会被下划线替换,字母会转换为小写)。例如,名为 Input 1 的输入会转换为 input_1

以下示例演示了如何加载组件规范的文本并在两步流水线中运行它。在运行此示例之前,请更新组件规范以使用您在前面部分定义的组件规范。

为了演示组件之间的数据传递,我们创建了另一个组件,该组件仅使用 bash 命令将一些文本值写入输出文件。输出文件可以作为输入传递给我们的前一个组件。

import kfp
import kfp.components as comp

create_step_get_lines = comp.load_component_from_text("""
name: Get Lines
description: Gets the specified number of lines from the input file.

inputs:
- {name: Input 1, type: Data, description: 'Data for input 1'}
- {name: Parameter 1, type: Integer, default: '100', description: 'Number of lines to copy'}

outputs:
- {name: Output 1, type: Data, description: 'Output 1 data.'}

implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f
    # command is a list of strings (command-line arguments). 
    # The YAML language has two syntaxes for lists and you can use either of them. 
    # Here we use the "flow syntax" - comma-separated strings inside square brackets.
    command: [
      python3, 
      # Path of the program inside the container
      /pipelines/component/src/program.py,
      --input1-path,
      {inputPath: Input 1},
      --param1, 
      {inputValue: Parameter 1},
      --output1-path, 
      {outputPath: Output 1},
    ]""")

# create_step_get_lines is a "factory function" that accepts the arguments
# for the component's inputs and output paths and returns a pipeline step
# (ContainerOp instance).
#
# To inspect the get_lines_op function in Jupyter Notebook, enter 
# "get_lines_op(" in a cell and press Shift+Tab.
# You can also get help by entering `help(get_lines_op)`, `get_lines_op?`,
# or `get_lines_op??`.

# Create a simple component using only bash commands. The output of this component
# can be passed to a downstream component that accepts an input with the same type.
create_step_write_lines = comp.load_component_from_text("""
name: Write Lines
description: Writes text to a file.

inputs:
- {name: text, type: String}

outputs:
- {name: data, type: Data}

implementation:
  container:
    image: busybox
    command:
    - sh
    - -c
    - |
      mkdir -p "$(dirname "$1")"
      echo "$0" > "$1"
    args:
    - {inputValue: text}
    - {outputPath: data}
""")

# Define your pipeline 
def my_pipeline():
    write_lines_step = create_step_write_lines(
        text='one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten')

    get_lines_step = create_step_get_lines(
        # Input name "Input 1" is converted to pythonic parameter name "input_1"
        input_1=write_lines_step.outputs['data'],
        parameter_1='5',
    )

# If you run this command on a Jupyter notebook running on Kubeflow,
# you can exclude the host parameter.
# client = kfp.Client()
client = kfp.Client(host='<your-kubeflow-pipelines-host-name>')

# Compile, upload, and submit this pipeline for execution.
client.create_run_from_pipeline_func(my_pipeline, arguments={})

组织组件文件

本节提供了一种推荐的组织组件文件的方式。没有要求您必须以这种方式组织文件。但是,使用标准组织方式可以重用相同的脚本进行测试、镜像构建和组件版本控制。

components/<component group>/<component name>/

    src/*            # Component source code files
    tests/*          # Unit tests
    run_tests.sh     # Small script that runs the tests
    README.md        # Documentation. If multiple files are needed, move to docs/.

    Dockerfile       # Dockerfile to build the component container image
    build_image.sh   # Small script that runs docker build and docker push

    component.yaml   # Component definition in YAML format

请参阅此示例组件以获取实际组件示例。

下一步

反馈

此页面有帮助吗?


最后修改于 2024年7月31日: 修复 Pipelines 中的断链 (#3807) (17e27bf)