Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
dbz-engine/target/
# Generated binaries
*.o
*.so
*.bc

dbz-engine/target/
testenv/olr/oradata/
testenv/olr/checkpoint/
testenv/olr/olrswap/
src/test/pytests/synchdbtests/__pycache__/
src/test/pytests/synchdbtests/t/__pycache__/
src/backend/debezium/target/
synchdb_testdir/
synchdb_testdir/
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,11 @@ oraclecheck-benchmark:
olrcheck-benchmark:
$(MAKE) dbcheck-tpcc DB=olr

.PHONY: clean_bc
clean_bc:
rm -rf $(patsubst %.o,%.bc, $(OBJS))

clean: clean_bc clean_dbz
ifeq ($(WITH_OLR), 1)
clean: clean_oracle_parser
endif
2 changes: 1 addition & 1 deletion src/backend/debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.example</groupId>
<artifactId>dbz-engine</artifactId>
<packaging>jar</packaging>
<version>1.0.0</version>
<version>1.1.0</version>
<name>dbz-engine</name>
<url>http://maven.apache.org</url>
<dependencies>
Expand Down
19 changes: 19 additions & 0 deletions src/backend/debezium/src/main/java/com/example/DebeziumRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,25 @@ public void jvmMemDump()
{
checkMemoryStatus();
}

public void changeLogLevel(int level)
{
switch (level)
{
case LOG_LEVEL_ALL: logger.setLevel(Level.ALL); break;
case LOG_LEVEL_DEBUG: logger.setLevel(Level.DEBUG); break;
case LOG_LEVEL_INFO: logger.setLevel(Level.INFO); break;
case LOG_LEVEL_ERROR: logger.setLevel(Level.ERROR); break;
case LOG_LEVEL_FATAL: logger.setLevel(Level.FATAL); break;
case LOG_LEVEL_OFF: logger.setLevel(Level.OFF); break;
case LOG_LEVEL_TRACE: logger.setLevel(Level.TRACE); break;
default:
case LOG_LEVEL_UNDEF:
case LOG_LEVEL_WARN: logger.setLevel(Level.WARN); break;
}
logger.warn("DBZ log level changed to " + logger.getLevel());
}

public static void main(String[] args)
{
/* testing code can be put here */
Expand Down
123 changes: 123 additions & 0 deletions src/backend/synchdb/synchdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ PG_FUNCTION_INFO_V1(synchdb_add_infinispan);
PG_FUNCTION_INFO_V1(synchdb_del_infinispan);
PG_FUNCTION_INFO_V1(synchdb_translate_datatype);
PG_FUNCTION_INFO_V1(synchdb_set_snapstats);
PG_FUNCTION_INFO_V1(synchdb_set_dbz_loglevel);

/* Global variables */
SynchdbSharedState *sdb_state = NULL; /* Pointer to shared-memory state. */
Expand Down Expand Up @@ -1244,6 +1245,38 @@ dbz_engine_memory_dump(void)
(*env)->CallVoidMethod(env, obj, jvmMemDump);
}

/**
* dbz_engine_set_loglevel - Set Debezium log level
*
* This function set Debezium runner's log level during run time
*/
static int
dbz_engine_set_loglevel(int level)
{
jmethodID changeLogLevel;

if (!jvm)
{
elog(WARNING, "jvm not initialized");
return -1;
}
if (!env)
{
elog(WARNING, "jvm env not initialized");
return -1;
}

changeLogLevel = (*env)->GetMethodID(env, cls, "changeLogLevel", "(I)V");
if (changeLogLevel == NULL)
{
elog(WARNING, "Failed to find changeLogLevel method");
return -1;
}

(*env)->CallVoidMethod(env, obj, changeLogLevel, level);
return 0;
}

/*
* synchdb_state_tupdesc - Create a TupleDesc for SynchDB state information
*
Expand Down Expand Up @@ -1476,6 +1509,8 @@ connectorStateAsString(ConnectorState state)
return "schema sync";
case STATE_RELOAD_OBJMAP:
return "reloading objmap";
case STATE_DBZ_LOGLEVEL_UPDATE:
return "updating dbz log level";
}
return "UNKNOWN";
}
Expand Down Expand Up @@ -1843,6 +1878,16 @@ processRequestInterrupt(ConnectionInfo *connInfo, ConnectorType type, int connec
fc_load_objmap(connInfo->name, type);
set_shm_connector_state(connectorId, oldstate);
}
else if (reqcopy->reqstate == STATE_DBZ_LOGLEVEL_UPDATE)
{
ConnectorState oldstate = get_shm_connector_state_enum(connectorId);
int level = atoi(reqcopy->reqdata);

elog(LOG, "Updating log level for %s connector to %d", connInfo->name, level);
set_shm_connector_state(connectorId, STATE_DBZ_LOGLEVEL_UPDATE);
dbz_engine_set_loglevel(level);
set_shm_connector_state(connectorId, oldstate);
}
else
{
/* unsupported request state combinations */
Expand Down Expand Up @@ -6558,3 +6603,81 @@ synchdb_set_snapstats(PG_FUNCTION_ARGS)

