diff --git a/README.md b/README.md index efca925..72f582a 100755 --- a/README.md +++ b/README.md @@ -41,3 +41,13 @@ by configuring a Cloud Builds trigger that runs automatically upon push to a Clo `targetBucket` - is a GCS bucket created for holding the DAG python scripts and SQL scripts (`targetBucket` in `config/config.json`). + +# Performance Optimization for Partitioned Table +The following steps must be followed to generate the optimized CDC script for raw and cdc tables partitioned by recordstamp. +Recommendation is to use this script for tables that are huge in size / that have low latency requirements. +The optimized script can be found at this path /src/template_sql/cdc_sql_partition_template.sql +Please follow the below steps to enable +- make sure that the table for which you want to use this script is partitioned by recordstamp at a DAY level in RAW dataset +- add the option partition_flg: "Y" +- add partition settings to enable DAY partition on recordstamp field. +- Refer to the example given in the cdc_settings.yaml file for the table acdoca \ No newline at end of file diff --git a/cdc_settings.yaml b/cdc_settings.yaml index 38c78ff..dfbd0a1 100755 --- a/cdc_settings.yaml +++ b/cdc_settings.yaml @@ -17,6 +17,12 @@ data_to_replicate: - base_table: acdoca load_frequency: "@hourly" cluster_details: {columns: ["rclnt", "rbukrs"]} + #CORTEX-CUSTOMER below is an example to add partition flag. If set to Y, the partition enabled cdc merge template will be applied + #the pre requisite for this would be that the raw and cdc table needs to be partitioned by recordstamp field at a DAY level + partition_flg: "Y" + partition_details: { + column: "recordstamp", partition_type: "time", time_grain: "day" + } - base_table: finsc_ld_cmp load_frequency: "@daily" - base_table: finsc_ledger_rep diff --git a/src/config_reader.py b/src/config_reader.py index 6335ea7..780c617 100755 --- a/src/config_reader.py +++ b/src/config_reader.py @@ -57,6 +57,12 @@ def process_table(table_config: dict, source_dataset: str, target_dataset: str, partition_details = table_config.get("partition_details") cluster_details = table_config.get("cluster_details") + # Check for partition_flg. if it does not exist, then default the value to "N" + if "partition_flg" in table_config: + partition_flg = table_config.get("partition_flg") + else: + partition_flg = "N" + load_frequency = table_config.get("load_frequency") if load_frequency == "RUNTIME": generate_runtime_view(raw_table, cdc_table) @@ -68,7 +74,7 @@ def process_table(table_config: dict, source_dataset: str, target_dataset: str, # tables. logging.info("Generating required files for DAG with %s ", cdc_table) - generate_cdc_dag_files(raw_table, cdc_table, load_frequency, + generate_cdc_dag_files(raw_table, cdc_table, load_frequency, partition_flg, gen_test) logging.info("✅ == Processed %s ==", raw_table) diff --git a/src/generate_query.py b/src/generate_query.py index 2a22d8e..f9b17c0 100755 --- a/src/generate_query.py +++ b/src/generate_query.py @@ -26,6 +26,8 @@ _SQL_DAG_PYTHON_TEMPLATE = 'template_dag/dag_sql.py' _SQL_DAG_SQL_TEMPLATE = 'template_sql/cdc_sql_template.sql' +#path for the partition enabled cdc template is added below +_SQL_DAG_PARTITION_SQL_TEMPLATE = 'template_sql/cdc_sql_partiton_template.sql' _VIEW_SQL_TEMPLATE = 'template_sql/runtime_query_view.sql' _GENERATED_DAG_DIR = '../generated_dag' @@ -111,7 +113,7 @@ def generate_runtime_view(raw_table_name, cdc_table_name): print(f'Created view {cdc_table_name}') -def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency, +def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency, partition_flg, gen_test): """Generates file contaiing DAG code to refresh CDC table from RAW table. @@ -161,7 +163,14 @@ def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency, p_key = ' AND '.join(p_key_list) p_key_sub_query = ' AND '.join(p_key_list_for_sub_query) - with open(_SQL_DAG_SQL_TEMPLATE, mode='r', + #check the value of partition flag. If it is Y then use the optimized cdc template for paritioned tables else use the default template + if partition_flg == "Y": + template_sql_file = _SQL_DAG_PARTITION_SQL_TEMPLATE + else: + template_sql_file = _SQL_DAG_SQL_TEMPLATE + + + with open(template_sql_file, mode='r', encoding='utf-8') as sql_template_file: sql_template = Template(sql_template_file.read()) diff --git a/src/template_sql/cdc_sql_partiton_template.sql b/src/template_sql/cdc_sql_partiton_template.sql new file mode 100644 index 0000000..c42b3b9 --- /dev/null +++ b/src/template_sql/cdc_sql_partiton_template.sql @@ -0,0 +1,82 @@ +-- Copyright 2021 Google Inc. + -- Licensed under the Apache License, Version 2.0 (the "License"); + -- you may not use this file except in compliance with the License. + -- You may obtain a copy of the License at + -- http://www.apache.org/licenses/LICENSE-2.0 + -- Unless required by applicable law or agreed to in writing, software + -- distributed under the License is distributed on an "AS IS" BASIS, + -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + -- See the License for the specific language governing permissions and + -- limitations under the License. + + -- this sql optimized template will work well for partitioned tables + +DECLARE max_rstamp TIMESTAMP; + +DECLARE max_raw_stamp TIMESTAMP; + + +DECLARE processed_date ARRAY; + +SET max_raw_stamp = TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 SECOND); + + + +SET max_rstamp = (SELECT IFNULL(MAX(recordstamp), TIMESTAMP('1940-12-25 05:30:00+00')) FROM `${target_table}`); + +SET processed_date = ( + WITH + S01 AS ( + SELECT * FROM `${base_table}` + WHERE recordstamp >= max_rstamp and recordstamp <= max_raw_stamp + ), + + -- To handle occasional dups from SLT connector + + S11 AS ( + SELECT * EXCEPT(row_num) + FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY ${keys} ORDER BY recordstamp desc) AS row_num + FROM S01 + ) + WHERE row_num = 1 +) + +select ARRAY_AGG(distinct date(T.recordstamp)) from `${target_table}` T inner join S11 S on ${p_key} +); + + +MERGE `${target_table}` AS T +USING ( + WITH + S0 AS ( + SELECT * FROM `${base_table}` + WHERE recordstamp >= max_rstamp and recordstamp <= max_raw_stamp + ), + + -- To handle occasional dups from SLT connector + + S1 AS ( + SELECT * EXCEPT(row_num) + FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY ${keys} ORDER BY recordstamp desc) AS row_num + FROM S0 + ) + WHERE row_num = 1 + ) + SELECT distinct S1.* + FROM S1 ) AS S +ON +date(T.`recordstamp`) IN UNNEST(processed_date) AND +${p_key} + +-- ## CORTEX-CUSTOMER You can use "`is_deleted` = true" condition along with "operation_flag = 'D'", +-- if that is applicable to your CDC set up. + +WHEN NOT MATCHED AND IFNULL(S.operation_flag, 'I') != 'D' THEN + INSERT (${fields}) + VALUES (${fields}) +WHEN MATCHED AND S.operation_flag = 'D' THEN + DELETE +WHEN MATCHED AND S.operation_flag IN ('I','U') THEN + UPDATE SET ${update_fields}; \ No newline at end of file