forked from Hornetlabs/synchdb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsynchdb.h
More file actions
238 lines (215 loc) · 7.12 KB
/
synchdb.h
File metadata and controls
238 lines (215 loc) · 7.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
* synchdb.h
*
* Header file for the SynchDB synchronization system
*
* This file defines the core structures and functions used by SynchDB
* to manage database synchronization across different connector types.
*
* Key components:
* - Connector types and states
* - Shared state structures for different database connectors
* - Function prototypes for shared memory operations
*
* Copyright (c) 2024 Hornetlabs Technology, Inc.
*
*/
#ifndef SYNCHDB_SYNCHDB_H_
#define SYNCHDB_SYNCHDB_H_
#include "storage/lwlock.h"
/* Constants */
#define SYNCHDB_CONNINFO_NAME_SIZE 64
#define SYNCHDB_CONNINFO_HOSTNAME_SIZE 256
#define SYNCHDB_CONNINFO_USERNAME_SIZE 64
#define SYNCHDB_CONNINFO_PASSWORD_SIZE 128
#define SYNCHDB_CONNINFO_TABLELIST_SIZE 256
#define SYNCHDB_CONNINFO_RULEFILENAME_SIZE 64
#define SYNCHDB_CONNINFO_DB_NAME_SIZE 64
//#define SYNCHDB_MAX_ACTIVE_CONNECTORS 30
#define DEBEZIUM_SHUTDOWN_TIMEOUT_MSEC 100000
#define SYNCHDB_OFFSET_SIZE 256
#define SYNCHDB_ERRMSG_SIZE 256
#define SYNCHDB_SNAPSHOT_MODE_SIZE 32
#define SYNCHDB_DATATYPE_NAME_SIZE 64
#define SYNCHDB_OBJ_NAME_SIZE 128
#define SYNCHDB_OBJ_TYPE_SIZE 32
#define SYNCHDB_TRANSFORM_EXPRESSION_SIZE 256
#define SYNCHDB_JSON_PATH_SIZE 128
#define SYNCHDB_INVALID_BATCH_ID -1
/*
* ex: pg_synchdb/[connector]_[name]_offsets.dat
* pg_synchdb/mysql_mysqlconn_offsets.dat
*/
#define SYNCHDB_OFFSET_FILE_PATTERN "pg_synchdb/%s_%s_offsets.dat"
#define SYNCHDB_SECRET "930e62fb8c40086c23f543357a023c0c"
#define SYNCHDB_CONNINFO_TABLE "synchdb_conninfo"
/* Enumerations */
/**
* ConnectorType - Enum representing different types of database connectors
*/
typedef enum _connectorType
{
TYPE_UNDEF = 0,
TYPE_MYSQL,
TYPE_ORACLE,
TYPE_SQLSERVER,
} ConnectorType;
/**
* ConnectorState - Enum representing different states of a connector
*/
typedef enum _connectorState
{
STATE_UNDEF = 0,
STATE_STOPPED, /* connector is stopped */
STATE_INITIALIZING, /* connector is initializing java dbz runner engine */
STATE_PAUSED, /* connector paused until commanded to resume */
STATE_SYNCING, /* connector is polling changes from dbz engine */
STATE_PARSING, /* got a change event, try to parse it */
STATE_CONVERTING, /* parsing done, try to convert it to pg */
STATE_EXECUTING, /* conversion done, try to execute it on pg */
STATE_OFFSET_UPDATE,/* in this state when user requests offset update */
STATE_RESTARTING, /* connector is restarting with new snapshot mode */
STATE_MEMDUMP, /* connector is dumping jvm heap memory info */
} ConnectorState;
/**
* ConnectorStage - Enum representing different stages of connector
*/
typedef enum _connectorStage
{
STAGE_UNDEF = 0,
STAGE_INITIAL_SNAPSHOT,
STAGE_CHANGE_DATA_CAPTURE,
} ConnectorStage;
/**
* ConnectorStatistics - Enum representing different statistics of a connector
*/
typedef enum _connectorStatistics
{
STATS_UNDEF = 0,
STATS_DDL,
STATS_DML,
STATS_READ,
STATS_CREATE,
STATS_UPDATE,
STATS_DELETE,
STATS_BAD_CHANGE_EVENT,
STATS_TOTAL_CHANGE_EVENT,
STATS_BATCH_COMPLETION,
STATS_AVERAGE_BATCH_SIZE
} ConnectorStatistics;
/**
* BatchInfo - Structure containing the metadata of a batch change request
*/
typedef struct _BatchInfo
{
int batchId;
int batchSize;
} BatchInfo;
/**
* ConnectionInfo - DBZ Connection info. These are put in shared memory so
* connector background workers can access when they are spawned.
*/
typedef struct _ConnectionInfo
{
char name[SYNCHDB_CONNINFO_NAME_SIZE];
char hostname[SYNCHDB_CONNINFO_HOSTNAME_SIZE];
unsigned int port;
char user[SYNCHDB_CONNINFO_USERNAME_SIZE];
char pwd[SYNCHDB_CONNINFO_PASSWORD_SIZE];
char srcdb[SYNCHDB_CONNINFO_DB_NAME_SIZE];
char dstdb[SYNCHDB_CONNINFO_DB_NAME_SIZE];
char table[SYNCHDB_CONNINFO_TABLELIST_SIZE];
bool active;
char rulefile[SYNCHDB_CONNINFO_RULEFILENAME_SIZE];
} ConnectionInfo;
/**
* ConnectorName - Used to store as a List* of names for automatic connector
* resume feature
*/
typedef struct _ConnectorName
{
char name[SYNCHDB_CONNINFO_NAME_SIZE];
} ConnectorName;
/**
* ExtraConnectionInfo - Extra DBZ Connection info parameters read from the
* rule file (if specified). These won't be put in shared memory so they
* are declared as pointers.
*/
typedef struct _ExtraConnectionInfo
{
char * ssl_mode;
char * ssl_keystore;
char * ssl_keystore_pass;
char * ssl_truststore;
char * ssl_truststore_pass;
} ExtraConnectionInfo;
/**
* SynchdbRequest - Structure representing a request to change connector state
*/
typedef struct _SynchdbRequest
{
ConnectorState reqstate;
char reqdata[SYNCHDB_ERRMSG_SIZE];
ConnectionInfo reqconninfo;
} SynchdbRequest;
/**
* SynchdbRequest - Structure representing a statistic info per connector.
* If you add new stats values here, make sure to add the same to ConnectorStatistics
* enum above
*
* todo: to be persisted in future
*/
typedef struct _SynchdbStatistics
{
unsigned long long stats_ddl; /* number of DDL operations performed */
unsigned long long stats_dml; /* number of DML operations performed */
unsigned long long stats_read; /* READ events generated during initial snapshot */
unsigned long long stats_create; /* INSERT events generated during CDC */
unsigned long long stats_update; /* UPDATE events generated during CDC */
unsigned long long stats_delete; /* DELETE events generated during CDC */
unsigned long long stats_bad_change_event; /* number of bad change events */
unsigned long long stats_total_change_event;/* number of total change events */
unsigned long long stats_batch_completion; /* number of batches completed */
unsigned long long stats_average_batch_size;/* calculated average batch size: */
/* todo: more stats to be added */
} SynchdbStatistics;
/**
* Structure holding state information for connectors
*/
typedef struct _ActiveConnectors
{
pid_t pid;
ConnectorState state;
ConnectorStage stage;
ConnectorType type;
SynchdbRequest req;
char errmsg[SYNCHDB_ERRMSG_SIZE];
char dbzoffset[SYNCHDB_OFFSET_SIZE];
char snapshotMode[SYNCHDB_SNAPSHOT_MODE_SIZE];
ConnectionInfo conninfo;
SynchdbStatistics stats;
} ActiveConnectors;
/**
* SynchdbSharedState - Shared state information for synchdb background worker
*/
typedef struct _SynchdbSharedState
{
LWLock lock; /* mutual exclusion */
ActiveConnectors * connectors;
} SynchdbSharedState;
/* Function prototypes */
const char * get_shm_connector_name(ConnectorType type);
pid_t get_shm_connector_pid(int connectorId);
void set_shm_connector_pid(int connectorId, pid_t pid);
void set_shm_connector_errmsg(int connectorId, const char * err);
const char * get_shm_connector_errmsg(int connectorId);
void set_shm_connector_state(int connectorId, ConnectorState state);
const char * get_shm_connector_state(int connectorId);
void set_shm_dbz_offset(int connectorId);
const char * get_shm_dbz_offset(int connectorId);
ConnectorState get_shm_connector_state_enum(int connectorId);
const char* connectorTypeToString(ConnectorType type);
void set_shm_connector_stage(int connectorId, ConnectorStage stage);
ConnectorStage get_shm_connector_stage_enum(int connectorId);
void increment_connector_statistics(SynchdbStatistics * myStats, ConnectorStatistics which, int incby);
#endif /* SYNCHDB_SYNCHDB_H_ */