Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/synchdb-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ jobs:
- oracle
- sqlserver
- olr
- postgres
runs-on: ubuntu-22.04
needs:
- params
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ oraclecheck:
olrcheck:
$(MAKE) dbcheck DB=olr

postgrescheck:
$(MAKE) dbcheck DB=postgres

mysqlcheck-benchmark:
$(MAKE) dbcheck-tpcc DB=mysql

Expand Down
26 changes: 17 additions & 9 deletions src/backend/converter/debezium_event_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,16 @@ deriveLogicalMessage(Jsonb ** jb)
elog(WARNING, "decoded DDL message is not valid JSON: %s", edata->message);
FreeErrorData(edata);
MemoryContextSwitchTo(oldctx);
if (tmpout)
pfree(tmpout);
return -1;
}
PG_END_TRY();

/* tmpout not needed anymore */
if (tmpout)
pfree(tmpout);

/* turn this ddl_jb to tableChanges array with just one value */
out = pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
v.type = jbvBinary;
Expand All @@ -381,6 +387,8 @@ deriveLogicalMessage(Jsonb ** jb)
BoolGetDatum(true) /* create_missing */
);
*jb = DatumGetJsonbP(newjb_d);
if (path)
pfree(path);
return 0;
}

Expand Down Expand Up @@ -1066,10 +1074,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs
if (getPathElementString(jb, tmpPath, &strinfo, false) == 0)
{
/* postgres array is wrapped with curly brackets instead */
if (strinfo.data[0] == '[')
if (strinfo.len > 0 && strinfo.data[0] == '[')
strinfo.data[0] = '{';
if (strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len -1] = '}';
if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len - 1] = '}';

value = pstrdup(strinfo.data);
}
Expand Down Expand Up @@ -1258,10 +1266,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs
if (getPathElementString(jb, tmpPath, &strinfo, false) == 0)
{
/* postgres array is wrapped with curly brackets instead */
if (strinfo.data[0] == '[')
if (strinfo.len > 0 && strinfo.data[0] == '[')
strinfo.data[0] = '{';
if (strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len -1] = '}';
if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len - 1] = '}';

value = pstrdup(strinfo.data);
}
Expand Down Expand Up @@ -1485,10 +1493,10 @@ parseDBZDML(Jsonb * jb, char op, ConnectorType type, Jsonb * source, bool isfirs
if (getPathElementString(jb, tmpPath, &strinfo, false) == 0)
{
/* postgres array is wrapped with curly brackets instead */
if (strinfo.data[0] == '[')
if (strinfo.len > 0 && strinfo.data[0] == '[')
strinfo.data[0] = '{';
if (strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len -1] = '}';
if (strinfo.len > 0 && strinfo.data[strinfo.len - 1] == ']')
strinfo.data[strinfo.len - 1] = '}';

value = pstrdup(strinfo.data);
}
Expand Down
145 changes: 84 additions & 61 deletions src/backend/converter/format_converter.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,9 @@ static long long derive_value_from_byte(const unsigned char * bytes, int len);
static char * derive_decimal_string_from_byte(const unsigned char *bytes, int len);
static void reverse_byte_array(unsigned char * array, int length);
static void trim_leading_zeros(char *str);
static void prepend_zeros(char *str, int num_zeros);
static void byte_to_binary(unsigned char byte, char * binary_str);
static void bytes_to_binary_string(const unsigned char * bytes,
size_t len, char * binary_str);
size_t len, char * binary_str, bool trim);
static char * transform_data_expression(const char * remoteObjid, const char * colname);
static void populate_primary_keys(StringInfoData * strinfo, const char * id,
const char * jsonin, bool alter, bool isinline);
Expand All @@ -367,7 +366,7 @@ static void expand_struct_value(char * in, DBZ_DML_COLUMN_VALUE * colval,
ConnectorType conntype);
static char * handle_base64_to_numeric_with_scale(const char * in, int scale);
static char * handle_string_to_numeric(const char * in, bool addquote);
static char * handle_base64_to_bit(const char * in, bool addquote, int typemod);
static char * handle_base64_to_bit(const char * in, bool addquote, int typemod, bool padzero);
static char * handle_string_to_bit(const char * in, bool addquote);
static char * handle_numeric_to_bit(const char * in, bool addquote);
static char * construct_datestr(long long input, bool addquote, int timerep);
Expand Down Expand Up @@ -637,32 +636,6 @@ trim_leading_zeros(char *str)
str[j] = '\0';
}

