From 6a8a156eb2dc977cf22dc60cc10e02280d5e6a6a Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Thu, 15 Sep 2022 17:27:55 -0600 Subject: [PATCH 01/19] draft version --- dbtc/cli.py | 26 +++++++++ dbtc/client/cloud/base.py | 107 +++++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index 465cb5e..d4ee3e7 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -967,6 +967,32 @@ def test_connection( json.loads(payload), ) +@app.command() +def trigger_job_for_ci( + ctx: typer.Context, + account_id: int = ACCOUNT_ID, + job_id: int = JOB_ID, + payload: str = PAYLOAD, + should_poll: bool = typer.Option( + True, + help='Poll until job completion (status is one of success, failure, or ' + 'cancelled)', + ), + poll_interval: int = typer.Option( + 10, '--poll-interval', help='Number of seconds to wait in between polling.' + ) +): + """Trigger job to run.""" + _dbt_cloud_request( + ctx, + 'trigger_job_for_ci', + account_id, + job_id, + json.loads(payload), + should_poll=should_poll, + poll_interval=poll_interval, + ) + @app.command() def trigger_job( diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 4475569..8e72676 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,10 +1,12 @@ # stdlib import argparse +from datetime import datetime import enum import shlex import time from functools import partial, wraps from typing import Dict, Iterable, List +import uuid # third party import requests @@ -786,7 +788,7 @@ def list_invited_users(self, account_id: int) -> Dict: @v2 def list_jobs( - self, account_id: int, *, order_by: str = None, project_id: int = None + self, account_id: int, *, order_by: str = None, project_id: int = None, ) -> Dict: """List jobs in an account or specific project. @@ -795,7 +797,7 @@ def list_jobs( order_by (str, optional): Field to order the result by. Use - to indicate reverse order. project_id (int, optional): Numeric ID of the project containing jobs - """ + """ return self._simple_request( f'accounts/{account_id}/jobs/', params={'order_by': order_by, 'project_id': project_id}, @@ -992,6 +994,107 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: f'accounts/{account_id}/connections/test/', method='post', json=payload ) + @v2 + def trigger_job_for_ci( + self, + account_id: int, + job_id: int, + payload: Dict, + *, + should_poll: bool = True, + poll_interval: int = 10, + ): + + """Trigger a job by its ID - designed to enable running CI jobs in parallel + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + should_poll (bool, optional): Poll until completion if `True`, completion + is one of success, failure, or cancelled + poll_interval (int, optional): Number of seconds to wait in between + polling + name_like (str, optional): Job prefix to identify the CI job "pool" + """ + #TODO: this should be abstracted somewhere + def run_status_formatted(run: Dict, time: float) -> str: + """Format a string indicating status of job. + Args: + run (dict): Dictionary representation of a Run + time (float): Elapsed time since job triggered + """ + status = JobRunStatus(run['data']['status']).name + url = run['data']['href'] + return ( + f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' + f', View here: {url}' + ) + + most_recent_job_run = self.list_runs( + account_id=account_id, + job_definition_id=job_id, + limit=1, + order_by='-id' + )['data'][0] + + job_run_status = most_recent_job_run['status_humanized'] + self.console.log(f'Status for most recent run of job {job_id} is {job_run_status}.') + + if job_run_status not in ['Queued', 'Starting', 'Running']: + self.console.log(f'Triggering base CI job {job_id}.') + + else: + self.console.log(f'Replicating base CI job.') + job_definition = self.get_job( + account_id=account_id, + job_id=job_id + )['data'] + + job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') + job_definition['id'] = None + job_definition['dbt_version'] = None + keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', + 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', + 'triggers', 'settings', 'schedule'] + job_definition = {k:v for k,v in job_definition.items() if k in keys} + print(job_definition) + new_job = self.create_job( + account_id=account_id, + project_id=job_definition['project_id'], + payload=job_definition + ) + + print(new_job) + + run = self._simple_request( + f'accounts/{account_id}/jobs/{job_id}/run/', + method='post', + json=payload, + ) + if not run['status']['is_success']: + self.console.log(f'Run NOT triggered for job {job_id}. See run response.') + return run + + self.console.log(run_status_formatted(run, 0)) + if should_poll: + start = time.time() + run_id = run['data']['id'] + while True: + time.sleep(poll_interval) + run = self.get_run(account_id, run_id) + status = run['data']['status'] + self.console.log(run_status_formatted(run, time.time() - start)) + if status in [ + JobRunStatus.SUCCESS, + JobRunStatus.CANCELLED, + JobRunStatus.ERROR, + ]: + break + + return run + + @v2 def trigger_job( self, From 680ffc0701509f08964e456e5b0e9895ccbc33ae Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Thu, 22 Sep 2022 16:08:30 -0600 Subject: [PATCH 02/19] demo-able version --- dbtc/client/cloud/base.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index aef5c01..b5602f0 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1013,6 +1013,7 @@ def trigger_job_for_ci( *, should_poll: bool = True, poll_interval: int = 10, + delete_replicated_job = True, ): """Trigger a job by its ID - designed to enable running CI jobs in parallel @@ -1025,7 +1026,9 @@ def trigger_job_for_ci( is one of success, failure, or cancelled poll_interval (int, optional): Number of seconds to wait in between polling - name_like (str, optional): Job prefix to identify the CI job "pool" + delete_replicated_job (bool, optional): True is the default, which will + automatically clean up any jobs replicated due to the base CI job + being busy at PR time. """ #TODO: this should be abstracted somewhere def run_status_formatted(run: Dict, time: float) -> str: @@ -1053,6 +1056,7 @@ def run_status_formatted(run: Dict, time: float) -> str: if job_run_status not in ['Queued', 'Starting', 'Running']: self.console.log(f'Triggering base CI job {job_id}.') + delete_replicated_job = False # IMPORTANT: this prevents removing the base CI job else: self.console.log(f'Replicating base CI job.') @@ -1061,21 +1065,22 @@ def run_status_formatted(run: Dict, time: float) -> str: job_id=job_id )['data'] + # TODO: this is gross - need a more ergnomic way to express these job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') job_definition['id'] = None job_definition['dbt_version'] = None + + # the dbt Cloud API will fail job create requests that contain extra keys. This is the minimal set required to + # create a new job keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', 'triggers', 'settings', 'schedule'] job_definition = {k:v for k,v in job_definition.items() if k in keys} - print(job_definition) new_job = self.create_job( account_id=account_id, - project_id=job_definition['project_id'], payload=job_definition - ) - - print(new_job) + )['data'] + job_id = new_job['id'] run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', @@ -1102,6 +1107,9 @@ def run_status_formatted(run: Dict, time: float) -> str: ]: break + if delete_replicated_job: + self.delete_job(account_id=account_id, job_id=job_id) + return run From 15bf9e18d8530c177c2a5252ff3e60ca2858d71e Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Fri, 23 Sep 2022 09:50:47 -0600 Subject: [PATCH 03/19] create configs directory --- dbtc/client/cloud/configs/__init__.py | 0 .../cloud/configs/cloud_api_request_templates.py | 0 dbtc/client/cloud/configs/dbt_cli_configs.py | 15 +++++++++++++++ 3 files changed, 15 insertions(+) create mode 100644 dbtc/client/cloud/configs/__init__.py create mode 100644 dbtc/client/cloud/configs/cloud_api_request_templates.py create mode 100644 dbtc/client/cloud/configs/dbt_cli_configs.py diff --git a/dbtc/client/cloud/configs/__init__.py b/dbtc/client/cloud/configs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbtc/client/cloud/configs/cloud_api_request_templates.py b/dbtc/client/cloud/configs/cloud_api_request_templates.py new file mode 100644 index 0000000..e69de29 diff --git a/dbtc/client/cloud/configs/dbt_cli_configs.py b/dbtc/client/cloud/configs/dbt_cli_configs.py new file mode 100644 index 0000000..370432f --- /dev/null +++ b/dbtc/client/cloud/configs/dbt_cli_configs.py @@ -0,0 +1,15 @@ +RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] +GLOBAL_CLI_ARGS = { + 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, + 'use_experimental_parser': { + 'flags': ('--use-experimental-parser',), + 'action': 'store_true', + }, +} +SUB_COMMAND_CLI_ARGS = { + 'vars': {'flags': ('--vars',)}, + 'args': {'flags': ('--args',)}, + 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, + 'full_refresh': {'flags': ('--full-refresh',), 'action': 'store_true'}, + 'store_failures': {'flags': ('--store-failures',), 'action': 'store_true'}, +} From 917b4b2ff7f1a1f1d9425bcf65fd8393d98305b6 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Fri, 23 Sep 2022 12:06:19 -0600 Subject: [PATCH 04/19] sketch configs idea --- dbtc/client/cloud/base.py | 43 +++++++------------ .../configs/cloud_api_request_templates.py | 0 dbtc/client/cloud/configs/dbt_cloud_api.py | 5 +++ .../{dbt_cli_configs.py => dbt_core_cli.py} | 9 ++-- 4 files changed, 26 insertions(+), 31 deletions(-) delete mode 100644 dbtc/client/cloud/configs/cloud_api_request_templates.py create mode 100644 dbtc/client/cloud/configs/dbt_cloud_api.py rename dbtc/client/cloud/configs/{dbt_cli_configs.py => dbt_core_cli.py} (82%) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index b5602f0..31a1422 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -13,7 +13,12 @@ # first party from dbtc.client.base import _Client - +from dbtc.client.cloud.configs.dbt_cloud_api import create_job_request +from dbtc.client.cloud.configs.dbt_core_cli import ( + run_commands, + global_cli_args, + sub_command_cli_args +) class JobRunStatus(enum.IntEnum): QUEUED = 1 @@ -24,23 +29,6 @@ class JobRunStatus(enum.IntEnum): CANCELLED = 30 -RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] -GLOBAL_CLI_ARGS = { - 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, - 'use_experimental_parser': { - 'flags': ('--use-experimental-parser',), - 'action': 'store_true', - }, -} -SUB_COMMAND_CLI_ARGS = { - 'vars': {'flags': ('--vars',)}, - 'args': {'flags': ('--args',)}, - 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, - 'full_refresh': {'flags': ('--full-refresh',), 'action': 'store_true'}, - 'store_failures': {'flags': ('--store-failures',), 'action': 'store_true'}, -} - - def _version_decorator(func, version): @wraps(func) def wrapper(self, *args, **kwargs): @@ -62,7 +50,7 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() - all_cli_args = {**GLOBAL_CLI_ARGS, **SUB_COMMAND_CLI_ARGS} + all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] self.parser.add_argument( @@ -1070,17 +1058,16 @@ def run_status_formatted(run: Dict, time: float) -> str: job_definition['id'] = None job_definition['dbt_version'] = None - # the dbt Cloud API will fail job create requests that contain extra keys. This is the minimal set required to - # create a new job + # the dbt Cloud API will fail job create requests that contain extra keys. + # This is the minimal set required to create a new job keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', 'triggers', 'settings', 'schedule'] job_definition = {k:v for k,v in job_definition.items() if k in keys} - new_job = self.create_job( + job_id = self.create_job( account_id=account_id, payload=job_definition - )['data'] - job_id = new_job['id'] + )['data']['id'] run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', @@ -1206,9 +1193,9 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): sub_command = remaining[1] if ( - sub_command not in RUN_COMMANDS + sub_command not in run_commands and status in ['error', 'cancelled', 'skipped'] - ) or (sub_command in RUN_COMMANDS and status == 'skipped'): + ) or (sub_command in run_commands and status == 'skipped'): rerun_steps.append(command) # errors and failures are when we need to inspect to figure @@ -1243,10 +1230,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): ] ) global_args = parse_args( - GLOBAL_CLI_ARGS.keys(), namespace + global_cli_args.keys(), namespace ) sub_command_args = parse_args( - SUB_COMMAND_CLI_ARGS.keys(), namespace + sub_command_cli_args.keys(), namespace ) modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 rerun_steps.append(modified_command) diff --git a/dbtc/client/cloud/configs/cloud_api_request_templates.py b/dbtc/client/cloud/configs/cloud_api_request_templates.py deleted file mode 100644 index e69de29..0000000 diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py new file mode 100644 index 0000000..e859e79 --- /dev/null +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -0,0 +1,5 @@ + +create_job_request = { + 'name': '', + 'id': None +} \ No newline at end of file diff --git a/dbtc/client/cloud/configs/dbt_cli_configs.py b/dbtc/client/cloud/configs/dbt_core_cli.py similarity index 82% rename from dbtc/client/cloud/configs/dbt_cli_configs.py rename to dbtc/client/cloud/configs/dbt_core_cli.py index 370432f..19a43fc 100644 --- a/dbtc/client/cloud/configs/dbt_cli_configs.py +++ b/dbtc/client/cloud/configs/dbt_core_cli.py @@ -1,12 +1,15 @@ -RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] -GLOBAL_CLI_ARGS = { + +run_commands = ['build', 'run', 'test', 'seed', 'snapshot'] + +global_cli_args = { 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, 'use_experimental_parser': { 'flags': ('--use-experimental-parser',), 'action': 'store_true', }, } -SUB_COMMAND_CLI_ARGS = { + +sub_command_cli_args = { 'vars': {'flags': ('--vars',)}, 'args': {'flags': ('--args',)}, 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, From db381535f94100d77fc5f0bc64111dc03bdc8250 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Mon, 26 Sep 2022 19:02:56 -0600 Subject: [PATCH 05/19] adds autoscaling mode and moves some configs out of base _CloudClient --- dbtc/cli.py | 45 +-- dbtc/client/cloud/base.py | 392 +++++++++++---------- dbtc/client/cloud/configs/dbt_cloud_api.py | 44 ++- 3 files changed, 257 insertions(+), 224 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index c97e8b9..9cad0e4 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1,4 +1,5 @@ # stdlib +from enum import auto import json from typing import List, Optional @@ -968,32 +969,6 @@ def test_connection( json.loads(payload), ) -@app.command() -def trigger_job_for_ci( - ctx: typer.Context, - account_id: int = ACCOUNT_ID, - job_id: int = JOB_ID, - payload: str = PAYLOAD, - should_poll: bool = typer.Option( - True, - help='Poll until job completion (status is one of success, failure, or ' - 'cancelled)', - ), - poll_interval: int = typer.Option( - 10, '--poll-interval', help='Number of seconds to wait in between polling.' - ) -): - """Trigger job to run.""" - _dbt_cloud_request( - ctx, - 'trigger_job_for_ci', - account_id, - job_id, - json.loads(payload), - should_poll=should_poll, - poll_interval=poll_interval, - ) - @app.command() def trigger_job( @@ -1021,6 +996,22 @@ def trigger_job( 'job.' ), ), + mode: str = typer.Option( + 'standard', + help=( + 'Possible values are ["standard", "restart_from_failure", "autoscale"] ' + 'standard: runs existing job as-is ' + 'restart_from_failure: determine whether the last run of the target job ' + ' exited with an error. If yes, restart from the point of failure ' + 'autoscale: determine with the target job is currently running ' + ' If yes, create and then run the clone.' + ) + ), + autoscale_delete_post_run: bool = typer.Option( + True, help=( + 'Delete job created via autoscaling after it finishes running' + ) + ), ): """Trigger job to run.""" _dbt_cloud_request( @@ -1033,6 +1024,8 @@ def trigger_job( poll_interval=poll_interval, restart_from_failure=restart_from_failure, trigger_on_failure_only=trigger_on_failure_only, + mode=mode, + autoscale_delete_post_run=autoscale_delete_post_run, ) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 31a1422..c16e216 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -6,6 +6,7 @@ import time from functools import partial, wraps from typing import Dict, Iterable, List +from urllib import request import uuid # third party @@ -13,7 +14,7 @@ # first party from dbtc.client.base import _Client -from dbtc.client.cloud.configs.dbt_cloud_api import create_job_request +from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory from dbtc.client.cloud.configs.dbt_core_cli import ( run_commands, global_cli_args, @@ -50,6 +51,7 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() + self.request_factory = dbtCloudAPIRequestFactory() all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] @@ -119,6 +121,12 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): except IndexError: obj = None return obj + + def _validate_job_run_mode(self, mode): + if mode not in ['standard', 'restart_from_failure', 'autoscale']: + return False + + return True @v3 def assign_group_permissions( @@ -991,114 +999,148 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: return self._simple_request( f'accounts/{account_id}/connections/test/', method='post', json=payload ) + + @v2 + def clone_job( + self, + account_id: int, + job_id: int, + ): - @v2 - def trigger_job_for_ci( - self, + """If a job is currently running, replicate the job definition to a new job + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + """ + + existing_job_definition = self.get_job( + account_id=account_id, + job_id=job_id + )['data'] + + return self.request_factory.create_job_request(data=existing_job_definition) + + @v2 + def _get_restart_job_definition( + self, account_id: int, job_id: int, payload: Dict, - *, - should_poll: bool = True, - poll_interval: int = 10, - delete_replicated_job = True, ): - """Trigger a job by its ID - designed to enable running CI jobs in parallel + """Identifies whether there was a failure on the previous run of the job. + When failures are identified, returns an updated job definition to + restart from the point of failure. Args: account_id (int): Numeric ID of the account to retrieve job_id (int): Numeric ID of the job to trigger payload (dict): Payload required for post request - should_poll (bool, optional): Poll until completion if `True`, completion - is one of success, failure, or cancelled - poll_interval (int, optional): Number of seconds to wait in between - polling - delete_replicated_job (bool, optional): True is the default, which will - automatically clean up any jobs replicated due to the base CI job - being busy at PR time. """ - #TODO: this should be abstracted somewhere - def run_status_formatted(run: Dict, time: float) -> str: - """Format a string indicating status of job. - Args: - run (dict): Dictionary representation of a Run - time (float): Elapsed time since job triggered - """ - status = JobRunStatus(run['data']['status']).name - url = run['data']['href'] - return ( - f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' - f', View here: {url}' - ) + + def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): + string = '' + for arg in cli_args: + value = getattr(namespace, arg, None) + if value: + arg = arg.replace('_', '-') + if isinstance(value, bool): + string += f' --{arg}' + else: + string += f" --{arg} '{value}'" + return string - most_recent_job_run = self.list_runs( - account_id=account_id, - job_definition_id=job_id, - limit=1, - order_by='-id' + has_failures = False + + last_run_data = self.list_runs( + account_id=account_id, + include_related=['run_steps'], + job_definition_id=job_id, + order_by='-id', + limit=1, )['data'][0] - job_run_status = most_recent_job_run['status_humanized'] - self.console.log(f'Status for most recent run of job {job_id} is {job_run_status}.') + last_run_status = last_run_data['status_humanized'].lower() + last_run_id = last_run_data['id'] - if job_run_status not in ['Queued', 'Starting', 'Running']: - self.console.log(f'Triggering base CI job {job_id}.') - delete_replicated_job = False # IMPORTANT: this prevents removing the base CI job - - else: - self.console.log(f'Replicating base CI job.') - job_definition = self.get_job( - account_id=account_id, - job_id=job_id - )['data'] - - # TODO: this is gross - need a more ergnomic way to express these - job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') - job_definition['id'] = None - job_definition['dbt_version'] = None - - # the dbt Cloud API will fail job create requests that contain extra keys. - # This is the minimal set required to create a new job - keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', - 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', - 'triggers', 'settings', 'schedule'] - job_definition = {k:v for k,v in job_definition.items() if k in keys} - job_id = self.create_job( - account_id=account_id, - payload=job_definition - )['data']['id'] + if last_run_status == 'error': + rerun_steps = [] - run = self._simple_request( - f'accounts/{account_id}/jobs/{job_id}/run/', - method='post', - json=payload, - ) - if not run['status']['is_success']: - self.console.log(f'Run NOT triggered for job {job_id}. See run response.') - return run + for run_step in last_run_data['run_steps']: + status = run_step['status_humanized'].lower() + # Skipping cloning, profile setup, and dbt deps - always + # the first three steps in any run + if run_step['index'] <= 3 or status == 'success': + self.console.log( + f'Skipping rerun for command "{run_step["name"]}" ' + 'as it does not need to be repeated.' + ) - self.console.log(run_status_formatted(run, 0)) - if should_poll: - start = time.time() - run_id = run['data']['id'] - while True: - time.sleep(poll_interval) - run = self.get_run(account_id, run_id) - status = run['data']['status'] - self.console.log(run_status_formatted(run, time.time() - start)) - if status in [ - JobRunStatus.SUCCESS, - JobRunStatus.CANCELLED, - JobRunStatus.ERROR, - ]: - break + else: - if delete_replicated_job: - self.delete_job(account_id=account_id, job_id=job_id) - - return run + # get the dbt command used within this step + command = run_step['name'].partition('`')[2].partition('`')[0] + namespace, remaining = self.parser.parse_known_args( + shlex.split(command) + ) + sub_command = remaining[1] + + if ( + sub_command not in run_commands + and status in ['error', 'cancelled', 'skipped'] + ) or (sub_command in run_commands and status == 'skipped'): + rerun_steps.append(command) + # errors and failures are when we need to inspect to figure + # out the point of failure + else: + + # get the run results scoped to the step which had an error + # an error here indicates that either: + # 1) the fail-fast flag was set, in which case + # the run_results.json file was never created; or + # 2) there was a problem on dbt Cloud's side saving + # this artifact + try: + step_results = self.get_run_artifact( + account_id=account_id, + run_id=last_run_id, + path='run_results.json', + step=run_step['index'], + )['results'] + + # If the artifact isn't found, the API returns a 404 with + # no json. The ValueError will catch the JSONDecodeError + except ValueError: + rerun_steps.append(command) + else: + rerun_nodes = ' '.join( + [ + record['unique_id'].split('.')[2] + for record in step_results + if record['status'] + in ['error', 'skipped', 'fail'] + ] + ) + global_args = parse_args( + global_cli_args.keys(), namespace + ) + sub_command_args = parse_args( + sub_command_cli_args.keys(), namespace + ) + modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 + rerun_steps.append(modified_command) + self.console.log( + f'Modifying command "{command}" as an error ' + 'or failure was encountered.' + ) + if len(rerun_steps) > 0: + has_failures = True + payload.update({"steps_override": rerun_steps}) + + return payload, has_failures @v2 def trigger_job( @@ -1111,6 +1153,8 @@ def trigger_job( poll_interval: int = 10, restart_from_failure: bool = False, trigger_on_failure_only: bool = False, + mode: str = 'standard', + autoscale_delete_post_run: bool = True, ): """Trigger a job by its ID @@ -1128,7 +1172,15 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. - + mode (str, optional): Must be one of ['standard', 'restart_from_failure', + 'autoscaling']. + - standard mode triggers the job to run as-is. + - restart_from_failure checks for errors on the prior invocation and, + if found, restarts failed models only. + - autoscale checks whether the job_id is actively running. If so, + creates a copy of the running job + autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' + Remove a job replicated via autoscaling after it finishes running. """ def run_status_formatted(run: Dict, time: float) -> str: @@ -1143,126 +1195,72 @@ def run_status_formatted(run: Dict, time: float) -> str: f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' f', View here: {url}' ) + + # this is here to not break existing stuff 09.26.2022 + if restart_from_failure: + mode = 'restart_from_failure' - def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): - string = '' - for arg in cli_args: - value = getattr(namespace, arg, None) - if value: - arg = arg.replace('_', '-') - if isinstance(value, bool): - string += f' --{arg}' - else: - string += f" --{arg} '{value}'" - return string + mode_is_valid = self._validate_job_run_mode(mode) + if not mode_is_valid: + raise Exception(f'mode: {mode} is not one of ["standard", "restart_from_failure", "autoscale"]') - if restart_from_failure: + if mode == 'restart_from_failure': self.console.log(f'Restarting job {job_id} from last failed state.') - last_run_data = self.list_runs( + payload, has_failures = self._get_restart_job_definition( account_id=account_id, - include_related=['run_steps'], - job_definition_id=job_id, - order_by='-id', - limit=1, - )['data'][0] - - last_run_status = last_run_data['status_humanized'].lower() - last_run_id = last_run_data['id'] - - if last_run_status == 'error': - rerun_steps = [] - - for run_step in last_run_data['run_steps']: - - status = run_step['status_humanized'].lower() - # Skipping cloning, profile setup, and dbt deps - always - # the first three steps in any run - if run_step['index'] <= 3 or status == 'success': - self.console.log( - f'Skipping rerun for command "{run_step["name"]}" ' - 'as it does not need to be repeated.' - ) - - else: - - # get the dbt command used within this step - command = run_step['name'].partition('`')[2].partition('`')[0] - namespace, remaining = self.parser.parse_known_args( - shlex.split(command) - ) - sub_command = remaining[1] - - if ( - sub_command not in run_commands - and status in ['error', 'cancelled', 'skipped'] - ) or (sub_command in run_commands and status == 'skipped'): - rerun_steps.append(command) - - # errors and failures are when we need to inspect to figure - # out the point of failure - else: + job_id=job_id, + payload=payload + ) - # get the run results scoped to the step which had an error - # an error here indicates that either: - # 1) the fail-fast flag was set, in which case - # the run_results.json file was never created; or - # 2) there was a problem on dbt Cloud's side saving - # this artifact - try: - step_results = self.get_run_artifact( - account_id=account_id, - run_id=last_run_id, - path='run_results.json', - step=run_step['index'], - )['results'] - - # If the artifact isn't found, the API returns a 404 with - # no json. The ValueError will catch the JSONDecodeError - except ValueError: - rerun_steps.append(command) - else: - rerun_nodes = ' '.join( - [ - record['unique_id'].split('.')[2] - for record in step_results - if record['status'] - in ['error', 'skipped', 'fail'] - ] - ) - global_args = parse_args( - global_cli_args.keys(), namespace - ) - sub_command_args = parse_args( - sub_command_cli_args.keys(), namespace - ) - modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 - rerun_steps.append(modified_command) - self.console.log( - f'Modifying command "{command}" as an error ' - 'or failure was encountered.' - ) - - payload.update({"steps_override": rerun_steps}) + if trigger_on_failure_only and not has_failures: self.console.log( - f'Triggering modified job to re-run failed steps: {rerun_steps}' + f'Process triggered with trigger_on_failure_only set to True but no ' + 'failed run steps found. Terminating.' ) + return None + elif mode == 'autoscale': + self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') + most_recent_job_run = self.list_runs( + account_id=account_id, + job_definition_id=job_id, + limit=1, + order_by='-id' + )['data'][0] + most_recent_job_run_status = most_recent_job_run['status_humanized'] + + self.console.log(f'Status for most recent run of job {job_id} is {most_recent_job_run_status}.') + + if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: + self.console.log(f'autoscale set to true but base job with id {job_id} is free ' + 'triggering base job and ignoring autoscale configuration.') + autoscale_delete_post_run = False + else: - self.console.log( - 'Process triggered with restart_from_failure set to True but no ' - 'failed run steps found.' + self.console.log(f'job_id {job_id} has an active run. Cloning job.') + + new_job_definition = self.clone_job( + account_id=account_id, + job_id=job_id ) - if trigger_on_failure_only: - self.console.log( - 'Not triggering job because prior run was successful.' - ) - return + + #TODO: need to figure out the best way to disambiguate replicated jobs. + creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + new_job_name = '-'.join([new_job_definition['name'], creation_time]) + new_job_definition['name'] = new_job_name + job_id = self.create_job( + account_id=account_id, + payload=new_job_definition + )['data']['id'] + + self.console.log(f'Created new job with job_id: {job_id}') run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', method='post', json=payload, ) + if not run['status']['is_success']: self.console.log(f'Run NOT triggered for job {job_id}. See run response.') return run @@ -1282,6 +1280,12 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): JobRunStatus.ERROR, ]: break + + if mode == 'autoscale' and autoscale_delete_post_run: + self.delete_job( + account_id=account_id, + job_id=job_id + ) return run diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py index e859e79..da3f0b9 100644 --- a/dbtc/client/cloud/configs/dbt_cloud_api.py +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -1,5 +1,41 @@ +from typing import Dict -create_job_request = { - 'name': '', - 'id': None -} \ No newline at end of file +class dbtCloudAPIRequestFactory(object): + + def __init__(self, **kwargs): + for key,value in kwargs.items(): + setattr(self, key, value) + + def _create_job_request(self) -> Dict: + """Minimal set of required fields needed to create a new dbt Cloud job, including default values""" + return { + 'name': None, + 'id': None, + 'execution': None, + 'account_id': None, + 'project_id': None, + 'environment_id': None, + 'dbt_version': None, + 'execute_steps': None, + 'state': None, + 'deferring_job_definition_id': None, + 'triggers': None, + 'settings': None, + 'schedule': None + } + + def create_job_request(self, data={}) -> Dict: + """Completes the _create_job_request template with values from data and overrides + + Args: + data (dict): payload to create the initial request. Typically, this will be the result of a GET on the + job definition from an existing job to be used for dbt Cloud migrations + """ + # copy everything EXCEPT for the existing dbt Cloud job ID + result = self._create_job_request() + if data != {}: + for key in result.keys(): + if key != 'id': + result[key] = data[key] + + return result \ No newline at end of file From 1eb16834ffb154a2faef2b8c7628404ea8fe6ab9 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 10:52:08 -0600 Subject: [PATCH 06/19] create enums file --- dbtc/client/cloud/base.py | 11 +---------- dbtc/client/cloud/configs/enums.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 10 deletions(-) create mode 100644 dbtc/client/cloud/configs/enums.py diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index c16e216..da15218 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,19 +1,18 @@ # stdlib import argparse from datetime import datetime -import enum import shlex import time from functools import partial, wraps from typing import Dict, Iterable, List from urllib import request -import uuid # third party import requests # first party from dbtc.client.base import _Client +from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunModes from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory from dbtc.client.cloud.configs.dbt_core_cli import ( run_commands, @@ -21,14 +20,6 @@ sub_command_cli_args ) -class JobRunStatus(enum.IntEnum): - QUEUED = 1 - STARTING = 2 - RUNNING = 3 - SUCCESS = 10 - ERROR = 20 - CANCELLED = 30 - def _version_decorator(func, version): @wraps(func) diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py new file mode 100644 index 0000000..06bd16c --- /dev/null +++ b/dbtc/client/cloud/configs/enums.py @@ -0,0 +1,11 @@ +import + +class JobRunStatus(enum.IntEnum): + QUEUED = 1 + STARTING = 2 + RUNNING = 3 + SUCCESS = 10 + ERROR = 20 + CANCELLED = 30 + +class JobRunModes(enum.StrEnum): From d083cc218c537140ed8cb60216ea5892570a953a Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 12:41:26 -0600 Subject: [PATCH 07/19] update enums --- dbtc/client/cloud/base.py | 2 +- dbtc/client/cloud/configs/enums.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index da15218..ec5e9ae 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -114,7 +114,7 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): return obj def _validate_job_run_mode(self, mode): - if mode not in ['standard', 'restart_from_failure', 'autoscale']: + if mode not in JobRunModes: return False return True diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index 06bd16c..245b7e7 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -1,4 +1,4 @@ -import +import enum class JobRunStatus(enum.IntEnum): QUEUED = 1 @@ -8,4 +8,7 @@ class JobRunStatus(enum.IntEnum): ERROR = 20 CANCELLED = 30 -class JobRunModes(enum.StrEnum): +class JobRunModes(str, enum.Enum): + STANDARD = 'standard' + RESTART = 'restart_from_failure' + AUTOSCALE = 'autoscale' From 593e000320c88b21604428cad0b19b0de67ffbac Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 12:54:00 -0600 Subject: [PATCH 08/19] add autoscale name slug --- dbtc/cli.py | 6 ++++++ dbtc/client/cloud/base.py | 16 ++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index 9cad0e4..4a454ef 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1012,6 +1012,11 @@ def trigger_job( 'Delete job created via autoscaling after it finishes running' ) ), + autoscale_job_name_slug: str = typer.Option( + True, help=( + 'Append this string to the end of the replicated job name to disambiguate' + ) + ), ): """Trigger job to run.""" _dbt_cloud_request( @@ -1026,6 +1031,7 @@ def trigger_job( trigger_on_failure_only=trigger_on_failure_only, mode=mode, autoscale_delete_post_run=autoscale_delete_post_run, + autoscale_job_name_slug=autoscale_job_name_slug, ) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index ec5e9ae..c304884 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1146,6 +1146,7 @@ def trigger_job( trigger_on_failure_only: bool = False, mode: str = 'standard', autoscale_delete_post_run: bool = True, + autoscale_job_name_slug: str = None, ): """Trigger a job by its ID @@ -1172,6 +1173,9 @@ def trigger_job( creates a copy of the running job autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' Remove a job replicated via autoscaling after it finishes running. + autoscale_job_name_slug (str, optional): Only relevant when mode = 'autoscale' + append value to the existing job name when replicating the job definition. + If None defaults to the current timestamp on job creation """ def run_status_formatted(run: Dict, time: float) -> str: @@ -1235,10 +1239,14 @@ def run_status_formatted(run: Dict, time: float) -> str: job_id=job_id ) - #TODO: need to figure out the best way to disambiguate replicated jobs. - creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') - new_job_name = '-'.join([new_job_definition['name'], creation_time]) - new_job_definition['name'] = new_job_name + if not autoscale_job_name_slug: + creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + new_job_name = '-'.join([new_job_definition['name'], creation_time]) + new_job_definition['name'] = new_job_name + else: + new_job_name = '-'.join([new_job_definition['name'], autoscale_job_name_slug]) + new_job_definition['name'] = new_job_name + job_id = self.create_job( account_id=account_id, payload=new_job_definition From 191f1997780a4f72099a08dfa9ec79b23ea4e6cc Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 12:57:09 -0600 Subject: [PATCH 09/19] rename mode --> job_run_strategy --- dbtc/cli.py | 4 ++-- dbtc/client/cloud/base.py | 32 +++++++++++++++--------------- dbtc/client/cloud/configs/enums.py | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index 4a454ef..475ab57 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -996,7 +996,7 @@ def trigger_job( 'job.' ), ), - mode: str = typer.Option( + job_run_strategy: str = typer.Option( 'standard', help=( 'Possible values are ["standard", "restart_from_failure", "autoscale"] ' @@ -1029,7 +1029,7 @@ def trigger_job( poll_interval=poll_interval, restart_from_failure=restart_from_failure, trigger_on_failure_only=trigger_on_failure_only, - mode=mode, + job_run_strategy=job_run_strategy, autoscale_delete_post_run=autoscale_delete_post_run, autoscale_job_name_slug=autoscale_job_name_slug, ) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index c304884..12dab51 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -12,7 +12,7 @@ # first party from dbtc.client.base import _Client -from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunModes +from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunStrategies from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory from dbtc.client.cloud.configs.dbt_core_cli import ( run_commands, @@ -113,8 +113,8 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): obj = None return obj - def _validate_job_run_mode(self, mode): - if mode not in JobRunModes: + def _validate_job_run_strategy(self, job_run_strategy): + if job_run_strategy not in JobRunStrategies: return False return True @@ -1144,7 +1144,7 @@ def trigger_job( poll_interval: int = 10, restart_from_failure: bool = False, trigger_on_failure_only: bool = False, - mode: str = 'standard', + job_run_strategy: str = 'standard', autoscale_delete_post_run: bool = True, autoscale_job_name_slug: str = None, ): @@ -1164,16 +1164,16 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. - mode (str, optional): Must be one of ['standard', 'restart_from_failure', + job_run_strategy (str, optional): Must be one of ['standard', 'restart_from_failure', 'autoscaling']. - - standard mode triggers the job to run as-is. + - standard strategy triggers the job to run as-is. - restart_from_failure checks for errors on the prior invocation and, if found, restarts failed models only. - autoscale checks whether the job_id is actively running. If so, creates a copy of the running job - autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' + autoscale_delete_post_run (bool, optional): Only relevant when job_run_strategy = 'autoscale' Remove a job replicated via autoscaling after it finishes running. - autoscale_job_name_slug (str, optional): Only relevant when mode = 'autoscale' + autoscale_job_name_slug (str, optional): Only relevant when job_run_strategy = 'autoscale' append value to the existing job name when replicating the job definition. If None defaults to the current timestamp on job creation """ @@ -1193,13 +1193,13 @@ def run_status_formatted(run: Dict, time: float) -> str: # this is here to not break existing stuff 09.26.2022 if restart_from_failure: - mode = 'restart_from_failure' + job_run_strategy = 'restart_from_failure' - mode_is_valid = self._validate_job_run_mode(mode) - if not mode_is_valid: - raise Exception(f'mode: {mode} is not one of ["standard", "restart_from_failure", "autoscale"]') + is_valid_strategy = self._validate_job_run_mode(job_run_strategy) + if not is_valid_strategy: + raise Exception(f'strategy: {job_run_strategy} is not one of ["standard", "restart_from_failure", "autoscale"]') - if mode == 'restart_from_failure': + if job_run_strategy == 'restart_from_failure': self.console.log(f'Restarting job {job_id} from last failed state.') payload, has_failures = self._get_restart_job_definition( account_id=account_id, @@ -1214,7 +1214,7 @@ def run_status_formatted(run: Dict, time: float) -> str: ) return None - elif mode == 'autoscale': + elif job_run_strategy == 'autoscale': self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') most_recent_job_run = self.list_runs( account_id=account_id, @@ -1246,7 +1246,7 @@ def run_status_formatted(run: Dict, time: float) -> str: else: new_job_name = '-'.join([new_job_definition['name'], autoscale_job_name_slug]) new_job_definition['name'] = new_job_name - + job_id = self.create_job( account_id=account_id, payload=new_job_definition @@ -1280,7 +1280,7 @@ def run_status_formatted(run: Dict, time: float) -> str: ]: break - if mode == 'autoscale' and autoscale_delete_post_run: + if job_run_strategy == 'autoscale' and autoscale_delete_post_run: self.delete_job( account_id=account_id, job_id=job_id diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index 245b7e7..ef22a94 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -8,7 +8,7 @@ class JobRunStatus(enum.IntEnum): ERROR = 20 CANCELLED = 30 -class JobRunModes(str, enum.Enum): +class JobRunStrategies(str, enum.Enum): STANDARD = 'standard' RESTART = 'restart_from_failure' AUTOSCALE = 'autoscale' From 59bbc9fb724c3d35bc40cd3772556504ddd1d763 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 27 Sep 2022 15:50:23 -0600 Subject: [PATCH 10/19] Add pydantic --- .pre-commit-config.yaml | 2 +- poetry.lock | 18 +++++++++++++++++- pyproject.toml | 1 + 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1fc7838..3011d45 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: isort stages: [commit,push] name: isort - entry: poetry run isort -rc + entry: poetry run isort language: system types: [python] - id: black diff --git a/poetry.lock b/poetry.lock index 261a20a..47a2e4f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -600,6 +600,21 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "pydantic" +version = "1.10.2" +description = "Data validation and settings management using python type hints" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +typing-extensions = ">=4.1.0" + +[package.extras] +dotenv = ["python-dotenv (>=0.10.4)"] +email = ["email-validator (>=1.0.3)"] + [[package]] name = "pyflakes" version = "2.4.0" @@ -938,7 +953,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "02325e1d8e94719b09b921bed4f10cec541135cb29e02514367fa34594346b7b" +content-hash = "ecbd26f1cd1c50c8dd6a3199923a7c4bc888401255ea46a61097f8c3578ddb90" [metadata.files] appnope = [ @@ -1178,6 +1193,7 @@ pycodestyle = [ {file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"}, {file = "pycodestyle-2.8.0.tar.gz", hash = "sha256:eddd5847ef438ea1c7870ca7eb78a9d47ce0cdb4851a5523949f2601d0cbbe7f"}, ] +pydantic = [] pyflakes = [ {file = "pyflakes-2.4.0-py2.py3-none-any.whl", hash = "sha256:3bb3a3f256f4b7968c9c788781e4ff07dce46bdf12339dcda61053375426ee2e"}, {file = "pyflakes-2.4.0.tar.gz", hash = "sha256:05a85c2872edf37a4ed30b0cce2f6093e1d0581f8c19d7393122da7e25b2b24c"}, diff --git a/pyproject.toml b/pyproject.toml index c23c73d..c0fc77e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ python = "^3.8" sgqlc = "^15.0" requests = "^2.27.1" typer = {extras = ["all"], version = "^0.6.1"} +pydantic = "^1.10.2" [tool.poetry.dev-dependencies] black = "^22.1.0" From ccea05c462706133260a7fbd7e2b9d45aa2b16a6 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 27 Sep 2022 16:08:19 -0600 Subject: [PATCH 11/19] Add support for cloning, resource validation --- dbtc/cli.py | 7 +- dbtc/client/cloud/base.py | 199 ++++++++++++--------- dbtc/client/cloud/configs/dbt_cloud_api.py | 43 +++-- dbtc/client/cloud/configs/dbt_core_cli.py | 1 - dbtc/client/cloud/configs/enums.py | 3 + dbtc/client/cloud/models/__init__.py | 2 + dbtc/client/cloud/models/constants.py | 7 + dbtc/client/cloud/models/job.py | 63 +++++++ dbtc/client/cloud/models/project.py | 23 +++ 9 files changed, 235 insertions(+), 113 deletions(-) create mode 100644 dbtc/client/cloud/models/__init__.py create mode 100644 dbtc/client/cloud/models/constants.py create mode 100644 dbtc/client/cloud/models/job.py create mode 100644 dbtc/client/cloud/models/project.py diff --git a/dbtc/cli.py b/dbtc/cli.py index 9cad0e4..75dcaae 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1,5 +1,4 @@ # stdlib -from enum import auto import json from typing import List, Optional @@ -1005,12 +1004,10 @@ def trigger_job( ' exited with an error. If yes, restart from the point of failure ' 'autoscale: determine with the target job is currently running ' ' If yes, create and then run the clone.' - ) + ), ), autoscale_delete_post_run: bool = typer.Option( - True, help=( - 'Delete job created via autoscaling after it finishes running' - ) + True, help=('Delete job created via autoscaling after it finishes running') ), ): """Trigger job to run.""" diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index ec5e9ae..d9b9783 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,24 +1,23 @@ # stdlib import argparse -from datetime import datetime import shlex import time +from datetime import datetime from functools import partial, wraps from typing import Dict, Iterable, List -from urllib import request # third party import requests # first party from dbtc.client.base import _Client -from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunModes -from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory +from dbtc.client.cloud import models from dbtc.client.cloud.configs.dbt_core_cli import ( - run_commands, global_cli_args, - sub_command_cli_args + run_commands, + sub_command_cli_args, ) +from dbtc.client.cloud.configs.enums import JobRunModes, JobRunStatus def _version_decorator(func, version): @@ -31,6 +30,7 @@ def wrapper(self, *args, **kwargs): return wrapper +# Version Decorators v2 = partial(_version_decorator, version='v2') v3 = partial(_version_decorator, version='v3') v4 = partial(_version_decorator, version='v4') @@ -42,7 +42,6 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() - self.request_factory = dbtCloudAPIRequestFactory() all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] @@ -60,10 +59,28 @@ def _header_property(self): return 'api_key' + def _clone_resource(self, resource: str, **kwargs): + create_args = kwargs.pop('create_args', None) + payload = getattr(self, f'get_{resource}')(**kwargs)['data'] + + # Can't recreate a resource with an ID + payload.pop('id', None) + if create_args is not None: + kwargs = {k: v for k, v in kwargs.items() if k in create_args} + kwargs['payload'] = payload + return getattr(self, f'create_{resource}')(**kwargs) + def _make_request( self, path: str, *, method: str = 'get', **kwargs ) -> requests.Response: """Make request to API.""" + + # Model is not an argument that the request method accepts, needs to be removed + model = kwargs.pop('model', None) + if model is not None: + + # This will validate the payload as well as add any optional fields + kwargs['json'] = model(**kwargs['json']).dict() full_url = self.full_url(path) response = self.session.request(method=method, url=full_url, **kwargs) return response @@ -112,11 +129,11 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): except IndexError: obj = None return obj - + def _validate_job_run_mode(self, mode): if mode not in JobRunModes: return False - + return True @v3 @@ -183,6 +200,27 @@ def cancel_run(self, account_id: int, run_id: int) -> Dict: method='post', ) + @v2 + def clone_job( + self, + account_id: int, + job_id: int, + ): + + """Create a job using the configuration of another + + !!! tip + If a job is currently running, replicate the job definition to a new job, + and trigger + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + """ + return self._clone_resource( + 'job', account_id=account_id, job_id=job_id, create_args=['account_id'] + ) + @v3 def create_adapter(self, account_id: int, project_id: int, payload: Dict) -> Dict: """Create an adapter @@ -280,6 +318,7 @@ def create_job(self, account_id: int, payload: Dict) -> Dict: f'accounts/{account_id}/jobs/', method='post', json=payload, + model=models.Job, ) @v3 @@ -291,7 +330,10 @@ def create_project(self, account_id: int, payload: Dict) -> Dict: payload (dict): Dictionary representing the project to create """ return self._simple_request( - f'accounts/{account_id}/projects/', method='post', json=payload + f'accounts/{account_id}/projects/', + method='post', + json=payload, + model=models.Project, ) @v3 @@ -785,7 +827,11 @@ def list_invited_users(self, account_id: int) -> Dict: @v2 def list_jobs( - self, account_id: int, *, order_by: str = None, project_id: int = None, + self, + account_id: int, + *, + order_by: str = None, + project_id: int = None, ) -> Dict: """List jobs in an account or specific project. @@ -794,7 +840,7 @@ def list_jobs( order_by (str, optional): Field to order the result by. Use - to indicate reverse order. project_id (int, optional): Numeric ID of the project containing jobs - """ + """ return self._simple_request( f'accounts/{account_id}/jobs/', params={'order_by': order_by, 'project_id': project_id}, @@ -990,30 +1036,8 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: return self._simple_request( f'accounts/{account_id}/connections/test/', method='post', json=payload ) - - @v2 - def clone_job( - self, - account_id: int, - job_id: int, - ): - """If a job is currently running, replicate the job definition to a new job - - Args: - account_id (int): Numeric ID of the account to retrieve - job_id (int): Numeric ID of the job to trigger - payload (dict): Payload required for post request - """ - - existing_job_definition = self.get_job( - account_id=account_id, - job_id=job_id - )['data'] - - return self.request_factory.create_job_request(data=existing_job_definition) - - @v2 + @v2 def _get_restart_job_definition( self, account_id: int, @@ -1022,9 +1046,9 @@ def _get_restart_job_definition( ): """Identifies whether there was a failure on the previous run of the job. - When failures are identified, returns an updated job definition to + When failures are identified, returns an updated job definition to restart from the point of failure. - + Args: account_id (int): Numeric ID of the account to retrieve job_id (int): Numeric ID of the job to trigger @@ -1042,16 +1066,16 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): else: string += f" --{arg} '{value}'" return string - + has_failures = False last_run_data = self.list_runs( - account_id=account_id, - include_related=['run_steps'], - job_definition_id=job_id, - order_by='-id', - limit=1, - )['data'][0] + account_id=account_id, + include_related=['run_steps'], + job_definition_id=job_id, + order_by='-id', + limit=1, + )['data'][0] last_run_status = last_run_data['status_humanized'].lower() last_run_id = last_run_data['id'] @@ -1111,13 +1135,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): [ record['unique_id'].split('.')[2] for record in step_results - if record['status'] - in ['error', 'skipped', 'fail'] + if record['status'] in ['error', 'skipped', 'fail'] ] ) - global_args = parse_args( - global_cli_args.keys(), namespace - ) + global_args = parse_args(global_cli_args.keys(), namespace) sub_command_args = parse_args( sub_command_cli_args.keys(), namespace ) @@ -1129,10 +1150,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): ) if len(rerun_steps) > 0: has_failures = True - payload.update({"steps_override": rerun_steps}) - + payload.update({"steps_override": rerun_steps}) + return payload, has_failures - + @v2 def trigger_job( self, @@ -1163,14 +1184,15 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. - mode (str, optional): Must be one of ['standard', 'restart_from_failure', - 'autoscaling']. + mode (str, optional): Must be one of ['standard', 'restart_from_failure', + 'autoscaling']. - standard mode triggers the job to run as-is. - - restart_from_failure checks for errors on the prior invocation and, + - restart_from_failure checks for errors on the prior invocation and, if found, restarts failed models only. - autoscale checks whether the job_id is actively running. If so, creates a copy of the running job - autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' + autoscale_delete_post_run (bool, optional): Only relevant when + mode = 'autoscale' Remove a job replicated via autoscaling after it finishes running. """ @@ -1186,62 +1208,66 @@ def run_status_formatted(run: Dict, time: float) -> str: f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' f', View here: {url}' ) - + # this is here to not break existing stuff 09.26.2022 if restart_from_failure: mode = 'restart_from_failure' mode_is_valid = self._validate_job_run_mode(mode) if not mode_is_valid: - raise Exception(f'mode: {mode} is not one of ["standard", "restart_from_failure", "autoscale"]') + raise Exception( + f'mode: {mode} is not one of ' + '["standard", "restart_from_failure", "autoscale"]' + ) if mode == 'restart_from_failure': self.console.log(f'Restarting job {job_id} from last failed state.') payload, has_failures = self._get_restart_job_definition( - account_id=account_id, - job_id=job_id, - payload=payload + account_id=account_id, job_id=job_id, payload=payload ) if trigger_on_failure_only and not has_failures: self.console.log( - f'Process triggered with trigger_on_failure_only set to True but no ' - 'failed run steps found. Terminating.' + 'Process triggered with trigger_on_failure_only set to True but ' + 'no failed run steps found. Terminating.' ) return None elif mode == 'autoscale': - self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') + self.console.log( + 'Triggered with autoscaling set to True. ' + 'Detecting any running instances' + ) most_recent_job_run = self.list_runs( - account_id=account_id, - job_definition_id=job_id, - limit=1, - order_by='-id' + account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' )['data'][0] most_recent_job_run_status = most_recent_job_run['status_humanized'] - - self.console.log(f'Status for most recent run of job {job_id} is {most_recent_job_run_status}.') + + self.console.log( + f'Status for most recent run of job {job_id} ' + f'is {most_recent_job_run_status}.' + ) if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: - self.console.log(f'autoscale set to true but base job with id {job_id} is free ' - 'triggering base job and ignoring autoscale configuration.') + self.console.log( + f'autoscale set to true but base job with id {job_id} is free ' + 'triggering base job and ignoring autoscale configuration.' + ) autoscale_delete_post_run = False - + else: self.console.log(f'job_id {job_id} has an active run. Cloning job.') - + new_job_definition = self.clone_job( - account_id=account_id, - job_id=job_id + account_id=account_id, job_id=job_id ) - - #TODO: need to figure out the best way to disambiguate replicated jobs. + + # TODO: need to figure out the best way to disambiguate replicated jobs. creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') new_job_name = '-'.join([new_job_definition['name'], creation_time]) new_job_definition['name'] = new_job_name job_id = self.create_job( - account_id=account_id, - payload=new_job_definition + account_id=account_id, payload=new_job_definition )['data']['id'] self.console.log(f'Created new job with job_id: {job_id}') @@ -1251,7 +1277,7 @@ def run_status_formatted(run: Dict, time: float) -> str: method='post', json=payload, ) - + if not run['status']['is_success']: self.console.log(f'Run NOT triggered for job {job_id}. See run response.') return run @@ -1271,12 +1297,9 @@ def run_status_formatted(run: Dict, time: float) -> str: JobRunStatus.ERROR, ]: break - + if mode == 'autoscale' and autoscale_delete_post_run: - self.delete_job( - account_id=account_id, - job_id=job_id - ) + self.delete_job(account_id=account_id, job_id=job_id) return run @@ -1293,7 +1316,7 @@ def update_connection( payload (dict): Dictionary representing the connection to update """ return self._simple_request( - f'accounts/{account_id}/projects/{project_id}/connections/{connection_id}/', + f'accounts/{account_id}/projects/{project_id}/connections/{connection_id}/', # noqa: E501 method='post', json=payload, ) @@ -1311,7 +1334,7 @@ def update_credentials( payload (dict): Dictionary representing the credentials to update """ return self._simple_request( - f'accounts/{account_id}/projects/{project_id}/credentials/{credentials_id}/', # noqa: E50 + f'accounts/{account_id}/projects/{project_id}/credentials/{credentials_id}/', # noqa: E501 method='post', json=payload, ) diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py index da3f0b9..c980c06 100644 --- a/dbtc/client/cloud/configs/dbt_cloud_api.py +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -1,35 +1,40 @@ +# stdlib from typing import Dict + class dbtCloudAPIRequestFactory(object): - def __init__(self, **kwargs): - for key,value in kwargs.items(): + for key, value in kwargs.items(): setattr(self, key, value) def _create_job_request(self) -> Dict: - """Minimal set of required fields needed to create a new dbt Cloud job, including default values""" + """Minimal set of required fields needed to create a new dbt Cloud job, + including default values + """ return { 'name': None, 'id': None, 'execution': None, - 'account_id': None, - 'project_id': None, - 'environment_id': None, - 'dbt_version': None, - 'execute_steps': None, - 'state': None, - 'deferring_job_definition_id': None, - 'triggers': None, - 'settings': None, - 'schedule': None + 'account_id': None, + 'project_id': None, + 'environment_id': None, + 'dbt_version': None, + 'execute_steps': None, + 'state': None, + 'deferring_job_definition_id': None, + 'triggers': None, + 'settings': None, + 'schedule': None, } - + def create_job_request(self, data={}) -> Dict: - """Completes the _create_job_request template with values from data and overrides - + """Completes the _create_job_request template with values from data and + overrides + Args: - data (dict): payload to create the initial request. Typically, this will be the result of a GET on the - job definition from an existing job to be used for dbt Cloud migrations + data (dict): payload to create the initial request. Typically, this will be + the result of a GET on the job definition from an existing job to be used + for dbt Cloud migrations """ # copy everything EXCEPT for the existing dbt Cloud job ID result = self._create_job_request() @@ -38,4 +43,4 @@ def create_job_request(self, data={}) -> Dict: if key != 'id': result[key] = data[key] - return result \ No newline at end of file + return result diff --git a/dbtc/client/cloud/configs/dbt_core_cli.py b/dbtc/client/cloud/configs/dbt_core_cli.py index 19a43fc..bc9df6a 100644 --- a/dbtc/client/cloud/configs/dbt_core_cli.py +++ b/dbtc/client/cloud/configs/dbt_core_cli.py @@ -1,4 +1,3 @@ - run_commands = ['build', 'run', 'test', 'seed', 'snapshot'] global_cli_args = { diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index 245b7e7..e909e3f 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -1,5 +1,7 @@ +# stdlib import enum + class JobRunStatus(enum.IntEnum): QUEUED = 1 STARTING = 2 @@ -8,6 +10,7 @@ class JobRunStatus(enum.IntEnum): ERROR = 20 CANCELLED = 30 + class JobRunModes(str, enum.Enum): STANDARD = 'standard' RESTART = 'restart_from_failure' diff --git a/dbtc/client/cloud/models/__init__.py b/dbtc/client/cloud/models/__init__.py new file mode 100644 index 0000000..93852cb --- /dev/null +++ b/dbtc/client/cloud/models/__init__.py @@ -0,0 +1,2 @@ +from .job import Job # noqa: F401 +from .project import Project # noqa: F401 diff --git a/dbtc/client/cloud/models/constants.py b/dbtc/client/cloud/models/constants.py new file mode 100644 index 0000000..e861841 --- /dev/null +++ b/dbtc/client/cloud/models/constants.py @@ -0,0 +1,7 @@ +# stdlib +import enum + + +class State(enum.IntEnum): + active = 1 + deleted = 2 diff --git a/dbtc/client/cloud/models/job.py b/dbtc/client/cloud/models/job.py new file mode 100644 index 0000000..2f19d7e --- /dev/null +++ b/dbtc/client/cloud/models/job.py @@ -0,0 +1,63 @@ +# stdlib +from typing import List, Literal, Optional + +# third party +from pydantic import BaseModel + +from .constants import State + + +class _JobExecution(BaseModel): + timeout_seconds: int + + +class _JobSchedule(BaseModel): + cron: str + date: Literal['custom_cron', 'days_of_week', 'every_day'] + time: Literal['every_hour', 'at_exact_hours'] + + +class _JobSettings(BaseModel): + threads: int + target_name: str + + +class _JobTrigger(BaseModel): + github_webhook: bool + schedule: bool + git_provider_webhook: Optional[bool] = None + + +class Job(BaseModel): + + # Required + account_id: int + environment_id: int + generate_docs: bool + name: str + project_id: int + run_generate_sources: bool + state: Literal[State.active, State.deleted] + + # Optional + dbt_version: Optional[str] = None + deactivated: bool = False + deferring_job_definiton_id: Optional[int] = None + execute_steps: Optional[List[str]] = None + execution: Optional[_JobExecution] = None + id: Optional[int] = None + is_deferrable: Optional[bool] = False + run_failure_count: int = 0 + schedule: Optional[_JobSchedule] = None + settings: Optional[_JobSettings] = None + triggers: Optional[_JobTrigger] = None + + def __init__(self, **data): + schedule = data.get('schedule', {}) + date = schedule.get('date', {}).get('type', None) + time = schedule.get('time', {}).get('type', None) + if date is not None: + data['schedule']['date'] = date + if time is not None: + data['schedule']['time'] = time + super().__init__(**data) diff --git a/dbtc/client/cloud/models/project.py b/dbtc/client/cloud/models/project.py new file mode 100644 index 0000000..0136288 --- /dev/null +++ b/dbtc/client/cloud/models/project.py @@ -0,0 +1,23 @@ +# stdlib +from typing import Optional + +# third party +from pydantic import BaseModel + +from .constants import State + + +class Project(BaseModel): + + # Required + account_id: int + name: str + + # Optional + id: Optional[int] = None + connection_id: Optional[int] = None + dbt_project_subdirectory: Optional[str] = None + docs_job_id: Optional[int] = None + freshness_job_id: Optional[int] = None + repository_id: Optional[int] = None + state: int = State.active From c718176f290f0953d519d04d15d2c34985d94cc9 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Wed, 28 Sep 2022 14:36:52 -0600 Subject: [PATCH 12/19] autoscaling works again --- dbtc/cli.py | 94 ++++-- dbtc/client/cloud/base.py | 362 ++++++++++----------- dbtc/client/cloud/configs/dbt_cloud_api.py | 46 --- dbtc/client/cloud/configs/enums.py | 14 - dbtc/client/cloud/models/job.py | 17 +- 5 files changed, 255 insertions(+), 278 deletions(-) delete mode 100644 dbtc/client/cloud/configs/dbt_cloud_api.py diff --git a/dbtc/cli.py b/dbtc/cli.py index 7bea3d0..4d10aaf 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1,4 +1,5 @@ # stdlib +from enum import auto import json from typing import List, Optional @@ -255,11 +256,10 @@ def create_environment_variables( def create_job( ctx: typer.Context, account_id: int = ACCOUNT_ID, - project_id: int = PROJECT_ID, payload: str = PAYLOAD, ): """Create a job in a project.""" - _dbt_cloud_request(ctx, 'create_job', account_id, project_id, json.loads(payload)) + _dbt_cloud_request(ctx, 'create_job', account_id, json.loads(payload)) @app.command() @@ -969,6 +969,74 @@ def test_connection( ) +@app.command() +def trigger_job_with_autoscaling( + ctx: typer.Context, + account_id: int = ACCOUNT_ID, + job_id: int = JOB_ID, + payload: str = PAYLOAD, + should_poll: bool = typer.Option( + True, + help='Poll until job completion (status is one of success, failure, or ' + 'cancelled)', + ), + poll_interval: int = typer.Option( + 10, '--poll-interval', help='Number of seconds to wait in between polling.' + ), + autoscale_delete_post_run: bool = typer.Option( + False, help='Delete the cloned job immediately after it runs' + ), + autoscale_job_identifier: str = typer.Option( + False, + help=( + 'A string to append to the name of the job with id specified in job_id ' + 'E.g. if the job is named "my_dbt_cloud_job" and the autoscale_job_identifier ' + 'is "my_new_job", the cloned job will be named "my_dbt_cloud_job-my_new_job" ' + ), + ), +): + """Trigger job to run.""" + _dbt_cloud_request( + ctx, + 'trigger_job_with_autoscaling', + account_id, + job_id, + json.loads(payload), + should_poll=should_poll, + poll_interval=poll_interval, + autoscale_delete_post_run=autoscale_delete_post_run, + autoscale_job_identifier=autoscale_job_identifier, + ) + + +@app.command() +def trigger_job_restart_from_failure( + ctx: typer.Context, + account_id: int = ACCOUNT_ID, + job_id: int = JOB_ID, + payload: str = PAYLOAD, + should_poll: bool = typer.Option( + True, + help='Poll until job completion (status is one of success, failure, or ' + 'cancelled)', + ), + poll_interval: int = typer.Option( + 10, '--poll-interval', help='Number of seconds to wait in between polling.' + ) +): + """Trigger job to rerun from the point of failure on its most recent run + Does nothing if no failures are identified on the most recent run.""" + _dbt_cloud_request( + ctx, + 'trigger_job_restart_from_failure', + account_id, + job_id, + json.loads(payload), + should_poll=should_poll, + poll_interval=poll_interval, + ) + + @app.command() def trigger_job( ctx: typer.Context, @@ -995,25 +1063,6 @@ def trigger_job( 'job.' ), ), - job_run_strategy: str = typer.Option( - 'standard', - help=( - 'Possible values are ["standard", "restart_from_failure", "autoscale"] ' - 'standard: runs existing job as-is ' - 'restart_from_failure: determine whether the last run of the target job ' - ' exited with an error. If yes, restart from the point of failure ' - 'autoscale: determine with the target job is currently running ' - ' If yes, create and then run the clone.' - ), - ), - autoscale_delete_post_run: bool = typer.Option( - True, help=('Delete job created via autoscaling after it finishes running') - ), - autoscale_job_name_slug: str = typer.Option( - True, help=( - 'Append this string to the end of the replicated job name to disambiguate' - ) - ), ): """Trigger job to run.""" _dbt_cloud_request( @@ -1026,9 +1075,6 @@ def trigger_job( poll_interval=poll_interval, restart_from_failure=restart_from_failure, trigger_on_failure_only=trigger_on_failure_only, - job_run_strategy=job_run_strategy, - autoscale_delete_post_run=autoscale_delete_post_run, - autoscale_job_name_slug=autoscale_job_name_slug, ) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 9e2e31c..132f5af 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,32 +1,25 @@ # stdlib import argparse +from asyncore import poll import shlex import time from datetime import datetime from functools import partial, wraps from typing import Dict, Iterable, List +from xxlimited import new # third party import requests # first party from dbtc.client.base import _Client -<<<<<<< HEAD -<<<<<<< HEAD -from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunStrategies -from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory -======= from dbtc.client.cloud import models ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= -from dbtc.client.cloud import models ->>>>>>> feat/model-validation from dbtc.client.cloud.configs.dbt_core_cli import ( global_cli_args, run_commands, sub_command_cli_args, ) -from dbtc.client.cloud.configs.enums import JobRunStrategies, JobRunStatus +from dbtc.client.cloud.configs.enums import JobRunStatus def _version_decorator(func, version): @@ -68,15 +61,17 @@ def _header_property(self): return 'api_key' - def _clone_resource(self, resource: str, **kwargs): + def _clone_resource(self, resource: str, account_id: int, **kwargs): create_args = kwargs.pop('create_args', None) payload = getattr(self, f'get_{resource}')(**kwargs)['data'] # Can't recreate a resource with an ID payload.pop('id', None) if create_args is not None: - kwargs = {k: v for k, v in kwargs.items() if k in create_args} + payload = {k: v for k, v in kwargs.items() if k in create_args} kwargs['payload'] = payload + kwargs['account_id'] = account_id + return getattr(self, f'create_{resource}')(**kwargs) def _make_request( @@ -138,24 +133,6 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): except IndexError: obj = None return obj -<<<<<<< HEAD -<<<<<<< HEAD - - def _validate_job_run_strategy(self, job_run_strategy): - if job_run_strategy not in JobRunStrategies: -======= - - def _validate_job_run_mode(self, mode): - if mode not in JobRunModes: ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= - - def _validate_job_run_strategy(self, job_run_strategy): - if job_run_strategy not in JobRunStrategies: ->>>>>>> feat/model-validation - return False - - return True @v3 def assign_group_permissions( @@ -339,7 +316,7 @@ def create_job(self, account_id: int, payload: Dict) -> Dict: f'accounts/{account_id}/jobs/', method='post', json=payload, - model=models.Job, + #model=models.Job, ) @v3 @@ -1174,7 +1151,163 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): payload.update({"steps_override": rerun_steps}) return payload, has_failures + + @v2 + def trigger_job_with_autoscaling( + self, + account_id: int, + job_id: int, + payload: Dict, + *, + should_poll: bool = True, + poll_interval: int = 10, + autoscale_delete_post_run: bool = True, + autoscale_job_identifier: str = None, + ): + """Check if job with id = job_id is actively running. If it is, create a + clone of the target job and then trigger the clone to run using self.trigger_job + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + should_poll (bool, optional): Poll until completion if `True`, completion + is one of success, failure, or cancelled + poll_interval (int, optional): Number of seconds to wait in between + polling + autoscale_delete_post_run (bool, optional): Only relevant when job_run_strategy = 'autoscale' + Remove a job replicated via autoscaling after it finishes running. + autoscale_job_identifier (str, optional): Only relevant when job_run_strategy = 'autoscale' + append value to the existing job name when replicating the job definition. + If None defaults to the current timestamp on job creation + """ + + is_new_job_created = False + self.console.log( + 'Triggered with autoscaling set to True. ' + 'Detecting any running instances' + ) + most_recent_job_run = self.list_runs( + account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' + )['data'][0] + most_recent_job_run_status = most_recent_job_run['status_humanized'] + + self.console.log( + f'Status for most recent run of job {job_id} ' + f'is {most_recent_job_run_status}.' + ) + + if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: + self.console.log( + f'autoscale set to true but base job with id {job_id} is free. ' + 'triggering base job and ignoring autoscale configuration.' + ) + + else: + self.console.log(f'job_id {job_id} has an active run. Cloning job.') + optional_fields = [k for k,v in models.Job.__fields__.items() if not v.required] + required_fields = [k for k,v in models.Job.__fields__.items() if v.required] + existing_job_defintion = self.get_job( + account_id=account_id, job_id=job_id + )['data'] + + new_job_definition = {k:v for k,v in existing_job_defintion.items() if k in required_fields} + + if not autoscale_job_identifier: + creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + new_job_name = '-'.join([new_job_definition['name'], creation_time]) + new_job_definition['name'] = new_job_name + else: + new_job_name = '-'.join([new_job_definition['name'], autoscale_job_identifier]) + new_job_definition['name'] = new_job_name + + # make sure the id is none on the new job + new_job_definition['id'] = None + job_id = self.create_job( + account_id=account_id, payload=new_job_definition + )['data']['id'] + + # fill in optional fields on the job if present in the source + for field in optional_fields: + new_job_definition[field] = existing_job_defintion.get(field) + + # set the ID in the update request + new_job_definition['id'] = job_id + self.console.log(new_job_definition) + job_id = self.update_job( + account_id=account_id, job_id=job_id, payload=new_job_definition + )['data']['id'] + + is_new_job_created = True + + self.console.log(f'Created new job with job_id: {job_id}') + + + self.console.log(f'Triggering new job with job_id: {job_id}') + run = self.trigger_job( + account_id=account_id, + job_id=job_id, + payload=payload, + should_poll=should_poll, + poll_interval=poll_interval + ) + + if is_new_job_created and autoscale_delete_post_run: + self.console.log(f'Deleting autoscaled job with job_id: {job_id} post run') + self.delete_job( + account_id=account_id, + job_id=job_id + ) + + return run + + @v2 + def trigger_job_restart_from_failure( + self, + account_id: int, + job_id: int, + payload: Dict, + *, + should_poll: bool = True, + poll_interval: int = 10, + ): + """Check if job with job_id had failed steps on its prior run. + If failed steps are found, modify the job run commands to restart + from the point of failure. + If no failed steps are found, return None + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + should_poll (bool, optional): Poll until completion if `True`, completion + is one of success, failure, or cancelled + poll_interval (int, optional): Number of seconds to wait in between + polling + """ + + self.console.log(f'Detecting failures on prior run of job {job_id}') + payload, has_failures = self._get_restart_job_definition( + account_id=account_id, + job_id=job_id, + payload=payload + ) + + if has_failures: + run = self.trigger_job( + account_id=account_id, + job_id=job_id, + payload=payload, + should_poll=should_poll, + poll_interval=poll_interval + ) + return run + + self.console.log(f'No failed steps identified for job {job_id}. Exiting') + + return None + @v2 def trigger_job( self, @@ -1185,10 +1318,7 @@ def trigger_job( should_poll: bool = True, poll_interval: int = 10, restart_from_failure: bool = False, - trigger_on_failure_only: bool = False, - job_run_strategy: str = 'standard', - autoscale_delete_post_run: bool = True, - autoscale_job_identifier: str = None, + trigger_on_failure_only: bool = True, ): """Trigger a job by its ID @@ -1206,36 +1336,6 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> feat/model-validation - job_run_strategy (str, optional): Must be one of ['standard', 'restart_from_failure', - 'autoscaling']. - - standard strategy triggers the job to run as-is. - - restart_from_failure checks for errors on the prior invocation and, - if found, restarts failed models only. - - autoscale checks whether the job_id is actively running. If so, - creates a copy of the running job - autoscale_delete_post_run (bool, optional): Only relevant when job_run_strategy = 'autoscale' -<<<<<<< HEAD -======= - mode (str, optional): Must be one of ['standard', 'restart_from_failure', - 'autoscaling']. - - standard mode triggers the job to run as-is. - - restart_from_failure checks for errors on the prior invocation and, - if found, restarts failed models only. - - autoscale checks whether the job_id is actively running. If so, - creates a copy of the running job - autoscale_delete_post_run (bool, optional): Only relevant when - mode = 'autoscale' ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= ->>>>>>> feat/model-validation - Remove a job replicated via autoscaling after it finishes running. - autoscale_job_identifier (str, optional): Only relevant when job_run_strategy = 'autoscale' - append value to the existing job name when replicating the job definition. - If None defaults to the current timestamp on job creation """ def run_status_formatted(run: Dict, time: float) -> str: @@ -1253,112 +1353,18 @@ def run_status_formatted(run: Dict, time: float) -> str: # this is here to not break existing stuff 09.26.2022 if restart_from_failure: - job_run_strategy = 'restart_from_failure' - -<<<<<<< HEAD -<<<<<<< HEAD - is_valid_strategy = self._validate_job_run_mode(job_run_strategy) - if not is_valid_strategy: - raise Exception(f'strategy: {job_run_strategy} is not one of ["standard", "restart_from_failure", "autoscale"]') -======= - mode_is_valid = self._validate_job_run_mode(mode) - if not mode_is_valid: -======= - is_valid_strategy = self._validate_job_run_mode(job_run_strategy) - if not is_valid_strategy: ->>>>>>> feat/model-validation - raise Exception( - f'strategy: {job_run_strategy} is not one of ' - '["standard", "restart_from_failure", "autoscale"]' -<<<<<<< HEAD - ) ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= - ) ->>>>>>> feat/model-validation - - if job_run_strategy == 'restart_from_failure': - self.console.log(f'Restarting job {job_id} from last failed state.') - payload, has_failures = self._get_restart_job_definition( - account_id=account_id, job_id=job_id, payload=payload - ) - - if trigger_on_failure_only and not has_failures: - self.console.log( - 'Process triggered with trigger_on_failure_only set to True but ' - 'no failed run steps found. Terminating.' - ) - return None - -<<<<<<< HEAD -<<<<<<< HEAD - elif job_run_strategy == 'autoscale': - self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') -======= - elif mode == 'autoscale': -======= - elif job_run_strategy == 'autoscale': ->>>>>>> feat/model-validation - self.console.log( - 'Triggered with autoscaling set to True. ' - 'Detecting any running instances' - ) -<<<<<<< HEAD ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= ->>>>>>> feat/model-validation - most_recent_job_run = self.list_runs( - account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' - )['data'][0] - most_recent_job_run_status = most_recent_job_run['status_humanized'] - - self.console.log( - f'Status for most recent run of job {job_id} ' - f'is {most_recent_job_run_status}.' + run = self.restart_job_from_failure( + account_id=account_id, + job_id=job_id, + payload=payload ) - - if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: + + if not run and trigger_on_failure_only: self.console.log( - f'autoscale set to true but base job with id {job_id} is free ' - 'triggering base job and ignoring autoscale configuration.' - ) - autoscale_delete_post_run = False - - else: - self.console.log(f'job_id {job_id} has an active run. Cloning job.') - - new_job_definition = self.clone_job( - account_id=account_id, job_id=job_id - ) -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> feat/model-validation - - if not autoscale_job_identifier: - creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') - new_job_name = '-'.join([new_job_definition['name'], creation_time]) - new_job_definition['name'] = new_job_name - else: - new_job_name = '-'.join([new_job_definition['name'], autoscale_job_name_slug]) - new_job_definition['name'] = new_job_name - -<<<<<<< HEAD -======= - - # TODO: need to figure out the best way to disambiguate replicated jobs. - creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') - new_job_name = '-'.join([new_job_definition['name'], creation_time]) - new_job_definition['name'] = new_job_name ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= ->>>>>>> feat/model-validation - job_id = self.create_job( - account_id=account_id, payload=new_job_definition - )['data']['id'] - - self.console.log(f'Created new job with job_id: {job_id}') - + 'Not triggering job because prior run was successful.' + ) + return + run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', method='post', @@ -1384,24 +1390,6 @@ def run_status_formatted(run: Dict, time: float) -> str: JobRunStatus.ERROR, ]: break -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> feat/model-validation - - if job_run_strategy == 'autoscale' and autoscale_delete_post_run: - self.delete_job( - account_id=account_id, - job_id=job_id - ) -<<<<<<< HEAD -======= - - if mode == 'autoscale' and autoscale_delete_post_run: - self.delete_job(account_id=account_id, job_id=job_id) ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= ->>>>>>> feat/model-validation return run diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py deleted file mode 100644 index c980c06..0000000 --- a/dbtc/client/cloud/configs/dbt_cloud_api.py +++ /dev/null @@ -1,46 +0,0 @@ -# stdlib -from typing import Dict - - -class dbtCloudAPIRequestFactory(object): - def __init__(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) - - def _create_job_request(self) -> Dict: - """Minimal set of required fields needed to create a new dbt Cloud job, - including default values - """ - return { - 'name': None, - 'id': None, - 'execution': None, - 'account_id': None, - 'project_id': None, - 'environment_id': None, - 'dbt_version': None, - 'execute_steps': None, - 'state': None, - 'deferring_job_definition_id': None, - 'triggers': None, - 'settings': None, - 'schedule': None, - } - - def create_job_request(self, data={}) -> Dict: - """Completes the _create_job_request template with values from data and - overrides - - Args: - data (dict): payload to create the initial request. Typically, this will be - the result of a GET on the job definition from an existing job to be used - for dbt Cloud migrations - """ - # copy everything EXCEPT for the existing dbt Cloud job ID - result = self._create_job_request() - if data != {}: - for key in result.keys(): - if key != 'id': - result[key] = data[key] - - return result diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index eef1991..7691ff0 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -9,17 +9,3 @@ class JobRunStatus(enum.IntEnum): SUCCESS = 10 ERROR = 20 CANCELLED = 30 - -<<<<<<< HEAD -<<<<<<< HEAD -class JobRunStrategies(str, enum.Enum): -======= - -class JobRunModes(str, enum.Enum): ->>>>>>> ccea05c462706133260a7fbd7e2b9d45aa2b16a6 -======= -class JobRunStrategies(str, enum.Enum): ->>>>>>> feat/model-validation - STANDARD = 'standard' - RESTART = 'restart_from_failure' - AUTOSCALE = 'autoscale' diff --git a/dbtc/client/cloud/models/job.py b/dbtc/client/cloud/models/job.py index 2f19d7e..b0c77d8 100644 --- a/dbtc/client/cloud/models/job.py +++ b/dbtc/client/cloud/models/job.py @@ -28,6 +28,10 @@ class _JobTrigger(BaseModel): git_provider_webhook: Optional[bool] = None +class Test(BaseModel): + account_id: int + id: Optional[int] = None + class Job(BaseModel): # Required @@ -35,22 +39,21 @@ class Job(BaseModel): environment_id: int generate_docs: bool name: str + dbt_version: str project_id: int run_generate_sources: bool + schedule: _JobSchedule + settings: _JobSettings + triggers: _JobTrigger state: Literal[State.active, State.deleted] # Optional - dbt_version: Optional[str] = None - deactivated: bool = False - deferring_job_definiton_id: Optional[int] = None + deactivated: Optional[bool] = False + deferring_job_definition_id: Optional[int] = None execute_steps: Optional[List[str]] = None execution: Optional[_JobExecution] = None - id: Optional[int] = None is_deferrable: Optional[bool] = False run_failure_count: int = 0 - schedule: Optional[_JobSchedule] = None - settings: Optional[_JobSettings] = None - triggers: Optional[_JobTrigger] = None def __init__(self, **data): schedule = data.get('schedule', {}) From 0ba0df117151b66fdc5d248f4cf6bbf976a73ed4 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Wed, 28 Sep 2022 15:11:14 -0600 Subject: [PATCH 13/19] add test for autoscaling --- tests/test_trigger_job_with_autoscaling.py | 53 ++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/test_trigger_job_with_autoscaling.py diff --git a/tests/test_trigger_job_with_autoscaling.py b/tests/test_trigger_job_with_autoscaling.py new file mode 100644 index 0000000..d4abb09 --- /dev/null +++ b/tests/test_trigger_job_with_autoscaling.py @@ -0,0 +1,53 @@ +import time + +# This dictionary contains job_ids and what the associated +# override steps should be when restarting from failure. +JOB_ASSERTIONS = { + 133168: {'execute_steps': ['dbt build -s state:modified+']}, +} + +ACCOUNT_ID = 28885 + +def _test_job(dbtc_client, job_id: int): + + first_run = dbtc_client.cloud.trigger_job( + ACCOUNT_ID, + job_id, + payload={'cause': 'Testing dbtc'}, + should_poll=False, + )['data'] + + # wait a few seconds to make sure the first job's status has been updated + time.sleep(3) + second_run = dbtc_client.cloud.trigger_job_with_autoscaling( + ACCOUNT_ID, + job_id, + payload={'cause': 'Testing dbtc'}, + autoscale_delete_post_run=True, + )['data'] + + # check that we triggered distinct jobs + assert first_run['job_definition_id'] != second_run['job_definition_id'] + + # get the first and second runs with run steps included + first_run_data = dbtc_client.cloud.get_run( + account_id=ACCOUNT_ID, + run_id=first_run['id'], + include_related=['run_steps'] + )['data'] + + second_run_data = dbtc_client.cloud.get_run( + account_id=ACCOUNT_ID, + run_id=second_run['id'], + include_related=['run_steps'] + )['data'] + # check that the run steps are the same for the original and replicated jobs + first_run_step_names = [step['name'] for step in first_run_data['run_steps']] + second_run_step_names = [step['name'] for step in second_run_data['run_steps']] + + assert first_run_step_names == second_run_step_names + + +def test_trigger_job_with_autoscaling(dbtc_client): + for job_id in JOB_ASSERTIONS.keys(): + _test_job(dbtc_client, job_id) From e22551dd0443c46b09c3adfaf35f0c758944fe23 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Thu, 29 Sep 2022 15:35:48 -0600 Subject: [PATCH 14/19] Update job --- dbtc/client/cloud/models/job.py | 41 +++++++++++---------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/dbtc/client/cloud/models/job.py b/dbtc/client/cloud/models/job.py index b0c77d8..0b356a9 100644 --- a/dbtc/client/cloud/models/job.py +++ b/dbtc/client/cloud/models/job.py @@ -1,11 +1,9 @@ # stdlib -from typing import List, Literal, Optional +from typing import Any, Dict, List, Literal, Optional # third party from pydantic import BaseModel -from .constants import State - class _JobExecution(BaseModel): timeout_seconds: int @@ -13,8 +11,8 @@ class _JobExecution(BaseModel): class _JobSchedule(BaseModel): cron: str - date: Literal['custom_cron', 'days_of_week', 'every_day'] - time: Literal['every_hour', 'at_exact_hours'] + date: Dict[str, Any] + time: Dict[str, Any] class _JobSettings(BaseModel): @@ -23,44 +21,33 @@ class _JobSettings(BaseModel): class _JobTrigger(BaseModel): + custom_branch_only: Optional[bool] + git_provider_webhook: Optional[bool] github_webhook: bool schedule: bool - git_provider_webhook: Optional[bool] = None - -class Test(BaseModel): - account_id: int - id: Optional[int] = None class Job(BaseModel): # Required account_id: int environment_id: int + execution: _JobExecution generate_docs: bool name: str - dbt_version: str project_id: int run_generate_sources: bool schedule: _JobSchedule settings: _JobSettings triggers: _JobTrigger - state: Literal[State.active, State.deleted] + state: Literal[1, 2] # Optional + dbt_version: Optional[str] deactivated: Optional[bool] = False - deferring_job_definition_id: Optional[int] = None - execute_steps: Optional[List[str]] = None - execution: Optional[_JobExecution] = None - is_deferrable: Optional[bool] = False - run_failure_count: int = 0 - - def __init__(self, **data): - schedule = data.get('schedule', {}) - date = schedule.get('date', {}).get('type', None) - time = schedule.get('time', {}).get('type', None) - if date is not None: - data['schedule']['date'] = date - if time is not None: - data['schedule']['time'] = time - super().__init__(**data) + deferring_job_definition_id: Optional[int] + execute_steps: Optional[List[str]] + id: Optional[int] + lifecycle_webhooks: Optional[bool] + lifecycle_webhooks_url: Optional[str] + run_failure_count: Optional[int] = 0 From 767582d701121df22d2139961350965000252d68 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Fri, 30 Sep 2022 15:25:03 -0600 Subject: [PATCH 15/19] Add option for multiple status in list_runs method --- dbtc/client/cloud/base.py | 65 +++++++++++++-------------------------- 1 file changed, 22 insertions(+), 43 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 132f5af..ef0f22b 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,12 +1,11 @@ # stdlib import argparse -from asyncore import poll +import json import shlex import time from datetime import datetime from functools import partial, wraps -from typing import Dict, Iterable, List -from xxlimited import new +from typing import Dict, Iterable, List, Union # third party import requests @@ -898,7 +897,7 @@ def list_runs( order_by: str = None, offset: int = None, limit: int = None, - status: str = None, + status: Union[str, List[str]] = None, ) -> Dict: """List runs in an account. @@ -916,15 +915,21 @@ def list_runs( Use with limit to paginate results. limit (int, optional): The limit to apply when listing runs. Use with offset to paginate results. - status (str, optional): The status to apply when listing runs. + status (str or list, optional): The status to apply when listing runs. Options include queued, starting, running, success, error, and cancelled """ if status is not None: try: - status = getattr(JobRunStatus, status.upper()).value + if isinstance(status, list): + status = [getattr(JobRunStatus, s.upper()).value for s in status] + else: + status = [getattr(JobRunStatus, status.upper()).value] except AttributeError: - pass + raise + else: + status = json.dumps(status) + return self._simple_request( f'accounts/{account_id}/runs', params={ @@ -933,7 +938,7 @@ def list_runs( 'order_by': order_by, 'offset': offset, 'limit': limit, - 'status': status, + 'status__in': status, }, ) @@ -1181,9 +1186,6 @@ def trigger_job_with_autoscaling( append value to the existing job name when replicating the job definition. If None defaults to the current timestamp on job creation """ - - is_new_job_created = False - self.console.log( 'Triggered with autoscaling set to True. ' 'Detecting any running instances' @@ -1206,54 +1208,31 @@ def trigger_job_with_autoscaling( else: self.console.log(f'job_id {job_id} has an active run. Cloning job.') - optional_fields = [k for k,v in models.Job.__fields__.items() if not v.required] - required_fields = [k for k,v in models.Job.__fields__.items() if v.required] - existing_job_defintion = self.get_job( - account_id=account_id, job_id=job_id - )['data'] + job_definition = self.get_job(account_id=account_id, job_id=job_id)['data'] - new_job_definition = {k:v for k,v in existing_job_defintion.items() if k in required_fields} - if not autoscale_job_identifier: creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') - new_job_name = '-'.join([new_job_definition['name'], creation_time]) - new_job_definition['name'] = new_job_name + new_job_name = '-'.join([job_definition['name'], creation_time]) else: - new_job_name = '-'.join([new_job_definition['name'], autoscale_job_identifier]) - new_job_definition['name'] = new_job_name - - # make sure the id is none on the new job - new_job_definition['id'] = None - job_id = self.create_job( - account_id=account_id, payload=new_job_definition - )['data']['id'] - - # fill in optional fields on the job if present in the source - for field in optional_fields: - new_job_definition[field] = existing_job_defintion.get(field) - - # set the ID in the update request - new_job_definition['id'] = job_id - self.console.log(new_job_definition) - job_id = self.update_job( - account_id=account_id, job_id=job_id, payload=new_job_definition - )['data']['id'] - - is_new_job_created = True + new_job_name = '-'.join([job_definition['name'], autoscale_job_identifier]) + # make sure the id is none on the new job + job_definition['id'] = None + job_definition['name'] = new_job_name + job = self.create_job(account_id=account_id, payload=job_definition) self.console.log(f'Created new job with job_id: {job_id}') self.console.log(f'Triggering new job with job_id: {job_id}') run = self.trigger_job( account_id=account_id, - job_id=job_id, + job_id=job['data']['id'], payload=payload, should_poll=should_poll, poll_interval=poll_interval ) - if is_new_job_created and autoscale_delete_post_run: + if job['status'] == 201 and autoscale_delete_post_run: self.console.log(f'Deleting autoscaled job with job_id: {job_id} post run') self.delete_job( account_id=account_id, From 49229903aac328a1953cdd95ab6e5dc46869d073 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Fri, 30 Sep 2022 15:25:14 -0600 Subject: [PATCH 16/19] Add tests for new status functionality --- tests/test_cloud.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_cloud.py b/tests/test_cloud.py index 15085cf..35f3983 100644 --- a/tests/test_cloud.py +++ b/tests/test_cloud.py @@ -144,6 +144,37 @@ def test_list_runs(dbtc_client): account_id=pytest.account_id, job_definition_id=pytest.job_id, ) + + +@pytest.mark.dependency(depends=['test_list_jobs']) +def test_list_runs_list_status(dbtc_client): + _test_cloud_method( + dbtc_client, + 'list_runs', + job_definition_id=pytest.job_id, + status=['error', 'success'] + ) + + +@pytest.mark.dependency(depends=['test_list_jobs']) +def test_list_runs_str_status(dbtc_client): + _test_cloud_method( + dbtc_client, + 'list_runs', + job_definition_id=pytest.job_id, + status='success' + ) + + +@pytest.mark.dependency(depends=['test_list_jobs']) +def test_list_runs_bad_status(dbtc_client): + with pytest.raises(AttributeError): + _test_cloud_method( + dbtc_client, + 'list_runs', + job_definition_id=pytest.job_id, + status='successs' + ) @pytest.mark.dependency(depends=['test_list_jobs']) From 0ce71188f79d285fb9d808145e11911c662a770b Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Fri, 7 Oct 2022 14:43:40 -0600 Subject: [PATCH 17/19] update restart_job_from_failure to trigger_job_restart_from_failure in trigger_job method --- dbtc/client/cloud/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 132f5af..e980c47 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1353,7 +1353,7 @@ def run_status_formatted(run: Dict, time: float) -> str: # this is here to not break existing stuff 09.26.2022 if restart_from_failure: - run = self.restart_job_from_failure( + run = self.trigger_job_restart_from_failure( account_id=account_id, job_id=job_id, payload=payload From ce98cde89b83755948dc29f067c71a72ae3dae4c Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Sat, 8 Oct 2022 08:21:58 -0600 Subject: [PATCH 18/19] update required fields for Job to include dbt_version --- dbtc/client/cloud/models/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbtc/client/cloud/models/job.py b/dbtc/client/cloud/models/job.py index 0b356a9..86291ac 100644 --- a/dbtc/client/cloud/models/job.py +++ b/dbtc/client/cloud/models/job.py @@ -31,6 +31,7 @@ class Job(BaseModel): # Required account_id: int + dbt_version: str environment_id: int execution: _JobExecution generate_docs: bool @@ -43,7 +44,6 @@ class Job(BaseModel): state: Literal[1, 2] # Optional - dbt_version: Optional[str] deactivated: Optional[bool] = False deferring_job_definition_id: Optional[int] execute_steps: Optional[List[str]] From 2a137a797722cfd72566523092c1364219685464 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Sat, 8 Oct 2022 08:22:49 -0600 Subject: [PATCH 19/19] _clone_resource gives a base payload to be modified by use case --- dbtc/client/cloud/base.py | 80 ++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 00f3835..47d3524 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -61,17 +61,21 @@ def _header_property(self): return 'api_key' def _clone_resource(self, resource: str, account_id: int, **kwargs): - create_args = kwargs.pop('create_args', None) - payload = getattr(self, f'get_{resource}')(**kwargs)['data'] + payload = getattr(self, f'get_{resource}')(account_id=account_id, **kwargs)['data'] + resource_fields = getattr(models, resource.capitalize()).__fields__ - # Can't recreate a resource with an ID - payload.pop('id', None) - if create_args is not None: - payload = {k: v for k, v in kwargs.items() if k in create_args} - kwargs['payload'] = payload - kwargs['account_id'] = account_id + for k in list(payload): + # only map the fields we're aware of + if k not in resource_fields: + payload.pop(k, None) + + # for optional fields, don't copy them if none in the source payload + elif not resource_fields[k].required and payload[k] is None: + payload.pop(k, None) - return getattr(self, f'create_{resource}')(**kwargs) + # Can't recreate a resource with an ID + payload['id'] = None + return payload def _make_request( self, path: str, *, method: str = 'get', **kwargs @@ -1186,19 +1190,37 @@ def trigger_job_with_autoscaling( append value to the existing job name when replicating the job definition. If None defaults to the current timestamp on job creation """ + autoscale_job_created = False self.console.log( 'Triggered with autoscaling set to True. ' 'Detecting any running instances' ) - most_recent_job_run = self.list_runs( - account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' - )['data'][0] - most_recent_job_run_status = most_recent_job_run['status_humanized'] - self.console.log( - f'Status for most recent run of job {job_id} ' - f'is {most_recent_job_run_status}.' - ) + # dbt Cloud API will remove jobs that are queued mid-run if a DELETE is issued. + # we don't want this behavior so do some config validation + if autoscale_delete_post_run and not should_poll: + self.console.log( + 'autoscale_delete_post_run set to True and should_poll set to False. ' + 'This has the effect that, after a new dbt Cloud job replica is created, ' + 'it will be removed before the run completes, so this configuration is disallowed. ' + ) + raise Exception('Invalid configuration') + + try: + most_recent_job_run = self.list_runs( + account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' + )['data'][0] + most_recent_job_run_status = most_recent_job_run['status_humanized'] + self.console.log( + f'Status for most recent run of job {job_id} ' + f'is {most_recent_job_run_status}.' + ) + except IndexError: + self.console.log( + f'Failed to get status for most recent run of job: {job_id} ' + f'This happens for jobs that have not previously run. ' + f'Triggering a new run for this job' + ) if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: self.console.log( @@ -1208,31 +1230,37 @@ def trigger_job_with_autoscaling( else: self.console.log(f'job_id {job_id} has an active run. Cloning job.') - job_definition = self.get_job(account_id=account_id, job_id=job_id)['data'] + job_definition = self._clone_resource( + 'job', + account_id=account_id, + job_id=job_id + ) if not autoscale_job_identifier: creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') new_job_name = '-'.join([job_definition['name'], creation_time]) else: new_job_name = '-'.join([job_definition['name'], autoscale_job_identifier]) - - # make sure the id is none on the new job - job_definition['id'] = None + job_definition['name'] = new_job_name job = self.create_job(account_id=account_id, payload=job_definition) - self.console.log(f'Created new job with job_id: {job_id}') - + job_id = job['data']['id'] + self.console.log(f'Created new job: {job_id}') + autoscale_job_created = True - self.console.log(f'Triggering new job with job_id: {job_id}') + self.console.log(f'Triggering job: {job_id}') run = self.trigger_job( account_id=account_id, - job_id=job['data']['id'], + job_id=job_id, payload=payload, should_poll=should_poll, poll_interval=poll_interval ) - if job['status'] == 201 and autoscale_delete_post_run: + if run['status']['code'] in [200, 201] \ + and autoscale_delete_post_run \ + and autoscale_job_created: + self.console.log(f'Deleting autoscaled job with job_id: {job_id} post run') self.delete_job( account_id=account_id,