diff --git a/.github/workflows/synchdb-ci.yml b/.github/workflows/synchdb-ci.yml index a7e15f7..5abef6f 100644 --- a/.github/workflows/synchdb-ci.yml +++ b/.github/workflows/synchdb-ci.yml @@ -90,6 +90,7 @@ jobs: - oracle - sqlserver - olr + - postgres runs-on: ubuntu-22.04 needs: - params diff --git a/Makefile b/Makefile index 8d489c0..22b89d0 100644 --- a/Makefile +++ b/Makefile @@ -167,6 +167,9 @@ oraclecheck: olrcheck: $(MAKE) dbcheck DB=olr +postgrescheck: + $(MAKE) dbcheck DB=postgres + mysqlcheck-benchmark: $(MAKE) dbcheck-tpcc DB=mysql diff --git a/src/backend/converter/debezium_event_handler.c b/src/backend/converter/debezium_event_handler.c index 0e8ce7b..3bdf40b 100644 --- a/src/backend/converter/debezium_event_handler.c +++ b/src/backend/converter/debezium_event_handler.c @@ -357,10 +357,16 @@ deriveLogicalMessage(Jsonb ** jb) elog(WARNING, "decoded DDL message is not valid JSON: %s", edata->message); FreeErrorData(edata); MemoryContextSwitchTo(oldctx); + if (tmpout) + pfree(tmpout); return -1; } PG_END_TRY(); + /* tmpout not needed anymore */ + if (tmpout) + pfree(tmpout); + /* turn this ddl_jb to tableChanges array with just one value */ out = pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL); v.type = jbvBinary; @@ -381,6 +387,8 @@ deriveLogicalMessage(Jsonb ** jb) BoolGetDatum(true) /* create_missing */ ); *jb = DatumGetJsonbP(newjb_d); + if (path) + pfree(path); return 0; } @@ -1066,10 +1074,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs if (getPathElementString(jb, tmpPath, &strinfo, false) == 0) { /* postgres array is wrapped with curly brackets instead */ - if (strinfo.data[0] == '[') + if (strinfo.len > 0 && strinfo.data[0] == '[') strinfo.data[0] = '{'; - if (strinfo.data[strinfo.len - 1] == ']') - strinfo.data[strinfo.len -1] = '}'; + if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']') + strinfo.data[strinfo.len - 1] = '}'; value = pstrdup(strinfo.data); } @@ -1258,10 +1266,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs if (getPathElementString(jb, tmpPath, &strinfo, false) == 0) { /* postgres array is wrapped with curly brackets instead */ - if (strinfo.data[0] == '[') + if (strinfo.len > 0 && strinfo.data[0] == '[') strinfo.data[0] = '{'; - if (strinfo.data[strinfo.len - 1] == ']') - strinfo.data[strinfo.len -1] = '}'; + if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']') + strinfo.data[strinfo.len - 1] = '}'; value = pstrdup(strinfo.data); } @@ -1485,10 +1493,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs if (getPathElementString(jb, tmpPath, &strinfo, false) == 0) { /* postgres array is wrapped with curly brackets instead */ - if (strinfo.data[0] == '[') + if (strinfo.len > 0 && strinfo.data[0] == '[') strinfo.data[0] = '{'; - if (strinfo.data[strinfo.len - 1] == ']') - strinfo.data[strinfo.len -1] = '}'; + if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']') + strinfo.data[strinfo.len - 1] = '}'; value = pstrdup(strinfo.data); } diff --git a/src/backend/converter/format_converter.c b/src/backend/converter/format_converter.c index 28d3451..921bc97 100644 --- a/src/backend/converter/format_converter.c +++ b/src/backend/converter/format_converter.c @@ -343,10 +343,9 @@ static long long derive_value_from_byte(const unsigned char * bytes, int len); static char * derive_decimal_string_from_byte(const unsigned char *bytes, int len); static void reverse_byte_array(unsigned char * array, int length); static void trim_leading_zeros(char *str); -static void prepend_zeros(char *str, int num_zeros); static void byte_to_binary(unsigned char byte, char * binary_str); static void bytes_to_binary_string(const unsigned char * bytes, - size_t len, char * binary_str); + size_t len, char * binary_str, bool trim); static char * transform_data_expression(const char * remoteObjid, const char * colname); static void populate_primary_keys(StringInfoData * strinfo, const char * id, const char * jsonin, bool alter, bool isinline); @@ -367,7 +366,7 @@ static void expand_struct_value(char * in, DBZ_DML_COLUMN_VALUE * colval, ConnectorType conntype); static char * handle_base64_to_numeric_with_scale(const char * in, int scale); static char * handle_string_to_numeric(const char * in, bool addquote); -static char * handle_base64_to_bit(const char * in, bool addquote, int typemod); +static char * handle_base64_to_bit(const char * in, bool addquote, int typemod, bool padzero); static char * handle_string_to_bit(const char * in, bool addquote); static char * handle_numeric_to_bit(const char * in, bool addquote); static char * construct_datestr(long long input, bool addquote, int timerep); @@ -637,32 +636,6 @@ trim_leading_zeros(char *str) str[j] = '\0'; } -/* - * prepend_zeros - * - * prepend zeros to the given string - */ -static void -prepend_zeros(char *str, int num_zeros) -{ - int original_len = strlen(str); - int new_len = original_len + num_zeros; - char * temp = palloc0(new_len + 1); - - for (int i = 0; i < num_zeros; i++) - { - temp[i] = '0'; - } - - for (int i = 0; i < original_len; i++) - { - temp[i + num_zeros] = str[i]; - } - temp[new_len] = '\0'; - strcpy(str, temp); - pfree(temp); -} - /* * byte_to_binary * @@ -684,7 +657,7 @@ byte_to_binary(unsigned char byte, char * binary_str) * convert the given bytes to a binary string with 1s and 0s */ static void -bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_str) +bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_str, bool trim) { char byte_str[9]; size_t i = 0; @@ -695,6 +668,9 @@ bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_st byte_to_binary(bytes[i], byte_str); strcat(binary_str, byte_str); } + + if (trim) + trim_leading_zeros(binary_str); } /* @@ -2042,42 +2018,50 @@ handle_string_to_numeric(const char * in, bool addquote) } static char * -handle_base64_to_bit(const char * in, bool addquote, int typemod) +handle_base64_to_bit(const char * in, bool addquote, int typemod, bool padzero) { int tmpoutlen = pg_b64_dec_len(strlen(in)); unsigned char * tmpout = (unsigned char *) palloc0(tmpoutlen); char * out = NULL; + int extrazeros = 0; #if SYNCHDB_PG_MAJOR_VERSION >= 1800 tmpoutlen = pg_b64_decode(in, strlen(in), tmpout, tmpoutlen); #else tmpoutlen = pg_b64_decode(in, strlen(in), (char*)tmpout, tmpoutlen); #endif + + if (padzero) + extrazeros = (typemod - (tmpoutlen * 8)); + if (addquote) { - /* 8 bits per byte + 2 single quotes + b + terminating null */ - char * tmp = NULL; - out = (char *) palloc0((tmpoutlen * 8) + 2 + 1 + 1); - tmp = out; + /* 8 bits per byte + 2 single quotes + b + extra zeros + terminating null */ + out = (char *) palloc0((tmpoutlen * 8) + 2 + 1 + extrazeros + 1); + + strcat(out, "'b"); + out += 2; + + /* zeros */ + memset(out, '0', extrazeros); + + /* bit value */ reverse_byte_array(tmpout, tmpoutlen); - strcat(tmp, "'b"); - tmp += 2; - bytes_to_binary_string(tmpout, tmpoutlen, tmp); - trim_leading_zeros(tmp); - if (strlen(tmp) < typemod) - prepend_zeros(tmp, typemod - strlen(tmp)); - - strcat(tmp, "'"); + bytes_to_binary_string(tmpout, tmpoutlen, out + extrazeros, !padzero); + + strcat(out, "'"); } else { - /* 8 bits per byte + terminating null */ - out = (char *) palloc0(tmpoutlen * 8 + 1); + /* 8 bits per byte + extra zeros + terminating null */ + out = (char *) palloc0((tmpoutlen * 8) + extrazeros + 1); + + /* zeros */ + memset(out, '0', extrazeros); + + /* bit value */ reverse_byte_array(tmpout, tmpoutlen); - bytes_to_binary_string(tmpout, tmpoutlen, out); - trim_leading_zeros(out); - if (strlen(out) < typemod) - prepend_zeros(out, typemod - strlen(out)); + bytes_to_binary_string(tmpout, tmpoutlen, out + extrazeros, !padzero); } pfree(tmpout); return out; @@ -2823,12 +2807,14 @@ handle_data_by_type_category(char * in, DBZ_DML_COLUMN_VALUE * colval, Connector case DBZTYPE_STRUCT: { expand_struct_value(in, colval, conntype); - out = handle_base64_to_bit(colval->value, addquote, colval->typemod); + out = handle_base64_to_bit(colval->value, addquote, colval->typemod, + conntype == TYPE_MYSQL ? true : false); break; } case DBZTYPE_BYTES: { - out = handle_base64_to_bit(in, addquote, colval->typemod); + out = handle_base64_to_bit(in, addquote, colval->typemod, + conntype == TYPE_MYSQL ? true : false); break; } case DBZTYPE_STRING: @@ -3008,12 +2994,14 @@ processDataByType(DBZ_DML_COLUMN_VALUE * colval, bool addquote, char * remoteObj case DBZTYPE_STRUCT: { expand_struct_value(in, colval, type); - out = handle_base64_to_bit(colval->value, addquote, colval->typemod); + out = handle_base64_to_bit(colval->value, addquote, colval->typemod, + type == TYPE_MYSQL ? true : false); break; } case DBZTYPE_BYTES: { - out = handle_base64_to_bit(in, addquote, colval->typemod); + out = handle_base64_to_bit(in, addquote, colval->typemod, + type == TYPE_MYSQL ? true : false); break; } case DBZTYPE_STRING: @@ -3408,15 +3396,29 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type) { if (synchdb_dml_use_spi) { + bool atleastone = false; + /* --- Convert to use SPI to handler DML --- */ appendStringInfo(&strinfo, "INSERT INTO %s(", dbzdml->mappedObjectId); foreach(cell, dbzdml->columnValuesAfter) { DBZ_DML_COLUMN_VALUE * colval = (DBZ_DML_COLUMN_VALUE *) lfirst(cell); appendStringInfo(&strinfo, "%s,", colval->name); + atleastone = true; + } + + if (atleastone) + { + strinfo.data[strinfo.len - 1] = '\0'; + strinfo.len = strinfo.len - 1; + } + else + { + elog(WARNING, "no column data is provided for %s. Insert skipped", dbzdml->mappedObjectId); + pfree(strinfo.data); + destroyPGDML(pgdml); + return NULL; } - strinfo.data[strinfo.len - 1] = '\0'; - strinfo.len = strinfo.len - 1; appendStringInfo(&strinfo, ") VALUES ("); foreach(cell, dbzdml->columnValuesAfter) @@ -3435,9 +3437,18 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type) } } /* remove extra "," */ - strinfo.data[strinfo.len - 1] = '\0'; - strinfo.len = strinfo.len - 1; - + if (atleastone) + { + strinfo.data[strinfo.len - 1] = '\0'; + strinfo.len = strinfo.len - 1; + } + else + { + elog(WARNING, "no column data is provided for %s. Insert skipped", dbzdml->mappedObjectId); + pfree(strinfo.data); + destroyPGDML(pgdml); + return NULL; + } appendStringInfo(&strinfo, ");"); } else @@ -3571,10 +3582,22 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type) { appendStringInfo(&strinfo, "%s,", "null"); } + atleastone = true; + } + + if (atleastone) + { + /* remove extra "," */ + strinfo.data[strinfo.len - 1] = '\0'; + strinfo.len = strinfo.len - 1; + } + else + { + elog(WARNING, "no column data is provided for %s. Update skipped", dbzdml->mappedObjectId); + pfree(strinfo.data); + destroyPGDML(pgdml); + return NULL; } - /* remove extra "," */ - strinfo.data[strinfo.len - 1] = '\0'; - strinfo.len = strinfo.len - 1; appendStringInfo(&strinfo, " WHERE "); foreach(cell, dbzdml->columnValuesBefore) diff --git a/src/backend/synchdb/synchdb.c b/src/backend/synchdb/synchdb.c index 852d6e0..bbfc6ae 100644 --- a/src/backend/synchdb/synchdb.c +++ b/src/backend/synchdb/synchdb.c @@ -2372,6 +2372,14 @@ main_loop(ConnectorType connectorType, ConnectionInfo *connInfo, char * snapshot &myBatchInfo, &myBatchStats, connInfo->flag); + /* + * postgres connector under debezium snapshot engine requires user to create + * schema manually, so there is not really a schema sync action here, so we + * just mark it as done for now. + */ + if (connectorType == TYPE_POSTGRES && (connInfo->flag & CONNFLAG_SCHEMA_SYNC_MODE)) + set_shm_connector_state(myConnectorId, STATE_SCHEMA_SYNC_DONE); + /* * if a valid batchid is set by dbz_engine_get_change(), it means we have * successfully completed a batch change request and we shall notify dbz diff --git a/src/test/pytests/synchdbtests/common.py b/src/test/pytests/synchdbtests/common.py index 15e2624..f37234b 100644 --- a/src/test/pytests/synchdbtests/common.py +++ b/src/test/pytests/synchdbtests/common.py @@ -60,6 +60,13 @@ def get_container_ip(name: str, network: str = "synchdbnet") -> str | None: OLR_PORT="7070" OLR_SERVICE="ORACLE" +POSTGRES_HOST="127.0.0.1" +POSTGRES_PORT=5432 +POSTGRES_USER="postgres" +POSTGRES_PASS="pass" +POSTGRES_DB="postgres" +POSTGRES_SCHEMA="public" + def getConnectorName(dbvendor): if dbvendor == "mysql": return "mysqlconn" @@ -67,6 +74,8 @@ def getConnectorName(dbvendor): return "sqlserverconn" elif dbvendor == "oracle": return "oracleconn" + elif dbvendor == "postgres": + return "postgresconn" else: return "olrconn" @@ -77,6 +86,8 @@ def getDbname(dbvendor): return SQLSERVER_DB elif dbvendor == "oracle": return ORACLE_DB + elif dbvendor == "postgres": + return POSTGRES_DB else: return ORA19C_DB @@ -88,6 +99,8 @@ def getSchema(dbvendor): return SQLSERVER_SCHEMA elif dbvendor == "oracle": return ORACLE_SCHEMA + elif dbvendor == "postgres": + return POSTGRES_SCHEMA else: return ORA19C_SCHEMA @@ -189,7 +202,8 @@ def run_remote_query(where, query, srcdb=None): "mysql": MYSQL_DB, "sqlserver": SQLSERVER_DB, "oracle": ORACLE_DB, - "olr": ORA19C_DB + "olr": ORA19C_DB, + "postgres": POSTGRES_DB }[where] try: @@ -210,6 +224,12 @@ def run_remote_query(where, query, srcdb=None): continue # skip empty lines and metadata cols = line.split("\t") rows.append(tuple(cols)) + elif where == "postgres": + result = subprocess.check_output(["docker", "exec", "-i", "postgres", "psql", "-U", f"{POSTGRES_USER}", "-d", f"{POSTGRES_DB}", "-tA", "-c", f"{query}"], text=True , env={"LC_ALL": "C"}).strip() + rows = [] + for line in result.splitlines(): + cols = line.split("|") + rows.append(tuple(cols)) else: sql = f""" SET HEADING OFF; @@ -259,14 +279,16 @@ def create_synchdb_connector(cursor, vendor, name, srcdb=None, srcschema=None): "mysql": MYSQL_DB, "sqlserver": SQLSERVER_DB, "oracle": ORACLE_DB, - "olr": ORA19C_DB + "olr": ORA19C_DB, + "postgres": POSTGRES_DB }[vendor] schema = srcschema or { "mysql": "null", "sqlserver": SQLSERVER_SCHEMA, "oracle": ORACLE_SCHEMA, - "olr": ORA19C_SCHEMA + "olr": ORA19C_SCHEMA, + "postgres": POSTGRES_SCHEMA }[vendor] if vendor == "mysql": @@ -287,6 +309,9 @@ def create_synchdb_connector(cursor, vendor, name, srcdb=None, srcschema=None): assert ORACLE_HOST != None result = run_pg_query_one(cursor, f"SELECT synchdb_add_conninfo('{name}','{ORACLE_HOST}', {ORACLE_PORT}, '{ORACLE_USER}', '{ORACLE_PASS}', '{db}', '{schema}', 'null', 'null', 'oracle');") + elif vendor == "postgres": + result = run_pg_query_one(cursor, f"SELECT synchdb_add_conninfo('{name}','{POSTGRES_HOST}', {POSTGRES_PORT}, '{POSTGRES_USER}', '{POSTGRES_PASS}', '{db}', '{schema}', 'null', 'null', 'postgres');") + else: global ORA19C_HOST global OLR_HOST @@ -328,6 +353,9 @@ def drop_default_pg_schema(cursor, vendor): row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS testdb CASCADE") row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS \"TESTDB\" CASCADE") row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS \"testDB\" CASCADE") + elif vendor == "postgres": + row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS postgres CASCADE") + row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS \"POSTGRES\" CASCADE") else: row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS free CASCADE") row = run_pg_query_one(cursor, f"DROP SCHEMA IF EXISTS \"FREE\" CASCADE") @@ -345,3 +373,9 @@ def update_guc_conf(cursor, key, val, reload_conf=False): if reload_conf: cursor.execute("SELECT pg_reload_conf()") +def drop_repslot_and_pub(dbvendor, name, dstdb): + if dbvendor != "postgres": + return + + run_remote_query(dbvendor, f"SELECT pg_drop_replication_slot('{name}_{dstdb}_synchdb_slot')") + run_remote_query(dbvendor, f"DROP PUBLICATION IF EXISTS {name}_{dstdb}_synchdb_pub") diff --git a/src/test/pytests/synchdbtests/conftest.py b/src/test/pytests/synchdbtests/conftest.py index 560268c..5c34065 100644 --- a/src/test/pytests/synchdbtests/conftest.py +++ b/src/test/pytests/synchdbtests/conftest.py @@ -6,7 +6,7 @@ import psycopg2 import pytest -PG_PORT = "5432" +PG_PORT = "14141" PG_HOST = "127.0.0.1" OLRVER = "1.8.5" @@ -32,6 +32,7 @@ def pg_instance(request): f.write("\nsynchdb.jvm_max_heap_size= 2048\n") f.write("\nsynchdb.olr_read_buffer_size = 128\n") f.write("\nlog_min_messages = debug1\n") + #f.write("\nsynchdb.dbz_log_level = 'info'\n") #f.write("\nsynchdb.olr_snapshot_engine = 'fdw'\n") #f.write("\nsynchdb.cdc_start_delay_ms = 15000\n") @@ -128,7 +129,7 @@ def setup_remote_instance(dbvendor, request): yield - #teardown_remote_instance(dbvendor) + teardown_remote_instance(dbvendor) @pytest.fixture(scope="session") def hammerdb(dbvendor): diff --git a/src/test/pytests/synchdbtests/t/test_001_initialsnapshot.py b/src/test/pytests/synchdbtests/t/test_001_initialsnapshot.py index a148934..347fc57 100644 --- a/src/test/pytests/synchdbtests/t/test_001_initialsnapshot.py +++ b/src/test/pytests/synchdbtests/t/test_001_initialsnapshot.py @@ -1,7 +1,7 @@ import common import time from datetime import datetime -from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, stop_and_delete_synchdb_connector, drop_default_pg_schema, create_and_start_synchdb_connector, update_guc_conf +from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, stop_and_delete_synchdb_connector, drop_default_pg_schema, create_and_start_synchdb_connector, update_guc_conf, getSchema, drop_repslot_and_pub def test_ConnectorCreate(pg_cursor, dbvendor): name = getConnectorName(dbvendor) @@ -61,7 +61,10 @@ def test_RemoveExtraConninfo(pg_cursor, dbvendor): def test_ConnectorStart(pg_cursor, dbvendor): name = getConnectorName(dbvendor) dbname = getDbname(dbvendor).lower() - + + if dbvendor == "postgres": + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'fdw'", True) + result = create_synchdb_connector(pg_cursor, dbvendor, name) assert result[0] == 0 @@ -85,10 +88,19 @@ def test_ConnectorStart(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + if dbvendor == "postgres": + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) + def test_InitialSnapshotDBZ(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbzsnap" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -103,6 +115,8 @@ def test_InitialSnapshotDBZ(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -112,27 +126,27 @@ def test_InitialSnapshotDBZ(pg_cursor, dbvendor): extrowcount = run_remote_query(dbvendor, f"SELECT count(*) FROM orders") assert int(pgrowcount[0]) == int(extrowcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0].lower() + "." + id[2].lower() == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0].lower() + "." + id[2].lower() == row[1] + else: + assert row[0].lower() == row[1] + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0].lower() == row[1] - - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0].lower() == row[1] - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table pgrow = run_pg_query_one(pg_cursor, f"SELECT order_number, order_date, purchaser, quantity, product_id FROM {dbname}.orders WHERE order_number = 10003") @@ -140,6 +154,8 @@ def test_InitialSnapshotDBZ(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -148,17 +164,28 @@ def test_InitialSnapshotDBZ(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_InitialSnapshotFDW(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_fdwsnap" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -182,6 +209,8 @@ def test_InitialSnapshotFDW(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -219,6 +248,8 @@ def test_InitialSnapshotFDW(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -226,11 +257,17 @@ def test_InitialSnapshotFDW(pg_cursor, dbvendor): assert int(pgrow[4]) == int(extrow[0][4]) # test cdc now - query = """ - INSERT INTO orders(order_number, order_date, purchaser, quantity, - product_id) VALUES (10005, TO_DATE('2025-12-12', 'YYYY-MM-DD'), - 1002, 10000, 102); - """ + if dbvendor == "postgres" or dbvendor == "mysql": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ + else: + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, TO_DATE('2025-12-12', 'YYYY-MM-DD'), + 1002, 10000, 102); + """ run_remote_query(dbvendor, query) if dbvendor == "oracle" or dbvendor == "olr": @@ -246,14 +283,21 @@ def test_InitialSnapshotFDW(pg_cursor, dbvendor): drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbzsnap_upper" dbname = getDbname(dbvendor).upper() + schema = getSchema(dbvendor) update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'uppercase'", True) + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS \"{dbname}\"") + run_pg_query_one(pg_cursor, f"CREATE TABLE \"{dbname}\".\"ORDERS\" (\"ORDER_NUMBER\" int primary key, \"ORDER_DATE\" timestamp without time zone, \"PURCHASER\" int, \"QUANTITY\" int , \"PRODUCT_ID\" int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -268,6 +312,8 @@ def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -277,27 +323,28 @@ def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): extrowcount = run_remote_query(dbvendor, f"SELECT count(*) FROM orders") assert int(pgrowcount[0]) == int(extrowcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0].upper() + "." + id[2].upper() == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0].upper() + "." + id[2].upper() == row[1] + else: + assert row[0].upper() == row[1] + + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0].upper() == row[1] - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0].upper() == row[1] - - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table pgrow = run_pg_query_one(pg_cursor, f"SELECT \"ORDER_NUMBER\", \"ORDER_DATE\", \"PURCHASER\", \"QUANTITY\", \"PRODUCT_ID\" FROM \"{dbname}\".\"ORDERS\" WHERE \"ORDER_NUMBER\" = 10003") @@ -305,6 +352,8 @@ def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -322,6 +371,11 @@ def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): INSERT INTO orders(order_date, purchaser, quantity, product_id) VALUES ("2025-12-12", 1002, 10000, 102) """ + elif dbvendor == "postgres": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ else: query = """ INSERT INTO orders(order_number, order_date, purchaser, quantity, @@ -343,18 +397,29 @@ def test_InitialSnapshotDBZ_uppercase(pg_cursor, dbvendor): drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'lowercase'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_InitialSnapshotFDW_uppercase(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_fdwsnap_upper" dbname = getDbname(dbvendor).upper() + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_uppercase skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_uppercase skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -379,6 +444,8 @@ def test_InitialSnapshotFDW_uppercase(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -416,6 +483,8 @@ def test_InitialSnapshotFDW_uppercase(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -433,6 +502,11 @@ def test_InitialSnapshotFDW_uppercase(pg_cursor, dbvendor): INSERT INTO orders(order_date, purchaser, quantity, product_id) VALUES ("2025-12-12", 1002, 10000, 102) """ + elif dbvendor == "postgres": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ else: query = """ INSERT INTO orders(order_number, order_date, purchaser, quantity, @@ -455,11 +529,18 @@ def test_InitialSnapshotFDW_uppercase(pg_cursor, dbvendor): run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'lowercase'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbzsnap_asis" dbname = getDbname(dbvendor) + schema = getSchema(dbvendor) + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'asis'", True) @@ -477,6 +558,8 @@ def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -489,27 +572,28 @@ def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): extrowcount = run_remote_query(dbvendor, f"SELECT count(*) FROM orders") assert int(pgrowcount[0]) == int(extrowcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0] + "." + id[2] == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0] + "." + id[2] == row[1] + else: + assert row[0] == row[1] + + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0] == row[1] - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0] == row[1] - - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table if dbvendor == "oracle" or dbvendor == "olr": @@ -520,6 +604,8 @@ def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -537,6 +623,11 @@ def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): INSERT INTO orders(order_date, purchaser, quantity, product_id) VALUES ("2025-12-12", 1002, 10000, 102) """ + elif dbvendor == "postgres": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ else: query = """ INSERT INTO orders(order_number, order_date, purchaser, quantity, @@ -561,18 +652,29 @@ def test_InitialSnapshotDBZ_asis(pg_cursor, dbvendor): drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'lowercase'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_InitialSnapshotFDW_asis(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_fdwsnap_asis" dbname = getDbname(dbvendor) + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_asis skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_asis skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -597,6 +699,8 @@ def test_InitialSnapshotFDW_asis(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -640,6 +744,8 @@ def test_InitialSnapshotFDW_asis(pg_cursor, dbvendor): assert int(pgrow[0]) == int(extrow[0][0]) if dbvendor == "oracle" or dbvendor == "olr": assert pgrow[1] == datetime.strptime(extrow[0][1], '%d-%b-%y') + elif dbvendor == "postgres": + assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d %H:%M:%S') else: assert pgrow[1] == datetime.strptime(extrow[0][1], '%Y-%m-%d').date() assert int(pgrow[2]) == int(extrow[0][2]) @@ -658,6 +764,11 @@ def test_InitialSnapshotFDW_asis(pg_cursor, dbvendor): INSERT INTO orders(order_date, purchaser, quantity, product_id) VALUES ("2025-12-12", 1002, 10000, 102) """ + elif dbvendor == "postgres": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ else: query = """ INSERT INTO orders(order_number, order_date, purchaser, quantity, @@ -683,11 +794,18 @@ def test_InitialSnapshotFDW_asis(pg_cursor, dbvendor): run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) update_guc_conf(pg_cursor, "synchdb.letter_casing_strategy", "'lowercase'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_ConnectorStartSchemaSyncModeDBZ(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbz_schemasync" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "schemasync") assert result == 0 @@ -703,31 +821,34 @@ def test_ConnectorStartSchemaSyncModeDBZ(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0].lower() + "." + id[2].lower() == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0].lower() + "." + id[2].lower() == row[1] + else: + assert row[0].lower() == row[1] + + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0].lower() == row[1] - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0].lower() == row[1] - - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table pgrow = run_pg_query_one(pg_cursor, f"SELECT count(*) FROM {dbname}.orders;") @@ -757,6 +878,11 @@ def test_ConnectorStartSchemaSyncModeDBZ(pg_cursor, dbvendor): product_id) VALUES ('12-DEC-2025', 1002, 10000, 102); """ + elif dbvendor == "postgres": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ else: query = """ INSERT INTO orders(order_number, order_date, purchaser, quantity, @@ -776,18 +902,29 @@ def test_ConnectorStartSchemaSyncModeDBZ(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_ConnectorStartSchemaSyncModeFDW(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_fdw_schemasync" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_ConnectorStartSchemaSyncModeFDW skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_ConnectorStartSchemaSyncModeFDW skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -811,6 +948,8 @@ def test_ConnectorStartSchemaSyncModeFDW(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -852,12 +991,19 @@ def test_ConnectorStartSchemaSyncModeFDW(pg_cursor, dbvendor): run_pg_query_one(pg_cursor, f"SELECT synchdb_resume_engine('{name}')") + time.sleep(10) # test a bit of cdc - query = """ - INSERT INTO orders(order_number, order_date, purchaser, quantity, - product_id) VALUES (10005, TO_DATE('2025-12-12', 'YYYY-MM-DD'), - 1002, 10000, 102); - """ + if dbvendor == "postgres" or dbvendor == "mysql": + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, '2025-12-12', 1002, 10000, 102); + """ + else: + query = """ + INSERT INTO orders(order_number, order_date, purchaser, quantity, + product_id) VALUES (10005, TO_DATE('2025-12-12', 'YYYY-MM-DD'), + 1002, 10000, 102); + """ run_remote_query(dbvendor, query) if dbvendor == "oracle" or dbvendor == "olr": @@ -872,11 +1018,18 @@ def test_ConnectorStartSchemaSyncModeFDW(pg_cursor, dbvendor): drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number > 10004") update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") time.sleep(10) def test_ConnectorStartAlwaysModeDBZ(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbz_always" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "always") assert result == 0 @@ -892,31 +1045,34 @@ def test_ConnectorStartAlwaysModeDBZ(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0].lower() + "." + id[2].lower() == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0].lower() + "." + id[2].lower() == row[1] + else: + assert row[0].lower() == row[1] + + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0].lower() == row[1] - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0].lower() == row[1] - - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table pgrow = run_pg_query_one(pg_cursor, f"SELECT count(*) FROM {dbname}.orders;") @@ -932,17 +1088,28 @@ def test_ConnectorStartAlwaysModeDBZ(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorStartAlwaysModeFDW(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbz_always" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_asis skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_InitialSnapshotFDW_asis skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -966,6 +1133,8 @@ def test_ConnectorStartAlwaysModeFDW(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -1007,10 +1176,17 @@ def test_ConnectorStartAlwaysModeFDW(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorStartNodataModeDBZ(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbz_nodata" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -1026,31 +1202,34 @@ def test_ConnectorStartNodataModeDBZ(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) - # check table name mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - id = row[0].split(".") - if len(id) == 3: - assert id[0].lower() + "." + id[2].lower() == row[1] - else: + if dbvendor != "postgres": + # check table name mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_tbname, pg_tbname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + id = row[0].split(".") + if len(id) == 3: + assert id[0].lower() + "." + id[2].lower() == row[1] + else: + assert row[0].lower() == row[1] + + # check attname mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: assert row[0].lower() == row[1] - # check attname mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_attname, pg_attname FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert row[0].lower() == row[1] - - # check data type mappings - rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") - assert len(rows) > 0 - for row in rows: - assert verify_default_type_mappings(row[0], row[1], dbvendor) == True + # check data type mappings + rows = run_pg_query(pg_cursor, f"SELECT ext_atttypename, pg_atttypename FROM synchdb_att_view WHERE name = '{name}' AND type = '{dbvendor}'") + assert len(rows) > 0 + for row in rows: + assert verify_default_type_mappings(row[0], row[1], dbvendor) == True # check data consistency of orders table pgrow = run_pg_query_one(pg_cursor, f"SELECT count(*) FROM {dbname}.orders;") @@ -1066,17 +1245,28 @@ def test_ConnectorStartNodataModeDBZ(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorStartNodataModeFDW(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_dbz_nodata" dbname = getDbname(dbvendor).lower() + schema = getSchema(dbvendor) if dbvendor == "mysql": - assert True - return + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'mysql_fdw' ) AS mysql_fdw_available") + if isfdw[0] == False: + print ("test_ConnectorStartNodataModeFDW skipped - mysql_fdw not available for install") + assert True + return elif dbvendor == "sqlserver": assert True return + elif dbvendor == "postgres": + isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'postgres_fdw' ) AS postgres_fdw_available") + if isfdw[0] == False: + print ("test_ConnectorStartNodataModeFDW skipped - postgres_fdw not available for install") + assert True + return else: isfdw = run_pg_query_one(pg_cursor, f"SELECT EXISTS ( SELECT 1 FROM pg_available_extensions WHERE name = 'oracle_fdw' ) AS oracle_fdw_available") if isfdw[0] == False: @@ -1100,6 +1290,8 @@ def test_ConnectorStartNodataModeFDW(pg_cursor, dbvendor): exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()") elif dbvendor == "sqlserver": exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM information_schema.tables WHERE TABLE_CATALOG=DB_NAME() AND TABLE_SCHEMA=schema_name() AND TABLE_NAME NOT LIKE 'systranschemas%'") + elif dbvendor == "postgres": + exttblcount = run_remote_query(dbvendor, f"SELECT count(*) FROM information_schema.tables where table_schema='{schema}' and table_type = 'BASE TABLE'") else: exttblcount = run_remote_query(dbvendor, f"SELECT COUNT(*) FROM user_tables WHERE table_name NOT LIKE 'LOG_MINING%'") assert int(pgtblcount[0]) == int(exttblcount[0][0]) @@ -1141,11 +1333,17 @@ def test_ConnectorStartNodataModeFDW(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorRestart(pg_cursor, dbvendor): name = getConnectorName(dbvendor) dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -1185,10 +1383,16 @@ def test_ConnectorRestart(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorStop(pg_cursor, dbvendor): name = getConnectorName(dbvendor) dbname = getDbname(dbvendor).lower() + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -1210,6 +1414,7 @@ def test_ConnectorStop(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_ConnectorDelete(pg_cursor, dbvendor): name = getConnectorName(dbvendor) diff --git a/src/test/pytests/synchdbtests/t/test_002_ddl.py b/src/test/pytests/synchdbtests/t/test_002_ddl.py index e4d0963..72e59d2 100644 --- a/src/test/pytests/synchdbtests/t/test_002_ddl.py +++ b/src/test/pytests/synchdbtests/t/test_002_ddl.py @@ -1,11 +1,16 @@ import common import time -from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, drop_default_pg_schema +from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, drop_default_pg_schema, drop_repslot_and_pub def test_CreateTable(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -33,6 +38,14 @@ def test_CreateTable(pg_cursor, dbvendor): @source_name = 'create_table_test', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE create_table_test ( + id INT PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE create_table_test ( @@ -58,11 +71,17 @@ def test_CreateTable(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) run_remote_query(dbvendor, "DROP TABLE create_table_test") + drop_repslot_and_pub(dbvendor, name, "postgres") def test_CreateTableWithSpace(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -90,6 +109,14 @@ def test_CreateTableWithSpace(pg_cursor, dbvendor): @source_name = 'create table test', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE \"create table test\" ( + id int PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE "create table test" ( @@ -115,19 +142,26 @@ def test_CreateTableWithSpace(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") if dbvendor == "mysql": run_remote_query(dbvendor, "DROP TABLE `create table test`") elif dbvendor == "sqlserver": run_remote_query(dbvendor, "DROP TABLE [create table test]") + elif dbvendor == "postgres": + run_remote_query(dbvendor, "DROP TABLE \"create table test\"") else: run_remote_query(dbvendor, "DROP TABLE \"create table test\"") def test_CreateTableWithNoPK(pg_cursor, dbvendor): - name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -155,6 +189,14 @@ def test_CreateTableWithNoPK(pg_cursor, dbvendor): @source_name = 'create_table_nopk', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE create_table_nopk ( + id INT, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE create_table_nopk ( @@ -179,13 +221,18 @@ def test_CreateTableWithNoPK(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE create_table_nopk") def test_CreateTableWithNotInlinePK(pg_cursor, dbvendor): - name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -215,6 +262,15 @@ def test_CreateTableWithNotInlinePK(pg_cursor, dbvendor): @source_name = 'create_table_noinlinepk', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE create_table_noinlinepk ( + id INT, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE, + CONSTRAINT pk_create_table_test PRIMARY KEY (id) + ); + """ else: query = """ CREATE TABLE create_table_noinlinepk ( @@ -249,12 +305,18 @@ def test_CreateTableWithNotInlinePK(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE create_table_noinlinepk") def test_DropTable(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -282,6 +344,14 @@ def test_DropTable(pg_cursor, dbvendor): @source_name = 'drop_table_test', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE drop_table_test ( + id INT PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE drop_table_test ( @@ -324,10 +394,16 @@ def test_DropTable(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_DropTableWithSpace(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -356,6 +432,14 @@ def test_DropTableWithSpace(pg_cursor, dbvendor): @source_name = 'drop with space', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE \"drop with space\" ( + id INT PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE "drop with space" ( @@ -382,6 +466,8 @@ def test_DropTableWithSpace(pg_cursor, dbvendor): run_remote_query(dbvendor, "DROP TABLE `drop with space`") elif dbvendor == "sqlserver": run_remote_query(dbvendor, "DROP TABLE [drop with space]") + elif dbvendor == "postgres": + run_remote_query(dbvendor, "DROP TABLE \"drop with space\"") else: run_remote_query(dbvendor, "DROP TABLE \"drop with space\"") @@ -403,11 +489,17 @@ def test_DropTableWithSpace(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_AlterTableAlterColumn(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -435,6 +527,14 @@ def test_AlterTableAlterColumn(pg_cursor, dbvendor): @source_name = 'alter_table_alter_col', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE alter_table_alter_col ( + id INT PRIMARY KEY, + age INT, + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE alter_table_alter_col ( @@ -461,6 +561,8 @@ def test_AlterTableAlterColumn(pg_cursor, dbvendor): run_remote_query(dbvendor, "ALTER TABLE alter_table_alter_col MODIFY COLUMN age BIGINT") elif dbvendor == "sqlserver": run_remote_query(dbvendor, "ALTER TABLE alter_table_alter_col ALTER COLUMN age BIGINT") + elif dbvendor == "postgres": + run_remote_query(dbvendor, "ALTER TABLE alter_table_alter_col ALTER COLUMN age TYPE BIGINT") else: run_remote_query(dbvendor, "ALTER TABLE alter_table_alter_col MODIFY age NUMBER(10,0)") @@ -481,12 +583,18 @@ def test_AlterTableAlterColumn(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE alter_table_alter_col") def test_AlterTableAlterColumnAddPK(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -515,6 +623,14 @@ def test_AlterTableAlterColumnAddPK(pg_cursor, dbvendor): @supports_net_changes = 0, @capture_instance='alter_table_add_pk_1'; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE alter_table_addpk ( + id INT, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE alter_table_addpk ( @@ -556,6 +672,11 @@ def test_AlterTableAlterColumnAddPK(pg_cursor, dbvendor): @supports_net_changes = 0, @capture_instance = 'alter_table_add_pk_2'; """) + elif dbvendor == "postgres": + run_remote_query(dbvendor, """ + ALTER TABLE alter_table_addpk + ADD PRIMARY KEY (id); + """) else: run_remote_query(dbvendor, """ ALTER TABLE alter_table_addpk @@ -579,6 +700,7 @@ def test_AlterTableAlterColumnAddPK(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE alter_table_addpk") @@ -588,6 +710,11 @@ def test_AlterTableiAddColumn(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -616,6 +743,14 @@ def test_AlterTableiAddColumn(pg_cursor, dbvendor): @supports_net_changes = 0, @capture_instance='alter_table_add_col_1'; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE alter_table_add_col ( + id INT PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE alter_table_add_col ( @@ -657,6 +792,8 @@ def test_AlterTableiAddColumn(pg_cursor, dbvendor): """) rows = run_remote_query(dbvendor, "INSERT INTO alter_table_add_col(name, created_at, age) VALUES('s', '16-JAN-2025', 35);") + elif dbvendor == "postgres": + run_remote_query(dbvendor, "ALTER TABLE alter_table_add_col ADD COLUMN age INT") else: run_remote_query(dbvendor, "ALTER TABLE alter_table_add_col ADD age NUMBER") @@ -676,12 +813,18 @@ def test_AlterTableiAddColumn(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE alter_table_add_col") def test_AlterTableDropColumn(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_ddl" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "initial") assert result == 0 @@ -709,6 +852,14 @@ def test_AlterTableDropColumn(pg_cursor, dbvendor): @source_name = 'alter_table_drop_col', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + query = """ + CREATE TABLE alter_table_drop_col ( + id INT PRIMARY KEY, + name VARCHAR(255), + created_at TIMESTAMP WITHOUT TIME ZONE + ); + """ else: query = """ CREATE TABLE alter_table_drop_col ( @@ -753,4 +904,5 @@ def test_AlterTableDropColumn(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE alter_table_drop_col") diff --git a/src/test/pytests/synchdbtests/t/test_003_datatypes.py b/src/test/pytests/synchdbtests/t/test_003_datatypes.py index 0385afb..846537f 100644 --- a/src/test/pytests/synchdbtests/t/test_003_datatypes.py +++ b/src/test/pytests/synchdbtests/t/test_003_datatypes.py @@ -1,10 +1,13 @@ import common +import json import time +from psycopg2.extras import NumericRange, DateRange, DateRange, DateTimeTZRange, DateTimeRange + from decimal import Decimal -from datetime import datetime, timezone, timedelta +from datetime import datetime, timezone, timedelta, date from binascii import unhexlify -from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, getSchema, drop_default_pg_schema +from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, verify_default_type_mappings, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, getSchema, drop_default_pg_schema, drop_repslot_and_pub, update_guc_conf def parse_time_with_fraction(t): if '.' in t: @@ -70,10 +73,20 @@ def parse_ora_year2month_interval(s): total_days = sign * (years * 365 + months * 30) return timedelta(days=total_days) +def parse_timedelta(s: str): + days, _, time = s.split() + h, m, sec = map(int, time.split(':')) + return timedelta(days=int(days), hours=h, minutes=m, seconds=sec) + def test_AllDefaultDataTypes(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_addt" dbname = getDbname(dbvendor).lower() - + + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -170,6 +183,60 @@ def test_AllDefaultDataTypes(pg_cursor, dbvendor): @source_name = 'mytable', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + time.sleep(20) + query = """ + CREATE TABLE mytable ( + col_smallint SMALLINT, + col_integer INTEGER, + col_bigint BIGINT, + col_decimal DECIMAL(10,5), + col_numeric NUMERIC(20,10), + col_real REAL, + col_double DOUBLE PRECISION, + col_smallserial SMALLSERIAL, + col_serial SERIAL, + col_bigserial BIGSERIAL, + col_money MONEY, + col_char CHAR(10), + col_varchar VARCHAR(255), + col_text TEXT, + col_bytea BYTEA, + col_boolean BOOLEAN, + col_date DATE, + col_time TIME, + col_timetz TIME WITH TIME ZONE, + col_timestamp TIMESTAMP, + col_timestamptz TIMESTAMP WITH TIME ZONE, + col_interval INTERVAL, + col_uuid UUID, + col_json JSON, + col_jsonb JSONB, + col_xml XML, + col_inet INET, + col_cidr CIDR, + col_macaddr MACADDR, + col_macaddr8 MACADDR8, + col_tsvector TSVECTOR, + col_tsquery TSQUERY, + col_bit BIT(8), + col_varbit BIT VARYING(64), + col_int4range INT4RANGE, + col_int8range INT8RANGE, + col_numrange NUMRANGE, + col_daterange DATERANGE, + col_tsrange TSRANGE, + col_tstzrange TSTZRANGE, + col_int_array INTEGER[], + col_text_array TEXT[], + col_jsonb_array JSONB[], + col_point POINT, + col_identity BIGINT GENERATED ALWAYS AS IDENTITY, + col_not_null TEXT NOT NULL, + col_unique TEXT UNIQUE, + col_primary_key INTEGER PRIMARY KEY + ) + """ else: query = """ CREATE TABLE mytable ( @@ -309,6 +376,64 @@ def test_AllDefaultDataTypes(pg_cursor, dbvendor): '2024-08-28 14:30:00.654321', '2024-08-28 14:30:00.654321 +06:00'); """ + elif dbvendor == "postgres": + query = """ + INSERT INTO mytable ( + col_smallint, col_integer, col_bigint, col_decimal, col_numeric, col_real, + col_double, col_money, col_char, col_varchar, col_text, col_bytea, col_boolean, + col_date, col_time, col_timetz, col_timestamp, col_timestamptz, col_interval, + col_uuid, col_json, col_jsonb, col_xml, col_inet, col_cidr, col_macaddr, + col_macaddr8, col_tsvector, col_tsquery, col_bit, col_varbit, col_int4range, + col_int8range, col_numrange, col_daterange, col_tsrange, col_tstzrange, col_int_array, + col_text_array, col_jsonb_array, col_point, col_not_null, col_unique, col_primary_key + ) + VALUES ( + 123, + 1000, + 9000000000, + 12345.67890, + 98765.4321000000, + 3.14, + 2.718281828459, + '$1234.56', + 'ABC', + 'varchar value', + 'some long text here', + '\\xDEADBEEF'::bytea, + true, + DATE '2026-01-01', + TIME '12:34:56', + TIME '12:34:56+02', + TIMESTAMP '2026-01-01 12:34:56', + TIMESTAMPTZ '2026-01-01 12:34:56+02', + INTERVAL '3 days 4 hours', + gen_random_uuid(), + '{"a":1,"b":"text"}', + '{"x":[1,2,3]}'::jsonb, + 'value', + '192.168.1.10', + '192.168.0.0/24', + '08:00:2b:01:02:03', + '08:00:2b:ff:fe:12:34:56', + to_tsvector('english', 'PostgreSQL full text search'), + to_tsquery('postgresql & search'), + B'10101010', + B'101010', + '[1,10]'::int4range, + '[10000000000,20000000000]'::int8range, + '[1.5,9.9]'::numrange, + '[2026-01-01,2026-01-31]'::daterange, + '[2026-01-01 10:00,2026-01-01 12:00]'::tsrange, + '[2026-01-01 10:00+00,2026-01-01 12:00+00]'::tstzrange, + ARRAY[1,2,3,4], + ARRAY['a','b','c'], + ARRAY['{"k":1}','{"k":2}']::jsonb[], + POINT(10.5, 20.25), + 'must not be null', + 'unique-value-1', + 1 + ) + """ else: query = """ INSERT INTO mytable ( @@ -364,6 +489,19 @@ def test_AllDefaultDataTypes(pg_cursor, dbvendor): q, r, s, t, u, v, w, x, y, z, aa, bb, cc, dd, ee, ff, gg, hh, ii, jj, kk, ll, mm FROM mytable """) + elif dbvendor == "postgres": + extrows = run_remote_query(dbvendor, f""" + SELECT col_smallint, col_integer, col_bigint, col_decimal, col_numeric, col_real, + col_double, col_smallserial, col_serial, col_bigserial, col_money, col_char, + col_varchar, col_text, col_bytea, col_boolean, + col_date, col_time, col_timetz, col_timestamp, col_timestamptz, col_interval, + col_uuid, col_json, col_jsonb, col_xml, col_inet, col_cidr, col_macaddr, + col_macaddr8, col_tsvector, col_tsquery, col_bit, col_varbit, col_int4range, + col_int8range, col_numrange, col_daterange, col_tsrange, col_tstzrange, col_int_array, + col_text_array, col_jsonb_array, col_point, col_identity, col_not_null, col_unique, + col_primary_key + FROM mytable + """) else: extrows = run_remote_query(dbvendor, f""" SELECT id, binary_double_col, binary_float_col, float_col, number_col, @@ -459,6 +597,58 @@ def test_AllDefaultDataTypes(pg_cursor, dbvendor): #assert row[36] == parse_time_with_fraction(extrow[36]) assert row[37] == parse_datetime_with_fraction(extrow[37]) assert row[38] == parse_datetime_with_fraction_and_tz(extrow[38]) + elif dbvendor == "postgres": + for row, extrow in zip(rows, extrows): + assert row[0] == int(extrow[0]) + assert row[1] == int(extrow[1]) + assert row[2] == int(extrow[2]) + assert row[3] == Decimal(extrow[3]) + assert row[4] == Decimal(extrow[4]) + assert Decimal(str(row[5])) == Decimal(extrow[5]) + assert Decimal(str(row[6])) == Decimal(extrow[6]) + assert row[7] == int(extrow[7]) + assert row[8] == int(extrow[8]) + assert row[9] == int(extrow[9]) + assert row[10] == extrow[10] + assert row[11] == extrow[11] + assert row[12] == extrow[12] + assert row[13] == extrow[13] + assert row[14].tobytes() == bytes.fromhex(extrow[14][2:]) + assert row[15] == True and extrow[15] == "t" + assert row[16] == datetime.strptime(extrow[16], "%Y-%m-%d").date() + assert row[17] == datetime.strptime(extrow[17], "%H:%M:%S").time() + assert row[18] == datetime.strptime(extrow[18].replace("+00", "+0000").replace("-00", "-0000"), "%H:%M:%S%z").timetz() + assert row[19] == datetime.strptime(extrow[19], "%Y-%m-%d %H:%M:%S") + #assert row[20].astimezone(datetime.timezone.utc) == datetime.datetime.fromisoformat(extrow[20]) + assert row[21] == parse_timedelta(extrow[21]) + assert row[22] == extrow[22] + assert row[23] == json.loads(extrow[23]) + assert row[24] == json.loads(extrow[24]) + assert row[25] == extrow[25] + assert row[26] == extrow[26] + assert row[27] == extrow[27] + assert row[28] == extrow[28] + assert row[29] == extrow[29] + assert row[30] == None + assert row[31] == None + assert row[32] == extrow[32] + assert row[33] == extrow[33] + assert row[34] == NumericRange(*map(int, extrow[34][1:-1].split(',')), bounds=extrow[34][0] + extrow[34][-1]) + assert row[35] == NumericRange(*map(int, extrow[35][1:-1].split(',')), bounds=extrow[35][0] + extrow[35][-1]) + assert row[36] == NumericRange(*map(Decimal, extrow[36][1:-1].split(',')), bounds=extrow[36][0] + extrow[36][-1]) + assert row[37] == DateRange(date.fromisoformat(extrow[37][1:-1].split(',',1)[0]), date.fromisoformat(extrow[37][1:-1].split(',',1)[1]),bounds=extrow[37][0] + extrow[37][-1]) + assert row[38] == DateTimeRange(datetime.fromisoformat(extrow[38][2:-2].split('","',1)[0]), datetime.fromisoformat(extrow[38][2:-2].split('","',1)[1]), bounds=extrow[38][0] + extrow[38][-1]) + #assert row[39] == DateTimeTZRange(*(datetime.fromisoformat(x).astimezone(timezone.utc) for x in extrow[39][2:-2].split('","')), extrow[39][0] + extrow[39][-1]) + assert row[39] == DateTimeTZRange(*(datetime.fromisoformat((x + ':00') if (len(x) >= 3 and x[-3] in '+-' and x[-2:].isdigit()) else (x[:-2] + ':' + x[-2:] if (len(x) >= 5 and x[-5] in '+-' and x[-4:].isdigit()) else x)).astimezone(timezone.utc) for x in extrow[39][2:-2].split('","')), extrow[39][0] + extrow[39][-1]) + + assert row[40] == list(map(int, extrow[40].strip("{}").split(","))) + assert row[41] == list(map(str, extrow[41].strip("{}").split(","))) + assert row[42] == [json.loads(json.loads(x)) for x in extrow[42][1:-1].split(',')] + assert row[43] == extrow[43] + assert row[44] == int(extrow[44]) + assert row[45] == extrow[45] + assert row[46] == extrow[46] + assert row[47] == int(extrow[47]) else: for row, extrow in zip(rows, extrows): assert row[0] == int(extrow[0]) @@ -490,7 +680,7 @@ def test_AllDefaultDataTypes(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) - + drop_repslot_and_pub(dbvendor, name, "postgres") run_remote_query(dbvendor, "DROP TABLE mytable") time.sleep(5) @@ -521,7 +711,17 @@ def test_TableNameMapping(pg_cursor, dbvendor): rows = run_pg_query_one(pg_cursor, f"SELECT synchdb_add_objmap('{name}', 'table', '{exttable_prefix}.objmap_srctable3', 'someschema.objmap_dsttable3')") assert rows[0] == 0 - # create the tables remotely + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + + # we need to start connector now so tables are copied via CDC, debezium does not snapshot table schemas + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") + assert result == 0 + time.sleep(20) + + # create the tables remotely run_remote_query(dbvendor, "CREATE TABLE objmap_srctable1 (a INT, b varchar(50))") run_remote_query(dbvendor, "CREATE TABLE objmap_srctable2 (a INT, b varchar(50))") run_remote_query(dbvendor, "CREATE TABLE objmap_srctable3 (a INT, b varchar(50))") @@ -542,8 +742,9 @@ def test_TableNameMapping(pg_cursor, dbvendor): """) # create the connector in pg and copy the tables - result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") - assert result == 0 + if dbvendor != "postgres": + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") + assert result == 0 if dbvendor == "oracle" or dbvendor == "olr": time.sleep(60) @@ -576,7 +777,8 @@ def test_TableNameMapping(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) - + drop_repslot_and_pub(dbvendor, name, "postgres") + run_remote_query(dbvendor, "DROP TABLE objmap_srctable1") run_remote_query(dbvendor, "DROP TABLE objmap_srctable2") run_remote_query(dbvendor, "DROP TABLE objmap_srctable3") @@ -604,6 +806,16 @@ def test_ColumnNameMapping(pg_cursor, dbvendor): rows = run_pg_query_one(pg_cursor, f"SELECT synchdb_add_objmap('{name}', 'column', '{exttable_prefix}.objmapcol_srctable1.b', 'pgtextcol')") assert rows[0] == 0 + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + + # we need to start connector now so tables are copied via CDC, debezium does not snapshot table schemas + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") + assert result == 0 + time.sleep(20) + # create the tables remotely run_remote_query(dbvendor, "CREATE TABLE objmapcol_srctable1 (a INT, b varchar(50))") @@ -614,9 +826,10 @@ def test_ColumnNameMapping(pg_cursor, dbvendor): @source_name = 'objmapcol_srctable1', @role_name = NULL, @supports_net_changes = 0; """) - # create the connector in pg and copy the tables - result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") - assert result == 0 + if dbvendor != "postgres": + # create the connector inpg and copy the tables + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") + assert result == 0 if dbvendor == "oracle" or dbvendor == "olr": time.sleep(60) @@ -635,7 +848,8 @@ def test_ColumnNameMapping(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) - + drop_repslot_and_pub(dbvendor, name, "postgres") + run_remote_query(dbvendor, "DROP TABLE objmapcol_srctable1") time.sleep(5) @@ -643,6 +857,10 @@ def test_DataTypeMapping(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_objmap_dtm" dbname = getDbname(dbvendor) + if dbvendor == "postgres": + # for postgres connector, we need to use fdw snapshot to observe + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'fdw'", True) + if dbvendor == "mysql": exttable_prefix=dbname else: @@ -675,11 +893,18 @@ def test_DataTypeMapping(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") + if dbvendor == "postgres": + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) def test_TransformExpression(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_objmap_te" dbname = getDbname(dbvendor) + if dbvendor == "postgres": + # for postgres connector, we need to use fdw snapshot to observe + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'fdw'", True) + if dbvendor == "mysql": exttable_prefix=dbname else: @@ -714,11 +939,18 @@ def test_TransformExpression(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") + if dbvendor == "postgres": + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) def test_ReloadObjmapEntries(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_objmap_roe" dbname = getDbname(dbvendor) + if dbvendor == "postgres": + # for postgres connector, we need to use fdw snapshot to observe + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'fdw'", True) + if dbvendor == "mysql": exttable_prefix=dbname else: @@ -867,6 +1099,11 @@ def test_ReloadObjmapEntries(pg_cursor, dbvendor): INSERT INTO orders(order_number, order_date, purchaser, quantity, product_id) VALUES (10005, TO_DATE('2025-12-12', 'YYYY-MM-DD'), 1002, 10000, 102) """) + elif dbvendor == "postgres": + extrows = run_remote_query(dbvendor, f""" + INSERT INTO orders(order_number, order_date, purchaser, quantity, product_id) VALUES + (10005, '2025-12-12', 1002, 10000, 102) + """) else: extrows = run_remote_query(dbvendor, f""" INSERT INTO orders(order_date, purchaser, quantity, product_id) VALUES @@ -887,6 +1124,9 @@ def test_ReloadObjmapEntries(pg_cursor, dbvendor): stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") + if dbvendor == "postgres": + update_guc_conf(pg_cursor, "synchdb.snapshot_engine", "'debezium'", True) run_remote_query(dbvendor, f"DELETE FROM orders WHERE order_number>=10005") time.sleep(5) diff --git a/src/test/pytests/synchdbtests/t/test_004_dml.py b/src/test/pytests/synchdbtests/t/test_004_dml.py index 2d2b8c9..3a1ed6e 100644 --- a/src/test/pytests/synchdbtests/t/test_004_dml.py +++ b/src/test/pytests/synchdbtests/t/test_004_dml.py @@ -1,11 +1,16 @@ import common import time -from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, drop_default_pg_schema +from common import run_pg_query, run_pg_query_one, run_remote_query, create_synchdb_connector, getConnectorName, getDbname, create_and_start_synchdb_connector, stop_and_delete_synchdb_connector, drop_default_pg_schema, drop_repslot_and_pub def test_Insert(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_insert" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -24,6 +29,13 @@ def test_Insert(pg_cursor, dbvendor): @source_name = 'inserttable', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + time.sleep(10) + query = """ + CREATE TABLE inserttable( + a INT PRIMARY KEY, + b VARCHAR(255)); + """ else: query = """ CREATE TABLE inserttable( @@ -57,6 +69,7 @@ def test_Insert(pg_cursor, dbvendor): extrows = run_remote_query(dbvendor, f"DROP TABLE inserttable") stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_InsertWithError(pg_cursor, dbvendor): assert True @@ -65,6 +78,11 @@ def test_Update(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_update" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -83,6 +101,13 @@ def test_Update(pg_cursor, dbvendor): @source_name = 'updatetable', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + time.sleep(10) + query = """ + CREATE TABLE updatetable( + a INT PRIMARY KEY, + b VARCHAR(255)); + """ else: query = """ CREATE TABLE updatetable( @@ -120,6 +145,7 @@ def test_Update(pg_cursor, dbvendor): extrows = run_remote_query(dbvendor, f"DROP TABLE updatetable") stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_UpdateWithError(pg_cursor, dbvendor): assert True @@ -128,6 +154,11 @@ def test_Delete(pg_cursor, dbvendor): name = getConnectorName(dbvendor) + "_delete" dbname = getDbname(dbvendor).lower() + if dbvendor == "postgres": + # postgres in debezium snapshot needs to create tables manually + run_pg_query_one(pg_cursor, f"CREATE SCHEMA IF NOT EXISTS {dbname}") + run_pg_query_one(pg_cursor, f"CREATE TABLE {dbname}.orders (order_number int primary key, order_date timestamp without time zone, purchaser int, quantity int , product_id int)") + result = create_and_start_synchdb_connector(pg_cursor, dbvendor, name, "no_data") assert result == 0 @@ -146,6 +177,13 @@ def test_Delete(pg_cursor, dbvendor): @source_name = 'deletetable', @role_name = NULL, @supports_net_changes = 0; """ + elif dbvendor == "postgres": + time.sleep(10) + query = """ + CREATE TABLE deletetable( + a INT PRIMARY KEY, + b VARCHAR(255)); + """ else: query = """ CREATE TABLE deletetable( @@ -199,6 +237,7 @@ def test_Delete(pg_cursor, dbvendor): extrows = run_remote_query(dbvendor, f"DROP TABLE deletetable") stop_and_delete_synchdb_connector(pg_cursor, name) drop_default_pg_schema(pg_cursor, dbvendor) + drop_repslot_and_pub(dbvendor, name, "postgres") def test_DeleteWithError(pg_cursor, dbvendor): assert True diff --git a/synchdb--1.0.sql b/synchdb--1.0.sql index ede4b37..c83e522 100644 --- a/synchdb--1.0.sql +++ b/synchdb--1.0.sql @@ -513,7 +513,10 @@ BEGIN precision integer OPTIONS (column_name 'numeric_precision'), scale integer OPTIONS (column_name 'numeric_scale'), nullable text OPTIONS (column_name 'is_nullable'), - default_value text OPTIONS (column_name 'column_default') + default_value text OPTIONS (column_name 'column_default'), + + udt_schema text OPTIONS (column_name 'udt_schema'), + udt_name text OPTIONS (column_name 'udt_name') ) SERVER %2$I OPTIONS ( @@ -675,7 +678,76 @@ BEGIN AND n.nspname NOT IN ('pg_catalog', 'information_schema') $SQL$, v_schema); - -- options currently unused; kept for API symmetry with mysql_create_catalog + EXECUTE format($SQL$ + CREATE FOREIGN TABLE IF NOT EXISTS %1$I.types ( + oid oid, + typname name, + typnamespace oid, + typlen smallint, + typbyval boolean, + typtype "char", + typcategory "char", + typispreferred boolean, + typdelim "char", + typrelid oid, + typelem oid, + typarray oid, + typinput regproc, + typoutput regproc, + typreceive regproc, + typsend regproc, + typmodin regproc, + typmodout regproc, + typanalyze regproc, + typalign "char", + typstorage "char", + typnotnull boolean, + typbasetype oid, + typtypmod integer, + typndims integer, + typcollation oid + ) + SERVER %2$I + OPTIONS ( + schema_name 'pg_catalog', + table_name 'pg_type' + ) + $SQL$, v_schema, v_server); + + EXECUTE format($SQL$ + CREATE OR REPLACE VIEW %1$I.columns_resolved AS + SELECT + c.schema, + c.table_name, + c.column_name, + c.position, + c.type_name, + c.length, + c.precision, + c.scale, + c.nullable, + c.default_value, + + -- carry-through so downstream SQL can use them + c.udt_schema, + c.udt_name, + + CASE WHEN c.type_name = 'ARRAY' THEN ns.nspname ELSE NULL END AS array_type_schema, + CASE WHEN c.type_name = 'ARRAY' THEN t.typname ELSE NULL END AS array_type_name, + + CASE WHEN c.type_name = 'ARRAY' THEN elemns.nspname ELSE NULL END AS element_type_schema, + CASE WHEN c.type_name = 'ARRAY' THEN elem.typname ELSE NULL END AS element_type_name + FROM %1$I.columns c + LEFT JOIN %1$I.namespaces ns + ON ns.nspname = c.udt_schema + LEFT JOIN %1$I.types t + ON t.typname = c.udt_name + AND t.typnamespace = ns.oid + LEFT JOIN %1$I.types elem + ON elem.oid = t.typelem + LEFT JOIN %1$I.namespaces elemns + ON elemns.oid = elem.typnamespace + $SQL$, v_schema); RETURN; END; $postgres_create_catalog$; @@ -2125,7 +2197,7 @@ LANGUAGE plpgsql AS $$ DECLARE rname text; - rels text[] := ARRAY['tables','columns','keys']; -- materialize just these + rels text[] := ARRAY['tables','columns','keys','columns_resolved']; -- materialize just these src_ok boolean; dst_ok boolean; rel_exists boolean; @@ -2241,6 +2313,21 @@ BEGIN END IF; END IF; + IF rname = 'columns_resolved' THEN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = p_dest_schema::text AND table_name='columns_resolved' + AND column_name IN ('schema','table_name','position') + GROUP BY table_schema, table_name + HAVING count(*) = 3 + ) THEN + EXECUTE format( + 'CREATE INDEX ON %I.%I(("schema"), table_name, position)', + p_dest_schema, rname + ); + END IF; + END IF; + -- For keys(schema, table_name[, constraint_name]) IF rname = 'keys' THEN IF EXISTS ( @@ -2268,6 +2355,18 @@ BEGIN END IF; END IF; + IF rname = 'columns_resolved' THEN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = p_dest_schema::text AND table_name='columns_resolved' + AND column_name IN ('schema','table_name','position') + GROUP BY table_schema, table_name + HAVING count(*) = 3 + ) THEN + EXECUTE format('CREATE INDEX ON %I.columns_resolved(("schema"), table_name, position)', p_dest_schema); + END IF; + END IF; + END LOOP; -- 4) Analyze for better local planning @@ -2275,6 +2374,14 @@ BEGIN EXECUTE format('ANALYZE %I.columns', p_dest_schema); EXECUTE format('ANALYZE %I.keys', p_dest_schema); + IF EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid=c.relnamespace + WHERE n.nspname = p_dest_schema::text AND c.relname='columns_resolved' AND c.relkind='r' + ) THEN + EXECUTE format('ANALYZE %I.columns_resolved', p_dest_schema); + END IF; + RAISE NOTICE 'Materialization complete into schema %', p_dest_schema; END; $$; @@ -2624,6 +2731,38 @@ BEGIN END IF; END IF; + IF v_conn_type = 'postgres' THEN + EXECUTE format( + 'SELECT string_agg( + quote_ident(%s) || '' '' || + synchdb_translate_datatype( + %L::name, + CASE + WHEN c.type_name = ''ARRAY'' AND c.element_type_name IS NOT NULL + THEN (lower(c.element_type_name) || ''[]'')::name + ELSE lower(c.type_name)::name + END, + COALESCE(c.length, -1)::bigint, + COALESCE(c.scale, -1)::bigint, + COALESCE(c.precision, -1)::bigint + ) || + CASE + WHEN lower(coalesce(c.nullable::text, '''')) IN (''no'', ''n'', ''0'', ''false'', ''f'') + THEN '' NOT NULL'' + ELSE '''' + END, + '', '' ORDER BY c.position + ) + FROM %I.columns_resolved c + WHERE c."schema" = %L + AND c.table_name = %L', + v_colname_expr, + v_conn_type, + p_source_schema, r.ora_owner, r.table_name + ) + INTO v_cols_sql; + + ELSE -- Build PG column list using translator, honoring case strategy EXECUTE format( 'SELECT string_agg( @@ -2643,6 +2782,8 @@ BEGIN ) INTO v_cols_sql; + END IF; + IF v_cols_sql IS NULL THEN RAISE NOTICE 'No columns found for %.% — skipping', r.ora_owner, r.table_name; CONTINUE; @@ -2710,17 +2851,73 @@ BEGIN RAISE NOTICE 'Created MySQL FT %.% -> %.% on %', p_stage_schema, v_tbl_pg, p_desired_db, r.table_name, p_server_name; ELSIF v_conn_type = 'postgres' THEN - -- Ignore p_scn; use mysql_fdw options: dbname + table_name - EXECUTE format( - 'CREATE FOREIGN TABLE %I.%I (%s) SERVER %I OPTIONS (schema_name %L, table_name %L)', - p_stage_schema, v_tbl_pg, v_cols_sql, p_server_name, - p_desired_schema::text, r.table_name - ); + ------------------------------------------------------------------ + -- IMPORTANT for postgres_fdw: + -- If we normalize local FT column identifiers (upper/lower), + -- we MUST map them back to the real remote column names using + -- column OPTIONS (column_name 'remote_col'). + ------------------------------------------------------------------ + EXECUTE format($SQL$ + SELECT string_agg( + ( + quote_ident( + CASE + WHEN %L = 'lower' THEN lower(c.column_name) + WHEN %L = 'upper' THEN upper(c.column_name) + ELSE c.column_name + END + ) + || ' ' || + synchdb_translate_datatype( + %L::name, + CASE + WHEN lower(c.type_name) = 'array' THEN + CASE + WHEN c.element_type_name IS NOT NULL + THEN (lower(c.element_type_name) || '[]')::name + ELSE + lower(c.array_type_name)::name + END + ELSE + lower(c.type_name)::name + END, + COALESCE(c.length, -1)::int, + COALESCE(c.scale, -1)::int, + COALESCE(c.precision, -1)::int + ) + || ' OPTIONS (column_name ' || quote_literal(c.column_name) || ')' + || CASE + WHEN lower(coalesce(c.nullable::text,'')) IN ('no','n','0','false','f') + THEN ' NOT NULL' + ELSE '' + END + ), + ', ' ORDER BY c.position + ) + FROM %I.columns_resolved c + WHERE c."schema" = %L + AND c.table_name = %L + $SQL$, + p_case_strategy, + p_case_strategy, + v_conn_type, + p_source_schema, + r.ora_owner, + r.table_name + ) + INTO v_cols_sql; - RAISE NOTICE 'Created Postgres FT %.% -> %.% on %', - p_stage_schema, v_tbl_pg, p_desired_schema, r.table_name, p_server_name; - ELSE + EXECUTE format( + 'CREATE FOREIGN TABLE %I.%I (%s) SERVER %I OPTIONS (schema_name %L, table_name %L)', + p_stage_schema, v_tbl_pg, v_cols_sql, p_server_name, + p_desired_schema::text, r.table_name + ); + + RAISE NOTICE 'Created Postgres FT %.% -> %.% on %', + p_stage_schema, v_tbl_pg, p_desired_schema, r.table_name, p_server_name; + + ELSE RAISE EXCEPTION 'Unsupported connector type: %', v_conn_type; END IF;