diff --git a/Cargo.lock b/Cargo.lock index 68f3c84cd8..98375ee5bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1109,6 +1109,20 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.8" @@ -3551,6 +3565,7 @@ dependencies = [ "chrono", "crc", "crossbeam-queue", + "dashmap", "either", "event-listener 5.2.0", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 316dc471e1..7b9e3578b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ uuid = "1.1.2" # Common utility crates dotenvy = { version = "0.15.0", default-features = false } +dashmap = "6.1" # Runtimes [workspace.dependencies.async-std] diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index d662861470..45fa8fe0d9 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -82,6 +82,7 @@ hashlink = "0.10.0" indexmap = "2.0" event-listener = "5.2.0" hashbrown = "0.15.0" +dashmap.workspace = true [dev-dependencies] sqlx = { workspace = true, features = ["postgres", "sqlite", "mysql", "migrate", "macros", "time", "uuid"] } diff --git a/sqlx-core/src/any/connection/mod.rs b/sqlx-core/src/any/connection/mod.rs index b6f795848a..dff9b3cc36 100644 --- a/sqlx-core/src/any/connection/mod.rs +++ b/sqlx-core/src/any/connection/mod.rs @@ -5,9 +5,8 @@ use crate::connection::{ConnectOptions, Connection}; use crate::error::Error; use crate::database::Database; -pub use backend::AnyConnectionBackend; - use crate::transaction::Transaction; +pub use backend::AnyConnectionBackend; mod backend; mod executor; diff --git a/sqlx-core/src/any/options.rs b/sqlx-core/src/any/options.rs index bb29d817c9..2526dbdc39 100644 --- a/sqlx-core/src/any/options.rs +++ b/sqlx-core/src/any/options.rs @@ -1,5 +1,5 @@ use crate::any::AnyConnection; -use crate::connection::{ConnectOptions, LogSettings}; +use crate::connection::{ConnectOptions, ConnectionStateReporter, LogSettings}; use crate::error::Error; use futures_core::future::BoxFuture; use log::LevelFilter; @@ -19,6 +19,7 @@ use url::Url; pub struct AnyConnectOptions { pub database_url: Url, pub log_settings: LogSettings, + pub state_reporter: Option, } impl FromStr for AnyConnectOptions { type Err = Error; @@ -29,6 +30,7 @@ impl FromStr for AnyConnectOptions { .parse::() .map_err(|e| Error::Configuration(e.into()))?, log_settings: LogSettings::default(), + state_reporter: None, }) } } @@ -40,6 +42,7 @@ impl ConnectOptions for AnyConnectOptions { Ok(AnyConnectOptions { database_url: url.clone(), log_settings: LogSettings::default(), + state_reporter: None, }) } @@ -62,4 +65,9 @@ impl ConnectOptions for AnyConnectOptions { self.log_settings.slow_statements_duration = duration; self } + + fn report_connection_state(mut self, reporter: ConnectionStateReporter) -> Self { + self.state_reporter = Some(reporter); + self + } } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index ce2aa6c629..6052716fd3 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,12 +1,15 @@ use crate::database::{Database, HasStatementCache}; use crate::error::Error; +use crate::pool::{PoolConnectionState, RawConnectionState, TimestampedStateEntry}; use crate::transaction::Transaction; +use dashmap::DashMap; use futures_core::future::BoxFuture; use log::LevelFilter; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, Instant}; use url::Url; /// Represents a single database connection. @@ -183,6 +186,54 @@ impl LogSettings { } } +pub struct ConnectionStateEntry { + pub id: usize, + pub raw: TimestampedStateEntry, + pub pool: Option>, +} + +#[derive(Clone)] +pub struct ConnectionStateReporter { + pub id: usize, + pub pool: Arc>>, + pub raw: Arc>>, +} + +impl Debug for ConnectionStateReporter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // skipping middleware_stack field for now + f.debug_struct("ConnectionStateReporter") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl ConnectionStateReporter { + pub fn report_pool_state(&self, state: PoolConnectionState) { + self.pool.insert( + self.id, + TimestampedStateEntry { + since: Instant::now(), + state, + }, + ); + } + pub fn report_raw_state(&self, state: RawConnectionState) { + if matches!(state, RawConnectionState::Closed) { + self.pool.remove(&self.id); + self.raw.remove(&self.id); + } else { + self.raw.insert( + self.id, + TimestampedStateEntry { + since: Instant::now(), + state, + }, + ); + } + } +} + pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug + Clone { type Connection: Connection + ?Sized; @@ -235,4 +286,6 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug + self.log_statements(LevelFilter::Off) .log_slow_statements(LevelFilter::Off, Duration::default()) } + + fn report_connection_state(self, reporter: ConnectionStateReporter) -> Self; } diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index bf3a6d4b1c..6a1c6073d0 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -5,12 +5,13 @@ use std::time::{Duration, Instant}; use crate::sync::AsyncSemaphoreReleaser; -use crate::connection::Connection; +use crate::connection::{Connection, ConnectionStateEntry, ConnectionStateReporter}; use crate::database::Database; use crate::error::Error; use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner}; use crate::pool::options::PoolConnectionMetadata; +use dashmap::DashMap; use std::future::Future; const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5); @@ -26,6 +27,7 @@ pub struct PoolConnection { pub(super) struct Live { pub(super) raw: DB::Connection, + pub(super) state_reporter: Option, pub(super) created_at: Instant, } @@ -40,6 +42,54 @@ pub(super) struct Floating { pub(super) guard: DecrementSizeGuard, } +#[derive(Clone, Default, Debug)] +pub struct ConnectionStateTracker { + pub pool: Arc>>, + pub raw: Arc>>, +} + +impl ConnectionStateTracker { + pub fn all_entries(&self) -> Vec { + let mut result = Vec::with_capacity(self.raw.len()); + for e in self.raw.iter() { + result.push(ConnectionStateEntry { + id: *e.key(), + raw: e.value().clone(), + pool: self.pool.get(e.key()).map(|x| x.value().clone()), + }); + } + result + } +} + +#[derive(Debug, Clone)] +pub struct TimestampedStateEntry { + pub since: Instant, + pub state: S, +} + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +pub enum PoolConnectionState { + FloatingLive, + FloatingIdle, + AttachedLive, + AttachedIdle, +} + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +pub enum RawConnectionState { + New, + Connected, + ReadyIdle, + ReadyInTx, + Error, + Running(String), + Closing, + Closed, +} + const EXPECT_MSG: &str = "BUG: inner connection already taken!"; impl Debug for PoolConnection { @@ -212,6 +262,7 @@ impl Drop for PoolConnection { impl Live { pub fn float(self, pool: Arc>) -> Floating { + self.report_pool_state(PoolConnectionState::FloatingLive); Floating { inner: self, // create a new guard from a previously leaked permit @@ -225,6 +276,12 @@ impl Live { idle_since: Instant::now(), } } + + pub fn report_pool_state(&self, state: PoolConnectionState) { + if let Some(reporter) = &self.state_reporter { + reporter.report_pool_state(state); + } + } } impl Deref for Idle { @@ -242,17 +299,26 @@ impl DerefMut for Idle { } impl Floating> { - pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard) -> Self { + pub fn new_live( + conn: DB::Connection, + guard: DecrementSizeGuard, + state_reporter: Option, + ) -> Self { + if let Some(reporter) = &state_reporter { + reporter.report_pool_state(PoolConnectionState::FloatingLive); + } Self { inner: Live { raw: conn, created_at: Instant::now(), + state_reporter, }, guard, } } pub fn reattach(self) -> PoolConnection { + self.report_pool_state(PoolConnectionState::AttachedLive); let Floating { inner, guard } = self; let pool = Arc::clone(&guard.pool); @@ -363,6 +429,8 @@ impl Floating> { pool: Arc>, permit: AsyncSemaphoreReleaser<'_>, ) -> Self { + idle.live + .report_pool_state(PoolConnectionState::FloatingIdle); Self { inner: idle, guard: DecrementSizeGuard::from_permit(pool, permit), @@ -374,6 +442,9 @@ impl Floating> { } pub fn into_live(self) -> Floating> { + self.inner + .live + .report_pool_state(PoolConnectionState::FloatingLive); Floating { inner: self.inner.live, guard: self.guard, diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index bbcc43134e..3edee6cde3 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -1,9 +1,9 @@ use super::connection::{Floating, Idle, Live}; -use crate::connection::ConnectOptions; use crate::connection::Connection; +use crate::connection::{ConnectOptions, ConnectionStateReporter}; use crate::database::Database; use crate::error::Error; -use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; +use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolConnectionState, PoolOptions}; use crossbeam_queue::ArrayQueue; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; @@ -33,6 +33,7 @@ pub(crate) struct PoolInner { pub(super) options: PoolOptions, pub(crate) acquire_time_level: Option, pub(crate) acquire_slow_level: Option, + pub(super) conn_counter: AtomicUsize, } impl PoolInner { @@ -62,6 +63,7 @@ impl PoolInner { acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level), acquire_slow_level: private_level_filter_to_trace_level(options.acquire_slow_level), options, + conn_counter: AtomicUsize::new(1), }; let pool = Arc::new(pool); @@ -208,6 +210,8 @@ impl PoolInner { let Floating { inner: idle, guard } = floating.into_idle(); + idle.live + .report_pool_state(PoolConnectionState::AttachedIdle); if self.idle_conns.push(idle).is_err() { panic!("BUG: connection queue overflow in release()"); } @@ -336,6 +340,16 @@ impl PoolInner { let mut backoff = Duration::from_millis(10); let max_backoff = deadline_as_timeout(deadline)? / 5; + let state_reporter = self + .options + .state_tracker + .as_ref() + .map(|t| ConnectionStateReporter { + id: self.conn_counter.fetch_add(1, Ordering::Acquire), + pool: t.pool.clone(), + raw: t.raw.clone(), + }); + loop { let timeout = deadline_as_timeout(deadline)?; @@ -347,6 +361,17 @@ impl PoolInner { .expect("write-lock holder panicked") .clone(); + let connect_options = if let Some(reporter) = &state_reporter { + Arc::new( + connect_options + .as_ref() + .clone() + .report_connection_state(reporter.clone()), + ) + } else { + connect_options + }; + // result here is `Result, TimeoutError>` // if this block does not return, sleep for the backoff timeout and try again match crate::rt::timeout(timeout, connect_options.connect()).await { @@ -365,7 +390,7 @@ impl PoolInner { }; match res { - Ok(()) => return Ok(Floating::new_live(raw, guard)), + Ok(()) => return Ok(Floating::new_live(raw, guard, state_reporter)), Err(error) => { tracing::error!(%error, "error returned from after_connect"); // The connection is broken, don't try to close nicely. diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index e998618413..a207652813 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -71,6 +71,9 @@ use crate::error::Error; use crate::transaction::Transaction; pub use self::connection::PoolConnection; +pub use self::connection::{ + ConnectionStateTracker, PoolConnectionState, RawConnectionState, TimestampedStateEntry, +}; use self::inner::PoolInner; #[doc(hidden)] pub use self::maybe::MaybePoolConnection; diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 96dbf8ee3d..452da7f9d0 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -1,6 +1,7 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; +use crate::pool::connection::ConnectionStateTracker; use crate::pool::inner::PoolInner; use crate::pool::Pool; use futures_core::future::BoxFuture; @@ -83,6 +84,7 @@ pub struct PoolOptions { pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, pub(crate) fair: bool, + pub(crate) state_tracker: Option>, pub(crate) parent_pool: Option>, } @@ -106,6 +108,7 @@ impl Clone for PoolOptions { max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, fair: self.fair, + state_tracker: self.state_tracker.clone(), parent_pool: self.parent_pool.clone(), } } @@ -161,6 +164,7 @@ impl PoolOptions { idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, + state_tracker: None, parent_pool: None, } } @@ -175,6 +179,11 @@ impl PoolOptions { self } + pub fn track_connections(mut self, state_tracker: Arc) -> Self { + self.state_tracker = Some(state_tracker); + self + } + /// Get the maximum number of connections that this pool should maintain pub fn get_max_connections(&self) -> u32 { self.max_connections diff --git a/sqlx-mysql/src/options/connect.rs b/sqlx-mysql/src/options/connect.rs index 116a49ccad..1819bef5ab 100644 --- a/sqlx-mysql/src/options/connect.rs +++ b/sqlx-mysql/src/options/connect.rs @@ -4,6 +4,7 @@ use crate::executor::Executor; use crate::{MySqlConnectOptions, MySqlConnection}; use futures_core::future::BoxFuture; use log::LevelFilter; +use sqlx_core::connection::ConnectionStateReporter; use sqlx_core::Url; use std::time::Duration; @@ -94,4 +95,9 @@ impl ConnectOptions for MySqlConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn report_connection_state(mut self, reporter: ConnectionStateReporter) -> Self { + self.state_reporter = Some(reporter); + self + } } diff --git a/sqlx-mysql/src/options/mod.rs b/sqlx-mysql/src/options/mod.rs index db2b20c19d..7d6916493d 100644 --- a/sqlx-mysql/src/options/mod.rs +++ b/sqlx-mysql/src/options/mod.rs @@ -1,3 +1,4 @@ +use sqlx_core::connection::ConnectionStateReporter; use std::path::{Path, PathBuf}; mod connect; @@ -80,6 +81,7 @@ pub struct MySqlConnectOptions { pub(crate) no_engine_substitution: bool, pub(crate) timezone: Option, pub(crate) set_names: bool, + pub(crate) state_reporter: Option, } impl Default for MySqlConnectOptions { @@ -111,6 +113,7 @@ impl MySqlConnectOptions { no_engine_substitution: true, timezone: Some(String::from("+00:00")), set_names: true, + state_reporter: None, } } diff --git a/sqlx-postgres/src/connection/establish.rs b/sqlx-postgres/src/connection/establish.rs index 1bc4172fbd..3746d4d087 100644 --- a/sqlx-postgres/src/connection/establish.rs +++ b/sqlx-postgres/src/connection/establish.rs @@ -1,4 +1,5 @@ use crate::HashMap; +use sqlx_core::pool::RawConnectionState; use crate::common::StatementCache; use crate::connection::{sasl, stream::PgStream}; @@ -16,9 +17,19 @@ use super::PgConnectionInner; impl PgConnection { pub(crate) async fn establish(options: &PgConnectOptions) -> Result { + options + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::New)); + // Upgrade to TLS if we were asked to and the server supports it let mut stream = PgStream::connect(options).await?; + options + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Connected)); + // To begin a session, a frontend opens a connection to the server // and sends a startup message. @@ -149,6 +160,7 @@ impl PgConnection { cache_type_info: HashMap::new(), cache_elem_type_to_array: HashMap::new(), log_settings: options.log_settings.clone(), + state_reporter: options.state_reporter.clone(), }), }) } diff --git a/sqlx-postgres/src/connection/executor.rs b/sqlx-postgres/src/connection/executor.rs index 97503a5004..ec8bc335c1 100644 --- a/sqlx-postgres/src/connection/executor.rs +++ b/sqlx-postgres/src/connection/executor.rs @@ -17,6 +17,7 @@ use futures_core::stream::BoxStream; use futures_core::Stream; use futures_util::{pin_mut, TryStreamExt}; use sqlx_core::arguments::Arguments; +use sqlx_core::pool::RawConnectionState; use sqlx_core::Either; use std::{borrow::Cow, sync::Arc}; @@ -205,6 +206,11 @@ impl PgConnection { let mut metadata: Arc; + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Running(query.to_string()))); + let format = if let Some(mut arguments) = arguments { // Check this before we write anything to the stream. // diff --git a/sqlx-postgres/src/connection/mod.rs b/sqlx-postgres/src/connection/mod.rs index c139f8e53d..b263862ba1 100644 --- a/sqlx-postgres/src/connection/mod.rs +++ b/sqlx-postgres/src/connection/mod.rs @@ -18,9 +18,9 @@ use crate::transaction::Transaction; use crate::types::Oid; use crate::{PgConnectOptions, PgTypeInfo, Postgres}; -pub(crate) use sqlx_core::connection::*; - pub use self::stream::PgStream; +pub(crate) use sqlx_core::connection::*; +use sqlx_core::pool::RawConnectionState; pub(crate) mod describe; mod establish; @@ -70,6 +70,8 @@ pub struct PgConnectionInner { pub(crate) transaction_depth: usize, log_settings: LogSettings, + + pub(crate) state_reporter: Option, } impl PgConnection { @@ -113,6 +115,13 @@ impl PgConnection { .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?; self.inner.transaction_status = message.decode::()?.transaction_status; + self.inner.state_reporter.as_ref().map(|r| { + r.report_raw_state(match self.inner.transaction_status { + TransactionStatus::Idle => RawConnectionState::ReadyIdle, + TransactionStatus::Transaction => RawConnectionState::ReadyInTx, + TransactionStatus::Error => RawConnectionState::Error, + }) + }); Ok(()) } @@ -147,19 +156,45 @@ impl Connection for PgConnection { // On receipt of this message, the backend closes the // connection and terminates. + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closing)); Box::pin(async move { - self.inner.stream.send(Terminate).await?; - self.inner.stream.shutdown().await?; - + if let Err(err) = self.inner.stream.send(Terminate).await { + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closed)); + return Err(err); + } + if let Err(err) = self.inner.stream.shutdown().await { + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closed)); + return Err(err.into()); + } + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closed)); Ok(()) }) } fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closing)); Box::pin(async move { - self.inner.stream.shutdown().await?; - - Ok(()) + let res = self.inner.stream.shutdown().await; + self.inner + .state_reporter + .as_ref() + .map(|r| r.report_raw_state(RawConnectionState::Closed)); + res.map_err(Into::into) }) } diff --git a/sqlx-postgres/src/options/connect.rs b/sqlx-postgres/src/options/connect.rs index bc6e4adce9..dc168d22cc 100644 --- a/sqlx-postgres/src/options/connect.rs +++ b/sqlx-postgres/src/options/connect.rs @@ -3,6 +3,7 @@ use crate::error::Error; use crate::{PgConnectOptions, PgConnection}; use futures_core::future::BoxFuture; use log::LevelFilter; +use sqlx_core::connection::ConnectionStateReporter; use sqlx_core::Url; use std::time::Duration; @@ -33,4 +34,9 @@ impl ConnectOptions for PgConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn report_connection_state(mut self, reporter: ConnectionStateReporter) -> Self { + self.state_reporter = Some(reporter); + self + } } diff --git a/sqlx-postgres/src/options/mod.rs b/sqlx-postgres/src/options/mod.rs index a0b222606a..9f51db7ca9 100644 --- a/sqlx-postgres/src/options/mod.rs +++ b/sqlx-postgres/src/options/mod.rs @@ -1,10 +1,10 @@ +use sqlx_core::connection::ConnectionStateReporter; +pub use ssl_mode::PgSslMode; use std::borrow::Cow; use std::env::var; use std::fmt::{Display, Write}; use std::path::{Path, PathBuf}; -pub use ssl_mode::PgSslMode; - use crate::{connection::LogSettings, net::tls::CertificateInput}; mod connect; @@ -103,6 +103,7 @@ pub struct PgConnectOptions { pub(crate) log_settings: LogSettings, pub(crate) extra_float_digits: Option>, pub(crate) options: Option, + pub(crate) state_reporter: Option, } impl Default for PgConnectOptions { @@ -169,6 +170,7 @@ impl PgConnectOptions { extra_float_digits: Some("2".into()), log_settings: Default::default(), options: var("PGOPTIONS").ok(), + state_reporter: None, } } diff --git a/sqlx-sqlite/src/options/connect.rs b/sqlx-sqlite/src/options/connect.rs index 309f2430e0..9586debb5d 100644 --- a/sqlx-sqlite/src/options/connect.rs +++ b/sqlx-sqlite/src/options/connect.rs @@ -1,7 +1,7 @@ use crate::{SqliteConnectOptions, SqliteConnection}; use futures_core::future::BoxFuture; use log::LevelFilter; -use sqlx_core::connection::ConnectOptions; +use sqlx_core::connection::{ConnectOptions, ConnectionStateReporter}; use sqlx_core::error::Error; use sqlx_core::executor::Executor; use std::fmt::Write; @@ -59,6 +59,11 @@ impl ConnectOptions for SqliteConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn report_connection_state(mut self, reporter: ConnectionStateReporter) -> Self { + self.state_reporter = Some(reporter); + self + } } impl SqliteConnectOptions { diff --git a/sqlx-sqlite/src/options/mod.rs b/sqlx-sqlite/src/options/mod.rs index 5f22edd913..8ded09499a 100644 --- a/sqlx-sqlite/src/options/mod.rs +++ b/sqlx-sqlite/src/options/mod.rs @@ -11,6 +11,7 @@ use crate::connection::LogSettings; pub use auto_vacuum::SqliteAutoVacuum; pub use journal_mode::SqliteJournalMode; pub use locking_mode::SqliteLockingMode; +use sqlx_core::connection::ConnectionStateReporter; use std::cmp::Ordering; use std::sync::Arc; use std::{borrow::Cow, time::Duration}; @@ -90,6 +91,8 @@ pub struct SqliteConnectOptions { #[cfg(feature = "regexp")] pub(crate) register_regexp_function: bool, + + pub(crate) state_reporter: Option, } #[derive(Clone, Debug)] @@ -212,6 +215,7 @@ impl SqliteConnectOptions { optimize_on_close: OptimizeOnClose::Disabled, #[cfg(feature = "regexp")] register_regexp_function: false, + state_reporter: None, } }