Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc_settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,7 @@ data_to_replicate:
load_frequency: "@weekly"
- base_table: mkol
load_frequency: "@weekly"
## CORTEX-CUSTOMER: Uncomment if you need optional address notes/remarks in AddressMD view.
# - base_table: adrt
# load_frequency: "@daily"

4 changes: 2 additions & 2 deletions cloudbuild.cdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ steps:
generated_files=$(shopt -s nullglob dotglob; echo ./generated_dag/*.py)
if (( ${#generated_files} ))
then
gsutil -m cp -r './generated_dag/*.py' gs://${_TGT_BUCKET_}/dags/
gcloud storage cp --recursive './generated_dag/*.py' gs://${_TGT_BUCKET_}/dags/
else
echo "🔪🔪🔪No Python files found under generated_dag folder or the folder does not exist. Skipping copy.🔪🔪🔪"
fi
Expand All @@ -78,7 +78,7 @@ steps:
generated_files=$(shopt -s nullglob dotglob; echo ./generated_sql/*.sql)
if (( ${#generated_files} ))
then
gsutil -m cp -r './generated_sql/*.sql' gs://${_TGT_BUCKET_}/data/bq_data_replication/
gcloud storage cp --recursive './generated_sql/*.sql' gs://${_TGT_BUCKET_}/data/bq_data_replication/
else
echo "🔪No SQL files found under generated_sql folder or the folder does not exist. Skipping copy.🔪"
fi
Expand Down
4 changes: 2 additions & 2 deletions common/materializer/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ echo "generate_dependent_dags.py completed successfully."
if [[ $(find generated_materializer_dag_files/*/*/task_dep_dags -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
else
echo "No task dependent DAG files to copy to GCS bucket!"
fi
Expand Down
13 changes: 10 additions & 3 deletions common/materializer/templates/airflow_dag_template_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -34,18 +36,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:
start_task = EmptyOperator(task_id="start")
refresh_table = BigQueryInsertJobOperator(
task_id="refresh_table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -36,18 +38,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ steps:
if [[ $(find generated_materializer_dag_files -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
else
echo "No files to copy to GCS bucket!"
fi
Expand Down
6 changes: 3 additions & 3 deletions common/py_libs/k9_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict,
if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]:
with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f:
f.writelines([
"import os",
"import sys",
"import os\n",
"import sys\n",
("sys.path.append("
"os.path.dirname(os.path.realpath(__file__)))")
"os.path.dirname(os.path.realpath(__file__)))\n")
])

if data_source == "k9":
Expand Down
2 changes: 1 addition & 1 deletion common/py_libs/resource_validation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def validate_resources(
if isinstance(ex, NotFound):
logging.error("🛑 Storage bucket `%s` doesn't exist. 🛑",
bucket.name)
elif isinstance(ex, Unauthorized, Forbidden):
elif isinstance(ex, (Unauthorized, Forbidden)):
if checking_on_writing:
logging.error("🛑 Storage bucket `%s` "
"is not writable. 🛑", bucket.name)
Expand Down
2 changes: 1 addition & 1 deletion src/copy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.

bucket=$1
gsutil cp -r ../generated_dag/ $bucket
gcloud storage cp --recursive ../generated_dag/ $bucket
65 changes: 45 additions & 20 deletions src/generate_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency,
'day': today.day,
'query_file': dag_sql_file_name,
'load_frequency': load_frequency,
'runtime_labels_dict': '', # A place holder for label key
'runtime_labels_dict': '', # A place holder for label key
'bq_location': bq_location
}
# Add bq_labels to substitutes dict if Telemetry allowed
# Converts dict to string for substitution purposes
# Add bq_labels to substitutes dict if Telemetry allowed
# Converts dict to string for substitution purposes
if allow_telemetry:
substitutes['runtime_labels_dict'] = str(constants.CORTEX_JOB_LABEL)

Expand Down Expand Up @@ -274,10 +274,11 @@ def validate_partition_details(partition_details, load_frequency):
bucket_end = integer_range_bucket.get('end')
bucket_interval = integer_range_bucket.get('interval')

if (bucket_start is None or bucket_end is None or
bucket_interval is None):
e_msg = ('Error: `start`, `end` or `interval` property missing for '
'the `integer_range_bucket` property.')
if (bucket_start is None or bucket_end is None
or bucket_interval is None):
e_msg = (
'Error: `start`, `end` or `interval` property missing for '
'the `integer_range_bucket` property.')
return e_msg

return None
Expand All @@ -301,8 +302,9 @@ def validate_cluster_details(cluster_details, load_frequency):
return e_msg

if len(cluster_columns) > 4:
e_msg = ('More than 4 columns specified in `cluster_details` property. '
'BigQuery supports maximum of 4 columns for table cluster.')
e_msg = (
'More than 4 columns specified in `cluster_details` property. '
'BigQuery supports maximum of 4 columns for table cluster.')
return e_msg

return None
Expand Down Expand Up @@ -399,16 +401,17 @@ def validate_partition_columns(partition_details, target_schema):

partition_type = partition_details['partition_type']

if (partition_type == 'time' and
partition_column_type not in _TIME_PARTITION_DATA_TYPES):
e_msg = ('For `partition_type` = "time", partitioning column has to be '
'one of the following data types:'
f'{_TIME_PARTITION_DATA_TYPES}.\n'
f'But column "{column}" is of "{partition_column_type}" type.')
if (partition_type == 'time'
and partition_column_type not in _TIME_PARTITION_DATA_TYPES):
e_msg = (
'For `partition_type` = "time", partitioning column has to be '
'one of the following data types:'
f'{_TIME_PARTITION_DATA_TYPES}.\n'
f'But column "{column}" is of "{partition_column_type}" type.')
raise cortex_exc.TypeCError(e_msg) from None

if (partition_type == 'integer_range' and
partition_column_type != 'INTEGER'):
if (partition_type == 'integer_range'
and partition_column_type != 'INTEGER'):
e_msg = ('Error: For `partition_type` = "integer_range", '
'partitioning column has to be of INTEGER data type.\n'
f'But column "{column}" is of {partition_column_type}.')
Expand Down Expand Up @@ -457,7 +460,8 @@ def create_cdc_table(raw_table_name, cdc_table_name, partition_details,
field=partition_details['column'],
type_=_TIME_PARTITION_GRAIN_DICT[time_partition_grain])
else:
integer_range_bucket = partition_details['integer_range_bucket']
integer_range_bucket = partition_details[
'integer_range_bucket']
bucket_start = integer_range_bucket['start']
bucket_end = integer_range_bucket['end']
bucket_interval = integer_range_bucket['interval']
Expand All @@ -484,10 +488,31 @@ def get_primary_keys(full_table_name):
"""

_, dataset, table_name = full_table_name.split('.')
query = (f'SELECT fieldname '

# Custom tables and fields in SAP have the prefix "/NAMESPACE/",
# which are renamed to "NAMESPACE_" (or other character, per SLT settings)
# when replicated to BigQuery, but retain their original naming in DD03L
# table. So we need to mirror the replacement logic in the query to
# get records for such tables.
#
# For example, "/OPT/Z_TABLE" is replicated to "OPT_Z_TABLE" in BigQuery.

## CORTEX-CUSTOMER: Replace per BQ SLT settings.
replace_char = '_'

# Remove starting "/" and then replace the second "/" with replace_char.
sap_naming_replace_logic = (
'REPLACE('
' IF(SUBSTR({FIELD}, 1, 1) = "/", SUBSTR({FIELD}, 2), {FIELD}),'
' "/",'
f' "{replace_char}"'
')')
bq_field_name = sap_naming_replace_logic.format(FIELD='fieldname')
bq_table_name = sap_naming_replace_logic.format(FIELD='tabname')
query = (f'SELECT {bq_field_name} as fieldname '
f'FROM `{dataset}.dd03l` '
f'WHERE KEYFLAG = "X" AND fieldname != ".INCLUDE" '
f'AND tabname = "{table_name.upper()}"')
f'AND {bq_table_name} = "{table_name.upper()}"')
query_job = client.query(query)

fields = []
Expand Down
11 changes: 9 additions & 2 deletions src/template_dag/dag_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,34 @@
import ast
from datetime import timedelta, datetime
import airflow
from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
_BQ_LABELS = ast.literal_eval("${runtime_labels_dict}" or "{}")

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG(dag_id="CDC_BigQuery_${base_table}",
template_searchpath=["/home/airflow/gcs/data/bq_data_replication/"],
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}") as dag:
**schedule_kwarg) as dag:
start_task = EmptyOperator(task_id="start")
copy_records = BigQueryInsertJobOperator(
task_id="merge_query_records",
Expand Down