连接 SDK 到 API
概览
Kubeflow Pipelines SDK (Kubeflow Pipelines SDK) 提供了与 Kubeflow Pipelines API 交互的 Python 接口。本指南将向您展示如何在各种场景下将 SDK 连接到 Pipelines API。
Kubeflow Platform
当 Kubeflow Pipelines 作为多用户 Kubeflow Platform (Kubeflow Platform) 的一部分运行时,您如何认证 Pipelines SDK 取决于您的代码是在集群**内部**还是**外部**运行。
Kubeflow Platform - 集群内部
点击展开
ServiceAccount token volume (ServiceAccount token volume) 可以挂载到与 Kubeflow Pipelines 在同一集群中运行的 Pod 上。Kubeflow Pipelines SDK 可以使用此 token 向 Kubeflow Pipelines API 进行认证。
以下 Python 代码将使用 ServiceAccount token 创建一个用于认证的 kfp.Client()
import kfp
# by default, when run from inside a Kubernetes cluster:
# - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
# - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
ServiceAccount Token Volume
要使用上述代码,您需要从已挂载 ServiceAccount token volume 的 Pod 中运行它。您可以手动将 volume
和 volumeMount
添加到您的 PodSpec 中,或使用 Kubeflow 的 PodDefaults (PodDefaults
) 来注入所需的 volume。
选项 1 - 手动将 volume 添加到您的 PodSpec
apiVersion: v1
kind: Pod
metadata:
name: access-kfp-example
spec:
containers:
- image: hello-world:latest
name: hello-world
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
选项 2 - 使用 PodDefault
注入 volume
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: access-ml-pipeline
namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
desc: Allow access to Kubeflow Pipelines
selector:
matchLabels:
access-ml-pipeline: "true"
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
提示
PodDefaults
是 namespaced 资源,因此您需要在每个 KubeflowProfile
namespace **内部**创建一个。- Notebook Spawner UI 将识别用户 namespace 中的任何
PodDefaults
(它们可在“configurations”部分下选择)。
RBAC 授权
Kubeflow Pipelines API 遵守 Kubernetes RBAC,并在允许 ServiceAccount 执行 Pipelines API 操作之前检查分配给它的 RoleBindings。
例如,此 RoleBinding 允许在 namespace-2
中具有 default-editor
ServiceAccount 的 Pod 管理 namespace-1
中的 Kubeflow Pipelines。
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: allow-namespace-2-kubeflow-edit
## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
namespace: namespace-1
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kubeflow-edit
subjects:
- kind: ServiceAccount
name: default-editor
## the ServiceAccount lives in `namespace-2`
namespace: namespace-2
提示
- 请查看名为
aggregate-to-kubeflow-pipelines-edit
的 ClusterRole,了解一些重要的pipelines.kubeflow.org
RBAC 动词列表。 - Kubeflow Notebooks Pod 默认以
default-editor
ServiceAccount 运行,因此default-editor
的 RoleBindings 适用于它们,并赋予它们在其自己的 namespace 中提交 pipeline 的权限。 - 有关 profile 的更多信息,请参阅管理 Profile 贡献者指南。
Kubeflow Platform - 集群外部
点击展开
Kubeflow Notebooks
由于 Kubeflow Notebooks 在集群*内部*的 Pod 上运行,它们无法使用以下方法认证 Pipelines SDK,请参阅集群内部方法。从集群*外部*认证的具体方法将取决于您如何部署 Kubeflow Platform。由于大多数发行版使用 Dex (Dex) 作为其身份提供者,此示例将展示如何使用 Python 脚本通过 Dex 进行认证。
您需要在远程机器上使 Kubeflow Pipelines API 可访问。如果您的 Kubeflow Istio 网关已暴露,请跳过此步骤并直接使用该 URL。
以下命令将通过 localhost:8080
暴露 istio-ingressgateway
服务
# TIP: svc/istio-ingressgateway may be called something else,
# or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80
以下 Python 代码定义了一个 KFPClientManager()
类,该类通过与 Dex 交互来创建一个已认证的 kfp.Client()
import re
from urllib.parse import urlsplit, urlencode
import kfp
import requests
import urllib3
class KFPClientManager:
"""
A class that creates `kfp.Client` instances with Dex authentication.
"""
def __init__(
self,
api_url: str,
dex_username: str,
dex_password: str,
dex_auth_type: str = "local",
skip_tls_verify: bool = False,
):
"""
Initialize the KfpClient
:param api_url: the Kubeflow Pipelines API URL
:param skip_tls_verify: if True, skip TLS verification
:param dex_username: the Dex username
:param dex_password: the Dex password
:param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
"""
self._api_url = api_url
self._skip_tls_verify = skip_tls_verify
self._dex_username = dex_username
self._dex_password = dex_password
self._dex_auth_type = dex_auth_type
self._client = None
# disable SSL verification, if requested
if self._skip_tls_verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ensure `dex_default_auth_type` is valid
if self._dex_auth_type not in ["ldap", "local"]:
raise ValueError(
f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
)
def _get_session_cookies(self) -> str:
"""
Get the session cookies by authenticating against Dex
:return: a string of session cookies in the form "key1=value1; key2=value2"
"""
# use a persistent session (for cookies)
s = requests.Session()
# GET the api_url, which should redirect to Dex
resp = s.get(
self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code == 200:
pass
elif resp.status_code == 403:
# if we get 403, we might be at the oauth2-proxy sign-in page
# the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
url_obj = urlsplit(resp.url)
url_obj = url_obj._replace(
path="/oauth2/start", query=urlencode({"rd": url_obj.path})
)
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
else:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
)
# if we were NOT redirected, then the endpoint is unsecured
if len(resp.history) == 0:
# no cookies are needed
return ""
# if we are at `../auth` path, we need to select an auth type
url_obj = urlsplit(resp.url)
if re.search(r"/auth$", url_obj.path):
url_obj = url_obj._replace(
path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
)
# if we are at `../auth/xxxx/login` path, then we are at the login page
if re.search(r"/auth/.*/login$", url_obj.path):
dex_login_url = url_obj.geturl()
else:
# otherwise, we need to follow a redirect to the login page
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
)
dex_login_url = resp.url
# attempt Dex login
resp = s.post(
dex_login_url,
data={"login": self._dex_username, "password": self._dex_password},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
)
# if we were NOT redirected, then the login credentials were probably invalid
if len(resp.history) == 0:
raise RuntimeError(
f"Login credentials are probably invalid - "
f"No redirect after POST to: {dex_login_url}"
)
# if we are at `../approval` path, we need to approve the login
url_obj = urlsplit(resp.url)
if re.search(r"/approval$", url_obj.path):
dex_approval_url = url_obj.geturl()
# approve the login
resp = s.post(
dex_approval_url,
data={"approval": "approve"},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
)
return "; ".join([f"{c.name}={c.value}" for c in s.cookies])
def _create_kfp_client(self) -> kfp.Client:
try:
session_cookies = self._get_session_cookies()
except Exception as ex:
raise RuntimeError(f"Failed to get Dex session cookies") from ex
# monkey patch the kfp.Client to support disabling SSL verification
# kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
original_load_config = kfp.Client._load_config
def patched_load_config(client_self, *args, **kwargs):
config = original_load_config(client_self, *args, **kwargs)
config.verify_ssl = not self._skip_tls_verify
return config
patched_kfp_client = kfp.Client
patched_kfp_client._load_config = patched_load_config
return patched_kfp_client(
host=self._api_url,
cookies=session_cookies,
)
def create_kfp_client(self) -> kfp.Client:
"""Get a newly authenticated Kubeflow Pipelines client."""
return self._create_kfp_client()
以下 Python 代码展示了如何使用 KFPClientManager()
类创建一个 kfp.Client()
# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
api_url="https://:8080/pipeline",
skip_tls_verify=True,
dex_username="user@example.com",
dex_password="12341234",
# can be 'ldap' or 'local' depending on your Dex configuration
dex_auth_type="local",
)
# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
独立 Kubeflow Pipelines
当 Kubeflow Pipelines 在独立模式下运行时,将没有多用户认证或 RBAC 的概念。具体步骤将取决于您的代码是在集群**内部**还是**外部**运行。
独立 KFP - 集群内部
点击展开
当在 Kubernetes 集群内部运行时,您可以通过集群内部服务 DNS 解析 (cluster-internal service DNS resolution) 将 Pipelines SDK 直接连接到 ml-pipeline-ui
服务。
当在与 Kubeflow **相同的 namespace** 中运行时
import kfp
client = kfp.Client(host="http://ml-pipeline-ui:80")
print(client.list_experiments())
当在与 Kubeflow **不同的 namespace** 中运行时
import kfp
# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow"
client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")
print(client.list_experiments())
独立 KFP - 集群外部
点击展开
当在 Kubernetes 集群外部运行时,您可以使用 kubectl port-forwarding (kubectl port-forwarding) 将 Pipelines SDK 连接到 ml-pipeline-ui
服务。
**步骤 1:**在您的外部系统上运行以下命令以启动端口转发
# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80
**步骤 2:**以下代码将针对您的端口转发 ml-pipeline-ui
服务创建一个 kfp.Client()
import kfp
client = kfp.Client(host="https://:3000")
print(client.list_experiments())