Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/data_processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
"""

# Auto-generated imports will be added here by scripts/update_init_imports.py
# Components will be imported dynamically based on subdirectories
# Components will be imported dynamically based on subdirectories
88 changes: 88 additions & 0 deletions components/data_processing/yoda_data_preparation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from google.auth.exceptions import InvalidValue
from typing import Any

from kfp import dsl
import kfp.compiler


@dsl.component(
packages_to_install=["datasets"],
)
def prepare_yoda_dataset(
yoda_input_dataset: str,
yoda_train_dataset: dsl.Output[dsl.Dataset],
yoda_eval_dataset: dsl.Output[dsl.Dataset],
operation_map: dict[str, Any] = {"rename_column": {"sentence":"prompt"}},
train_split_ratio: float = 0.8,
):
"""Prepare the training and evaluation datasets by downloading and preprocessing.

Downloads the yoda_sentences dataset from HuggingFace, renames columns to match
the expected format for training (prompt/completion), splits into train/eval sets,
and saves them as output artifacts.

Args:
yoda_input_dataset (str): Dataset to download from HuggingFace
yoda_train_dataset (dsl.Output[dsl.Dataset]): Output dataset for training.
yoda_eval_dataset (dsl.Output[dsl.Dataset]): Output dataset for evaluation.
operation_map (dict): Specify list of operations you want to perform on the data set before splitting it e.g. {"rename_column": {"sentence":"prompt"}, "remove_columns": "translation"}
train_split_ratio (float): Ratio of data to use for training (0.0-1.0).
Defaults to 0.8 (80% train, 20% eval).
"""
from datasets import load_dataset

print(f"Downloading and loading the dataset from {yoda_input_dataset}")
dataset = load_dataset(yoda_input_dataset, split="train")
if operation_map:
for operation_name, operation_value in operation_map.items():
print(f'Performing operation: "{operation_name}"')
if operation_name == 'rename_column':
if type(operation_value) != dict:
raise RuntimeError(f'Dict value is required to perform operation "{operation_name}"')
for key, value in operation_value.items():
dataset = dataset.rename_column(key, value)
elif operation_name == "remove_columns":
if type(operation_value) == str:
dataset = dataset.remove_columns(["translation"])
elif type(operation_value) == list:
dataset = dataset.remove_columns("translation")
else:
raise RuntimeError(f'Only list and str type are allowed to perform "{operation_name}" operation')
else:
raise InvalidValue(f'Unrecogonized operation value "{operation_name}"')

# Add prefix to prompts
print("Adding Yoda speak prefix to prompts")
def add_yoda_prefix(example):
example["prompt"] = (
"Translate the following to Yoda speak: " + example["prompt"]
)
return example

dataset = dataset.map(add_yoda_prefix)

# Split the dataset into train and eval sets
print(
f"Splitting dataset with {len(dataset)} rows into train ({train_split_ratio:.1%}) and eval ({(1-train_split_ratio):.1%}) sets"
)
split_dataset = dataset.train_test_split(test_size=1 - train_split_ratio, seed=42)

train_dataset = split_dataset["train"]
eval_dataset = split_dataset["test"]

print(f"Train set: {len(train_dataset)} rows")
print(f"Eval set: {len(eval_dataset)} rows")

# Save both datasets
print(f"Saving train dataset to {yoda_train_dataset.path}")
train_dataset.save_to_disk(yoda_train_dataset.path)

print(f"Saving eval dataset to {yoda_eval_dataset.path}")
eval_dataset.save_to_disk(yoda_eval_dataset.path)


if __name__ == "__main__":
kfp.compiler.Compiler().compile(
prepare_yoda_dataset,
package_path=__file__.replace(".py", "_component.yaml"),
)
187 changes: 187 additions & 0 deletions components/data_processing/yoda_data_preparation_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# PIPELINE DEFINITION
# Name: prepare-yoda-dataset
# Description: Prepare the training and evaluation datasets by downloading and preprocessing.
# Downloads the yoda_sentences dataset from HuggingFace, renames columns to match
# the expected format for training (prompt/completion), splits into train/eval sets,
# and saves them as output artifacts.
# Inputs:
# operation_map: dict [Default: {'rename_column': {'sentence': 'prompt'}}]
# train_split_ratio: float [Default: 0.8]
# yoda_input_dataset: str
# Outputs:
# yoda_eval_dataset: system.Dataset
# yoda_train_dataset: system.Dataset
components:
comp-prepare-yoda-dataset:
executorLabel: exec-prepare-yoda-dataset
inputDefinitions:
parameters:
operation_map:
defaultValue:
rename_column:
sentence: prompt
description: 'Specify list of operations you want to perform on the data
set before splitting it e.g. {"rename_column": {"sentence":"prompt"},
"remove_columns": "translation"}'
isOptional: true
parameterType: STRUCT
train_split_ratio:
defaultValue: 0.8
description: 'Ratio of data to use for training (0.0-1.0).

Defaults to 0.8 (80% train, 20% eval).'
isOptional: true
parameterType: NUMBER_DOUBLE
yoda_input_dataset:
description: Dataset to download from HuggingFace
parameterType: STRING
outputDefinitions:
artifacts:
yoda_eval_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
yoda_train_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
deploymentSpec:
executors:
exec-prepare-yoda-dataset:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- prepare_yoda_dataset
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'datasets' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)


printf "%s" "$0" > "$program_path/ephemeral_component.py"

_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef prepare_yoda_dataset(\n yoda_input_dataset: str,\n \
\ yoda_train_dataset: dsl.Output[dsl.Dataset],\n yoda_eval_dataset:\
\ dsl.Output[dsl.Dataset],\n operation_map: dict[str, Any] = {\"\
rename_column\": {\"sentence\":\"prompt\"}},\n train_split_ratio:\
\ float = 0.8,\n):\n \"\"\"Prepare the training and evaluation datasets\
\ by downloading and preprocessing.\n\n Downloads the yoda_sentences\
\ dataset from HuggingFace, renames columns to match\n the expected format\
\ for training (prompt/completion), splits into train/eval sets,\n and\
\ saves them as output artifacts.\n\n Args:\n yoda_input_dataset\
\ (str): Dataset to download from HuggingFace\n yoda_train_dataset\
\ (dsl.Output[dsl.Dataset]): Output dataset for training.\n yoda_eval_dataset\
\ (dsl.Output[dsl.Dataset]): Output dataset for evaluation.\n operation_map\
\ (dict): Specify list of operations you want to perform on the data set\
\ before splitting it e.g. {\"rename_column\": {\"sentence\":\"prompt\"\
}, \"remove_columns\": \"translation\"}\n train_split_ratio (float):\
\ Ratio of data to use for training (0.0-1.0).\n \
\ Defaults to 0.8 (80% train, 20% eval).\n \"\"\"\n from\
\ datasets import load_dataset\n\n print(f\"Downloading and loading the\
\ dataset from {yoda_input_dataset}\")\n dataset = load_dataset(yoda_input_dataset,\
\ split=\"train\")\n if operation_map:\n for operation_name, operation_value\
\ in operation_map.items():\n print(f'Performing operation: \"\
{operation_name}\"')\n if operation_name == 'rename_column':\n\
\ if type(operation_value) != dict:\n \
\ raise RuntimeError(f'Dict value is required to perform operation \"{operation_name}\"\
')\n for key, value in operation_value.items():\n \
\ dataset = dataset.rename_column(key, value)\n elif\
\ operation_name == \"remove_columns\":\n if type(operation_value)\
\ == str:\n dataset = dataset.remove_columns([\"translation\"\
])\n elif type(operation_value) == list:\n \
\ dataset = dataset.remove_columns(\"translation\")\n \
\ else:\n raise RuntimeError(f'Only list and str type\
\ are allowed to perform \"{operation_name}\" operation')\n else:\n\
\ raise InvalidValue(f'Unrecogonized operation value \"{operation_name}\"\
')\n\n # Add prefix to prompts\n print(\"Adding Yoda speak prefix\
\ to prompts\")\n def add_yoda_prefix(example):\n example[\"prompt\"\
] = (\n \"Translate the following to Yoda speak: \" + example[\"\
prompt\"]\n )\n return example\n\n dataset = dataset.map(add_yoda_prefix)\n\
\n # Split the dataset into train and eval sets\n print(\n \
\ f\"Splitting dataset with {len(dataset)} rows into train ({train_split_ratio:.1%})\
\ and eval ({(1-train_split_ratio):.1%}) sets\"\n )\n split_dataset\
\ = dataset.train_test_split(test_size=1 - train_split_ratio, seed=42)\n\
\n train_dataset = split_dataset[\"train\"]\n eval_dataset = split_dataset[\"\
test\"]\n\n print(f\"Train set: {len(train_dataset)} rows\")\n print(f\"\
Eval set: {len(eval_dataset)} rows\")\n\n # Save both datasets\n print(f\"\
Saving train dataset to {yoda_train_dataset.path}\")\n train_dataset.save_to_disk(yoda_train_dataset.path)\n\
\n print(f\"Saving eval dataset to {yoda_eval_dataset.path}\")\n eval_dataset.save_to_disk(yoda_eval_dataset.path)\n\
\n"
image: python:3.11
pipelineInfo:
name: prepare-yoda-dataset
root:
dag:
outputs:
artifacts:
yoda_eval_dataset:
artifactSelectors:
- outputArtifactKey: yoda_eval_dataset
producerSubtask: prepare-yoda-dataset
yoda_train_dataset:
artifactSelectors:
- outputArtifactKey: yoda_train_dataset
producerSubtask: prepare-yoda-dataset
tasks:
prepare-yoda-dataset:
cachingOptions:
enableCache: true
componentRef:
name: comp-prepare-yoda-dataset
inputs:
parameters:
operation_map:
componentInputParameter: operation_map
train_split_ratio:
componentInputParameter: train_split_ratio
yoda_input_dataset:
componentInputParameter: yoda_input_dataset
taskInfo:
name: prepare-yoda-dataset
inputDefinitions:
parameters:
operation_map:
defaultValue:
rename_column:
sentence: prompt
description: 'Specify list of operations you want to perform on the data set
before splitting it e.g. {"rename_column": {"sentence":"prompt"}, "remove_columns":
"translation"}'
isOptional: true
parameterType: STRUCT
train_split_ratio:
defaultValue: 0.8
description: 'Ratio of data to use for training (0.0-1.0).

Defaults to 0.8 (80% train, 20% eval).'
isOptional: true
parameterType: NUMBER_DOUBLE
yoda_input_dataset:
description: Dataset to download from HuggingFace
parameterType: STRING
outputDefinitions:
artifacts:
yoda_eval_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
yoda_train_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
schemaVersion: 2.1.0
sdkVersion: kfp-2.15.2
Loading