PG_RETURN_VOID();
}

/*
* synchdb_set_dbz_loglevel
*
* This function dynamically changes the Debezium log4j log level at runtime
* for the specified connector without requiring a restart.
*/
Datum
synchdb_set_dbz_loglevel(PG_FUNCTION_ARGS)
{
int connectorId = -1;
pid_t pid;
SynchdbRequest *req;
int level;
text *level_text;
char *level_str;

Name name = PG_GETARG_NAME(0);
level_text = PG_GETARG_TEXT_PP(1);
level_str = text_to_cstring(level_text);

/* Convert level string to integer */
if (pg_strcasecmp(level_str, "all") == 0)
level = LOG_LEVEL_ALL;
else if (pg_strcasecmp(level_str, "debug") == 0)
level = LOG_LEVEL_DEBUG;
else if (pg_strcasecmp(level_str, "info") == 0)
level = LOG_LEVEL_INFO;
else if (pg_strcasecmp(level_str, "warn") == 0)
level = LOG_LEVEL_WARN;
else if (pg_strcasecmp(level_str, "error") == 0)
level = LOG_LEVEL_ERROR;
else if (pg_strcasecmp(level_str, "fatal") == 0)
level = LOG_LEVEL_FATAL;
else if (pg_strcasecmp(level_str, "off") == 0)
level = LOG_LEVEL_OFF;
else if (pg_strcasecmp(level_str, "trace") == 0)
level = LOG_LEVEL_TRACE;
else
ereport(ERROR,
(errmsg("invalid log level \"%s\"", level_str),
errhint("Valid levels: all, trace, debug, info, warn, error, fatal, off")));

synchdb_init_shmem();
if (!sdb_state)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to init or attach to synchdb shared memory")));

connectorId = get_shm_connector_id_by_name(NameStr(*name), get_database_name(MyDatabaseId));
if (connectorId < 0)
ereport(ERROR,
(errmsg("dbz connector (%s) does not have connector ID assigned",
NameStr(*name)),
errhint("use synchdb_start_engine_bgw() to assign one first")));

pid = get_shm_connector_pid(connectorId);
if (pid == InvalidPid)
ereport(ERROR,
(errmsg("dbz connector (%s) is not running", NameStr(*name)),
errhint("use synchdb_start_engine_bgw() to start a worker first")));

req = &(sdb_state->connectors[connectorId].req);
if (req->reqstate != STATE_UNDEF)
ereport(ERROR,
(errmsg("an active request is currently active for connector %s",
NameStr(*name)),
errhint("wait for it to finish and try again later")));

LWLockAcquire(&sdb_state->lock, LW_EXCLUSIVE);
req->reqstate = STATE_DBZ_LOGLEVEL_UPDATE;
snprintf(req->reqdata, SYNCHDB_ERRMSG_SIZE, "%d", level);
LWLockRelease(&sdb_state->lock);

elog(WARNING, "sent loglevel update request to dbz connector (%s): %s",
NameStr(*name), level_str);
PG_RETURN_INT32(0);
}
7 changes: 4 additions & 3 deletions src/include/synchdb/synchdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
*/

#define SYNCHDB_METADATA_DIR "pg_synchdb"
#define DBZ_ENGINE_JAR_FILE "dbz-engine-1.0.0.jar"
#define DBZ_ENGINE_JAR_FILE "dbz-engine-1.1.0.jar"
#define ORACLE_RAW_PARSER_LIB "libsynchdb_oracle_parser.so"
#define MAX_PATH_LENGTH 1024
#define MAX_JAVA_OPTION_LENGTH 256
Expand Down Expand Up @@ -119,8 +119,9 @@ typedef enum _connectorState
STATE_OFFSET_UPDATE,/* in this state when user requests offset update */
STATE_RESTARTING, /* connector is restarting with new snapshot mode */
STATE_MEMDUMP, /* connector is dumping jvm heap memory info */
STATE_SCHEMA_SYNC_DONE, /* connect has completed schema sync as requested */
STATE_RELOAD_OBJMAP, /* connect is reloading object mapping */
STATE_SCHEMA_SYNC_DONE, /* connector has completed schema sync as requested */
STATE_RELOAD_OBJMAP, /* connector is reloading object mapping */
STATE_DBZ_LOGLEVEL_UPDATE, /* connector is updating debezium log4j log level */
} ConnectorState;

/**
Expand Down
4 changes: 4 additions & 0 deletions synchdb--1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ CREATE OR REPLACE FUNCTION synchdb_log_jvm_meminfo(name) RETURNS int
AS '$libdir/synchdb'
LANGUAGE C IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION synchdb_set_dbz_loglevel(name, text) RETURNS int
AS '$libdir/synchdb'
LANGUAGE C IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION synchdb_get_stats() RETURNS SETOF record
AS '$libdir/synchdb'
LANGUAGE C IMMUTABLE STRICT;
Expand Down
Loading