/*
* prepend_zeros
*
* prepend zeros to the given string
*/
static void
prepend_zeros(char *str, int num_zeros)
{
int original_len = strlen(str);
int new_len = original_len + num_zeros;
char * temp = palloc0(new_len + 1);

for (int i = 0; i < num_zeros; i++)
{
temp[i] = '0';
}

for (int i = 0; i < original_len; i++)
{
temp[i + num_zeros] = str[i];
}
temp[new_len] = '\0';
strcpy(str, temp);
pfree(temp);
}

/*
* byte_to_binary
*
Expand All @@ -684,7 +657,7 @@ byte_to_binary(unsigned char byte, char * binary_str)
* convert the given bytes to a binary string with 1s and 0s
*/
static void
bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_str)
bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_str, bool trim)
{
char byte_str[9];
size_t i = 0;
Expand All @@ -695,6 +668,9 @@ bytes_to_binary_string(const unsigned char * bytes, size_t len, char * binary_st
byte_to_binary(bytes[i], byte_str);
strcat(binary_str, byte_str);
}

if (trim)
trim_leading_zeros(binary_str);
}

/*
Expand Down Expand Up @@ -2042,42 +2018,50 @@ handle_string_to_numeric(const char * in, bool addquote)
}

static char *
handle_base64_to_bit(const char * in, bool addquote, int typemod)
handle_base64_to_bit(const char * in, bool addquote, int typemod, bool padzero)
{
int tmpoutlen = pg_b64_dec_len(strlen(in));
unsigned char * tmpout = (unsigned char *) palloc0(tmpoutlen);
char * out = NULL;
int extrazeros = 0;

#if SYNCHDB_PG_MAJOR_VERSION >= 1800
tmpoutlen = pg_b64_decode(in, strlen(in), tmpout, tmpoutlen);
#else
tmpoutlen = pg_b64_decode(in, strlen(in), (char*)tmpout, tmpoutlen);
#endif

if (padzero)
extrazeros = (typemod - (tmpoutlen * 8));

if (addquote)
{
/* 8 bits per byte + 2 single quotes + b + terminating null */
char * tmp = NULL;
out = (char *) palloc0((tmpoutlen * 8) + 2 + 1 + 1);
tmp = out;
/* 8 bits per byte + 2 single quotes + b + extra zeros + terminating null */
out = (char *) palloc0((tmpoutlen * 8) + 2 + 1 + extrazeros + 1);

strcat(out, "'b");
out += 2;

/* zeros */
memset(out, '0', extrazeros);

/* bit value */
reverse_byte_array(tmpout, tmpoutlen);
strcat(tmp, "'b");
tmp += 2;
bytes_to_binary_string(tmpout, tmpoutlen, tmp);
trim_leading_zeros(tmp);
if (strlen(tmp) < typemod)
prepend_zeros(tmp, typemod - strlen(tmp));

strcat(tmp, "'");
bytes_to_binary_string(tmpout, tmpoutlen, out + extrazeros, !padzero);

