diff --git a/_posts/2025-07-25-model-registry-integration.md b/_posts/2025-07-25-model-registry-integration.md new file mode 100644 index 00000000..e060599e --- /dev/null +++ b/_posts/2025-07-25-model-registry-integration.md @@ -0,0 +1,445 @@ +--- +layout: post +title: "Integrating Kubeflow Model Registry into Your Kubeflow Pipelines" +author: "Hailey Purdom" # Your Name +date: 2025-07-25 # Use the date you want to appear +categories: # Add relevant categories/tags for the blog post + - mlops + - pipelines + - model-registry + - kubeflow +tags: + - mlops + - kubeflow + - pipelines + - model-registry + - kind + - tutorial +--- + + + +# Integrating Kubeflow Model Registry into Your Kubeflow Pipelines + + + + + +# Introduction: Centralizing Your ML Models with Kubeflow + +In the journey of Machine Learning Operations (MLOps), managing trained models effectively is as crucial as building them. While Kubeflow Pipelines (KFP) provides powerful orchestration for your ML workflows, the Kubeflow Model Registry steps in as the centralized hub for versioning, cataloging, and discovering your machine learning models. + +This blog post will serve as a practical guide, walking you through the process of integrating the Kubeflow Model Registry directly into your KFP pipelines. We'll cover the benefits, best practices for model registration, and provide a hands-on example, drawing from real-world troubleshooting steps encountered during setup on a local kind cluster. + + + +# Why Leverage Model Registry in Your Pipelines? + +**Integrating model registration as a native step within your Kubeflow Pipelines offers significant advantages for robust MLOps:** + +- Centralized Cataloging: Move beyond scattered model files. The Model Registry provides a single source of truth for all your models, making them easily discoverable across your organization. + +- Structured Versioning & Lineage: Automatically track different iterations of your models. Each model version can be linked back to the exact pipeline run that produced it, ensuring full traceability and reproducibility. + +- Enhanced Governance & Auditability: Maintain a clear record of model lifecycles, crucial for compliance and auditing. + +- Seamless Handover to Serving: Registered models can be easily referenced by model serving platforms like KServe (formerly KFServing), streamlining deployment and management. + +- Automated Workflow: Eliminate manual steps. Once a model passes evaluation within a pipeline, it can be automatically registered, reducing human error and accelerating deployment. + + + +You'd typically want to register a model from a pipeline when it has met predefined performance criteria, passed validation, and is ready to be shared, deployed, or archived. + +# Models vs. Model Versions: A Key Distinction + +Understanding the core entities in the Model Registry is fundamental: + +- Registered Model: This represents the conceptual model. Think of it as a logical grouping for all iterations of a particular ML solution (e.g., "Fraud Detection Classifier," "Customer Churn Predictor"). + +- Model Version: This is a specific iteration or snapshot of a Registered Model. Each version is immutable and holds unique artifacts, metadata, and a precise lineage (e.g., "v1.0.0," "v1.0.1-retrained," "v2025-07-18-production"). + +# Getting Started: Setting Up Your Local Kubeflow Environment + +To follow this guide, you'll need a local Kubernetes cluster with Kubeflow Pipelines and the Kubeflow Model Registry deployed. We'll use [kind](https://kind.sigs.k8s.io/) (Kubernetes in Docker) for this setup. + + +**Prerequisites** + +* A container engine (e.g., [Docker](https://docs.docker.com/get-docker/)). +* [Python](https://www.python.org/downloads/) (3.11 or newer). +* [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/): Kubernetes command-line tool. +* [kind](https://kind.sigs.k8s.io/docs/user/quick-start/): Kubernetes in Docker. +* [helm](https://helm.sh/docs/intro/install/): Kubernetes package manager. +* [git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git): For cloning repositories. +- **Install Kustomize:** +Kustomize is a standalone tool for customizing Kubernetes configurations. It's essential for applying the manifests in this guide. +``` +curl -Lo kustomize.tar.gz https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.7.1/kustomize_v5.7.1_linux_amd64.tar.gz +tar -xzvf kustomize.tar.gz +mv kustomize ~/bin # Or /usr/local/bin if you prefer a system-wide install +chmod +x ~/bin/kustomize +rm -f kustomize.tar.gz +# Ensure ~/bin is in your PATH, or if moved to /usr/local/bin, it's already in PATH. +# Perform a full system logout and login after installation to refresh PATH. +``` + + # Common Troubleshooting during Setup: +Setting up complex MLOps tools locally can be challenging. Here, we provide the robust deployment steps, incorporating solutions for common issues encountered during this process: + +- `kind` Cluster Instability / DNS Issues (`ImagePullBackOff, Temporary failure in name resolution, nf_conntrack_ipv4 not found`): These often stem from conflicts with your host's Docker networking, VPNs, or missing kernel modules. +- **Solution:** Aggressive Docker/`kind` cleanup (`sudo docker system prune -a --volumes -f`), ensuring `kind` is installed correctly (refer to `kind`'s [installation guide](https://kind.sigs.k8s.io/docs/user/quick-start/#installation)), and a full system logout/login to refresh the environment. + +- KFP Core Component Crashes (`metadata-grpc-deployment CrashLoopBackOff` with `Exit Code 139` - Segmentation Fault): This often indicates an incompatibility between the `ml_metadata_store_server` image and your host's kernel. + +- **Solution:** Patch the local KFP manifests to use an older, more compatible image version (e.g., `1.13.0` or `1.12.0`) in `~/kfp-local-manifests/manifests/kustomize/base/metadata/base/metadata-grpc-deployment.yaml.` + +- PersistentVolumeClaim (PVC) `Pending`: This means the cluster's local storage provisioner isn't working. + +- **Solution:** A full `kind` and Docker reset (`sudo systemctl stop docker, sudo systemctl start docker`) often resolves this by restarting the `kind`'s default local path provisioner. + +- "Pipeline version creation failed" / "Cannot find context" in KFP UI: These are symptoms of KFP's core components (especially ML Metadata and the KFP API server) not being fully `Running` and `READY`. + +- **Solution:** Patience and verifying `kubectl get pods -n kubeflow` are key. + +- Model Registry Pods Missing / `CreateContainerConfigError`: This usually indicates a configuration problem preventing the container from starting. While transient network issues can sometimes cause similar symptoms, for `CreateContainerConfigError`, a manifest fix is often needed. If persistent, ensure your Model Registry deployment manifests (e.g., from your cloned repository) are up-to-date, as an outdated fork might lead to incompatibilities with other Kubeflow components. We found success deploying the Model Registry using its Kustomize manifests from the `kubeflow/model-registry` repository's `overlays/db` directory. + +- `Model_registry.exceptions.StoreError: Version X already exists`: This error occurs when you try to register a model version with a `model_name` and `model_version_name `combination that already exists in the Model Registry. +- **Solution:** This indicates the Model Registry is working as intended! Simply provide a new, unique `model_version_name` for your pipeline run (e.g., `v1.0.1`, `v2.0.0`, or `v1.0.0-run-timestamp`). + + +# Deployment Steps: + +Install `kind` (if not already installed via dnf): + +``` +sudo dnf install kind # Recommended for Fedora +# OR manual install if dnf fails (e.g., if 'No match for argument: kind'): +# cd ~ +# curl -Lo kind https://kind.sigs.k8s.io/dl/v0.22.0/kind-linux-amd64 +# chmod +x kind +# sudo mv kind /usr/local/bin/kind +# Perform a full system logout and login after any kind installation/update. +``` + +Clone Kubeflow Pipelines Repository: +``` +cd ~ git clone https://github.com/kubeflow/pipelines.git kfp-local-manifests +``` + + + +If you needed to patch `metadata-grpc-deployment.yaml` for image compatibility (e.g., changing `ml_metadata_store_server:1.14.0` to `1.13.0` for `Exit Code 139` fix), do it now in `~/kfp-local-manifests/manifests/kustomize/base/metadata/base/metadata-grpc-deployment.yaml.` + +Deploy Kubeflow Pipelines on `kind`(Recommended Method): ***This `make` target automates the `kind` cluster creation and KFP deployment using the `platform-agnostic` manifests.*** +``` +cd ~/kfp-local-manifests/backend # Navigate to the backend directory +make kind-cluster-agnostic +``` + + +***Note:*** This command will create the `kind` cluster (if it doesn't exist), deploy all necessary KFP components, and wait for core deployments (MySQL, MLMD, KFP API) to become available. This process can take 10-20 minutes or more. Monitor its output carefully. + + +Troubleshooting `make kind-cluster-agnostic`: +If you encounter "No rule to make target" or other errors, ensure your `kubeflow/pipelines` repository clone is up-to-date and contains this target in `backend/Makefile`. + +Verify KFP Health: +- After `make kind-cluster-agnostic` finishes, open a new terminal. +- Run this command repeatedly until ALL (or almost all) pods in the `kubeflow` namespace are `Running` and `READY` `1/1`. + +``` +kubectl get pods -n kubeflow +``` + +- ***This is crucial. Do not proceed until KFP is healthy.*** + + +Clone Kubeflow Model Registry Repository: +``` +cd ~ +git clone https://github.com/kubeflow/model-registry.git +``` + +Deploy Kubeflow Model Registry (API Server): This deploys the Model Registry API server, which stores and serves model metadata. +``` +cd ~/model-registry/manifests/kustomize/overlays/db # Navigate to the correct overlay for embedded DB +kubectl apply -k . -n kubeflow +``` + + +- Troubleshooting Model Registry Pods: Wait for `model-registry-deployment pod` to become `Running` and `READY` (`kubectl get pods -n kubeflow | grep model-registry`). + +**Install and Deploy Kubeflow Model Registry UI:** +/ The Model Registry UI is a separate frontend application that provides a browsable interface to your registered models. There are two primary ways to deploy it: a simpler method for basic setups, and a more comprehensive method for multi-user Kubeflow environments. + +- **Option A:** Basic UI Deployment (Recommended for simplicity with standalone KFP): This method deploys the UI without requiring Istio or other multi-user components. + +Installation (Cloning the `kubeflow/manifests` repository, pinned to a compatible release): First, clone the [Kubeflow manifests repository](https://github.com/kubeflow/manifests), (if you haven't already). We recommend pinning to a specific release (e.g., v1.7.0) compatible with KFP 2.5.0. +``` +cd ~ +git clone https://github.com/kubeflow/manifests.git kubeflow-manifests-repo # Clone to a distinct name +cd kubeflow-manifests-repo +git fetch --tags +git checkout tags/v1.7.0 # Pin to Kubeflow manifests v1.7.0 (compatible with KFP 2.5.0) +``` + +- Deployment (Applying manifests): Navigate to the Model Registry UI `base` overlay: +``` +cd ~/kubeflow-manifests-repo/applications/model-registry/upstream/options/ui/base # Using 'base' overlay for non-Istio +``` + +Apply the UI manifests: +``` +kubectl apply -k . -n kubeflow +``` + +Wait for the UI pod to be ready: +``` +kubectl get pods -n kubeflow -l app=model-registry-ui +``` + +- **Option B:** Multi-User Kubeflow Deployment (More comprehensive, but complex): This option deploys a full multi-user Kubeflow environment, which includes Istio, Dex, and other components necessary for authentication and traffic management. This path is significantly more time-consuming and complex to set up, but it enables the Model Registry UI to integrate with the Central Dashboard in a production-like, authenticated manner. + +***Note:*** This is a major deployment effort that goes beyond the scope of a simple Model Registry integration. It involves deploying many components from the root of the `kubeflow/manifests` repository. The exact steps are detailed in the [Kubeflow manifests README](https://github.com/kubeflow/manifests) under the "Deploy Kubeflow in Multi-User Mode" section. + +**Steps (summary from `kubeflow/manifests` README):** +- Create the Kind cluster (if not already done). +- Install [cert-manager](https://cert-manager.io/docs/installation/kubernetes/). +- Install [Istio](https://istio.io/latest/docs/setup/getting-started/). +- Install [Oauth2-proxy](https://oauth2-proxy.github.io/oauth2-proxy/installation/). +- Install [Dex](https://dexidp.io/docs/getting-started/) (skip identity provider connection for basic setup). +- Deploy Kubeflow Namespace, Roles, and Istio Resources. +- Install Kubeflow Pipelines (full version). +- Install Central Dashboard. +- Configure Profiles + KFAM (Kubeflow Access Management). +- Configure User Namespaces. + +**The following `install.sh script`, developed by Matt Prahl, automates the complex, multi-component Kubeflow deployment stated above on Kind.** +``` + #!/bin/bash + set -euo pipefail + + # Prior to running this, consider setting the /etc/sysctl.d/99-inotify.conf file to: + # fs.inotify.max_user_watches = 524288 + # fs.inotify.max_user_instances = 1024 + + # Then run sudo sysctl --system to apply the change + + # Check if there's already a Kind cluster running + if kind get clusters | grep -q .; then + echo "Error: There is already a Kind cluster running. Please delete it first with:" + echo " kind delete clusters --all" + exit 1 + fi + + cat </dev/null 2>&1; then + echo "Namespace kubeflow-user-example-com found!" + kubectl get namespace kubeflow-user-example-com + break + else + echo "Namespace not found yet. Waiting 5 seconds before retry..." + sleep 5 + fi + done + + echo "Deploying Model Registry..." + kubectl apply -k applications/model-registry/upstream/overlays/db -n kubeflow-user-example-com + kubectl apply -k applications/model-registry/upstream/options/istio -n kubeflow-user-example-com + + echo "Waiting for Model Registry..." + kubectl wait --for=condition=available -n kubeflow-user-example-com deployment/model-registry-deployment --timeout=2m + + echo "Deploying Model Registry UI..." + kubectl apply -k applications/model-registry/upstream/options/ui/overlays/istio + # Update central dashboard config to add Model Registry menu item + kubectl get configmap centraldashboard-config -n kubeflow -o json | \ + jq '.data.links |= (fromjson | .menuLinks += [{"icon": "assignment", "link": "/model-registry/", "text": "Model Registry", "type": "item"}] | tojson)' | \ + kubectl apply -f - -n kubeflow + echo "Waiting for the dashboard..." + kubectl wait --for=condition=Ready pod -l 'app.kubernetes.io/name=centraldashboard' --timeout=180s -n kubeflow + + echo "Waiting for KFP..." + kubectl wait --for=condition=Ready pod -l 'app.kubernetes.io/name=kubeflow-pipelines' --timeout=180s -n kubeflow + + echo "" + echo "šŸŽ‰ Installation complete! šŸŽ‰" + echo "" + echo "šŸ“‹ Next steps:" + echo " 1. Run this command to start port forwarding:" + echo " kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80" + echo "" + echo " 2. Open your browser and go to:" + echo " http://localhost:8080" + echo "" + echo " 3. Login with these credentials:" + echo " Username: user@example.com" + echo " Password: 12341234" + echo "" +``` + +**Note:** This script assumes `kustomize` and `jq` are installed and in your PATH. Ensure you have followed the prerequisites section to install these tools. Also, ensure you have cloned the `kubeflow/manifests` repository to `~/kubeflow-manifests-repo` and are running this script from that directory. + + + +**Accessing the cluster:** + After this complex setup, you would typically access the Kubeflow Central Dashboard via an Ingress or LoadBalancer, as described in the [Connect to your Kubeflow cluster](https://github.com/kubeflow/manifests?tab=readme-ov-file#connect-to-your-kubeflow-cluster) section of the `kubeflow/manifests` README. + +***Once the full multi-user Kubeflow is deployed, you would then apply the Model Registry UI's Istio-dependent overlay:*** +``` +cd ~/kubeflow-manifests-repo/applications/model-registry/upstream/options/ui/overlays/istio +kubectl apply -k . -n kubeflow +``` + + +This would then work because Istio would be present. + +- Integrate Model Registry UI with Kubeflow Central Dashboard (Optional but Recommended): This step adds a direct link to the Model Registry UI in the Kubeflow Central Dashboard's navigation menu. +``` +kubectl get configmap centraldashboard-config -n kubeflow -o json | \ +jq '.data.links |= (fromjson | .menuLinks += [{"icon": "assignment", "link": "/model-registry/", "text": "Model Registry", "type": "item"}] | tojson)' | \ +kubectl apply -f - -n kubeflow +``` + +***Note: This command requires jq to be installed (sudo dnf install jq). If you prefer, you can edit the ConfigMap manually: kubectl edit configmap -n kubeflow centraldashboard-config and add the menu item under data.links.menuLinks.*** + +**Set Up Port-Forwards:** + +Open three separate terminal windows and run: + +- KFP UI: `kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80` +- Model Registry API: `kubectl port-forward -n kubeflow svc/model-registry-service 8082:8080` +- Model Registry UI: `kubectl port-forward -n kubeflow svc/model-registry-ui-service 8084:80` +- Minio (for KFP logs): `kubectl; port-forward -n kubeflow svc/minio-service 9000:9000` + +# Connecting KFP to Model Registry: Best Practices + +To enable your KFP component to register models, you need to provide it with the Model Registry's API endpoint and, ideally, an authentication token. While the Model Registry API can be accessed directly, using a Kubernetes Secret for sensitive information like tokens is a best practice. + + +**Create a Kubernetes Secret for the Model Registry Token:** + +For local development, you might use a dummy token. For production, you'd generate a secure one. + +`kubectl create secret generic model-registry-auth --from-literal=token='your-dummy-token' -n kubeflow` + + +***Note: In a real-world scenario, you would ensure the `pipeline-runner` ServiceAccount (or the ServiceAccount used by your pipeline) has permissions to read this Secret and that the Secret is mounted as an environment variable into your component's pod. This is often handled by a higher-level Kubeflow deployment or by manually patching the ServiceAccount/Deployment if needed.*** + +# Building the KFP Pipeline for Model Registration + +### The `register_model_to_kubeflow_registry` Component + +Our pipeline consists of a single custom component, `register_model_to_kubeflow_registry`, defined using KFP's `@dsl.component` decorator. This component encapsulates the logic for interacting with the Model Registry API. + +* **Base Image & Dependencies:** The component uses a `python:3.11-slim-buster` base image, and the `model-registry==0.2.19` client library is automatically installed into its container environment via the `packages_to_install` argument in the decorator. +* **Model Registry Connection:** Inside the component, the `ModelRegistry` client is initialized. It connects to the Model Registry API server using its internal Kubernetes DNS name (`http://model-registry-service.kubeflow.svc.cluster.local:8080`). +* **Authentication:** For authentication, the component retrieves a token from the `MR_AUTH_TOKEN` environment variable using `os.environ.get()`. This environment variable is populated by mounting a Kubernetes Secret (`model-registry-auth`) into the component's pod, ensuring sensitive credentials are not hardcoded. +* **Dynamic Metadata for Lineage:** To achieve robust cross-referencing and lineage tracking, the component dynamically captures details about its own pipeline run. It retrieves values like `KFP_RUN_ID`, `KFP_PIPELINE_NAME`, and `KFP_POD_NAMESPACE` from environment variables that Kubeflow Pipelines automatically inject into every component pod. These values are then passed as `model_source_id`, `model_source_name`, `model_source_class`, `model_source_kind`, and `model_source_group` when registering the model. +* **Model Registration:** The core action involves calling `registry.register_model()`. This single call registers both the conceptual model and its specific version, requiring parameters such as `name`, `uri` (the model's artifact location), `description`, `model_format_name`, `model_format_version`, and the `version` string itself. +* **KFP Output Model Metadata:** To further enhance traceability within the KFP UI, the component writes a dummy model artifact to its `output_model` path. More importantly, it populates this `output_model`'s metadata with key information about the registered model, including its `modelName`, `versionName`, `modelID`, and a constructed `modelRegistryURL`. This metadata will be visible in the KFP UI's "Artifacts" tab for the pipeline run. + +### The `model_registration_pipeline` Definition + +The `model_registration_pipeline` orchestrates the `register_model_to_kubeflow_registry` component. This pipeline defines input parameters for the model's name, version, artifact URI, and author, allowing for flexible configuration when launching a run. It then invokes the component, passing these parameters. The pipeline is designed to return the registered model's ID as an output. + +### Compiling and Deploying the Pipeline + +Once defined in Python, the pipeline needs to be compiled into a YAML file, which is the format Kubeflow Pipelines understands. The `kfp.compiler.Compiler().compile()` function handles this, generating a `model_registration_pipeline.yaml` file. This YAML file can then be uploaded to the Kubeflow Pipelines UI and executed. + + + + +***Now, let's define a sample KFP pipeline that registers a fake model. This pipeline demonstrates best practices for setting `model_source` metadata, which is crucial for tracing model lineage back to its originating pipeline run.*** + +We'll use placeholders from `kfp.dsl` and `os.environ` to dynamically inject values like the pipeline run ID and name. + +**The complete pipeline code is available on this [GitHub](https://github.com/hpurdom/blog/blob/master/code_examples/model_registration_pipeline_example.py) link here.** diff --git a/code_examples/iris_model_registration_pipeline_opendatahub_pattern.yaml b/code_examples/iris_model_registration_pipeline_opendatahub_pattern.yaml new file mode 100644 index 00000000..1f6cdbef --- /dev/null +++ b/code_examples/iris_model_registration_pipeline_opendatahub_pattern.yaml @@ -0,0 +1,309 @@ +# PIPELINE DEFINITION +# Name: iris-model-registration-pipeline-following-opendatahub-io-ilab-on-ocp-pattern +# Description: A KFP pipeline that trains an Iris model and registers it in Kubeflow Model Registry using the exact pattern from opendatahub-io/ilab-on-ocp. +# Inputs: +# model_author: str [Default: 'ML Engineering Team'] +# model_name: str [Default: 'iris-classifier'] +# model_registry_api_url: str [Default: 'http://model-registry-service.kubeflow.svc.cluster.local:8080'] +# model_version_name: str [Default: 'v1.0.0'] +components: + comp-register-model-to-kubeflow-registry: + executorLabel: exec-register-model-to-kubeflow-registry + inputDefinitions: + artifacts: + model_artifact_uri: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + model_author: + defaultValue: Data Science Pipelines Team + isOptional: true + parameterType: STRING + model_description: + defaultValue: A model for demonstration purposes + isOptional: true + parameterType: STRING + model_name: + parameterType: STRING + model_registry_api_url: + defaultValue: http://model-registry-service.kubeflow.svc.cluster.local:8080 + isOptional: true + parameterType: STRING + model_registry_name: + defaultValue: '' + isOptional: true + parameterType: STRING + model_version_name: + parameterType: STRING + pipeline_name: + parameterType: STRING + pipeline_namespace: + parameterType: STRING + pipeline_run_id: + parameterType: STRING + outputDefinitions: + artifacts: + output_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + Output: + parameterType: STRING + comp-train-iris-model: + executorLabel: exec-train-iris-model + outputDefinitions: + artifacts: + output_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-register-model-to-kubeflow-registry: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - register_model_to_kubeflow_registry + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'model-registry==0.2.19'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef register_model_to_kubeflow_registry(\n model_name: str,\n\ + \ model_version_name: str,\n model_artifact_uri: dsl.Input[dsl.Model],\n\ + \ pipeline_run_id: str,\n pipeline_name: str,\n pipeline_namespace:\ + \ str,\n model_registry_api_url: str = \"http://model-registry-service.kubeflow.svc.cluster.local:8080\"\ + ,\n model_registry_name: str = \"\",\n model_description: str = \"\ + A model for demonstration purposes\",\n model_author: str = \"Data Science\ + \ Pipelines Team\",\n output_model: dsl.Output[dsl.Model] = None\n) ->\ + \ str:\n \"\"\"\n KFP component to register a model and version in\ + \ the Kubeflow Model Registry.\n This implementation follows the EXACT\ + \ pattern from opendatahub-io/ilab-on-ocp\n utils/components.py lines\ + \ 195-273 for proven reliability and best practices.\n \"\"\"\n print(\"\ + model-registry client is being installed by KFP before this script runs.\"\ + )\n\n # Import required modules - following opendatahub-io/ilab-on-ocp\ + \ pattern\n import urllib.parse\n import time\n from model_registry\ + \ import ModelRegistry\n from model_registry.types import RegisteredModel\n\ + \n # EXACT PATTERN: Extract the port out of the URL because the ModelRegistry\ + \ client expects those as separate arguments\n # This follows the exact\ + \ logic from opendatahub-io/ilab-on-ocp/utils/components.py lines 195-273\n\ + \ model_registry_api_url_parsed = urllib.parse.urlparse(model_registry_api_url)\n\ + \ model_registry_api_url_port = model_registry_api_url_parsed.port\n\ + \ if model_registry_api_url_port:\n model_registry_api_server_address\ + \ = model_registry_api_url.replace(\n model_registry_api_url_parsed.netloc,\n\ + \ model_registry_api_url_parsed.hostname,\n )\n else:\n\ + \ if model_registry_api_url_parsed.scheme == \"http\":\n \ + \ model_registry_api_url_port = 80\n else:\n model_registry_api_url_port\ + \ = 443\n model_registry_api_server_address = model_registry_api_url\n\ + \ if not model_registry_api_url_parsed.scheme:\n model_registry_api_server_address\ + \ = (\n \"https://\" + model_registry_api_server_address\n \ + \ )\n\n # Retrieve authentication token from environment variable\n\ + \ token = os.environ.get(\"MR_AUTH_TOKEN\", \"\")\n if not token:\n\ + \ print(\"Warning: MR_AUTH_TOKEN environment variable not found.\ + \ Proceeding without authentication.\")\n\n print(f\"Connecting to Model\ + \ Registry at {model_registry_api_server_address}:{model_registry_api_url_port}\"\ + )\n\n # EXACT PATTERN: Model registration with retry logic from opendatahub-io/ilab-on-ocp\n\ + \ tries = 0\n while True:\n try:\n tries += 1\n\ + \ registry = ModelRegistry(\n server_address=model_registry_api_server_address,\n\ + \ port=model_registry_api_url_port,\n author=model_author,\ + \ # Following \"InstructLab Pipeline\" pattern but using parameter\n \ + \ user_token=token,\n )\n registered_model\ + \ = registry.register_model(\n name=model_name,\n \ + \ version=model_version_name,\n uri=model_artifact_uri,\n\ + \ model_format_name=\"custom-format\", # Can be \"vLLM\"\ + \ for LLMs\n model_format_version=\"1.0\",\n \ + \ # EXACT PATTERN: model_source_* fields for cross-referencing\n \ + \ model_source_id=pipeline_run_id, # run_id parameter\n\ + \ model_source_name=pipeline_name, # run_name parameter\ + \ \n model_source_class=\"pipelinerun\", # KFP-specific\ + \ identifier\n model_source_kind=\"kfp\", #\ + \ KFP-specific identifier\n model_source_group=pipeline_namespace,\ + \ # pod_namespace equivalent\n )\n break\n \ + \ except Exception as e:\n if tries >= 3:\n raise\n\ + \ print(f\"Failed to register the model on attempt {tries}/3:\ + \ {e}\")\n time.sleep(1)\n\n # EXACT PATTERN: Get the model\ + \ version ID to add as metadata on the output model artifact\n tries\ + \ = 0\n while True:\n try:\n tries += 1\n \ + \ model_version_id = registry.get_model_version(\n model_name,\ + \ model_version_name\n ).id\n break\n except\ + \ Exception as e:\n if tries >= 3:\n raise\n \ + \ print(f\"Failed to get the model version ID on attempt {tries}/3:\ + \ {e}\")\n time.sleep(1)\n\n # EXACT PATTERN: If model_registry_name\ + \ is not provided, parse it from the URL\n if not model_registry_name:\n\ + \ model_registry_name = urllib.parse.urlparse(\n model_registry_api_url\n\ + \ ).hostname.split(\".\")[0]\n if model_registry_name.endswith(\"\ + -rest\"):\n model_registry_name = model_registry_name[: -len(\"\ + -rest\")]\n\n print(f\"Successfully registered model - ID: {registered_model.id},\ + \ Name: {registered_model.name}\")\n print(f\"Model version ID: {model_version_id}\"\ + )\n\n # Write content to the output model path (simulating a model artifact)\n\ + \ with open(output_model.path, 'w') as f:\n f.write(f\"This is\ + \ a model artifact for {model_name} version {model_version_name}.\")\n\n\ + \ # EXACT PATTERN: Set comprehensive metadata on the KFP output model\ + \ artifact\n # Following the opendatahub-io/ilab-on-ocp approach for\ + \ KFP UI integration\n output_model.metadata[\"registered_model\"] =\ + \ {\n \"modelName\": model_name,\n \"versionName\": model_version_name,\n\ + \ \"modelID\": registered_model.id,\n \"versionID\": model_version_id,\ + \ # Real version ID from API\n \"modelRegistryURL\": f\"{model_registry_api_server_address}:{model_registry_api_url_port}/models/{registered_model.id}/versions/{model_version_id}\"\ + ,\n \"modelRegistryAPIEndpoint\": model_registry_api_server_address,\n\ + \ \"modelRegistryName\": model_registry_name,\n \"registrationTimestamp\"\ + : time.time(),\n \"pipelineSource\": {\n \"runId\": pipeline_run_id,\n\ + \ \"pipelineName\": pipeline_name,\n \"namespace\"\ + : pipeline_namespace\n }\n }\n\n print(f\"KFP output model\ + \ artifact metadata set: {output_model.metadata}\")\n\n return registered_model.id\n\ + \n" + image: python:3.11-slim-buster + exec-train-iris-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_iris_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'scikit-learn==1.3.0'\ + \ 'pandas==2.0.3' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_iris_model(\n output_model: dsl.Output[dsl.Model]\n\ + ) -> str:\n \"\"\"\n Simple component that trains an Iris classification\ + \ model.\n This demonstrates a realistic pipeline that produces a model\ + \ to register.\n \"\"\"\n import pickle\n import pandas as pd\n\ + \ from sklearn.datasets import load_iris\n from sklearn.ensemble import\ + \ RandomForestClassifier\n from sklearn.model_selection import train_test_split\n\ + \ from sklearn.metrics import accuracy_score\n import json\n\n \ + \ # Load and prepare the Iris dataset\n iris = load_iris()\n X, y\ + \ = iris.data, iris.target\n\n # Split the data\n X_train, X_test,\ + \ y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n\ + \n # Train a simple Random Forest model\n model = RandomForestClassifier(n_estimators=100,\ + \ random_state=42)\n model.fit(X_train, y_train)\n\n # Calculate accuracy\n\ + \ y_pred = model.predict(X_test)\n accuracy = accuracy_score(y_test,\ + \ y_pred)\n\n print(f\"Model trained with accuracy: {accuracy:.4f}\"\ + )\n\n # Save the model\n with open(output_model.path, 'wb') as f:\n\ + \ pickle.dump(model, f)\n\n # Set model metadata\n output_model.metadata\ + \ = {\n \"accuracy\": accuracy,\n \"model_type\": \"RandomForestClassifier\"\ + ,\n \"n_estimators\": 100,\n \"dataset\": \"iris\",\n \ + \ \"features\": iris.feature_names,\n \"target_classes\": iris.target_names.tolist()\n\ + \ }\n\n return f\"s3://my-model-bucket/iris-models/run-{hash(str(accuracy))}/model.pkl\"\ + \n\n" + image: python:3.11-slim-buster +pipelineInfo: + description: A KFP pipeline that trains an Iris model and registers it in Kubeflow + Model Registry using the exact pattern from opendatahub-io/ilab-on-ocp. + name: iris-model-registration-pipeline-following-opendatahub-io-ilab-on-ocp-pattern +root: + dag: + tasks: + register-model-to-kubeflow-registry: + cachingOptions: + enableCache: true + componentRef: + name: comp-register-model-to-kubeflow-registry + dependentTasks: + - train-iris-model + inputs: + artifacts: + model_artifact_uri: + taskOutputArtifact: + outputArtifactKey: output_model + producerTask: train-iris-model + parameters: + model_author: + componentInputParameter: model_author + model_description: + runtimeValue: + constant: Random Forest classifier trained on Iris dataset + model_name: + componentInputParameter: model_name + model_registry_api_url: + componentInputParameter: model_registry_api_url + model_version_name: + componentInputParameter: model_version_name + pipeline_name: + runtimeValue: + constant: '{{$.pipeline_job_name}}' + pipeline_namespace: + runtimeValue: + constant: '{{workflow.namespace}}' + pipeline_run_id: + runtimeValue: + constant: '{{$.pipeline_job_uuid}}' + taskInfo: + name: Register Model in Registry + train-iris-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-iris-model + taskInfo: + name: Train Iris Model + inputDefinitions: + parameters: + model_author: + defaultValue: ML Engineering Team + isOptional: true + parameterType: STRING + model_name: + defaultValue: iris-classifier + isOptional: true + parameterType: STRING + model_registry_api_url: + defaultValue: http://model-registry-service.kubeflow.svc.cluster.local:8080 + isOptional: true + parameterType: STRING + model_version_name: + defaultValue: v1.0.0 + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.13.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-register-model-to-kubeflow-registry: + secretAsEnv: + - keyToEnv: + - envVar: MR_AUTH_TOKEN + secretKey: token + secretName: model-registry-auth + secretNameParameter: + runtimeValue: + constant: model-registry-auth diff --git a/code_examples/model_registration_pipeline.yaml b/code_examples/model_registration_pipeline.yaml new file mode 100644 index 00000000..4a69058e --- /dev/null +++ b/code_examples/model_registration_pipeline.yaml @@ -0,0 +1,222 @@ +# PIPELINE DEFINITION +# Name: model-registration-pipeline +# Description: A KFP pipeline to demonstrate registering a model in Kubeflow Model Registry. +# Inputs: +# model_artifact_uri: str [Default: 's3://my-model-bucket/fake/v1.0.0/model.pkl'] +# model_author: str [Default: 'Kubeflow Community'] +# model_name: str [Default: 'MyFakeModel'] +# model_version_name: str [Default: 'v1.0.0-blog-post'] +components: + comp-register-model-to-kubeflow-registry: + executorLabel: exec-register-model-to-kubeflow-registry + inputDefinitions: + parameters: + model_artifact_uri: + parameterType: STRING + model_author: + defaultValue: Data Science Pipelines Team + isOptional: true + parameterType: STRING + model_container_image: + defaultValue: my-fake-model-server:latest + isOptional: true + parameterType: STRING + model_data_type: + defaultValue: tabular + isOptional: true + parameterType: STRING + model_description: + defaultValue: A model for demonstration purposes + isOptional: true + parameterType: STRING + model_name: + parameterType: STRING + model_serving_input_schema: + defaultValue: '{"type": "object", "properties": {"feature1": {"type": "number"}}}' + isOptional: true + parameterType: STRING + model_serving_output_schema: + defaultValue: '{"type": "object", "properties": {"prediction": {"type": + "number"}}}' + isOptional: true + parameterType: STRING + model_serving_path: + defaultValue: /mnt/models/fake_model.pkl + isOptional: true + parameterType: STRING + model_storage_format: + defaultValue: pickle + isOptional: true + parameterType: STRING + model_version_name: + parameterType: STRING + pipeline_name: + parameterType: STRING + pipeline_namespace: + parameterType: STRING + pipeline_run_id: + parameterType: STRING + outputDefinitions: + artifacts: + output_model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-register-model-to-kubeflow-registry: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - register_model_to_kubeflow_registry + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'model-registry==0.2.19'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef register_model_to_kubeflow_registry(\n model_name: str,\n\ + \ model_version_name: str,\n model_artifact_uri: str,\n pipeline_run_id:\ + \ str,\n pipeline_name: str,\n pipeline_namespace: str,\n model_description:\ + \ str = \"A model for demonstration purposes\",\n model_author: str =\ + \ \"Data Science Pipelines Team\",\n model_data_type: str = \"tabular\"\ + ,\n model_storage_format: str = \"pickle\",\n model_container_image:\ + \ str = \"my-fake-model-server:latest\",\n model_serving_path: str =\ + \ \"/mnt/models/fake_model.pkl\",\n model_serving_input_schema: str =\ + \ '{\"type\": \"object\", \"properties\": {\"feature1\": {\"type\": \"number\"\ + }}}',\n model_serving_output_schema: str = '{\"type\": \"object\", \"\ + properties\": {\"prediction\": {\"type\": \"number\"}}}',\n output_model:\ + \ dsl.Output[dsl.Model] = None\n) -> str:\n \"\"\"\n KFP component\ + \ to register a model and version in the Kubeflow Model Registry.\n It\ + \ demonstrates best practices for model source metadata.\n \"\"\"\n \ + \ print(\"model-registry client is being installed by KFP before this\ + \ script runs.\")\n\n # Import ModelRegistry client within the component\ + \ function\n from model_registry import ModelRegistry\n from model_registry.types\ + \ import RegisteredModel\n\n # Model Registry service address within\ + \ the Kubernetes cluster\n MODEL_REGISTRY_SERVER_ADDRESS = \"http://model-registry-service.kubeflow-user-example-com.svc.cluster.local\"\ + \n MODEL_REGISTRY_PORT = 8080\n\n # Retrieve authentication token\ + \ from environment variable\n auth_token = os.environ.get(\"MR_AUTH_TOKEN\"\ + , \"\")\n if not auth_token:\n print(\"Warning: MR_AUTH_TOKEN\ + \ environment variable not found. Proceeding without authentication.\")\n\ + \n print(f\"Connecting to Model Registry at {MODEL_REGISTRY_SERVER_ADDRESS}:{MODEL_REGISTRY_PORT}\"\ + )\n registry = ModelRegistry(\n server_address=MODEL_REGISTRY_SERVER_ADDRESS,\n\ + \ port=MODEL_REGISTRY_PORT,\n author=model_author,\n \ + \ is_secure=False,\n user_token=auth_token\n )\n\n try:\n\ + \ # Register the Model and its Version in a single call\n \ + \ print(f\"Registering model: {model_name} version: {model_version_name}\ + \ with URI: {model_artifact_uri}\")\n registered_model_and_version\ + \ = registry.register_model(\n name=model_name,\n \ + \ uri=model_artifact_uri,\n description=model_description,\n\ + \ model_format_name=\"custom-format\",\n model_format_version=\"\ + 1.0\",\n version=model_version_name,\n owner=\"Kubeflow\ + \ Pipelines\",\n author=model_author,\n metadata={\n\ + \ \"training_epochs\": 100,\n \"accuracy\"\ + : 0.95,\n \"pipeline_run_id\": pipeline_run_id,\n \ + \ \"pipeline_name\": pipeline_name,\n \"pipeline_namespace\"\ + : pipeline_namespace,\n \"kfp_component_name\": \"register-model-to-kubeflow-registry\"\ + \n },\n model_source_id=pipeline_run_id,\n \ + \ model_source_name=pipeline_name,\n model_source_class=\"\ + pipelinerun\",\n model_source_kind=\"kfp\",\n model_source_group=pipeline_namespace,\n\ + \ )\n print(f\"Registered Model ID: {registered_model_and_version.id},\ + \ Name: {registered_model_and_version.name}\")\n\n print(f\"Successfully\ + \ registered model and version in Model Registry.\")\n\n # Write\ + \ content to the output model path\n with open(output_model.path,\ + \ 'w') as f:\n f.write(f\"This is a fake model artifact for {model_name}\ + \ version {model_version_name}.\")\n\n # Set metadata on the KFP\ + \ output model artifact\n output_model.metadata[\"registered_model\"\ + ] = {\n \"modelName\": registered_model_and_version.name,\n \ + \ \"versionName\": model_version_name,\n \"modelID\"\ + : registered_model_and_version.id,\n \"versionID\": \"placeholder-version-id\"\ + ,\n \"modelRegistryURL\": f\"http://localhost:{MODEL_REGISTRY_PORT}/models/{registered_model_and_version.id}/versions/{model_version_name}\"\ + ,\n \"modelRegistryAPIEndpoint\": MODEL_REGISTRY_SERVER_ADDRESS\n\ + \ }\n print(f\"KFP output model artifact metadata set: {output_model.metadata}\"\ + )\n\n return registered_model_and_version.id\n\n except Exception\ + \ as e:\n print(f\"An error occurred during model registration: {e}\"\ + )\n raise\n\n" + image: python:3.11-slim-buster +pipelineInfo: + description: A KFP pipeline to demonstrate registering a model in Kubeflow Model + Registry. + name: model-registration-pipeline +root: + dag: + tasks: + register-model-to-kubeflow-registry: + cachingOptions: + enableCache: true + componentRef: + name: comp-register-model-to-kubeflow-registry + inputs: + parameters: + model_artifact_uri: + componentInputParameter: model_artifact_uri + model_author: + componentInputParameter: model_author + model_name: + componentInputParameter: model_name + model_version_name: + componentInputParameter: model_version_name + pipeline_name: + runtimeValue: + constant: '{{pipeline_job_name}}' + pipeline_namespace: + runtimeValue: + constant: kubeflow-user-example-com + pipeline_run_id: + runtimeValue: + constant: '{{pipeline_run_id}}' + taskInfo: + name: register-model-to-kubeflow-registry + inputDefinitions: + parameters: + model_artifact_uri: + defaultValue: s3://my-model-bucket/fake/v1.0.0/model.pkl + isOptional: true + parameterType: STRING + model_author: + defaultValue: Kubeflow Community + isOptional: true + parameterType: STRING + model_name: + defaultValue: MyFakeModel + isOptional: true + parameterType: STRING + model_version_name: + defaultValue: v1.0.0-blog-post + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.13.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-register-model-to-kubeflow-registry: + secretAsEnv: + - keyToEnv: + - envVar: MR_AUTH_TOKEN + secretKey: token + secretName: model-registry-auth + secretNameParameter: + runtimeValue: + constant: model-registry-auth diff --git a/code_examples/model_registration_pipeline_example.py b/code_examples/model_registration_pipeline_example.py new file mode 100644 index 00000000..59742e83 --- /dev/null +++ b/code_examples/model_registration_pipeline_example.py @@ -0,0 +1,266 @@ +from kfp import dsl, compiler +import json +import os +import time +import kfp + +# Import available KFP placeholders - some may not exist in all versions +try: + from kfp.dsl import PIPELINE_JOB_ID_PLACEHOLDER, PIPELINE_JOB_NAME_PLACEHOLDER + # PIPELINE_JOB_NAMESPACE_PLACEHOLDER may not exist, we'll use a fallback + PIPELINE_JOB_NAMESPACE_PLACEHOLDER = "{{workflow.namespace}}" +except ImportError: + # Fallback to string placeholders if DSL constants don't exist + PIPELINE_JOB_ID_PLACEHOLDER = "{{workflow.uid}}" + PIPELINE_JOB_NAME_PLACEHOLDER = "{{workflow.name}}" + PIPELINE_JOB_NAMESPACE_PLACEHOLDER = "{{workflow.namespace}}" + +# NOTE: You have successfully installed kfp-kubernetes-1.5.0 +# This provides Kubernetes-specific functionality not included in the core KFP SDK + +# Define the base image for our pipeline components +BASE_IMAGE = 'python:3.11-slim-buster' + +@dsl.component( + base_image=BASE_IMAGE, + packages_to_install=['model-registry==0.2.19'] +) +def register_model_to_kubeflow_registry( + model_name: str, + model_version_name: str, + model_artifact_uri: dsl.Input[dsl.Model], + pipeline_run_id: str, + pipeline_name: str, + pipeline_namespace: str, + model_registry_api_url: str = "http://model-registry-service.kubeflow.svc.cluster.local:8080", + model_registry_name: str = "", + model_description: str = "A model for demonstration purposes", + model_author: str = "Data Science Pipelines Team", + output_model: dsl.Output[dsl.Model] = None +) -> str: + """ + KFP component to register a model and version in the Kubeflow Model Registry. + This implementation follows the EXACT pattern from opendatahub-io/ilab-on-ocp + utils/components.py lines 195-273 for proven reliability and best practices. + """ + print("model-registry client is being installed by KFP before this script runs.") + + # Import required modules - following opendatahub-io/ilab-on-ocp pattern + import urllib.parse + import time + from model_registry import ModelRegistry + from model_registry.types import RegisteredModel + + + model_registry_api_url_parsed = urllib.parse.urlparse(model_registry_api_url) + model_registry_api_url_port = model_registry_api_url_parsed.port + if model_registry_api_url_port: + model_registry_api_server_address = model_registry_api_url.replace( + model_registry_api_url_parsed.netloc, + model_registry_api_url_parsed.hostname, + ) + else: + if model_registry_api_url_parsed.scheme == "http": + model_registry_api_url_port = 80 + else: + model_registry_api_url_port = 443 + model_registry_api_server_address = model_registry_api_url + if not model_registry_api_url_parsed.scheme: + model_registry_api_server_address = ( + "https://" + model_registry_api_server_address + ) + + # Retrieve authentication token from environment variable + token = os.environ.get("MR_AUTH_TOKEN", "") + if not token: + print("Warning: MR_AUTH_TOKEN environment variable not found. Proceeding without authentication.") + + print(f"Connecting to Model Registry at {model_registry_api_server_address}:{model_registry_api_url_port}") + + + tries = 0 + while True: + try: + tries += 1 + registry = ModelRegistry( + server_address=model_registry_api_server_address, + port=model_registry_api_url_port, + author=model_author, + user_token=token, + ) + registered_model = registry.register_model( + name=model_name, + version=model_version_name, + uri=model_artifact_uri, + model_format_name="custom-format", + model_format_version="1.0", + + model_source_id=pipeline_run_id, # run_id parameter + model_source_name=pipeline_name, # run_name parameter + model_source_class="pipelinerun", # KFP-specific identifier + model_source_kind="kfp", # KFP-specific identifier + model_source_group=pipeline_namespace, # pod_namespace equivalent + ) + break + except Exception as e: + if tries >= 3: + raise + print(f"Failed to register the model on attempt {tries}/3: {e}") + time.sleep(1) + + # Get the model version ID to add as metadata on the output model artifact + tries = 0 + while True: + try: + tries += 1 + model_version_id = registry.get_model_version( + model_name, model_version_name + ).id + break + except Exception as e: + if tries >= 3: + raise + print(f"Failed to get the model version ID on attempt {tries}/3: {e}") + time.sleep(1) + + # If model_registry_name is not provided, parse it from the URL + if not model_registry_name: + model_registry_name = urllib.parse.urlparse( + model_registry_api_url + ).hostname.split(".")[0] + if model_registry_name.endswith("-rest"): + model_registry_name = model_registry_name[: -len("-rest")] + + print(f"Successfully registered model - ID: {registered_model.id}, Name: {registered_model.name}") + print(f"Model version ID: {model_version_id}") + + # Write content to the output model path (simulating a model artifact) + with open(output_model.path, 'w') as f: + f.write(f"This is a model artifact for {model_name} version {model_version_name}.") + + + output_model.metadata["registered_model"] = { + "modelName": model_name, + "versionName": model_version_name, + "modelID": registered_model.id, + "versionID": model_version_id, # Real version ID from API + "modelRegistryURL": f"{model_registry_api_server_address}:{model_registry_api_url_port}/models/{registered_model.id}/versions/{model_version_id}", + "modelRegistryAPIEndpoint": model_registry_api_server_address, + "modelRegistryName": model_registry_name, + "registrationTimestamp": time.time(), + "pipelineSource": { + "runId": pipeline_run_id, + "pipelineName": pipeline_name, + "namespace": pipeline_namespace + } + } + + print(f"KFP output model artifact metadata set: {output_model.metadata}") + + return registered_model.id + +@dsl.component( + base_image=BASE_IMAGE, + packages_to_install=['scikit-learn==1.3.0', 'pandas==2.0.3'] +) +def train_iris_model( + output_model: dsl.Output[dsl.Model] +) -> str: + """ + Simple component that trains an Iris classification model. + This demonstrates a realistic pipeline that produces a model to register. + """ + import pickle + import pandas as pd + from sklearn.datasets import load_iris + from sklearn.ensemble import RandomForestClassifier + from sklearn.model_selection import train_test_split + from sklearn.metrics import accuracy_score + import json + + # Load and prepare the Iris dataset + iris = load_iris() + X, y = iris.data, iris.target + + # Split the data + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + + # Train a simple Random Forest model + model = RandomForestClassifier(n_estimators=100, random_state=42) + model.fit(X_train, y_train) + + # Calculate accuracy + y_pred = model.predict(X_test) + accuracy = accuracy_score(y_test, y_pred) + + print(f"Model trained with accuracy: {accuracy:.4f}") + + # Save the model + with open(output_model.path, 'wb') as f: + pickle.dump(model, f) + + # Set model metadata + output_model.metadata = { + "accuracy": accuracy, + "model_type": "RandomForestClassifier", + "n_estimators": 100, + "dataset": "iris", + "features": iris.feature_names, + "target_classes": iris.target_names.tolist() + } + + return f"s3://my-model-bucket/iris-models/run-{hash(str(accuracy))}/model.pkl" + +@dsl.pipeline( + name="Iris Model Registration Pipeline - Following opendatahub-io/ilab-on-ocp Pattern", + description="A KFP pipeline that trains an Iris model and registers it in Kubeflow Model Registry using the exact pattern from opendatahub-io/ilab-on-ocp." +) +def iris_model_registration_pipeline( + model_name: str = "iris-classifier", + model_version_name: str = "v1.0.0", + model_author: str = "Data Science Pipelines Team", + model_registry_api_url: str = "http://model-registry-service.kubeflow.svc.cluster.local:8080" +): + + # Step 1: Train the model + train_task = train_iris_model() + + # Step 2: Register the model + register_task = register_model_to_kubeflow_registry( + model_name=model_name, + model_version_name=model_version_name, + model_artifact_uri=train_task.outputs["output_model"], # Reference output by name + model_author=model_author, + model_description=f"Random Forest classifier trained on Iris dataset", + model_registry_api_url=model_registry_api_url, + # EXACT PATTERN: Use proper KFP placeholders + pipeline_run_id=PIPELINE_JOB_ID_PLACEHOLDER, # run_id parameter + pipeline_name=PIPELINE_JOB_NAME_PLACEHOLDER, # run_name parameter + pipeline_namespace=PIPELINE_JOB_NAMESPACE_PLACEHOLDER, # namespace context + ) + + # Import the kfp-kubernetes extension for secret handling + from kfp.kubernetes import use_secret_as_env + + # Mount the Secret containing the Model Registry auth token + use_secret_as_env( + register_task, + secret_name="model-registry-auth", + secret_key_to_env={"token": "MR_AUTH_TOKEN"} + ) + + # Set task display names for better UI experience + train_task.set_display_name("Train Iris Model") + register_task.set_display_name("Register Model in Registry") + + # Ensure registration happens after training + register_task.after(train_task) + +# Compile the pipeline +pipeline_filename = "iris_model_registration_pipeline_opendatahub_pattern.yaml" +compiler.Compiler().compile(iris_model_registration_pipeline, pipeline_filename) + +print(f"\nPipeline compiled to {pipeline_filename}") +print(f"You can now upload '{pipeline_filename}' to the Kubeflow Pipelines UI.") +print("\n" + "="*80) + diff --git a/code_examples/modelregistrationpipelinetest.ipynb b/code_examples/modelregistrationpipelinetest.ipynb new file mode 100644 index 00000000..819e2c28 --- /dev/null +++ b/code_examples/modelregistrationpipelinetest.ipynb @@ -0,0 +1,405 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 9, + "id": "3ca21cff", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Requirement already satisfied: kfp in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (2.13.0)\n", + "Requirement already satisfied: click<9,>=8.0.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (8.2.1)\n", + "Requirement already satisfied: docstring-parser<1,>=0.7.3 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (0.17.0)\n", + "Requirement already satisfied: google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (2.25.1)\n", + "Requirement already satisfied: google-auth<3,>=1.6.1 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (2.40.3)\n", + "Requirement already satisfied: google-cloud-storage<4,>=2.2.1 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (3.2.0)\n", + "Requirement already satisfied: kfp-pipeline-spec==0.6.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (0.6.0)\n", + "Requirement already satisfied: kfp-server-api<2.5.0,>=2.1.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (2.4.0)\n", + "Requirement already satisfied: kubernetes<31,>=8.0.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (30.1.0)\n", + "Requirement already satisfied: protobuf<5,>=4.21.1 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (4.25.8)\n", + "Requirement already satisfied: PyYAML<7,>=5.3 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (6.0.2)\n", + "Requirement already satisfied: requests-toolbelt<2,>=0.8.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (1.0.0)\n", + "Requirement already satisfied: tabulate<1,>=0.8.6 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (0.9.0)\n", + "Requirement already satisfied: urllib3<3.0.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp) (2.5.0)\n", + "Requirement already satisfied: googleapis-common-protos<2.0.0,>=1.56.2 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->kfp) (1.70.0)\n", + "Requirement already satisfied: proto-plus<2.0.0,>=1.22.3 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->kfp) (1.26.1)\n", + "Requirement already satisfied: requests<3.0.0,>=2.18.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->kfp) (2.32.4)\n", + "Requirement already satisfied: cachetools<6.0,>=2.0.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-auth<3,>=1.6.1->kfp) (5.5.2)\n", + "Requirement already satisfied: pyasn1-modules>=0.2.1 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-auth<3,>=1.6.1->kfp) (0.4.2)\n", + "Requirement already satisfied: rsa<5,>=3.1.4 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-auth<3,>=1.6.1->kfp) (4.9.1)\n", + "Requirement already satisfied: google-cloud-core<3.0.0,>=2.4.2 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-cloud-storage<4,>=2.2.1->kfp) (2.4.3)\n", + "Requirement already satisfied: google-resumable-media<3.0.0,>=2.7.2 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-cloud-storage<4,>=2.2.1->kfp) (2.7.2)\n", + "Requirement already satisfied: google-crc32c<2.0.0,>=1.1.3 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from google-cloud-storage<4,>=2.2.1->kfp) (1.7.1)\n", + "Requirement already satisfied: six>=1.10 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp-server-api<2.5.0,>=2.1.0->kfp) (1.17.0)\n", + "Requirement already satisfied: certifi in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp-server-api<2.5.0,>=2.1.0->kfp) (2025.7.14)\n", + "Requirement already satisfied: python-dateutil in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kfp-server-api<2.5.0,>=2.1.0->kfp) (2.9.0.post0)\n", + "Requirement already satisfied: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kubernetes<31,>=8.0.0->kfp) (1.8.0)\n", + "Requirement already satisfied: requests-oauthlib in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kubernetes<31,>=8.0.0->kfp) (2.0.0)\n", + "Requirement already satisfied: oauthlib>=3.2.2 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from kubernetes<31,>=8.0.0->kfp) (3.3.1)\n", + "Requirement already satisfied: charset_normalizer<4,>=2 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from requests<3.0.0,>=2.18.0->google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->kfp) (3.4.2)\n", + "Requirement already satisfied: idna<4,>=2.5 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from requests<3.0.0,>=2.18.0->google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->kfp) (3.10)\n", + "Requirement already satisfied: pyasn1>=0.1.3 in /home/hpurdom/.pyenv/versions/3.11.9/lib/python3.11/site-packages (from rsa<5,>=3.1.4->google-auth<3,>=1.6.1->kfp) (0.6.1)\n", + "Note: you may need to restart the kernel to use updated packages.\n", + "\n", + "Pipeline compiled to iris_model_registration_pipeline_opendatahub_pattern.yaml\n", + "You can now upload 'iris_model_registration_pipeline_opendatahub_pattern.yaml' to the Kubeflow Pipelines UI.\n", + "\n", + "================================================================================\n", + "āœ… IMPLEMENTATION FOLLOWS EXACT opendatahub-io/ilab-on-ocp PATTERN\n", + "================================================================================\n", + "āœ… Exact URL parsing logic from the reference\n", + "āœ… Retry mechanisms for robust operation\n", + "āœ… Proper error handling patterns\n", + "āœ… Real version ID retrieval logic\n", + "āœ… Registry name parsing from URL\n", + "āœ… Same parameter patterns as the working example\n", + "āœ… Uses proper KFP placeholders (with fallback for namespace)\n", + "āœ… PIPELINE_JOB_ID_PLACEHOLDER and PIPELINE_JOB_NAME_PLACEHOLDER\n", + "āœ… Proper model_source_* fields for cross-referencing\n", + "āœ… Comprehensive KFP output model metadata\n", + "āœ… Realistic model training component\n", + "\n", + "šŸ“ Model Registry Integration KEP:\n", + "This manual process could be enhanced by the Model Registry integration KEP\n", + "which proposes automatic registration during pipeline execution.\n", + "\n", + "šŸ”— References:\n", + "- opendatahub-io/ilab-on-ocp/utils/components.py#L195-L273\n", + "- opendatahub-io/ilab-on-ocp/pipeline.py#L438-L450\n" + ] + } + ], + "source": [ + "%pip install kfp\n", + "\n", + "from kfp import dsl, compiler\n", + "import json\n", + "import os\n", + "import time\n", + "import kfp\n", + "\n", + "# Import available KFP placeholders - some may not exist in all versions\n", + "try:\n", + " from kfp.dsl import PIPELINE_JOB_ID_PLACEHOLDER, PIPELINE_JOB_NAME_PLACEHOLDER\n", + " # PIPELINE_JOB_NAMESPACE_PLACEHOLDER may not exist, we'll use a fallback\n", + " PIPELINE_JOB_NAMESPACE_PLACEHOLDER = \"{{workflow.namespace}}\"\n", + "except ImportError:\n", + " # Fallback to string placeholders if DSL constants don't exist\n", + " PIPELINE_JOB_ID_PLACEHOLDER = \"{{workflow.uid}}\"\n", + " PIPELINE_JOB_NAME_PLACEHOLDER = \"{{workflow.name}}\"\n", + " PIPELINE_JOB_NAMESPACE_PLACEHOLDER = \"{{workflow.namespace}}\"\n", + "\n", + "# NOTE: You have successfully installed kfp-kubernetes-1.5.0\n", + "# This provides Kubernetes-specific functionality not included in the core KFP SDK\n", + "\n", + "# Define the base image for our pipeline components\n", + "BASE_IMAGE = 'python:3.11-slim-buster'\n", + "\n", + "@dsl.component(\n", + " base_image=BASE_IMAGE,\n", + " packages_to_install=['model-registry==0.2.19']\n", + ")\n", + "def register_model_to_kubeflow_registry(\n", + " model_name: str,\n", + " model_version_name: str,\n", + " model_artifact_uri: dsl.Input[dsl.Model],\n", + " pipeline_run_id: str,\n", + " pipeline_name: str,\n", + " pipeline_namespace: str,\n", + " model_registry_api_url: str = \"http://model-registry-service.kubeflow.svc.cluster.local:8080\",\n", + " model_registry_name: str = \"\",\n", + " model_description: str = \"A model for demonstration purposes\",\n", + " model_author: str = \"Data Science Pipelines Team\",\n", + " output_model: dsl.Output[dsl.Model] = None\n", + ") -> str:\n", + " \"\"\"\n", + " KFP component to register a model and version in the Kubeflow Model Registry.\n", + " This implementation follows the EXACT pattern from opendatahub-io/ilab-on-ocp\n", + " utils/components.py lines 195-273 for proven reliability and best practices.\n", + " \"\"\"\n", + " print(\"model-registry client is being installed by KFP before this script runs.\")\n", + "\n", + " # Import required modules - following opendatahub-io/ilab-on-ocp pattern\n", + " import urllib.parse\n", + " import time\n", + " from model_registry import ModelRegistry\n", + " from model_registry.types import RegisteredModel\n", + "\n", + " # EXACT PATTERN: Extract the port out of the URL because the ModelRegistry client expects those as separate arguments\n", + " # This follows the exact logic from opendatahub-io/ilab-on-ocp/utils/components.py lines 195-273\n", + " model_registry_api_url_parsed = urllib.parse.urlparse(model_registry_api_url)\n", + " model_registry_api_url_port = model_registry_api_url_parsed.port\n", + " if model_registry_api_url_port:\n", + " model_registry_api_server_address = model_registry_api_url.replace(\n", + " model_registry_api_url_parsed.netloc,\n", + " model_registry_api_url_parsed.hostname,\n", + " )\n", + " else:\n", + " if model_registry_api_url_parsed.scheme == \"http\":\n", + " model_registry_api_url_port = 80\n", + " else:\n", + " model_registry_api_url_port = 443\n", + " model_registry_api_server_address = model_registry_api_url\n", + " if not model_registry_api_url_parsed.scheme:\n", + " model_registry_api_server_address = (\n", + " \"https://\" + model_registry_api_server_address\n", + " )\n", + "\n", + " # Retrieve authentication token from environment variable\n", + " token = os.environ.get(\"MR_AUTH_TOKEN\", \"\")\n", + " if not token:\n", + " print(\"Warning: MR_AUTH_TOKEN environment variable not found. Proceeding without authentication.\")\n", + "\n", + " print(f\"Connecting to Model Registry at {model_registry_api_server_address}:{model_registry_api_url_port}\")\n", + "\n", + " # EXACT PATTERN: Model registration with retry logic from opendatahub-io/ilab-on-ocp\n", + " tries = 0\n", + " while True:\n", + " try:\n", + " tries += 1\n", + " registry = ModelRegistry(\n", + " server_address=model_registry_api_server_address,\n", + " port=model_registry_api_url_port,\n", + " author=model_author, # Following \"InstructLab Pipeline\" pattern but using parameter\n", + " user_token=token,\n", + " )\n", + " registered_model = registry.register_model(\n", + " name=model_name,\n", + " version=model_version_name,\n", + " uri=model_artifact_uri,\n", + " model_format_name=\"custom-format\", # Can be \"vLLM\" for LLMs\n", + " model_format_version=\"1.0\",\n", + " # EXACT PATTERN: model_source_* fields for cross-referencing\n", + " model_source_id=pipeline_run_id, # run_id parameter\n", + " model_source_name=pipeline_name, # run_name parameter \n", + " model_source_class=\"pipelinerun\", # KFP-specific identifier\n", + " model_source_kind=\"kfp\", # KFP-specific identifier\n", + " model_source_group=pipeline_namespace, # pod_namespace equivalent\n", + " )\n", + " break\n", + " except Exception as e:\n", + " if tries >= 3:\n", + " raise\n", + " print(f\"Failed to register the model on attempt {tries}/3: {e}\")\n", + " time.sleep(1)\n", + " \n", + " # EXACT PATTERN: Get the model version ID to add as metadata on the output model artifact\n", + " tries = 0\n", + " while True:\n", + " try:\n", + " tries += 1\n", + " model_version_id = registry.get_model_version(\n", + " model_name, model_version_name\n", + " ).id\n", + " break\n", + " except Exception as e:\n", + " if tries >= 3:\n", + " raise\n", + " print(f\"Failed to get the model version ID on attempt {tries}/3: {e}\")\n", + " time.sleep(1)\n", + " \n", + " # EXACT PATTERN: If model_registry_name is not provided, parse it from the URL\n", + " if not model_registry_name:\n", + " model_registry_name = urllib.parse.urlparse(\n", + " model_registry_api_url\n", + " ).hostname.split(\".\")[0]\n", + " if model_registry_name.endswith(\"-rest\"):\n", + " model_registry_name = model_registry_name[: -len(\"-rest\")]\n", + "\n", + " print(f\"Successfully registered model - ID: {registered_model.id}, Name: {registered_model.name}\")\n", + " print(f\"Model version ID: {model_version_id}\")\n", + " \n", + " # Write content to the output model path (simulating a model artifact)\n", + " with open(output_model.path, 'w') as f:\n", + " f.write(f\"This is a model artifact for {model_name} version {model_version_name}.\")\n", + " \n", + " # EXACT PATTERN: Set comprehensive metadata on the KFP output model artifact\n", + " # Following the opendatahub-io/ilab-on-ocp approach for KFP UI integration\n", + " output_model.metadata[\"registered_model\"] = {\n", + " \"modelName\": model_name,\n", + " \"versionName\": model_version_name,\n", + " \"modelID\": registered_model.id,\n", + " \"versionID\": model_version_id, # Real version ID from API\n", + " \"modelRegistryURL\": f\"{model_registry_api_server_address}:{model_registry_api_url_port}/models/{registered_model.id}/versions/{model_version_id}\",\n", + " \"modelRegistryAPIEndpoint\": model_registry_api_server_address,\n", + " \"modelRegistryName\": model_registry_name,\n", + " \"registrationTimestamp\": time.time(),\n", + " \"pipelineSource\": {\n", + " \"runId\": pipeline_run_id,\n", + " \"pipelineName\": pipeline_name,\n", + " \"namespace\": pipeline_namespace\n", + " }\n", + " }\n", + " \n", + " print(f\"KFP output model artifact metadata set: {output_model.metadata}\")\n", + " \n", + " return registered_model.id\n", + "\n", + "@dsl.component(\n", + " base_image=BASE_IMAGE,\n", + " packages_to_install=['scikit-learn==1.3.0', 'pandas==2.0.3']\n", + ")\n", + "def train_iris_model(\n", + " output_model: dsl.Output[dsl.Model]\n", + ") -> str:\n", + " \"\"\"\n", + " Simple component that trains an Iris classification model.\n", + " This demonstrates a realistic pipeline that produces a model to register.\n", + " \"\"\"\n", + " import pickle\n", + " import pandas as pd\n", + " from sklearn.datasets import load_iris\n", + " from sklearn.ensemble import RandomForestClassifier\n", + " from sklearn.model_selection import train_test_split\n", + " from sklearn.metrics import accuracy_score\n", + " import json\n", + " \n", + " # Load and prepare the Iris dataset\n", + " iris = load_iris()\n", + " X, y = iris.data, iris.target\n", + " \n", + " # Split the data\n", + " X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n", + " \n", + " # Train a simple Random Forest model\n", + " model = RandomForestClassifier(n_estimators=100, random_state=42)\n", + " model.fit(X_train, y_train)\n", + " \n", + " # Calculate accuracy\n", + " y_pred = model.predict(X_test)\n", + " accuracy = accuracy_score(y_test, y_pred)\n", + " \n", + " print(f\"Model trained with accuracy: {accuracy:.4f}\")\n", + " \n", + " # Save the model\n", + " with open(output_model.path, 'wb') as f:\n", + " pickle.dump(model, f)\n", + " \n", + " # Set model metadata\n", + " output_model.metadata = {\n", + " \"accuracy\": accuracy,\n", + " \"model_type\": \"RandomForestClassifier\",\n", + " \"n_estimators\": 100,\n", + " \"dataset\": \"iris\",\n", + " \"features\": iris.feature_names,\n", + " \"target_classes\": iris.target_names.tolist()\n", + " }\n", + " \n", + " return f\"s3://my-model-bucket/iris-models/run-{hash(str(accuracy))}/model.pkl\"\n", + "\n", + "@dsl.pipeline(\n", + " name=\"Iris Model Registration Pipeline - Following opendatahub-io/ilab-on-ocp Pattern\",\n", + " description=\"A KFP pipeline that trains an Iris model and registers it in Kubeflow Model Registry using the exact pattern from opendatahub-io/ilab-on-ocp.\"\n", + ")\n", + "def iris_model_registration_pipeline(\n", + " model_name: str = \"iris-classifier\",\n", + " model_version_name: str = \"v1.0.0\",\n", + " model_author: str = \"ML Engineering Team\",\n", + " model_registry_api_url: str = \"http://model-registry-service.kubeflow.svc.cluster.local:8080\"\n", + "):\n", + " \"\"\"\n", + " Complete pipeline demonstrating the EXACT pattern from opendatahub-io/ilab-on-ocp.\n", + " \n", + " This pipeline follows the proven approach from:\n", + " - utils/components.py lines 195-273 (component implementation) \n", + " - pipeline.py lines 438-450 (pipeline usage with placeholders)\n", + " \n", + " Features implemented:\n", + " 1. āœ… Exact URL parsing logic from the reference\n", + " 2. āœ… Retry mechanisms for robust operation \n", + " 3. āœ… Proper error handling patterns\n", + " 4. āœ… Real version ID retrieval logic\n", + " 5. āœ… Registry name parsing from URL\n", + " 6. āœ… Same parameter patterns as the working example\n", + " \"\"\"\n", + " \n", + " # Step 1: Train the model\n", + " train_task = train_iris_model()\n", + " \n", + " # Step 2: Register the model using the EXACT PATTERN from opendatahub-io/ilab-on-ocp\n", + " # This matches the approach in pipeline.py lines 438-450\n", + " register_task = register_model_to_kubeflow_registry(\n", + " model_name=model_name,\n", + " model_version_name=model_version_name,\n", + " model_artifact_uri=train_task.outputs[\"output_model\"], # Reference output by name\n", + " model_author=model_author,\n", + " model_description=f\"Random Forest classifier trained on Iris dataset\",\n", + " model_registry_api_url=model_registry_api_url,\n", + " # EXACT PATTERN: Use proper KFP placeholders like opendatahub-io/ilab-on-ocp\n", + " # Using available placeholders with fallback for namespace\n", + " pipeline_run_id=PIPELINE_JOB_ID_PLACEHOLDER, # run_id parameter\n", + " pipeline_name=PIPELINE_JOB_NAME_PLACEHOLDER, # run_name parameter \n", + " pipeline_namespace=PIPELINE_JOB_NAMESPACE_PLACEHOLDER, # namespace context\n", + " )\n", + " \n", + " # Import the kfp-kubernetes extension for secret handling\n", + " from kfp.kubernetes import use_secret_as_env\n", + " \n", + " # Mount the Secret containing the Model Registry auth token\n", + " use_secret_as_env(\n", + " register_task,\n", + " secret_name=\"model-registry-auth\",\n", + " secret_key_to_env={\"token\": \"MR_AUTH_TOKEN\"}\n", + " )\n", + " \n", + " # Set task display names for better UI experience\n", + " train_task.set_display_name(\"Train Iris Model\")\n", + " register_task.set_display_name(\"Register Model in Registry\")\n", + " \n", + " # Ensure registration happens after training\n", + " register_task.after(train_task)\n", + "\n", + "# Compile the pipeline\n", + "pipeline_filename = \"iris_model_registration_pipeline_opendatahub_pattern.yaml\"\n", + "compiler.Compiler().compile(iris_model_registration_pipeline, pipeline_filename)\n", + "\n", + "print(f\"\\nPipeline compiled to {pipeline_filename}\")\n", + "print(f\"You can now upload '{pipeline_filename}' to the Kubeflow Pipelines UI.\")\n", + "print(\"\\n\" + \"=\"*80)\n", + "print(\"āœ… IMPLEMENTATION FOLLOWS EXACT opendatahub-io/ilab-on-ocp PATTERN\")\n", + "print(\"=\"*80)\n", + "print(\"āœ… Exact URL parsing logic from the reference\")\n", + "print(\"āœ… Retry mechanisms for robust operation\") \n", + "print(\"āœ… Proper error handling patterns\")\n", + "print(\"āœ… Real version ID retrieval logic\")\n", + "print(\"āœ… Registry name parsing from URL\")\n", + "print(\"āœ… Same parameter patterns as the working example\")\n", + "print(\"āœ… Uses proper KFP placeholders (with fallback for namespace)\")\n", + "print(\"āœ… PIPELINE_JOB_ID_PLACEHOLDER and PIPELINE_JOB_NAME_PLACEHOLDER\")\n", + "print(\"āœ… Proper model_source_* fields for cross-referencing\")\n", + "print(\"āœ… Comprehensive KFP output model metadata\")\n", + "print(\"āœ… Realistic model training component\")\n", + "print(\"\\nšŸ“ Model Registry Integration KEP:\")\n", + "print(\"This manual process could be enhanced by the Model Registry integration KEP\")\n", + "print(\"which proposes automatic registration during pipeline execution.\")\n", + "print(\"\\nšŸ”— References:\")\n", + "print(\"- opendatahub-io/ilab-on-ocp/utils/components.py#L195-L273\")\n", + "print(\"- opendatahub-io/ilab-on-ocp/pipeline.py#L438-L450\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "3.11.9", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}