构建更高级的 ML 流水线

创建一个利用更多 KFP 功能的更高级流水线。

此步骤演示如何构建一个更高级的机器学习 (ML) 流水线,该流水线利用 KFP 流水线组合的附加功能。

以下 ML 流水线创建了一个数据集,作为预处理步骤对数据集的特征进行了归一化处理,并使用不同的超参数在该数据上训练了一个简单的 ML 模型

from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output


@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
    import pandas as pd

    csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    col_names = [
        'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    ]
    df = pd.read_csv(csv_url, names=col_names)

    with open(iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
    input_iris_dataset: Input[Dataset],
    normalized_iris_dataset: Output[Dataset],
    standard_scaler: bool,
    min_max_scaler: bool,
):
    if standard_scaler is min_max_scaler:
        raise ValueError(
            'Exactly one of standard_scaler or min_max_scaler must be True.')

    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler

    with open(input_iris_dataset.path) as f:
        df = pd.read_csv(f)
    labels = df.pop('Labels')

    if standard_scaler:
        scaler = StandardScaler()
    if min_max_scaler:
        scaler = MinMaxScaler()

    df = pd.DataFrame(scaler.fit_transform(df))
    df['Labels'] = labels
    with open(normalized_iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
    normalized_iris_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier

    with open(normalized_iris_dataset.path) as f:
        df = pd.read_csv(f)

    y = df.pop('Labels')
    X = df

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(clf, f)


@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
    standard_scaler: bool,
    min_max_scaler: bool,
    neighbors: List[int],
):
    create_dataset_task = create_dataset()

    normalize_dataset_task = normalize_dataset(
        input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
        standard_scaler=True,
        min_max_scaler=False)

    with dsl.ParallelFor(neighbors) as n_neighbors:
        train_model(
            normalized_iris_dataset=normalize_dataset_task
            .outputs['normalized_iris_dataset'],
            n_neighbors=n_neighbors)


endpoint = '<KFP_UI_URL>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'min_max_scaler': True,
        'standard_scaler': False,
        'neighbors': [3, 6, 9]
    },
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)

此示例在流水线中引入了以下新功能

  • 通过在 @dsl.component 装饰器上使用 packages_to_install 参数,可以在组件运行时添加一些需要安装的 Python 包,如下所示

    @dsl.component(packages_to_install=['pandas==1.3.5'])

    要在安装库后使用它,必须在其导入语句包含在组件函数的作用域内,以便在组件运行时导入该库。

  • 组件签名中引入了 DatasetModel 类型的输入和输出 artifacts 来描述组件的输入和输出 artifacts。这是通过分别对输入和输出 artifacts 使用类型注解泛型 Input[]Output[] 来完成的。

    在组件的作用域内,可以通过 .path 属性读取 (对于输入) 和写入 (对于输出) artifacts。KFP 后端确保在运行时将输入 artifact 文件从远程存储复制正在执行的 Pod 的本地文件系统,以便组件函数可以从本地文件系统读取输入 artifacts。相比之下,当组件运行完成时,输出 artifact 文件从 Pod 的本地文件系统复制远程存储。这样,输出 artifacts 会持久存储在 Pod 外部。在这两种情况下,组件作者只需要与本地文件系统交互即可创建持久 artifacts。

    带有 Output[] 注解的参数的实参不会由流水线作者传递给组件。KFP 后端在组件运行时传递此 artifact,因此组件作者无需关心输出 artifacts 的写入路径。写入输出 artifact 后,执行组件的后端会识别 KFP artifact 类型 (DatasetModel),并在仪表盘上组织它们。

    可以使用源任务的 .outputs 属性和输出 artifact 参数名称将输出 artifact 作为输入传递给下游组件,如下所示

    create_dataset_task.outputs['iris_dataset']

  • 使用了其中一个 DSL 控制流功能 dsl.ParallelFor。它是一个上下文管理器,允许流水线作者创建任务。这些任务在循环中并行执行。使用 dsl.ParallelFor 遍历 neighbors 流水线参数,可以在一次流水线运行中以不同的参数执行 train_model 组件并测试多个超参数。其他控制流功能包括 dsl.Conditiondsl.ExitHandler

恭喜您!现在您已经拥有了一个 KFP 部署、一个端到端 ML 流水线以及 UI 的简介。这仅仅是 KFP 流水线和仪表盘功能的开始。

反馈

此页面有帮助吗?