连接 SDK 到 API

了解如何将 Kubeflow Pipelines SDK 连接到 API。

概览

Kubeflow Pipelines SDK (Kubeflow Pipelines SDK) 提供了与 Kubeflow Pipelines API 交互的 Python 接口。本指南将向您展示如何在各种场景下将 SDK 连接到 Pipelines API。

Kubeflow Platform

当 Kubeflow Pipelines 作为多用户 Kubeflow Platform (Kubeflow Platform) 的一部分运行时,您如何认证 Pipelines SDK 取决于您的代码是在集群**内部**还是**外部**运行。

Kubeflow Platform - 集群内部

点击展开

ServiceAccount token volume (ServiceAccount token volume) 可以挂载到与 Kubeflow Pipelines 在同一集群中运行的 Pod 上。Kubeflow Pipelines SDK 可以使用此 token 向 Kubeflow Pipelines API 进行认证。

以下 Python 代码将使用 ServiceAccount token 创建一个用于认证的 kfp.Client()

import kfp

# by default, when run from inside a Kubernetes cluster:
#  - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
#  - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

ServiceAccount Token Volume

要使用上述代码,您需要从已挂载 ServiceAccount token volume 的 Pod 中运行它。您可以手动将 volumevolumeMount 添加到您的 PodSpec 中,或使用 Kubeflow 的 PodDefaults (PodDefaults) 来注入所需的 volume。

选项 1 - 手动将 volume 添加到您的 PodSpec

apiVersion: v1
kind: Pod
metadata:
  name: access-kfp-example
spec:
  containers:
  - image: hello-world:latest
    name: hello-world
    env:
      - ## this environment variable is automatically read by `kfp.Client()`
        ## this is the default value, but we show it here for clarity
        name: KF_PIPELINES_SA_TOKEN_PATH
        value: /var/run/secrets/kubeflow/pipelines/token
    volumeMounts:
      - mountPath: /var/run/secrets/kubeflow/pipelines
        name: volume-kf-pipeline-token
        readOnly: true
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      

选项 2 - 使用 PodDefault 注入 volume

apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: access-ml-pipeline
  namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
  desc: Allow access to Kubeflow Pipelines
  selector:
    matchLabels:
      access-ml-pipeline: "true"
  env:
    - ## this environment variable is automatically read by `kfp.Client()`
      ## this is the default value, but we show it here for clarity
      name: KF_PIPELINES_SA_TOKEN_PATH
      value: /var/run/secrets/kubeflow/pipelines/token
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      
  volumeMounts:
    - mountPath: /var/run/secrets/kubeflow/pipelines
      name: volume-kf-pipeline-token
      readOnly: true

RBAC 授权

Kubeflow Pipelines API 遵守 Kubernetes RBAC,并在允许 ServiceAccount 执行 Pipelines API 操作之前检查分配给它的 RoleBindings。

例如,此 RoleBinding 允许在 namespace-2 中具有 default-editor ServiceAccount 的 Pod 管理 namespace-1 中的 Kubeflow Pipelines。

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: allow-namespace-2-kubeflow-edit
  ## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
  namespace: namespace-1
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kubeflow-edit
subjects:
  - kind: ServiceAccount
    name: default-editor
    ## the ServiceAccount lives in `namespace-2`
    namespace: namespace-2

Kubeflow Platform - 集群外部

点击展开

从集群*外部*认证的具体方法将取决于您如何部署 Kubeflow Platform。由于大多数发行版使用 Dex (Dex) 作为其身份提供者,此示例将展示如何使用 Python 脚本通过 Dex 进行认证。

您需要在远程机器上使 Kubeflow Pipelines API 可访问。如果您的 Kubeflow Istio 网关已暴露,请跳过此步骤并直接使用该 URL。

以下命令将通过 localhost:8080 暴露 istio-ingressgateway 服务

# TIP: svc/istio-ingressgateway may be called something else, 
#      or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80

以下 Python 代码定义了一个 KFPClientManager() 类,该类通过与 Dex 交互来创建一个已认证的 kfp.Client()

import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

以下 Python 代码展示了如何使用 KFPClientManager() 类创建一个 kfp.Client()

# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
    api_url="https://:8080/pipeline",
    skip_tls_verify=True,

    dex_username="user@example.com",
    dex_password="12341234",

    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

独立 Kubeflow Pipelines

当 Kubeflow Pipelines 在独立模式下运行时,将没有多用户认证或 RBAC 的概念。具体步骤将取决于您的代码是在集群**内部**还是**外部**运行。

独立 KFP - 集群内部

点击展开

当在 Kubernetes 集群内部运行时,您可以通过集群内部服务 DNS 解析 (cluster-internal service DNS resolution) 将 Pipelines SDK 直接连接到 ml-pipeline-ui 服务。

当在与 Kubeflow **相同的 namespace** 中运行时

import kfp

client = kfp.Client(host="http://ml-pipeline-ui:80")

print(client.list_experiments())

当在与 Kubeflow **不同的 namespace** 中运行时

import kfp

# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow" 

client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")

print(client.list_experiments())

独立 KFP - 集群外部

点击展开

当在 Kubernetes 集群外部运行时,您可以使用 kubectl port-forwarding (kubectl port-forwarding) 将 Pipelines SDK 连接到 ml-pipeline-ui 服务。

**步骤 1:**在您的外部系统上运行以下命令以启动端口转发

# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80

**步骤 2:**以下代码将针对您的端口转发 ml-pipeline-ui 服务创建一个 kfp.Client()

import kfp

client = kfp.Client(host="https://:3000")

print(client.list_experiments())

反馈

此页面是否有帮助?