feat(replication): Add support for TEMPORARY replication slots#515
feat(replication): Add support for TEMPORARY replication slots#515akselbor wants to merge 17 commits intosupabase:mainfrom
TEMPORARY replication slots#515Conversation
iambriccardo
left a comment
There was a problem hiding this comment.
Thanks for the contribution! Are there other types of slots?
Left some comments around backward compatibility.
etl-api/src/configs/pipeline.rs
Outdated
| #[schema(example = "my_publication")] | ||
| #[serde(deserialize_with = "crate::utils::trim_string")] | ||
| pub publication_name: String, | ||
| pub temporary_replication_slot: bool, |
There was a problem hiding this comment.
Are there other kind of replication slots? Asking since having a boolean might give us less flexibility in the future to support other replication slots types.
There was a problem hiding this comment.
Hmmm as per the Postgres docs (here) there are some other knobs that can be turned when creating the replication slots:
CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL | LOGICAL output_plugin } [ ( option [, ...] ) ]
Create a physical or logical replication slot. See [Section 26.2.6](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS) for more about replication slots.
slot_name
The name of the slot to create. Must be a valid replication slot name (see [Section 26.2.6.1](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION)).
output_plugin
The name of the output plugin used for logical decoding (see [Section 47.6](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html)).
TEMPORARY
Specify that this replication slot is a temporary one. Temporary slots are not saved to disk and are automatically dropped on error or when the session has finished.
The following options are supported:
TWO_PHASE [ boolean ]
If true, this logical replication slot supports decoding of two-phase commit. With this option, commands related to two-phase commit such as PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED are decoded and transmitted. The transaction will be decoded and transmitted at PREPARE TRANSACTION time. The default is false.
RESERVE_WAL [ boolean ]
If true, this physical replication slot reserves WAL immediately. Otherwise, WAL is only reserved upon connection from a streaming replication client. The default is false.
SNAPSHOT { 'export' | 'use' | 'nothing' }
Decides what to do with the snapshot created during logical slot initialization. 'export', which is the default, will export the snapshot for use in other sessions. This option can't be used inside a transaction. 'use' will use the snapshot for the current transaction executing the command. This option must be used in a transaction, and CREATE_REPLICATION_SLOT must be the first command run in that transaction. Finally, 'nothing' will just use the snapshot for logical decoding as normal but won't do anything else with it.
FAILOVER [ boolean ]
If true, the slot is enabled to be synced to the standbys so that logical replication can be resumed after failover. The default is false.
I would guess that etl will limit itself to logical replication, so the physical replication is irrelevant. Some of the others might be relevant, such as FAILOVER (to enable clean failover if someone is running a high-availability postgres cluster using e.g., cloudnative-pg). But iirc the etl currently couldn't support this without adding some kind of reconnect logic anyhow.
But I agree with your point! It could make sense to make it more future-proof wrt. other existing (or future) replication slot options later, by changing the ReplicationSlotConfig to something like:
pub struct ReplicationSlotConfig {
// Making this private has the benefit of forcing creation through the Default trait. If we combine it with some
// builder methods we prevent future additions from being a breaking change. But could also just be pub
temporary: bool,
// extend with other attributes as needed in the future, e.g.,
// failover: bool
}
impl Default for ReplicationSlotConfig { /* ... */ }
impl ReplicationSlotConfig { /* ... */ }
etl-config/src/shared/pipeline.rs
Outdated
| #[derive(Clone, Copy, Debug, Deserialize, Serialize)] | ||
| pub enum ReplicationSlotConfig { |
There was a problem hiding this comment.
| #[derive(Clone, Copy, Debug, Deserialize, Serialize)] | |
| pub enum ReplicationSlotConfig { | |
| #[derive(Clone, Copy, Debug, Deserialize, Serialize)] | |
| #[serde(rename_all = "snake_case")] | |
| pub enum ReplicationSlotConfig { |
| #[derive(Clone, Copy, Debug, Deserialize, Serialize)] | ||
| pub enum ReplicationSlotConfig { | ||
| Temporary, | ||
| Permanent, |
There was a problem hiding this comment.
To main backward compatibility we should set the default to be Permanent so that if we read a config from the db, we default to Permanent.
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct StoredPipelineConfig { | ||
| pub publication_name: String, | ||
| pub replication_slot: ReplicationSlotConfig, |
There was a problem hiding this comment.
This will crash for existing stored configs.
| pub replication_slot: ReplicationSlotConfig, | |
| #[serde(default)] | |
| pub replication_slot: ReplicationSlotConfig, |
This has to be paired with the other comment that I left.
|
Based on your comments I think it might actually be worthwhile to change the It will be more future-proof in the event that we find it necessary/useful to expose more of the replication slot settings later. If you agree I'll update the struct and the related code (plus the comments regarding backwards compatibility above) |
|
Changed from an enum to a structure, i.e., with a default of non-temporary ( This should be better long-term in the event that we want to add additional options. Also implemented the changes mentioned above. |
TEMPORARY replication slotsTEMPORARY replication slots
|
Thanks for the updates, I will get to this PR once I finalize some other pieces of work! |
|
Great, thanks! |
iambriccardo
left a comment
There was a problem hiding this comment.
Left some other comments, just spent more time reviewing it closely.
Unfortunately given our current setup, we need to be very cautious about backward and forward compatibility.
etl-config/src/shared/pipeline.rs
Outdated
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub struct ReplicationSlotConfig { | ||
| pub temporary: bool, |
There was a problem hiding this comment.
I have tried searching online for variants, to keep our format open for the future we could do something like:
| pub temporary: bool, | |
| pub persistence: ReplicationSlotPersistence, |
And then the ReplicationSlotPersistence could be something like:
enum ReplicationSlotType {
Permanent,
Temporary
}
And that could be serialized as an internally tagged enum as snake case and with the identifier named something like type.
The reason why I am suggesting this is that if there is ever the case where persistence changes or we want a slight variation, having a boolean is painful to migrate from. Unfortunately me being overly strict is because we store this config in the middleware db and the migration paths are very tricky.
etl-api/src/configs/pipeline.rs
Outdated
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)] | ||
| pub struct ApiReplicationSlotConfig { | ||
| #[schema(example = false)] | ||
| pub temporary: bool, |
There was a problem hiding this comment.
For the API, it's fine to use this interface as of now.
etl-api/src/configs/pipeline.rs
Outdated
| #[schema(example = "my_publication")] | ||
| #[serde(deserialize_with = "crate::utils::trim_string")] | ||
| pub publication_name: String, | ||
| pub replication_slot: ApiReplicationSlotConfig, |
There was a problem hiding this comment.
Same here:
| pub replication_slot: ApiReplicationSlotConfig, | |
| #[serde(default) | |
| pub replication_slot: ApiReplicationSlotConfig, |
Otherwise if the value is missing, it will crash existing API consumers.
|
Yeah I get that, no worries! I'll get to these when I have time |
| # Pick docker compose (plugin) if available, else docker-compose (standalone) | ||
| if command -v docker >/dev/null 2>&1 && docker compose version >/dev/null 2>&1; then | ||
| DOCKER_COMPOSE=(docker compose) | ||
| elif command -v docker-compose >/dev/null 2>&1; then | ||
| DOCKER_COMPOSE=(docker-compose) | ||
| else | ||
| echo >&2 "❌ Error: Docker Compose is not installed." | ||
| echo >&2 "Please install it using your system's package manager." | ||
| echo >&2 "Install either Docker Compose v2 (docker compose) or legacy docker-compose." |
There was a problem hiding this comment.
Unrelated, but added this since I was tired of having unstaged local updates to the init file.
|
Note: failing BigQuery tests appear to be caused by #520 |
📝 WalkthroughWalkthroughThis PR adds replication slot configuration support by introducing Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
etl/src/lib.rs (1)
48-97: Remove the Rust code example from crate docs.Our guidelines prohibit embedded code samples; replace this block with a short prose pointer to the
etl-examplescrate.As per coding guidelines, avoid code examples in Rust documentation.♻️ Suggested edit
-//! # Basic Usage Example -//! -//! ```rust,no_run -//! use etl::{ -//! config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig, TableSyncCopyConfig, ReplicationSlotConfig}, -//! destination::memory::MemoryDestination, -//! pipeline::Pipeline, -//! store::both::memory::MemoryStore, -//! }; -//! -//! #[tokio::main] -//! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! // Configure Postgres connection -//! -//! let pg_config = PgConnectionConfig { -//! host: "localhost".to_string(), -//! port: 5432, -//! name: "mydb".to_string(), -//! username: "postgres".to_string(), -//! password: Some("password".to_string().into()), -//! tls: TlsConfig { enabled: false, trusted_root_certs: String::new() }, -//! keepalive: None -//! }; -//! -//! // Create memory-based store and destination for testing -//! let store = MemoryStore::new(); -//! let destination = MemoryDestination::new(); -//! -//! // Configure the pipeline -//! let config = PipelineConfig { -//! id: 1, -//! publication_name: "my_publication".to_string(), -//! replication_slot: ReplicationSlotConfig::default(), -//! pg_connection: pg_config, -//! batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 }, -//! table_error_retry_delay_ms: 10000, -//! table_error_retry_max_attempts: 5, -//! max_table_sync_workers: 4, -//! table_sync_copy: TableSyncCopyConfig::IncludeAllTables, -//! }; -//! -//! // Create and start the pipeline -//! let mut pipeline = Pipeline::new(config, store, destination); -//! pipeline.start().await?; -//! -//! // Pipeline will run until stopped -//! pipeline.wait().await?; -//! -//! Ok(()) -//! } -//! ``` +//! # Basic Usage +//! +//! See the `etl-examples` crate for a complete, runnable pipeline setup.
🤖 Fix all issues with AI agents
In `@etl-api/src/configs/pipeline.rs`:
- Around line 1-4: The import of ReplicationSlotPersistence is coming from
etl::config but should be imported directly from etl_config::shared for
consistency with ReplicationSlotConfig; update the use statements so
ReplicationSlotPersistence is added to the etl_config::shared import list
(alongside BatchConfig, PgConnectionConfig, PipelineConfig,
ReplicationSlotConfig, TableSyncCopyConfig) and remove the separate
etl::config::ReplicationSlotPersistence import (and drop the etl::config import
entirely if it becomes unused).
- Around line 34-64: Add missing documentation comments to the API types and
derive Copy where appropriate: add doc comments for the
ApiReplicationSlotPersistence enum (and its variants Permanent and Temporary)
and for the ApiReplicationSlotConfig struct and its persistence field to explain
intent/usage, and update the derives on ApiReplicationSlotConfig (and optionally
ApiReplicationSlotPersistence) to include Copy so the API types match the
internal ReplicationSlotConfig/ReplicationSlotPersistence semantics; ensure the
referenced symbols are ApiReplicationSlotPersistence, ApiReplicationSlotConfig,
and the persistence field.
In `@etl-config/src/shared/pipeline.rs`:
- Around line 63-64: The documentation comment for the struct field
replication_slot should end with a period; update the doc comment above pub
replication_slot: ReplicationSlotConfig to read "Configuration for the
replication slot used." ensuring the trailing period is added to satisfy
punctuation guidelines and reference the ReplicationSlotConfig type in the same
comment.
- Around line 63-64: Add serde default for backward compatibility: annotate the
PipelineConfig struct's replication_slot field with #[serde(default)] so
deserializing older configs that omit replication_slot succeeds; update the
replication_slot declaration (type ReplicationSlotConfig) in PipelineConfig to
mirror StoredPipelineConfig's handling and ensure the default implementation for
ReplicationSlotConfig exists or derive/implement Default if needed.
- Around line 143-144: The doc comment for the field replication_slot is missing
a trailing period and the field lacks #[serde(default)] for backward
compatibility; update the PipelineConfig struct so the replication_slot field
has the #[serde(default)] attribute and terminate its documentation sentence
with a period, referencing the ReplicationSlotConfig type and the
replication_slot field to locate the change.
- Around line 181-193: Add Rustdoc comments to the new public types: document
the ReplicationSlotConfig struct to explain its purpose (holds replication slot
settings for a pipeline, e.g., control persistence behavior) and document the
persistence field if needed; document the ReplicationSlotPersistence enum to
describe the variants and their meaning (Permanent = durable replication slot
retained across restarts, Temporary = slot tied to session/lifetime and removed
afterwards). Use triple-slash /// comments placed immediately above the
ReplicationSlotConfig declaration and above the ReplicationSlotPersistence enum
and each variant to provide concise descriptions.
In `@etl/src/test_utils/pipeline.rs`:
- Around line 57-58: Update the field doc for replication_slot:
Option<ReplicationSlotConfig> to end with a period and clarify wording (e.g.,
"Replication slot configuration. Uses the default if not specified."). Locate
the doc comment immediately above the replication_slot field in the struct and
adjust the text to be concise, punctuated, and grammatically correct.
- Around line 125-131: Update the doc comment for the
with_replication_slot_config method: fix the grammar of the argument line and
link the type by replacing the current "* `replication_slot` - Configuration for
how replication slots behaviour" with a concise, corrected sentence that
references the type, e.g. "* `replication_slot` - Configuration for replication
slot behavior (see [`ReplicationSlotConfig`])." Ensure the backtick-bracket link
uses the exact type name `ReplicationSlotConfig` and keep punctuation
consistent.
In `@etl/tests/replication.rs`:
- Around line 178-223: Add a call to init_test_tracing() at the start of the
test_replication_client_temporary_slot_dropped_on_disconnect async test so its
logging/tracing setup matches the other tests; locate the test function
(test_replication_client_temporary_slot_dropped_on_disconnect) and insert a
single init_test_tracing() invocation as the first statement inside the function
before spawning the source database to ensure consistent tracing initialization.
- Around line 225-261: The test function
test_replication_client_permanent_slot_persisted_on_disconnect is missing the
standard test tracing initialization; add a call to init_test_tracing() at the
start of that async test (before spawn_source_database()) so it follows the same
tracing setup as other tests in this file.
| use etl::config::ReplicationSlotPersistence; | ||
| use etl_config::shared::{ | ||
| BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig, | ||
| }; |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider importing ReplicationSlotPersistence from etl_config::shared for consistency.
ReplicationSlotPersistence is defined in etl_config::shared but imported here via etl::config. While this works due to re-exports, importing directly from the source crate would be more consistent with how ReplicationSlotConfig is imported.
📝 Suggested fix
-use etl::config::ReplicationSlotPersistence;
use etl_config::shared::{
- BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig,
+ BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig,
+ ReplicationSlotPersistence, TableSyncCopyConfig,
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use etl::config::ReplicationSlotPersistence; | |
| use etl_config::shared::{ | |
| BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig, | |
| }; | |
| use etl_config::shared::{ | |
| BatchConfig, PgConnectionConfig, PipelineConfig, ReplicationSlotConfig, | |
| ReplicationSlotPersistence, TableSyncCopyConfig, | |
| }; |
🤖 Prompt for AI Agents
In `@etl-api/src/configs/pipeline.rs` around lines 1 - 4, The import of
ReplicationSlotPersistence is coming from etl::config but should be imported
directly from etl_config::shared for consistency with ReplicationSlotConfig;
update the use statements so ReplicationSlotPersistence is added to the
etl_config::shared import list (alongside BatchConfig, PgConnectionConfig,
PipelineConfig, ReplicationSlotConfig, TableSyncCopyConfig) and remove the
separate etl::config::ReplicationSlotPersistence import (and drop the
etl::config import entirely if it becomes unused).
| #[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub enum ApiReplicationSlotPersistence { | ||
| #[default] | ||
| Permanent, | ||
| Temporary, | ||
| } | ||
|
|
||
| impl From<ReplicationSlotPersistence> for ApiReplicationSlotPersistence { | ||
| fn from(value: ReplicationSlotPersistence) -> Self { | ||
| match value { | ||
| ReplicationSlotPersistence::Permanent => ApiReplicationSlotPersistence::Permanent, | ||
| ReplicationSlotPersistence::Temporary => ApiReplicationSlotPersistence::Temporary, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl From<ApiReplicationSlotPersistence> for ReplicationSlotPersistence { | ||
| fn from(value: ApiReplicationSlotPersistence) -> Self { | ||
| match value { | ||
| ApiReplicationSlotPersistence::Permanent => ReplicationSlotPersistence::Permanent, | ||
| ApiReplicationSlotPersistence::Temporary => ReplicationSlotPersistence::Temporary, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub struct ApiReplicationSlotConfig { | ||
| pub persistence: ApiReplicationSlotPersistence, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add documentation to API types and consider adding Copy derive.
The new API types are missing documentation. Also, ApiReplicationSlotConfig could derive Copy since it only contains a Copy field, matching the internal ReplicationSlotConfig.
📝 Suggested improvements
+/// Determines the persistence behavior of a replication slot in API requests.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub enum ApiReplicationSlotPersistence {
+ /// Slot persists until explicitly dropped.
#[default]
Permanent,
+ /// Slot is automatically dropped when the connection ends.
Temporary,
}
// ... conversions ...
+/// Configuration for the replication slot used by the pipeline in API requests.
-#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, Default)]
#[serde(rename_all = "snake_case")]
pub struct ApiReplicationSlotConfig {
+ /// Controls whether the replication slot persists across connection sessions.
pub persistence: ApiReplicationSlotPersistence,
}🤖 Prompt for AI Agents
In `@etl-api/src/configs/pipeline.rs` around lines 34 - 64, Add missing
documentation comments to the API types and derive Copy where appropriate: add
doc comments for the ApiReplicationSlotPersistence enum (and its variants
Permanent and Temporary) and for the ApiReplicationSlotConfig struct and its
persistence field to explain intent/usage, and update the derives on
ApiReplicationSlotConfig (and optionally ApiReplicationSlotPersistence) to
include Copy so the API types match the internal
ReplicationSlotConfig/ReplicationSlotPersistence semantics; ensure the
referenced symbols are ApiReplicationSlotPersistence, ApiReplicationSlotConfig,
and the persistence field.
| /// Configuration for the replication slot used | ||
| pub replication_slot: ReplicationSlotConfig, |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add trailing period to documentation comment.
Per coding guidelines, documentation comments should be properly punctuated.
📝 Suggested fix
- /// Configuration for the replication slot used
+ /// Configuration for the replication slot used.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Configuration for the replication slot used | |
| pub replication_slot: ReplicationSlotConfig, | |
| /// Configuration for the replication slot used. | |
| pub replication_slot: ReplicationSlotConfig, |
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, The documentation
comment for the struct field replication_slot should end with a period; update
the doc comment above pub replication_slot: ReplicationSlotConfig to read
"Configuration for the replication slot used." ensuring the trailing period is
added to satisfy punctuation guidelines and reference the ReplicationSlotConfig
type in the same comment.
Add #[serde(default)] for backward compatibility.
The replication_slot field in PipelineConfig is missing #[serde(default)]. When deserializing existing configs that don't have this field, deserialization will fail. This was addressed in StoredPipelineConfig but not here.
🔧 Proposed fix
/// Configuration for the replication slot used.
+ #[serde(default)]
pub replication_slot: ReplicationSlotConfig,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Configuration for the replication slot used | |
| pub replication_slot: ReplicationSlotConfig, | |
| /// Configuration for the replication slot used. | |
| #[serde(default)] | |
| pub replication_slot: ReplicationSlotConfig, |
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 63 - 64, Add serde default
for backward compatibility: annotate the PipelineConfig struct's
replication_slot field with #[serde(default)] so deserializing older configs
that omit replication_slot succeeds; update the replication_slot declaration
(type ReplicationSlotConfig) in PipelineConfig to mirror StoredPipelineConfig's
handling and ensure the default implementation for ReplicationSlotConfig exists
or derive/implement Default if needed.
| /// Configuration for the replication slot used | ||
| pub replication_slot: ReplicationSlotConfig, |
There was a problem hiding this comment.
Add #[serde(default)] and trailing period for consistency.
Same issues as PipelineConfig: missing #[serde(default)] for backward compatibility and missing period in documentation.
🔧 Proposed fix
- /// Configuration for the replication slot used
+ /// Configuration for the replication slot used.
+ #[serde(default)]
pub replication_slot: ReplicationSlotConfig,🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 143 - 144, The doc comment
for the field replication_slot is missing a trailing period and the field lacks
#[serde(default)] for backward compatibility; update the PipelineConfig struct
so the replication_slot field has the #[serde(default)] attribute and terminate
its documentation sentence with a period, referencing the ReplicationSlotConfig
type and the replication_slot field to locate the change.
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub struct ReplicationSlotConfig { | ||
| pub persistence: ReplicationSlotPersistence, | ||
| } | ||
|
|
||
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub enum ReplicationSlotPersistence { | ||
| #[default] | ||
| Permanent, | ||
| Temporary, | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add documentation to the new public types.
Per coding guidelines, all items (public and private) should be documented. The ReplicationSlotConfig struct and ReplicationSlotPersistence enum are missing documentation.
📝 Suggested documentation
+/// Configuration for the replication slot used by the pipeline.
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)]
#[serde(rename_all = "snake_case")]
pub struct ReplicationSlotConfig {
+ /// Controls whether the replication slot persists across connection sessions.
pub persistence: ReplicationSlotPersistence,
}
+/// Determines the persistence behavior of a replication slot.
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationSlotPersistence {
+ /// Slot persists until explicitly dropped.
#[default]
Permanent,
+ /// Slot is automatically dropped when the connection ends.
Temporary,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | |
| #[serde(rename_all = "snake_case")] | |
| pub struct ReplicationSlotConfig { | |
| pub persistence: ReplicationSlotPersistence, | |
| } | |
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | |
| #[serde(rename_all = "snake_case")] | |
| pub enum ReplicationSlotPersistence { | |
| #[default] | |
| Permanent, | |
| Temporary, | |
| } | |
| /// Configuration for the replication slot used by the pipeline. | |
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | |
| #[serde(rename_all = "snake_case")] | |
| pub struct ReplicationSlotConfig { | |
| /// Controls whether the replication slot persists across connection sessions. | |
| pub persistence: ReplicationSlotPersistence, | |
| } | |
| /// Determines the persistence behavior of a replication slot. | |
| #[derive(Clone, Copy, Debug, Deserialize, Serialize, Default)] | |
| #[serde(rename_all = "snake_case")] | |
| pub enum ReplicationSlotPersistence { | |
| /// Slot persists until explicitly dropped. | |
| #[default] | |
| Permanent, | |
| /// Slot is automatically dropped when the connection ends. | |
| Temporary, | |
| } |
🤖 Prompt for AI Agents
In `@etl-config/src/shared/pipeline.rs` around lines 181 - 193, Add Rustdoc
comments to the new public types: document the ReplicationSlotConfig struct to
explain its purpose (holds replication slot settings for a pipeline, e.g.,
control persistence behavior) and document the persistence field if needed;
document the ReplicationSlotPersistence enum to describe the variants and their
meaning (Permanent = durable replication slot retained across restarts,
Temporary = slot tied to session/lifetime and removed afterwards). Use
triple-slash /// comments placed immediately above the ReplicationSlotConfig
declaration and above the ReplicationSlotPersistence enum and each variant to
provide concise descriptions.
| /// Replication slot config. Uses default if not specified | ||
| replication_slot: Option<ReplicationSlotConfig>, |
There was a problem hiding this comment.
Add trailing period and tighten wording for the new field doc.
The field doc comment is missing a terminating period and could be slightly clearer.
💡 Proposed tweak
- /// Replication slot config. Uses default if not specified
+ /// Replication slot configuration. Uses default if not specified.As per coding guidelines, keep Rust comments punctuated.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Replication slot config. Uses default if not specified | |
| replication_slot: Option<ReplicationSlotConfig>, | |
| /// Replication slot configuration. Uses default if not specified. | |
| replication_slot: Option<ReplicationSlotConfig>, |
🤖 Prompt for AI Agents
In `@etl/src/test_utils/pipeline.rs` around lines 57 - 58, Update the field doc
for replication_slot: Option<ReplicationSlotConfig> to end with a period and
clarify wording (e.g., "Replication slot configuration. Uses the default if not
specified."). Locate the doc comment immediately above the replication_slot
field in the struct and adjust the text to be concise, punctuated, and
grammatically correct.
| /// Sets custom replication slot configuration. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `replication_slot` - Configuration for how replication slots behaviour | ||
| pub fn with_replication_slot_config(mut self, replication_slot: ReplicationSlotConfig) -> Self { | ||
| self.replication_slot = Some(replication_slot); |
There was a problem hiding this comment.
Fix argument doc grammar and link the type.
The argument description is grammatically incorrect and should link [ReplicationSlotConfig].
💡 Proposed tweak
- /// * `replication_slot` - Configuration for how replication slots behaviour
+ /// * `replication_slot` - [`ReplicationSlotConfig`] controlling how replication slots behave.As per coding guidelines, keep Rust doc comments concise, correct, punctuated, and link types.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Sets custom replication slot configuration. | |
| /// | |
| /// # Arguments | |
| /// | |
| /// * `replication_slot` - Configuration for how replication slots behaviour | |
| pub fn with_replication_slot_config(mut self, replication_slot: ReplicationSlotConfig) -> Self { | |
| self.replication_slot = Some(replication_slot); | |
| /// Sets custom replication slot configuration. | |
| /// | |
| /// # Arguments | |
| /// | |
| /// * `replication_slot` - [`ReplicationSlotConfig`] controlling how replication slots behave. | |
| pub fn with_replication_slot_config(mut self, replication_slot: ReplicationSlotConfig) -> Self { | |
| self.replication_slot = Some(replication_slot); |
🤖 Prompt for AI Agents
In `@etl/src/test_utils/pipeline.rs` around lines 125 - 131, Update the doc
comment for the with_replication_slot_config method: fix the grammar of the
argument line and link the type by replacing the current "* `replication_slot` -
Configuration for how replication slots behaviour" with a concise, corrected
sentence that references the type, e.g. "* `replication_slot` - Configuration
for replication slot behavior (see [`ReplicationSlotConfig`])." Ensure the
backtick-bracket link uses the exact type name `ReplicationSlotConfig` and keep
punctuation consistent.
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn test_replication_client_temporary_slot_dropped_on_disconnect() { | ||
| let database = spawn_source_database().await; | ||
|
|
||
| let client = PgReplicationClient::connect(database.config.clone()) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let slot_name = test_slot_name("my_slot"); | ||
| assert!(matches!( | ||
| client | ||
| .create_slot( | ||
| &slot_name, | ||
| ReplicationSlotConfig { | ||
| persistence: ReplicationSlotPersistence::Temporary | ||
| } | ||
| ) | ||
| .await, | ||
| Ok(CreateSlotResult { | ||
| consistent_point: _ | ||
| }) | ||
| )); | ||
|
|
||
| // Dropping the client should close the connection, causing our replication | ||
| // slot to be cleaned up | ||
| drop(client); | ||
|
|
||
| // This means that we should be able to execute the exact same sequence again, and create the same replication slot again | ||
| let client = PgReplicationClient::connect(database.config.clone()) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| assert!(matches!( | ||
| client | ||
| .create_slot( | ||
| &slot_name, | ||
| ReplicationSlotConfig { | ||
| persistence: ReplicationSlotPersistence::Temporary | ||
| } | ||
| ) | ||
| .await, | ||
| Ok(CreateSlotResult { | ||
| consistent_point: _ | ||
| }) | ||
| )); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add init_test_tracing() for consistency with other tests.
Other tests in this file call init_test_tracing() at the start. This test is missing that call.
📝 Suggested fix
#[tokio::test(flavor = "multi_thread")]
async fn test_replication_client_temporary_slot_dropped_on_disconnect() {
+ init_test_tracing();
let database = spawn_source_database().await;🤖 Prompt for AI Agents
In `@etl/tests/replication.rs` around lines 178 - 223, Add a call to
init_test_tracing() at the start of the
test_replication_client_temporary_slot_dropped_on_disconnect async test so its
logging/tracing setup matches the other tests; locate the test function
(test_replication_client_temporary_slot_dropped_on_disconnect) and insert a
single init_test_tracing() invocation as the first statement inside the function
before spawning the source database to ensure consistent tracing initialization.
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn test_replication_client_permanent_slot_persisted_on_disconnect() { | ||
| let database = spawn_source_database().await; | ||
|
|
||
| let client = PgReplicationClient::connect(database.config.clone()) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let slot_name = test_slot_name("my_slot"); | ||
| assert!(matches!( | ||
| client | ||
| .create_slot( | ||
| &slot_name, | ||
| ReplicationSlotConfig { | ||
| persistence: ReplicationSlotPersistence::Permanent | ||
| } | ||
| ) | ||
| .await, | ||
| Ok(CreateSlotResult { | ||
| consistent_point: _ | ||
| }) | ||
| )); | ||
|
|
||
| // Dropping the client should close the connection. Since we use a persisted replication slot | ||
| // it should still be present when we start a fresh connection | ||
| drop(client); | ||
|
|
||
| // This means that we should be able to execute the exact same sequence again, and the slot should still exist (i.e., we fail to create a new one) | ||
| let client = PgReplicationClient::connect(database.config.clone()) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| assert!(matches!( | ||
| client.create_slot(&slot_name, ReplicationSlotConfig { persistence: ReplicationSlotPersistence::Permanent }).await, | ||
| Err(ref err) if err.kind() == ErrorKind::ReplicationSlotAlreadyExists, | ||
| )); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add init_test_tracing() for consistency with other tests.
Other tests in this file call init_test_tracing() at the start. This test is missing that call.
📝 Suggested fix
#[tokio::test(flavor = "multi_thread")]
async fn test_replication_client_permanent_slot_persisted_on_disconnect() {
+ init_test_tracing();
let database = spawn_source_database().await;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[tokio::test(flavor = "multi_thread")] | |
| async fn test_replication_client_permanent_slot_persisted_on_disconnect() { | |
| let database = spawn_source_database().await; | |
| let client = PgReplicationClient::connect(database.config.clone()) | |
| .await | |
| .unwrap(); | |
| let slot_name = test_slot_name("my_slot"); | |
| assert!(matches!( | |
| client | |
| .create_slot( | |
| &slot_name, | |
| ReplicationSlotConfig { | |
| persistence: ReplicationSlotPersistence::Permanent | |
| } | |
| ) | |
| .await, | |
| Ok(CreateSlotResult { | |
| consistent_point: _ | |
| }) | |
| )); | |
| // Dropping the client should close the connection. Since we use a persisted replication slot | |
| // it should still be present when we start a fresh connection | |
| drop(client); | |
| // This means that we should be able to execute the exact same sequence again, and the slot should still exist (i.e., we fail to create a new one) | |
| let client = PgReplicationClient::connect(database.config.clone()) | |
| .await | |
| .unwrap(); | |
| assert!(matches!( | |
| client.create_slot(&slot_name, ReplicationSlotConfig { persistence: ReplicationSlotPersistence::Permanent }).await, | |
| Err(ref err) if err.kind() == ErrorKind::ReplicationSlotAlreadyExists, | |
| )); | |
| } | |
| #[tokio::test(flavor = "multi_thread")] | |
| async fn test_replication_client_permanent_slot_persisted_on_disconnect() { | |
| init_test_tracing(); | |
| let database = spawn_source_database().await; | |
| let client = PgReplicationClient::connect(database.config.clone()) | |
| .await | |
| .unwrap(); | |
| let slot_name = test_slot_name("my_slot"); | |
| assert!(matches!( | |
| client | |
| .create_slot( | |
| &slot_name, | |
| ReplicationSlotConfig { | |
| persistence: ReplicationSlotPersistence::Permanent | |
| } | |
| ) | |
| .await, | |
| Ok(CreateSlotResult { | |
| consistent_point: _ | |
| }) | |
| )); | |
| // Dropping the client should close the connection. Since we use a persisted replication slot | |
| // it should still be present when we start a fresh connection | |
| drop(client); | |
| // This means that we should be able to execute the exact same sequence again, and the slot should still exist (i.e., we fail to create a new one) | |
| let client = PgReplicationClient::connect(database.config.clone()) | |
| .await | |
| .unwrap(); | |
| assert!(matches!( | |
| client.create_slot(&slot_name, ReplicationSlotConfig { persistence: ReplicationSlotPersistence::Permanent }).await, | |
| Err(ref err) if err.kind() == ErrorKind::ReplicationSlotAlreadyExists, | |
| )); | |
| } |
🤖 Prompt for AI Agents
In `@etl/tests/replication.rs` around lines 225 - 261, The test function
test_replication_client_permanent_slot_persisted_on_disconnect is missing the
standard test tracing initialization; add a call to init_test_tracing() at the
start of that async test (before spawn_source_database()) so it follows the same
tracing setup as other tests in this file.
What kind of change does this PR introduce?
This adds a new attributereplication_slot: ReplicationSlotConfigtoPipelineConfigallowing users to choose whether to use regular (ReplicationSlotConfig::Permanent) or temporary (ReplicationSlotConfig::Temporary) replication slots. This is passed down to calls toPgReplicationClientwhere relevant (when creating/fetching slots).This adds a new attribute
replication_slot: ReplicationSlotConfigtoPipelineConfigallowing users to specify additional configuration that will be for the replication slot. Currently, we only haveReplicationSlotConfig { temporary: bool }. However, this pattern allows us to more easily extend with additional options in the future. Postgres provides several knobs to turn for replication slots (docs):LOGICAL/PHYSICAL(probably irrelevant)TEMPORARY(implemented here)FAILOVERBy making this a
struct, we can easily extend it with additional options down the line, with no/minimal breakage.What is the current behavior?
Only regular (non-temporary) replication slots are supported.
What is the new behavior?
Users can choose whether to use persisted/permanent or temporary replication slots. Having access to temporary replication slots can be useful for ephemeral use cases (e.g., using CDC to maintain an in-memory cache of select tables for an otherwise stateless application).
Additional context
PipelineConfigcargo test --workspace --all-features --no-fail-fast --exclude etl-destinationworks fine locally, but haven't tested theetl-destinationstests* I opted to use a simple boolean rather than theReplicationSlotConfigfor theFullApiPipelineConfigto avoid having to pull inutoipaas a dependency for theetl-configcrate.Summary by CodeRabbit
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.