Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ uuid = "1.1.2"

# Common utility crates
dotenvy = { version = "0.15.0", default-features = false }
dashmap = "6.1"

# Runtimes
[workspace.dependencies.async-std]
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ hashlink = "0.10.0"
indexmap = "2.0"
event-listener = "5.2.0"
hashbrown = "0.15.0"
dashmap.workspace = true

[dev-dependencies]
sqlx = { workspace = true, features = ["postgres", "sqlite", "mysql", "migrate", "macros", "time", "uuid"] }
Expand Down
3 changes: 1 addition & 2 deletions sqlx-core/src/any/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;

use crate::database::Database;
pub use backend::AnyConnectionBackend;

use crate::transaction::Transaction;
pub use backend::AnyConnectionBackend;

mod backend;
mod executor;
Expand Down
10 changes: 9 additions & 1 deletion sqlx-core/src/any/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::any::AnyConnection;
use crate::connection::{ConnectOptions, LogSettings};
use crate::connection::{ConnectOptions, ConnectionStateReporter, LogSettings};
use crate::error::Error;
use futures_core::future::BoxFuture;
use log::LevelFilter;
Expand All @@ -19,6 +19,7 @@ use url::Url;
pub struct AnyConnectOptions {
pub database_url: Url,
pub log_settings: LogSettings,
pub state_reporter: Option<ConnectionStateReporter>,
}
impl FromStr for AnyConnectOptions {
type Err = Error;
Expand All @@ -29,6 +30,7 @@ impl FromStr for AnyConnectOptions {
.parse::<Url>()
.map_err(|e| Error::Configuration(e.into()))?,
log_settings: LogSettings::default(),
state_reporter: None,
})
}
}
Expand All @@ -40,6 +42,7 @@ impl ConnectOptions for AnyConnectOptions {
Ok(AnyConnectOptions {
database_url: url.clone(),
log_settings: LogSettings::default(),
state_reporter: None,
})
}

Expand All @@ -62,4 +65,9 @@ impl ConnectOptions for AnyConnectOptions {
self.log_settings.slow_statements_duration = duration;
self
}

fn report_connection_state(mut self, reporter: ConnectionStateReporter) -> Self {
self.state_reporter = Some(reporter);
self
}
}
57 changes: 55 additions & 2 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::database::{Database, HasStatementCache};
use crate::error::Error;

use crate::pool::{PoolConnectionState, RawConnectionState, TimestampedStateEntry};
use crate::transaction::Transaction;
use dashmap::DashMap;
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::Debug;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::time::Duration;
use std::sync::Arc;
use std::time::{Duration, Instant};
use url::Url;

/// Represents a single database connection.
Expand Down Expand Up @@ -183,6 +186,54 @@ impl LogSettings {
}
}

pub struct ConnectionStateEntry {
pub id: usize,
pub raw: TimestampedStateEntry<RawConnectionState>,
pub pool: Option<TimestampedStateEntry<PoolConnectionState>>,
}

#[derive(Clone)]
pub struct ConnectionStateReporter {
pub id: usize,
pub pool: Arc<DashMap<usize, TimestampedStateEntry<PoolConnectionState>>>,
pub raw: Arc<DashMap<usize, TimestampedStateEntry<RawConnectionState>>>,
}

impl Debug for ConnectionStateReporter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// skipping middleware_stack field for now
f.debug_struct("ConnectionStateReporter")
.field("id", &self.id)
.finish_non_exhaustive()
}
}

impl ConnectionStateReporter {
pub fn report_pool_state(&self, state: PoolConnectionState) {
self.pool.insert(
self.id,
TimestampedStateEntry {
since: Instant::now(),
state,
},
);
}
pub fn report_raw_state(&self, state: RawConnectionState) {
if matches!(state, RawConnectionState::Closed) {
self.pool.remove(&self.id);
self.raw.remove(&self.id);
} else {
self.raw.insert(
self.id,
TimestampedStateEntry {
since: Instant::now(),
state,
},
);
}
}
}

pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
type Connection: Connection<Options = Self> + ?Sized;