strcat(out, "'");
}
else
{
/* 8 bits per byte + terminating null */
out = (char *) palloc0(tmpoutlen * 8 + 1);
/* 8 bits per byte + extra zeros + terminating null */
out = (char *) palloc0((tmpoutlen * 8) + extrazeros + 1);

/* zeros */
memset(out, '0', extrazeros);

/* bit value */
reverse_byte_array(tmpout, tmpoutlen);
bytes_to_binary_string(tmpout, tmpoutlen, out);
trim_leading_zeros(out);
if (strlen(out) < typemod)
prepend_zeros(out, typemod - strlen(out));
bytes_to_binary_string(tmpout, tmpoutlen, out + extrazeros, !padzero);
}
pfree(tmpout);
return out;
Expand Down Expand Up @@ -2823,12 +2807,14 @@ handle_data_by_type_category(char * in, DBZ_DML_COLUMN_VALUE * colval, Connector
case DBZTYPE_STRUCT:
{
expand_struct_value(in, colval, conntype);
out = handle_base64_to_bit(colval->value, addquote, colval->typemod);
out = handle_base64_to_bit(colval->value, addquote, colval->typemod,
conntype == TYPE_MYSQL ? true : false);
break;
}
case DBZTYPE_BYTES:
{
out = handle_base64_to_bit(in, addquote, colval->typemod);
out = handle_base64_to_bit(in, addquote, colval->typemod,
conntype == TYPE_MYSQL ? true : false);
break;
}
case DBZTYPE_STRING:
Expand Down Expand Up @@ -3008,12 +2994,14 @@ processDataByType(DBZ_DML_COLUMN_VALUE * colval, bool addquote, char * remoteObj
case DBZTYPE_STRUCT:
{
expand_struct_value(in, colval, type);
out = handle_base64_to_bit(colval->value, addquote, colval->typemod);
out = handle_base64_to_bit(colval->value, addquote, colval->typemod,
type == TYPE_MYSQL ? true : false);
break;
}
case DBZTYPE_BYTES:
{
out = handle_base64_to_bit(in, addquote, colval->typemod);
out = handle_base64_to_bit(in, addquote, colval->typemod,
type == TYPE_MYSQL ? true : false);
break;
}
case DBZTYPE_STRING:
Expand Down Expand Up @@ -3408,15 +3396,29 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type)
{
if (synchdb_dml_use_spi)
{
bool atleastone = false;

/* --- Convert to use SPI to handler DML --- */
appendStringInfo(&strinfo, "INSERT INTO %s(", dbzdml->mappedObjectId);
foreach(cell, dbzdml->columnValuesAfter)
{
DBZ_DML_COLUMN_VALUE * colval = (DBZ_DML_COLUMN_VALUE *) lfirst(cell);
appendStringInfo(&strinfo, "%s,", colval->name);
atleastone = true;
}

if (atleastone)
{
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;
}
else
{
elog(WARNING, "no column data is provided for %s. Insert skipped", dbzdml->mappedObjectId);
pfree(strinfo.data);
destroyPGDML(pgdml);
return NULL;
}
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;
appendStringInfo(&strinfo, ") VALUES (");

foreach(cell, dbzdml->columnValuesAfter)
Expand All @@ -3435,9 +3437,18 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type)
}
}
/* remove extra "," */
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;

if (atleastone)
{
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;
}
else
{
elog(WARNING, "no column data is provided for %s. Insert skipped", dbzdml->mappedObjectId);
pfree(strinfo.data);
destroyPGDML(pgdml);
return NULL;
}
appendStringInfo(&strinfo, ");");
}
else
Expand Down Expand Up @@ -3571,10 +3582,22 @@ convert2PGDML(DBZ_DML * dbzdml, ConnectorType type)
{
appendStringInfo(&strinfo, "%s,", "null");
}
atleastone = true;
}

if (atleastone)
{
/* remove extra "," */
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;
}
else
{
elog(WARNING, "no column data is provided for %s. Update skipped", dbzdml->mappedObjectId);
pfree(strinfo.data);
destroyPGDML(pgdml);
return NULL;
}
/* remove extra "," */
strinfo.data[strinfo.len - 1] = '\0';
strinfo.len = strinfo.len - 1;

appendStringInfo(&strinfo, " WHERE ");
foreach(cell, dbzdml->columnValuesBefore)
Expand Down
8 changes: 8 additions & 0 deletions src/backend/synchdb/synchdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,14 @@ main_loop(ConnectorType connectorType, ConnectionInfo *connInfo, char * snapshot
&myBatchInfo, &myBatchStats,
connInfo->flag);

/*
* postgres connector under debezium snapshot engine requires user to create
* schema manually, so there is not really a schema sync action here, so we
* just mark it as done for now.
*/
if (connectorType == TYPE_POSTGRES && (connInfo->flag & CONNFLAG_SCHEMA_SYNC_MODE))
set_shm_connector_state(myConnectorId, STATE_SCHEMA_SYNC_DONE);

/*
* if a valid batchid is set by dbz_engine_get_change(), it means we have
* successfully completed a batch change request and we shall notify dbz
Expand Down
Loading
Loading