Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions sdk/python/kfp/local/placeholder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for working with placeholders."""
import datetime
import functools
import json
import random
Expand Down Expand Up @@ -44,13 +45,16 @@ def replace_placeholders(
task.
"""
unique_task_id = make_random_id()
utc_timestamp = datetime.datetime.now(
datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
Comment on lines 47 to +49
executor_input_dict = resolve_self_references_in_executor_input(
executor_input_dict=executor_input_dict,
pipeline_resource_name=pipeline_resource_name,
task_resource_name=task_resource_name,
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
utc_timestamp=utc_timestamp,
)
provided_inputs = get_provided_inputs(executor_input_dict)
full_command = [
Expand All @@ -70,6 +74,7 @@ def replace_placeholders(
pipeline_root=pipeline_root,
pipeline_job_id=unique_pipeline_id,
pipeline_task_id=unique_task_id,
utc_timestamp=utc_timestamp,
)
if resolved_el is None:
continue
Expand All @@ -91,6 +96,7 @@ def resolve_self_references_in_executor_input(
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
utc_timestamp: str = '',
) -> Dict[str, Any]:
"""Resolve parameter placeholders that point to other parameter
placeholders in the same ExecutorInput message.
Expand Down Expand Up @@ -119,6 +125,7 @@ def resolve_self_references_in_executor_input(
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
utc_timestamp=utc_timestamp,
)
return executor_input_dict

Expand All @@ -131,6 +138,7 @@ def recursively_resolve_json_dict_placeholders(
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
utc_timestamp: str = '',
) -> Any:
"""Recursively resolves any placeholders in a dictionary representation of
a JSON object.
Expand All @@ -147,6 +155,7 @@ def recursively_resolve_json_dict_placeholders(
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
utc_timestamp=utc_timestamp,
)
if isinstance(obj, list):
return [inner_fn(item) for item in obj]
Expand All @@ -161,6 +170,7 @@ def recursively_resolve_json_dict_placeholders(
pipeline_root=pipeline_root,
pipeline_job_id=pipeline_job_id,
pipeline_task_id=pipeline_task_id,
utc_timestamp=utc_timestamp,
)
else:
return obj
Expand Down Expand Up @@ -337,6 +347,7 @@ def resolve_individual_placeholder(
pipeline_root: str,
pipeline_job_id: str,
pipeline_task_id: str,
utc_timestamp: str = '',
) -> str:
"""Replaces placeholders for a single element."""

Expand Down Expand Up @@ -369,8 +380,14 @@ def resolve_individual_placeholder(
json.dumps(executor_input_dict),
dsl.PIPELINE_JOB_NAME_PLACEHOLDER:
pipeline_resource_name,
dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER:
pipeline_resource_name,
Comment on lines 381 to +384
dsl.PIPELINE_JOB_ID_PLACEHOLDER:
pipeline_job_id,
dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER:
utc_timestamp,
dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER:
utc_timestamp,
Comment on lines +387 to +390
dsl.PIPELINE_TASK_NAME_PLACEHOLDER:
task_resource_name,
dsl.PIPELINE_TASK_ID_PLACEHOLDER:
Expand Down
44 changes: 44 additions & 0 deletions sdk/python/kfp/local/placeholder_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.
"""Tests for placeholder_utils.py."""

import datetime
import json
import os
import re
import tempfile
from typing import List, Optional
import unittest
Expand Down Expand Up @@ -144,6 +146,10 @@ class TestResolveIndividualPlaceholder(parameterized.TestCase):
'{{$.pipeline_job_name}}',
'my-pipeline-2023-10-10-13-32-59-420710',
),
(
'{{$.pipeline_job_resource_name}}',
'my-pipeline-2023-10-10-13-32-59-420710',
),
(
'{{$.pipeline_job_uuid}}',
'123456789',
Expand Down Expand Up @@ -293,6 +299,44 @@ def test_io_placeholder_with_string_concat(self, element: str,
)
self.assertEqual(actual, expected)

def test_pipeline_job_create_time_utc_placeholder(self):
"""Test that PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER is replaced with a valid UTC timestamp."""
utc_timestamp = datetime.datetime.now(
datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
iso8601_pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z$'
kwargs = dict(
executor_input_dict=EXECUTOR_INPUT_DICT,
pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710',
task_resource_name='comp',
pipeline_root='/foo/bar/my-pipeline-2023-10-10-13-32-59-420710',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
utc_timestamp=utc_timestamp,
)
actual_create = placeholder_utils.resolve_individual_placeholder(
element='{{$.pipeline_job_create_time_utc}}', **kwargs)
actual_schedule = placeholder_utils.resolve_individual_placeholder(
element='{{$.pipeline_job_schedule_time_utc}}', **kwargs)
self.assertRegex(actual_create, iso8601_pattern)
self.assertEqual(actual_create, actual_schedule)

def test_pipeline_job_schedule_time_utc_placeholder(self):
"""Test that PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER is replaced with a valid UTC timestamp."""
utc_timestamp = datetime.datetime.now(
datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
actual = placeholder_utils.resolve_individual_placeholder(
element='{{$.pipeline_job_schedule_time_utc}}',
executor_input_dict=EXECUTOR_INPUT_DICT,
pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710',
task_resource_name='comp',
pipeline_root='/foo/bar/my-pipeline-2023-10-10-13-32-59-420710',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
utc_timestamp=utc_timestamp,
)
iso8601_pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z$'
self.assertRegex(actual, iso8601_pattern)
Comment on lines +323 to +338


class TestGetValueUsingPath(unittest.TestCase):

Expand Down
Loading