Expand Down Expand Up @@ -235,4 +286,6 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug +
self.log_statements(LevelFilter::Off)
.log_slow_statements(LevelFilter::Off, Duration::default())
}

fn report_connection_state(self, reporter: ConnectionStateReporter) -> Self;
}
75 changes: 73 additions & 2 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::time::{Duration, Instant};

use crate::sync::AsyncSemaphoreReleaser;

use crate::connection::Connection;
use crate::connection::{Connection, ConnectionStateEntry, ConnectionStateReporter};
use crate::database::Database;
use crate::error::Error;

use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use dashmap::DashMap;
use std::future::Future;

const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
Expand All @@ -26,6 +27,7 @@ pub struct PoolConnection<DB: Database> {

pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) state_reporter: Option<ConnectionStateReporter>,
pub(super) created_at: Instant,
}

Expand All @@ -40,6 +42,54 @@ pub(super) struct Floating<DB: Database, C> {
pub(super) guard: DecrementSizeGuard<DB>,
}

#[derive(Clone, Default, Debug)]
pub struct ConnectionStateTracker {
pub pool: Arc<DashMap<usize, TimestampedStateEntry<PoolConnectionState>>>,
pub raw: Arc<DashMap<usize, TimestampedStateEntry<RawConnectionState>>>,
}

impl ConnectionStateTracker {
pub fn all_entries(&self) -> Vec<ConnectionStateEntry> {
let mut result = Vec::with_capacity(self.raw.len());
for e in self.raw.iter() {
result.push(ConnectionStateEntry {
id: *e.key(),
raw: e.value().clone(),
pool: self.pool.get(e.key()).map(|x| x.value().clone()),
});
}
result
}
}

#[derive(Debug, Clone)]
pub struct TimestampedStateEntry<S> {
pub since: Instant,
pub state: S,
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub enum PoolConnectionState {
FloatingLive,
FloatingIdle,
AttachedLive,
AttachedIdle,
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub enum RawConnectionState {
New,
Connected,
ReadyIdle,
ReadyInTx,
Error,
Running(String),
Closing,
Closed,
}

const EXPECT_MSG: &str = "BUG: inner connection already taken!";

impl<DB: Database> Debug for PoolConnection<DB> {
Expand Down Expand Up @@ -212,6 +262,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {

impl<DB: Database> Live<DB> {
pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
self.report_pool_state(PoolConnectionState::FloatingLive);
Floating {
inner: self,
// create a new guard from a previously leaked permit
Expand All @@ -225,6 +276,12 @@ impl<DB: Database> Live<DB> {
idle_since: Instant::now(),
}
}

pub fn report_pool_state(&self, state: PoolConnectionState) {
if let Some(reporter) = &self.state_reporter {
reporter.report_pool_state(state);
}
}
}

impl<DB: Database> Deref for Idle<DB> {
Expand All @@ -242,17 +299,26 @@ impl<DB: Database> DerefMut for Idle<DB> {
}

impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
pub fn new_live(
conn: DB::Connection,
guard: DecrementSizeGuard<DB>,
state_reporter: Option<ConnectionStateReporter>,
) -> Self {
if let Some(reporter) = &state_reporter {
reporter.report_pool_state(PoolConnectionState::FloatingLive);
}
Self {
inner: Live {
raw: conn,
created_at: Instant::now(),
state_reporter,
},
guard,
}
}

pub fn reattach(self) -> PoolConnection<DB> {
self.report_pool_state(PoolConnectionState::AttachedLive);
let Floating { inner, guard } = self;

let pool = Arc::clone(&guard.pool);
Expand Down Expand Up @@ -363,6 +429,8 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
pool: Arc<PoolInner<DB>>,
permit: AsyncSemaphoreReleaser<'_>,
) -> Self {
idle.live
.report_pool_state(PoolConnectionState::FloatingIdle);
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
Expand All @@ -374,6 +442,9 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
}

pub fn into_live(self) -> Floating<DB, Live<DB>> {
self.inner
.live
.report_pool_state(PoolConnectionState::FloatingLive);
Floating {
inner: self.inner.live,
guard: self.guard,
Expand Down
Loading