From a1e0ed5b725bb75e6566220d5997e1e48ed1afbd Mon Sep 17 00:00:00 2001 From: dotinfinity Date: Fri, 27 Feb 2026 17:17:42 +0000 Subject: [PATCH] Release 6.3.4 - Airflow v3 compatibility --- cdc_settings.yaml | 4 ++ cloudbuild.cdc.yaml | 4 +- common/materializer/deploy.sh | 4 +- .../airflow_dag_template_reporting.py | 13 +++- ...airflow_task_dep_dag_template_reporting.py | 13 +++- .../cloudbuild_materializer.yaml.jinja | 4 +- common/py_libs/k9_deployer.py | 6 +- common/py_libs/resource_validation_helper.py | 2 +- src/copy.sh | 2 +- src/generate_query.py | 65 +++++++++++++------ src/template_dag/dag_sql.py | 11 +++- 11 files changed, 89 insertions(+), 39 deletions(-) diff --git a/cdc_settings.yaml b/cdc_settings.yaml index 38c78ff..4720776 100755 --- a/cdc_settings.yaml +++ b/cdc_settings.yaml @@ -312,3 +312,7 @@ data_to_replicate: load_frequency: "@weekly" - base_table: mkol load_frequency: "@weekly" +## CORTEX-CUSTOMER: Uncomment if you need optional address notes/remarks in AddressMD view. +# - base_table: adrt +# load_frequency: "@daily" + diff --git a/cloudbuild.cdc.yaml b/cloudbuild.cdc.yaml index 529f9f2..271b065 100644 --- a/cloudbuild.cdc.yaml +++ b/cloudbuild.cdc.yaml @@ -57,7 +57,7 @@ steps: generated_files=$(shopt -s nullglob dotglob; echo ./generated_dag/*.py) if (( ${#generated_files} )) then - gsutil -m cp -r './generated_dag/*.py' gs://${_TGT_BUCKET_}/dags/ + gcloud storage cp --recursive './generated_dag/*.py' gs://${_TGT_BUCKET_}/dags/ else echo "🔪🔪🔪No Python files found under generated_dag folder or the folder does not exist. Skipping copy.🔪🔪🔪" fi @@ -78,7 +78,7 @@ steps: generated_files=$(shopt -s nullglob dotglob; echo ./generated_sql/*.sql) if (( ${#generated_files} )) then - gsutil -m cp -r './generated_sql/*.sql' gs://${_TGT_BUCKET_}/data/bq_data_replication/ + gcloud storage cp --recursive './generated_sql/*.sql' gs://${_TGT_BUCKET_}/data/bq_data_replication/ else echo "🔪No SQL files found under generated_sql folder or the folder does not exist. Skipping copy.🔪" fi diff --git a/common/materializer/deploy.sh b/common/materializer/deploy.sh index 54fab27..bc9ad53 100755 --- a/common/materializer/deploy.sh +++ b/common/materializer/deploy.sh @@ -220,8 +220,8 @@ echo "generate_dependent_dags.py completed successfully." if [[ $(find generated_materializer_dag_files/*/*/task_dep_dags -type f 2> /dev/null | wc -l) -gt 0 ]] then echo "Copying DAG files to GCS bucket..." - echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/" - gsutil -m cp -r 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/" + echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/" + gcloud storage cp --recursive 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/" else echo "No task dependent DAG files to copy to GCS bucket!" fi diff --git a/common/materializer/templates/airflow_dag_template_reporting.py b/common/materializer/templates/airflow_dag_template_reporting.py index ddadaf1..7704ada 100644 --- a/common/materializer/templates/airflow_dag_template_reporting.py +++ b/common/materializer/templates/airflow_dag_template_reporting.py @@ -24,6 +24,8 @@ from datetime import timedelta import airflow +from airflow import __version__ as airflow_version +from packaging.version import Version from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import \ BigQueryInsertJobOperator @@ -34,18 +36,23 @@ default_dag_args = { "depends_on_past": False, - "start_date": datetime(${year}, ${month}, ${day}), + "start_date": datetime(int("${year}"), int("${month}"), int("${day}")), "catchup": False, "retries": 1, "retry_delay": timedelta(minutes=30), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with airflow.DAG("${dag_full_name}", default_args=default_dag_args, catchup=False, max_active_runs=1, - schedule_interval="${load_frequency}", - tags=${tags}) as dag: + tags=ast.literal_eval("${tags}"), + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") refresh_table = BigQueryInsertJobOperator( task_id="refresh_table", diff --git a/common/materializer/templates/airflow_task_dep_dag_template_reporting.py b/common/materializer/templates/airflow_task_dep_dag_template_reporting.py index bb01d39..21c5637 100644 --- a/common/materializer/templates/airflow_task_dep_dag_template_reporting.py +++ b/common/materializer/templates/airflow_task_dep_dag_template_reporting.py @@ -24,6 +24,8 @@ from datetime import timedelta import airflow +from airflow import __version__ as airflow_version +from packaging.version import Version from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import \ BigQueryInsertJobOperator @@ -36,18 +38,23 @@ default_dag_args = { "depends_on_past": False, - "start_date": datetime(${year}, ${month}, ${day}), + "start_date": datetime(int("${year}"), int("${month}"), int("${day}")), "catchup": False, "retries": 1, "retry_delay": timedelta(minutes=30), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with airflow.DAG("${dag_full_name}", default_args=default_dag_args, catchup=False, max_active_runs=1, - schedule_interval="${load_frequency}", - tags=${tags}) as dag: + tags=ast.literal_eval("${tags}"), + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") diff --git a/common/materializer/templates/cloudbuild_materializer.yaml.jinja b/common/materializer/templates/cloudbuild_materializer.yaml.jinja index d7cf88c..6459ab3 100644 --- a/common/materializer/templates/cloudbuild_materializer.yaml.jinja +++ b/common/materializer/templates/cloudbuild_materializer.yaml.jinja @@ -88,8 +88,8 @@ steps: if [[ $(find generated_materializer_dag_files -type f 2> /dev/null | wc -l) -gt 0 ]] then echo "Copying DAG files to GCS bucket..." - echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/" - gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/ + echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/" + gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/ else echo "No files to copy to GCS bucket!" fi diff --git a/common/py_libs/k9_deployer.py b/common/py_libs/k9_deployer.py index 719c76c..165e1ac 100644 --- a/common/py_libs/k9_deployer.py +++ b/common/py_libs/k9_deployer.py @@ -82,10 +82,10 @@ def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict, if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]: with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f: f.writelines([ - "import os", - "import sys", + "import os\n", + "import sys\n", ("sys.path.append(" - "os.path.dirname(os.path.realpath(__file__)))") + "os.path.dirname(os.path.realpath(__file__)))\n") ]) if data_source == "k9": diff --git a/common/py_libs/resource_validation_helper.py b/common/py_libs/resource_validation_helper.py index 0072e3d..adbfd6a 100644 --- a/common/py_libs/resource_validation_helper.py +++ b/common/py_libs/resource_validation_helper.py @@ -128,7 +128,7 @@ def validate_resources( if isinstance(ex, NotFound): logging.error("🛑 Storage bucket `%s` doesn't exist. 🛑", bucket.name) - elif isinstance(ex, Unauthorized, Forbidden): + elif isinstance(ex, (Unauthorized, Forbidden)): if checking_on_writing: logging.error("🛑 Storage bucket `%s` " "is not writable. 🛑", bucket.name) diff --git a/src/copy.sh b/src/copy.sh index 57820e9..2edf9a4 100755 --- a/src/copy.sh +++ b/src/copy.sh @@ -14,4 +14,4 @@ # limitations under the License. bucket=$1 -gsutil cp -r ../generated_dag/ $bucket +gcloud storage cp --recursive ../generated_dag/ $bucket diff --git a/src/generate_query.py b/src/generate_query.py index f00a407..ca7b704 100755 --- a/src/generate_query.py +++ b/src/generate_query.py @@ -150,11 +150,11 @@ def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency, 'day': today.day, 'query_file': dag_sql_file_name, 'load_frequency': load_frequency, - 'runtime_labels_dict': '', # A place holder for label key + 'runtime_labels_dict': '', # A place holder for label key 'bq_location': bq_location } - # Add bq_labels to substitutes dict if Telemetry allowed - # Converts dict to string for substitution purposes + # Add bq_labels to substitutes dict if Telemetry allowed + # Converts dict to string for substitution purposes if allow_telemetry: substitutes['runtime_labels_dict'] = str(constants.CORTEX_JOB_LABEL) @@ -274,10 +274,11 @@ def validate_partition_details(partition_details, load_frequency): bucket_end = integer_range_bucket.get('end') bucket_interval = integer_range_bucket.get('interval') - if (bucket_start is None or bucket_end is None or - bucket_interval is None): - e_msg = ('Error: `start`, `end` or `interval` property missing for ' - 'the `integer_range_bucket` property.') + if (bucket_start is None or bucket_end is None + or bucket_interval is None): + e_msg = ( + 'Error: `start`, `end` or `interval` property missing for ' + 'the `integer_range_bucket` property.') return e_msg return None @@ -301,8 +302,9 @@ def validate_cluster_details(cluster_details, load_frequency): return e_msg if len(cluster_columns) > 4: - e_msg = ('More than 4 columns specified in `cluster_details` property. ' - 'BigQuery supports maximum of 4 columns for table cluster.') + e_msg = ( + 'More than 4 columns specified in `cluster_details` property. ' + 'BigQuery supports maximum of 4 columns for table cluster.') return e_msg return None @@ -399,16 +401,17 @@ def validate_partition_columns(partition_details, target_schema): partition_type = partition_details['partition_type'] - if (partition_type == 'time' and - partition_column_type not in _TIME_PARTITION_DATA_TYPES): - e_msg = ('For `partition_type` = "time", partitioning column has to be ' - 'one of the following data types:' - f'{_TIME_PARTITION_DATA_TYPES}.\n' - f'But column "{column}" is of "{partition_column_type}" type.') + if (partition_type == 'time' + and partition_column_type not in _TIME_PARTITION_DATA_TYPES): + e_msg = ( + 'For `partition_type` = "time", partitioning column has to be ' + 'one of the following data types:' + f'{_TIME_PARTITION_DATA_TYPES}.\n' + f'But column "{column}" is of "{partition_column_type}" type.') raise cortex_exc.TypeCError(e_msg) from None - if (partition_type == 'integer_range' and - partition_column_type != 'INTEGER'): + if (partition_type == 'integer_range' + and partition_column_type != 'INTEGER'): e_msg = ('Error: For `partition_type` = "integer_range", ' 'partitioning column has to be of INTEGER data type.\n' f'But column "{column}" is of {partition_column_type}.') @@ -457,7 +460,8 @@ def create_cdc_table(raw_table_name, cdc_table_name, partition_details, field=partition_details['column'], type_=_TIME_PARTITION_GRAIN_DICT[time_partition_grain]) else: - integer_range_bucket = partition_details['integer_range_bucket'] + integer_range_bucket = partition_details[ + 'integer_range_bucket'] bucket_start = integer_range_bucket['start'] bucket_end = integer_range_bucket['end'] bucket_interval = integer_range_bucket['interval'] @@ -484,10 +488,31 @@ def get_primary_keys(full_table_name): """ _, dataset, table_name = full_table_name.split('.') - query = (f'SELECT fieldname ' + + # Custom tables and fields in SAP have the prefix "/NAMESPACE/", + # which are renamed to "NAMESPACE_" (or other character, per SLT settings) + # when replicated to BigQuery, but retain their original naming in DD03L + # table. So we need to mirror the replacement logic in the query to + # get records for such tables. + # + # For example, "/OPT/Z_TABLE" is replicated to "OPT_Z_TABLE" in BigQuery. + + ## CORTEX-CUSTOMER: Replace per BQ SLT settings. + replace_char = '_' + + # Remove starting "/" and then replace the second "/" with replace_char. + sap_naming_replace_logic = ( + 'REPLACE(' + ' IF(SUBSTR({FIELD}, 1, 1) = "/", SUBSTR({FIELD}, 2), {FIELD}),' + ' "/",' + f' "{replace_char}"' + ')') + bq_field_name = sap_naming_replace_logic.format(FIELD='fieldname') + bq_table_name = sap_naming_replace_logic.format(FIELD='tabname') + query = (f'SELECT {bq_field_name} as fieldname ' f'FROM `{dataset}.dd03l` ' f'WHERE KEYFLAG = "X" AND fieldname != ".INCLUDE" ' - f'AND tabname = "{table_name.upper()}"') + f'AND {bq_table_name} = "{table_name.upper()}"') query_job = client.query(query) fields = [] diff --git a/src/template_dag/dag_sql.py b/src/template_dag/dag_sql.py index e3523d3..83e5486 100755 --- a/src/template_dag/dag_sql.py +++ b/src/template_dag/dag_sql.py @@ -19,8 +19,10 @@ import ast from datetime import timedelta, datetime import airflow +from airflow import __version__ as airflow_version from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.operators.empty import EmptyOperator +from packaging.version import Version # BigQuery Job Labels - converts generated string to dict # If string is empty, assigns empty dict @@ -28,18 +30,23 @@ default_dag_args = { "depends_on_past": False, - "start_date": datetime(${year}, ${month}, ${day}), + "start_date": datetime(int("${year}"), int("${month}"), int("${day}")), "catchup": False, "retries": 1, "retry_delay": timedelta(minutes=30), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with airflow.DAG(dag_id="CDC_BigQuery_${base_table}", template_searchpath=["/home/airflow/gcs/data/bq_data_replication/"], default_args=default_dag_args, catchup=False, max_active_runs=1, - schedule_interval="${load_frequency}") as dag: + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") copy_records = BigQueryInsertJobOperator( task_id="merge_query_records",