From e82c635e6e0db34c8d48f9ee57741000678f58bd Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Mon, 8 Dec 2025 21:03:20 -0700 Subject: [PATCH 1/9] feat: add query interceptor for logging and observability Add optional interceptor support to enable query logging, metrics collection, and testing without a database connection. ## Why This Change Was Made - Users need to log SQL queries for debugging and auditing - Applications need to collect query performance metrics - Testing frameworks need to mock database responses without a real database - No current extension point exists for these use cases ## What Was Changed - Added Interceptor, InterceptRequest, and InterceptResult types - Added optional 'interceptor' field to Config (defaults to None) - Added interceptor() builder function to set interceptor - Modified execute() to check interceptor before running queries - Interceptor can return Continue, Respond, or Fail - Added FFI functions to store/retrieve interceptor in process dictionary - Added 5 comprehensive tests demonstrating logging, response, and error cases ## Implementation Details - Interceptor stored in process dictionary keyed by pool name - Zero overhead when interceptor is None (default) - Interceptor receives sql, parameters, and timeout for each query - Can inspect request and return Continue (run query), Respond (return data), or Fail (return error) - All existing tests pass (non-breaking change) ## Note to Future Maintainers - Process dictionary is used for simplicity; could be moved to ETS if needed - Interceptor is called from execute() before run_query() - SingleConnection queries don't support interceptors (no pool name) - decode_intercepted_rows() handles decoder application to intercepted data --- README.md | 57 ++++++++++++++++++++ src/pog.gleam | 129 ++++++++++++++++++++++++++++++++++++++++++++ src/pog_ffi.erl | 31 ++++++++++- test/pog_test.gleam | 94 ++++++++++++++++++++++++++++++++ 4 files changed, 309 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index efdfdaa..fa0ebfc 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,63 @@ pub fn run(db: pog.Connection) { } ``` +## Query Interceptors + +Interceptors allow you to observe, log, or modify query execution. They're called before each query runs and can: + +- **Log queries** for debugging and auditing +- **Collect metrics** for performance monitoring +- **Return test data** for testing without a database + +### Example - Query Logging + +```gleam +import gleam/io +import pog + +let logger = pog.Interceptor(fn(request) { + io.println("SQL: " <> request.sql) + pog.Continue // Execute normally +}) + +pog.default_config(pool_name) +|> pog.interceptor(Some(logger)) +|> pog.start +``` + +### Example - Testing Without a Database + +```gleam +import gleam/dynamic + +let test_data = dynamic.array([ + dynamic.int(1), + dynamic.string("Alice"), + dynamic.string("alice@example.com"), +]) + +let test_interceptor = pog.Interceptor(fn(request) { + case request.sql { + "SELECT * FROM users WHERE id = $1" -> + pog.Respond(count: 1, rows: [test_data]) + _ -> + pog.Continue + } +}) + +pog.default_config(pool_name) +|> pog.interceptor(Some(test_interceptor)) +|> pog.start +``` + +### Interceptor Results + +Your interceptor function returns an `InterceptResult`: + +- `Continue` - Execute the query normally against the database +- `Respond(count: Int, rows: List(Dynamic))` - Return this data without querying +- `Fail(error: QueryError)` - Return this error without querying + ## Support of connection URI Configuring a Postgres connection is done by using `Config` type in `pog`. diff --git a/src/pog.gleam b/src/pog.gleam index 2cb6c61..6582f41 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -85,6 +85,8 @@ pub type Config { /// (default: False) By default, pgo will return a n-tuple, in the order of the query. /// By setting `rows_as_map` to `True`, the result will be `Dict`. rows_as_map: Bool, + /// (default: None): Optional query interceptor for logging, observability, or testing. + interceptor: Option(Interceptor), ) } @@ -210,6 +212,44 @@ pub fn rows_as_map(config: Config, rows_as_map: Bool) -> Config { Config(..config, rows_as_map:) } +/// Set a query interceptor for logging, observability, or testing. +/// +/// The interceptor is called before each query execution. Common use cases: +/// +/// **Logging**: Log all SQL queries for debugging and auditing +/// ```gleam +/// import gleam/io +/// +/// let logger = Interceptor(fn(req) { +/// io.println("SQL: " <> req.sql) +/// Continue +/// }) +/// +/// config |> pog.interceptor(Some(logger)) +/// ``` +/// +/// **Metrics**: Track query performance +/// ```gleam +/// let metrics = Interceptor(fn(req) { +/// metrics.record_query(req.sql, req.timeout) +/// Continue +/// }) +/// ``` +/// +/// **Testing**: Return test data without a database +/// ```gleam +/// let test_interceptor = Interceptor(fn(req) { +/// case req.sql { +/// "SELECT * FROM users" -> Respond(1, [test_user_data]) +/// _ -> Continue +/// } +/// }) +/// ``` +/// +pub fn interceptor(config: Config, interceptor: Option(Interceptor)) -> Config { + Config(..config, interceptor:) +} + /// The internet protocol version to use. pub type IpVersion { /// Internet Protocol version 4 (IPv4) @@ -238,6 +278,7 @@ pub fn default_config(pool_name pool_name: Name(Message)) -> Config { trace: False, ip_version: Ipv4, rows_as_map: False, + interceptor: None, ) } @@ -498,6 +539,56 @@ pub type Returned(t) { Returned(count: Int, rows: List(t)) } +/// Query interceptor for logging, observability, and testing. +/// +/// Interceptors are called before query execution and can: +/// - Log queries for debugging and auditing +/// - Collect metrics and performance data +/// - Return mock data for testing (without a database) +/// +/// The interceptor receives information about the query and can either +/// continue with normal execution or provide an alternative result. +/// +/// ## Example - Query logging +/// +/// ```gleam +/// import gleam/io +/// +/// let logger = Interceptor(fn(request) { +/// io.println("Executing: " <> request.sql) +/// Continue +/// }) +/// +/// pog.default_config(name) +/// |> pog.interceptor(Some(logger)) +/// |> pog.start +/// ``` +/// +pub type Interceptor { + Interceptor(intercept: fn(InterceptRequest) -> InterceptResult) +} + +/// Information about a query being executed. +/// +/// This is passed to the interceptor function before the query runs. +pub type InterceptRequest { + InterceptRequest(sql: String, parameters: List(Value), timeout: Int) +} + +/// The result returned by an interceptor. +/// +/// - `Continue`: Execute the query normally against the database +/// - `Respond`: Return this data instead of querying the database +/// - `Fail`: Return this error instead of querying the database +pub type InterceptResult { + /// Execute the query normally + Continue + /// Return data without querying the database + Respond(count: Int, rows: List(Dynamic)) + /// Return an error without querying the database + Fail(error: QueryError) +} + @external(erlang, "pog_ffi", "query") fn run_query( a: Connection, @@ -584,6 +675,29 @@ pub fn execute( on pool: Connection, ) -> Result(Returned(t), QueryError) { let parameters = list.reverse(query.parameters) + + // Check for interceptor + case get_pool_interceptor(pool) { + Some(interceptor) -> { + let request = + InterceptRequest(sql: query.sql, parameters: parameters, timeout: query.timeout) + + case interceptor.intercept(request) { + Continue -> execute_query(pool, query, parameters) + Respond(count:, rows:) -> + decode_intercepted_rows(count, rows, query.row_decoder) + Fail(error:) -> Error(error) + } + } + None -> execute_query(pool, query, parameters) + } +} + +fn execute_query( + pool: Connection, + query: Query(t), + parameters: List(Value), +) -> Result(Returned(t), QueryError) { use #(count, rows) <- result.try(run_query( pool, query.sql, @@ -597,6 +711,21 @@ pub fn execute( Ok(Returned(count, rows)) } +fn decode_intercepted_rows( + count: Int, + rows: List(Dynamic), + decoder: Decoder(t), +) -> Result(Returned(t), QueryError) { + use decoded <- result.try( + list.try_map(over: rows, with: decode.run(_, decoder)) + |> result.map_error(UnexpectedResultType), + ) + Ok(Returned(count, decoded)) +} + +@external(erlang, "pog_ffi", "get_pool_interceptor") +fn get_pool_interceptor(pool: Connection) -> Option(Interceptor) + /// Get the name for a PostgreSQL error code. /// /// ```gleam diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 9c486a2..10ca479 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,6 +1,6 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, set_pool_interceptor/2]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -54,8 +54,14 @@ start(Config) -> idle_interval = IdleInterval, trace = Trace, ip_version = IpVersion, - rows_as_map = RowsAsMap + rows_as_map = RowsAsMap, + interceptor = Interceptor } = Config, + % Store interceptor if present + case Interceptor of + {some, I} -> set_pool_interceptor(PoolName, I); + none -> ok + end, {SslActivated, SslOptions} = default_ssl_options(Host, Ssl), Options1 = #{ host => Host, @@ -141,3 +147,24 @@ convert_error(#{ {unexpected_argument_type, Expected, Got}; convert_error(closed) -> query_timeout. + +%% Interceptor support +%% Store interceptor in process dictionary keyed by pool name +set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> + put({pog_interceptor, PoolName}, Interceptor), + nil. + +%% Get interceptor for a connection +get_pool_interceptor(Connection) -> + PoolName = case Connection of + {pool, Name} -> Name; + {single_connection, _} -> undefined + end, + case PoolName of + undefined -> none; + _ -> + case get({pog_interceptor, PoolName}) of + undefined -> none; + Interceptor -> {some, Interceptor} + end + end. diff --git a/test/pog_test.gleam b/test/pog_test.gleam index 3a6f7ad..ed6f350 100644 --- a/test/pog_test.gleam +++ b/test/pog_test.gleam @@ -1,4 +1,5 @@ import exception +import gleam/dynamic import gleam/dynamic/decode.{type Decoder} import gleam/erlang/process import gleam/option.{None, Some} @@ -615,3 +616,96 @@ pub fn transaction_commit_test() { disconnect(db) } + +// Interceptor tests + +pub fn interceptor_logging_test() { + let interceptor = + pog.Interceptor(fn(request) { + let _ = request.sql + pog.Continue + }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + + assert config.interceptor == Some(interceptor) +} + +pub fn interceptor_respond_with_data_test() { + let test_data = dynamic.array([dynamic.int(42), dynamic.string("test")]) + let interceptor = pog.Interceptor(fn(_request) { pog.Respond(1, [test_data]) }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let decoder = { + use id <- decode.field(0, decode.int) + use name <- decode.field(1, decode.string) + decode.success(#(id, name)) + } + + let result = + pog.query("SELECT id, name FROM users") + |> pog.returning(decoder) + |> pog.execute(started.data) + + assert result == Ok(pog.Returned(1, [#(42, "test")])) + + disconnect(started) +} + +pub fn interceptor_fail_with_error_test() { + let interceptor = pog.Interceptor(fn(_request) { pog.Fail(pog.ConnectionUnavailable) }) + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let result = + pog.query("SELECT * FROM users") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result == Error(pog.ConnectionUnavailable) + + disconnect(started) +} + +pub fn no_interceptor_is_default_test() { + let name = process.new_name("pog_test") + let config = default_config(name) + + assert config.interceptor == None +} + +pub fn interceptor_can_inspect_sql_test() { + let interceptor = + pog.Interceptor(fn(request) { + case request.sql { + "SELECT 1" -> pog.Respond(1, [dynamic.array([dynamic.int(1)])]) + "SELECT 2" -> pog.Respond(1, [dynamic.array([dynamic.int(2)])]) + _ -> pog.Continue + } + }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let result1 = + pog.query("SELECT 1") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result1 == Ok(pog.Returned(1, [1])) + + let result2 = + pog.query("SELECT 2") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result2 == Ok(pog.Returned(1, [2])) + + disconnect(started) +} From 35f8d8f7a3f7f4f490dab7415426bc82094672e7 Mon Sep 17 00:00:00 2001 From: "D.C. Rockwell" Date: Thu, 11 Dec 2025 02:10:26 -0700 Subject: [PATCH 2/9] feat(interceptor): wip add Capture variant for recording query results Add a new Capture variant to InterceptResult that executes the query and passes the raw result to a callback function for recording purposes. This allows interceptors to observe actual database responses while still returning results normally to the caller. WIP - needs tests --- src/pog.gleam | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/pog.gleam b/src/pog.gleam index 6582f41..0bb43a6 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -580,6 +580,7 @@ pub type InterceptRequest { /// - `Continue`: Execute the query normally against the database /// - `Respond`: Return this data instead of querying the database /// - `Fail`: Return this error instead of querying the database +/// - `Capture`: Execute the query and capture the raw result for recording pub type InterceptResult { /// Execute the query normally Continue @@ -587,6 +588,8 @@ pub type InterceptResult { Respond(count: Int, rows: List(Dynamic)) /// Return an error without querying the database Fail(error: QueryError) + /// Execute query and capture result for recording + Capture(on_result: fn(Result(#(Int, List(Dynamic)), QueryError)) -> Nil) } @external(erlang, "pog_ffi", "query") @@ -687,6 +690,23 @@ pub fn execute( Respond(count:, rows:) -> decode_intercepted_rows(count, rows, query.row_decoder) Fail(error:) -> Error(error) + Capture(on_result:) -> { + // Execute real query + let result = run_query(pool, query.sql, parameters, query.timeout) + // Let interceptor capture the raw result + on_result(result) + // Decode and return normally + case result { + Ok(#(count, rows)) -> { + use decoded <- result.try( + list.try_map(over: rows, with: decode.run(_, query.row_decoder)) + |> result.map_error(UnexpectedResultType), + ) + Ok(Returned(count, decoded)) + } + Error(err) -> Error(err) + } + } } } None -> execute_query(pool, query, parameters) From cd7b8e0040f1bc050cd6107adca7a9ebb526712f Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Fri, 12 Dec 2025 13:43:44 -0700 Subject: [PATCH 3/9] feat(interceptor): enable interceptor support in transaction queries ## Why This Change Was Made - Database recorder for fixture-based testing wasn't capturing queries executed inside pog.transaction() - Interceptors were stored by pool name, but SingleConnection (from checkout) had no reference to the pool's interceptor - This caused all transaction queries to bypass the interceptor entirely, making it impossible to record/playback transactional tests ## What Was Changed - Modified checkout/1 in pog_ffi.erl to copy pool interceptor to connection handle when checking out - Modified get_pool_interceptor/1 to lookup interceptor by connection handle for SingleConnection - Added cleanup_checkout_interceptor/1 to erase interceptor reference on checkin - Modified checkin wrapper in pog.gleam to call cleanup before checking connection back in ## Note to Future Engineer - The interceptor is stored in the process dictionary with the connection handle as the key: {pog_conn_interceptor, Conn} - This is intentionally separate from the pool interceptor: {pog_interceptor, PoolName} - The connection handle (Conn) is available in both checkout (to store) and get_pool_interceptor (to lookup), making it the perfect key - Cleanup is critical - if you remove the erase() call in cleanup_checkout_interceptor, you'll leak interceptor references in the process dictionary every time a transaction runs - Without this fix, transaction queries work fine in production but are invisible to interceptors. Your tests will mysteriously pass in record mode (hitting real DB) but fail in playback mode (fixture missing queries). Don't ask me how long it took to figure that out. --- src/pog.gleam | 115 ++++++++++++++++++++++++++++++++++++++---------- src/pog_ffi.erl | 32 +++++++++----- 2 files changed, 114 insertions(+), 33 deletions(-) diff --git a/src/pog.gleam b/src/pog.gleam index 0bb43a6..39d629c 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -27,6 +27,7 @@ const default_port: Int = 5432 pub opaque type Connection { Pool(Name(Message)) SingleConnection(SingleConnection) + Disconnected(Interceptor) } type SingleConnection @@ -42,6 +43,29 @@ pub fn named_connection(name: Name(Message)) -> Connection { Pool(name) } +/// Create a connection that uses an interceptor without a database. +/// +/// No database connection is established. All queries are routed to the +/// interceptor. The interceptor must handle all queries - returning `Continue` +/// will result in a `ConnectionUnavailable` error. +/// +/// ## Example +/// +/// ```gleam +/// let interceptor = Interceptor(fn(request) { +/// case request.sql { +/// "SELECT * FROM users" -> Respond(1, [user_data]) +/// _ -> Fail(PostgresqlError("UNSUPPORTED", "unsupported", "Query not supported")) +/// } +/// }) +/// +/// let connection = pog.disconnected(interceptor) +/// ``` +/// +pub fn disconnected(interceptor: Interceptor) -> Connection { + Disconnected(interceptor) +} + /// The configuration for a pool of connections. pub type Config { Config( @@ -469,6 +493,14 @@ pub fn transaction( callback: fn(Connection) -> Result(t, error), ) -> Result(t, TransactionError(error)) { case pool { + Disconnected(interceptor) -> { + // For disconnected connections (playback mode), call the callback + // with the disconnected connection so interceptor can serve recorded queries + case callback(Disconnected(interceptor)) { + Ok(value) -> Ok(value) + Error(err) -> Error(TransactionRolledBack(err)) + } + } SingleConnection(conn) -> { transaction_layer(conn, callback) } @@ -524,8 +556,16 @@ fn checkout( pool: Name(Message), ) -> Result(#(Reference, SingleConnection), QueryError) +@external(erlang, "pog_ffi", "cleanup_checkout_interceptor") +fn cleanup_checkout_interceptor(conn: SingleConnection) -> Nil + @external(erlang, "pgo", "checkin") -fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic +fn do_checkin(ref: Reference, conn: SingleConnection) -> Dynamic + +fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic { + cleanup_checkout_interceptor(conn) + do_checkin(ref, conn) +} pub fn nullable(inner_type: fn(a) -> Value, value: Option(a)) -> Value { case value { @@ -678,38 +718,67 @@ pub fn execute( on pool: Connection, ) -> Result(Returned(t), QueryError) { let parameters = list.reverse(query.parameters) - - // Check for interceptor - case get_pool_interceptor(pool) { - Some(interceptor) -> { + + case pool { + Disconnected(interceptor) -> { + // Disconnected connection - route through interceptor only let request = - InterceptRequest(sql: query.sql, parameters: parameters, timeout: query.timeout) - + InterceptRequest( + sql: query.sql, + parameters: parameters, + timeout: query.timeout, + ) + case interceptor.intercept(request) { - Continue -> execute_query(pool, query, parameters) + Continue -> Error(ConnectionUnavailable) Respond(count:, rows:) -> decode_intercepted_rows(count, rows, query.row_decoder) Fail(error:) -> Error(error) - Capture(on_result:) -> { - // Execute real query - let result = run_query(pool, query.sql, parameters, query.timeout) - // Let interceptor capture the raw result - on_result(result) - // Decode and return normally - case result { - Ok(#(count, rows)) -> { - use decoded <- result.try( - list.try_map(over: rows, with: decode.run(_, query.row_decoder)) - |> result.map_error(UnexpectedResultType), - ) - Ok(Returned(count, decoded)) + Capture(_) -> Error(ConnectionUnavailable) + } + } + + Pool(_) | SingleConnection(_) -> { + // Connected pool or connection - check for interceptor + case get_pool_interceptor(pool) { + Some(interceptor) -> { + let request = + InterceptRequest( + sql: query.sql, + parameters: parameters, + timeout: query.timeout, + ) + + case interceptor.intercept(request) { + Continue -> execute_query(pool, query, parameters) + Respond(count:, rows:) -> + decode_intercepted_rows(count, rows, query.row_decoder) + Fail(error:) -> Error(error) + Capture(on_result:) -> { + // Execute real query + let result = run_query(pool, query.sql, parameters, query.timeout) + // Let interceptor capture the raw result + on_result(result) + // Decode and return normally + case result { + Ok(#(count, rows)) -> { + use decoded <- result.try( + list.try_map(over: rows, with: decode.run( + _, + query.row_decoder, + )) + |> result.map_error(UnexpectedResultType), + ) + Ok(Returned(count, decoded)) + } + Error(err) -> Error(err) + } } - Error(err) -> Error(err) } } + None -> execute_query(pool, query, parameters) } } - None -> execute_query(pool, query, parameters) } } diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 10ca479..7524c20 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,6 +1,6 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, set_pool_interceptor/2]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -118,7 +118,13 @@ query_extended(Conn, Sql) -> checkout(Name) when is_atom(Name) -> case pgo:checkout(Name) of - {ok, Ref, Conn} -> {ok, {Ref, Conn}}; + {ok, Ref, Conn} -> + % Copy pool's interceptor to connection for transaction support + case get({pog_interceptor, Name}) of + undefined -> ok; + Interceptor -> put({pog_conn_interceptor, Conn}, Interceptor) + end, + {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} end. @@ -156,15 +162,21 @@ set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> %% Get interceptor for a connection get_pool_interceptor(Connection) -> - PoolName = case Connection of - {pool, Name} -> Name; - {single_connection, _} -> undefined - end, - case PoolName of - undefined -> none; - _ -> - case get({pog_interceptor, PoolName}) of + case Connection of + {pool, Name} -> + case get({pog_interceptor, Name}) of + undefined -> none; + Interceptor -> {some, Interceptor} + end; + {single_connection, Conn} -> + % For single connections (from transactions), lookup by connection handle + case get({pog_conn_interceptor, Conn}) of undefined -> none; Interceptor -> {some, Interceptor} end end. + +%% Cleanup interceptor reference when connection is checked back in +cleanup_checkout_interceptor(Conn) -> + erase({pog_conn_interceptor, Conn}), + nil. From e236e25c8ec6ae830bb565461fcaf59f1d5b3891 Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Sun, 18 Jan 2026 22:34:14 -0700 Subject: [PATCH 4/9] fix(ffi): store interceptors in ETS Replace process dictionary interceptor storage with a named ETS table so interceptor mappings survive across checkouts/transactions and avoid per-process state issues. --- src/pog_ffi.erl | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 7524c20..8768d29 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -5,6 +5,17 @@ -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). +-define(INTERCEPTOR_TABLE, pog_interceptor_table). + +ensure_interceptor_table() -> + case ets:info(?INTERCEPTOR_TABLE) of + undefined -> + ets:new(?INTERCEPTOR_TABLE, [named_table, public, set]), + ok; + _ -> + ok + end. + null() -> null. @@ -120,9 +131,13 @@ checkout(Name) when is_atom(Name) -> case pgo:checkout(Name) of {ok, Ref, Conn} -> % Copy pool's interceptor to connection for transaction support - case get({pog_interceptor, Name}) of - undefined -> ok; - Interceptor -> put({pog_conn_interceptor, Conn}, Interceptor) + ensure_interceptor_table(), + case ets:lookup(?INTERCEPTOR_TABLE, {pool, Name}) of + [{_, Interceptor}] -> + ets:insert(?INTERCEPTOR_TABLE, {{conn, Conn}, Interceptor}), + ok; + _ -> + ok end, {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} @@ -157,26 +172,29 @@ convert_error(closed) -> %% Interceptor support %% Store interceptor in process dictionary keyed by pool name set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> - put({pog_interceptor, PoolName}, Interceptor), + ensure_interceptor_table(), + ets:insert(?INTERCEPTOR_TABLE, {{pool, PoolName}, Interceptor}), nil. %% Get interceptor for a connection get_pool_interceptor(Connection) -> + ensure_interceptor_table(), case Connection of {pool, Name} -> - case get({pog_interceptor, Name}) of - undefined -> none; - Interceptor -> {some, Interceptor} + case ets:lookup(?INTERCEPTOR_TABLE, {pool, Name}) of + [{_, Interceptor}] -> {some, Interceptor}; + _ -> none end; {single_connection, Conn} -> % For single connections (from transactions), lookup by connection handle - case get({pog_conn_interceptor, Conn}) of - undefined -> none; - Interceptor -> {some, Interceptor} + case ets:lookup(?INTERCEPTOR_TABLE, {conn, Conn}) of + [{_, Interceptor}] -> {some, Interceptor}; + _ -> none end end. %% Cleanup interceptor reference when connection is checked back in cleanup_checkout_interceptor(Conn) -> - erase({pog_conn_interceptor, Conn}), + ensure_interceptor_table(), + ets:delete(?INTERCEPTOR_TABLE, {conn, Conn}), nil. From 181f70a5c680d1e044541202fa86ef1dd75de42f Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Wed, 21 Jan 2026 05:39:18 -0700 Subject: [PATCH 5/9] fix(ffi): store interceptors in process dictionary --- src/pog_ffi.erl | 40 +++++++++++----------------------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 8768d29..7524c20 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -5,17 +5,6 @@ -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). --define(INTERCEPTOR_TABLE, pog_interceptor_table). - -ensure_interceptor_table() -> - case ets:info(?INTERCEPTOR_TABLE) of - undefined -> - ets:new(?INTERCEPTOR_TABLE, [named_table, public, set]), - ok; - _ -> - ok - end. - null() -> null. @@ -131,13 +120,9 @@ checkout(Name) when is_atom(Name) -> case pgo:checkout(Name) of {ok, Ref, Conn} -> % Copy pool's interceptor to connection for transaction support - ensure_interceptor_table(), - case ets:lookup(?INTERCEPTOR_TABLE, {pool, Name}) of - [{_, Interceptor}] -> - ets:insert(?INTERCEPTOR_TABLE, {{conn, Conn}, Interceptor}), - ok; - _ -> - ok + case get({pog_interceptor, Name}) of + undefined -> ok; + Interceptor -> put({pog_conn_interceptor, Conn}, Interceptor) end, {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} @@ -172,29 +157,26 @@ convert_error(closed) -> %% Interceptor support %% Store interceptor in process dictionary keyed by pool name set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> - ensure_interceptor_table(), - ets:insert(?INTERCEPTOR_TABLE, {{pool, PoolName}, Interceptor}), + put({pog_interceptor, PoolName}, Interceptor), nil. %% Get interceptor for a connection get_pool_interceptor(Connection) -> - ensure_interceptor_table(), case Connection of {pool, Name} -> - case ets:lookup(?INTERCEPTOR_TABLE, {pool, Name}) of - [{_, Interceptor}] -> {some, Interceptor}; - _ -> none + case get({pog_interceptor, Name}) of + undefined -> none; + Interceptor -> {some, Interceptor} end; {single_connection, Conn} -> % For single connections (from transactions), lookup by connection handle - case ets:lookup(?INTERCEPTOR_TABLE, {conn, Conn}) of - [{_, Interceptor}] -> {some, Interceptor}; - _ -> none + case get({pog_conn_interceptor, Conn}) of + undefined -> none; + Interceptor -> {some, Interceptor} end end. %% Cleanup interceptor reference when connection is checked back in cleanup_checkout_interceptor(Conn) -> - ensure_interceptor_table(), - ets:delete(?INTERCEPTOR_TABLE, {conn, Conn}), + erase({pog_conn_interceptor, Conn}), nil. From 9a436ce93aa262422ae1afb76c3b19a39b95ab61 Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Thu, 22 Jan 2026 18:37:48 -0700 Subject: [PATCH 6/9] fix(ffi): persist interceptors via ETS table Move interceptor storage out of the process dictionary so pooled connections can retrieve interceptors reliably across processes. --- src/pog_ffi.erl | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 7524c20..b6ce057 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -5,6 +5,17 @@ -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). +-define(INTERCEPTOR_TABLE, pog_interceptor_table). + +ensure_interceptor_table() -> + case ets:info(?INTERCEPTOR_TABLE) of + undefined -> + ets:new(?INTERCEPTOR_TABLE, [named_table, public, set]), + ok; + _ -> + ok + end. + null() -> null. @@ -120,9 +131,10 @@ checkout(Name) when is_atom(Name) -> case pgo:checkout(Name) of {ok, Ref, Conn} -> % Copy pool's interceptor to connection for transaction support - case get({pog_interceptor, Name}) of - undefined -> ok; - Interceptor -> put({pog_conn_interceptor, Conn}, Interceptor) + ensure_interceptor_table(), + case ets:lookup(?INTERCEPTOR_TABLE, Name) of + [{Name, Interceptor}] -> put({pog_conn_interceptor, Conn}, Interceptor); + _ -> ok end, {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} @@ -155,18 +167,20 @@ convert_error(closed) -> query_timeout. %% Interceptor support -%% Store interceptor in process dictionary keyed by pool name +%% Store interceptor in ETS keyed by pool name set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> - put({pog_interceptor, PoolName}, Interceptor), + ensure_interceptor_table(), + ets:insert(?INTERCEPTOR_TABLE, {PoolName, Interceptor}), nil. %% Get interceptor for a connection get_pool_interceptor(Connection) -> case Connection of {pool, Name} -> - case get({pog_interceptor, Name}) of - undefined -> none; - Interceptor -> {some, Interceptor} + ensure_interceptor_table(), + case ets:lookup(?INTERCEPTOR_TABLE, Name) of + [{Name, Interceptor}] -> {some, Interceptor}; + _ -> none end; {single_connection, Conn} -> % For single connections (from transactions), lookup by connection handle From c17f22a52e6d07adec9e29ba8057fee73bcdccaa Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Wed, 28 Jan 2026 12:59:40 -0700 Subject: [PATCH 7/9] fix(ffi): add safe interceptor lookup Ensure the interceptor ETS table has a stable heir and add a safe getter that handles disconnected/unknown connection shapes. --- src/pog_ffi.erl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index b6ce057..a510d02 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,6 +1,6 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, get_pool_interceptor_safe/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -10,7 +10,10 @@ ensure_interceptor_table() -> case ets:info(?INTERCEPTOR_TABLE) of undefined -> - ets:new(?INTERCEPTOR_TABLE, [named_table, public, set]), + ets:new( + ?INTERCEPTOR_TABLE, + [named_table, public, set, {heir, whereis(init), undefined}] + ), ok; _ -> ok @@ -190,6 +193,14 @@ get_pool_interceptor(Connection) -> end end. +get_pool_interceptor_safe(Connection) -> + case Connection of + {pool, _} -> get_pool_interceptor(Connection); + {single_connection, _} -> get_pool_interceptor(Connection); + {disconnected, _} -> none; + _ -> none + end. + %% Cleanup interceptor reference when connection is checked back in cleanup_checkout_interceptor(Conn) -> erase({pog_conn_interceptor, Conn}), From b560bfe1e5f6d827a3dd13ce2fb0489bdbb0ae7b Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Thu, 7 May 2026 04:05:24 -0600 Subject: [PATCH 8/9] fix(ffi): add catch-all clause to convert_error/1 Without this, any pgo error variant outside the documented set ({pgo_error, _}, {unexpected_message, _}, client_disconnected, client_timeout, ssl_refused, {unimplemented, _}, {sasl_server_final,_}, invalid_value_len, etc.) raises function_clause and crashes the calling Erlang process. In long-lived consumers like audit logging actors that call queries synchronously, this manifests as the actor dying mid-message-handling, taking down the actor and any work queued in its mailbox. Map every unmatched shape to connection_unavailable (the closest existing recoverable variant on the Gleam side, ConnectionUnavailable) and emit a `=WARNING REPORT=` to the BEAM logger with the actual driver term so volume / pattern can be tracked and a precise mapping added later if a specific shape becomes common. Also exports convert_error/1 so it can be unit-tested without spinning up a real connection. The function is pure; exporting it expands no surface area meaningfully. --- src/pog_ffi.erl | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index a510d02..63125a6 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,6 +1,6 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, get_pool_interceptor_safe/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, get_pool_interceptor_safe/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1, convert_error/1]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -167,7 +167,21 @@ convert_error(#{ Got = list_to_binary(io_lib:format("~p", [Value])), {unexpected_argument_type, Expected, Got}; convert_error(closed) -> - query_timeout. + query_timeout; +%% Catch-all: pgo can emit shapes outside the documented set +%% (e.g. {pgo_error, _}, {unexpected_message, _}, client_disconnected, +%% client_timeout, ssl_refused, {unimplemented, _}, etc.). Without this +%% clause those shapes raise function_clause and crash the calling +%% process — which previously took down audit_service and other +%% long-lived service actors. We map them all to connection_unavailable +%% so callers see a known, recoverable variant and the actual driver +%% term is logged for diagnosis. +convert_error(Other) -> + logger:warning( + "pog convert_error: unhandled driver error variant: ~p", + [Other] + ), + connection_unavailable. %% Interceptor support %% Store interceptor in ETS keyed by pool name From c4b89d320942813e4b14e22ed2507e67788755ea Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Thu, 7 May 2026 07:03:01 -0600 Subject: [PATCH 9/9] feat(ffi): structured logger.warning for catch-all + eunit regression suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why This Change Was Made The previous commit (b560bfe) added a catch-all clause to convert_error/1 to keep undocumented pgo error shapes from raising function_clause and killing long-lived service actors in consuming applications. That fix mapped every unmatched shape to connection_unavailable and emitted a plain-text logger:warning for diagnosis. Two problems with the original landing: 1. The plain-text format ("pog convert_error: unhandled driver error variant: ~p") tunnels the entire driver term through a printf-style message, which is ergonomic in stdout but hostile to structured log pipelines (Datadog, CloudWatch, Logflare). A consumer running a JSON formatter saw the message field as one big opaque string and could not query on the actual error shape. 2. There was no test coverage. The catch-all was a defensive change on a plausible-but-unproven hypothesis about a production crash; without tests, any future engineer "cleaning up the imports" or "switching to a more specific error type" could silently re-break the actor-survival invariant we are relying on. This commit fixes both. It re-emits the warning as a structured report map (component / event / error_term), which renders cleanly under both the OTP default logger_formatter and structured JSON formatters, and adds an eunit suite that pins the actor-survival behaviour deterministically. A non-obvious wrinkle that bit during live verification on a real consuming application: an earlier draft of this change set `domain => [pog, convert_error]` on the warning, intending to give consumers a stable filter key. Live testing showed that many default logger handlers ship with a `no_domain` filter that ONLY logs events whose domain is undefined plus an `[otp, sasl]` super filter for system reports. Setting any other domain caused the warning to be silently dropped — the opposite of the goal. The metadata was removed; consumers who want domain-based routing can install a filter and key on the `component` field in the report map instead, which is formatter-agnostic and survives any handler reconfiguration. ## What Was Changed - `src/pog_ffi.erl`: convert_error/1 catch-all clause now emits a report-map warning with three fields: component = pog_convert_error (stable identifier for filters) event = unhandled_driver_error error_term = list_to_binary(io_lib:format("~p", [Other])) (pre-formatted so JSON formatters can't accidentally tunnel the raw Erlang term through and produce unparseable output) Domain metadata is intentionally NOT set; see commit body and the inline doc comment for why. - `test/pog_ffi_test.erl` (new): twelve eunit tests covering three invariants: 1. Documented shapes still map to their specific QueryError variants (regression guard against accidentally rerouting documented variants through the catch-all). 2. Seven previously-fatal shapes now map to connection_unavailable without raising. Includes both well-known pgo internals ({pgo_error, _}, {unexpected_message, _}, client_disconnected, ssl_refused, {unimplemented, _}) and arbitrary atoms / tuples, since the bug class is "any term outside the documented set". 3. The high-signal test: a process spawned to call convert_error/1 with three previously-fatal shapes exits with reason `normal`, not `{function_clause, _}`. Without the catch-all this monitor receives a function_clause DOWN message; this test would fail loudly. With the catch-all the process completes cleanly. This is the actor-survival invariant that consumers depend on. All twelve tests pass in 41ms on a clean build, including the process-monitor test. - The `convert_error/1` export added in b560bfe stays in place; the eunit test calls it directly without spinning up a real Postgres connection. The function is pure given its input, so exporting it expands no meaningful surface area. ## Note to Future Engineer - The eunit suite is auto-discovered by `gleam test`. gleeunit's `find_files(matching: "**/*.{erl,gleam}", in: "test")` picks up any *.erl test module in test/, which is why no wrapper was needed. If you rename the file to anything not ending in `_test.erl`, you lose the auto-discovery and the suite goes silent. - The catch-all is a SAFETY NET, not a feature path. If you find yourself adding "another shape to map specifically", do that by adding a NEW clause ABOVE the catch-all, not by extending the catch-all body. The whole point is that the catch-all should be invoked rarely and loudly; routing more traffic through it makes the warning noise instead of signal. - If the warning fires in production, it does NOT mean the system is broken. It means pgo emitted an error shape we have not yet mapped to a specific QueryError variant. The intended workflow is: see the warning, add a clause for that shape with a more specific variant, ship it, watch the warning go away. The catch-all keeps the lights on while you do that work; it is not your enemy. - Do not be tempted to "make this look less ugly" by setting `domain` metadata on the warning. We tried. Default OTP logger handlers will eat it. There is a comment in the source explaining this; please read it before changing it. Future-You will thank Past-You for the comment, and Present-You for not deleting it. - Tongue firmly in cheek: this is the most defensively-engineered three-line catch-all in the codebase, and it has more documentation than the entire rest of the FFI module. You are welcome to find this excessive. The catch-all exists because someone shipped a pog release with no catch-all at all and it took down service actors in production. The cost of "too much documentation" is bounded; the cost of "another silent production cascade" is not. --- src/pog_ffi.erl | 22 +++++--- test/pog_ffi_test.erl | 117 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 test/pog_ffi_test.erl diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 63125a6..2f6bcfa 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -174,13 +174,23 @@ convert_error(closed) -> %% clause those shapes raise function_clause and crash the calling %% process — which previously took down audit_service and other %% long-lived service actors. We map them all to connection_unavailable -%% so callers see a known, recoverable variant and the actual driver -%% term is logged for diagnosis. +%% so callers see a known, recoverable variant. +%% +%% Emits a structured `logger:warning` so consumers with a structured +%% formatter (e.g. JSON) get queryable fields, while consumers with a +%% string formatter still get something readable via `~p`. We +%% intentionally do NOT set `domain` metadata: many default handlers +%% install a `no_domain` filter that drops events whose domain is set +%% to anything other than the handler's expected list, which would +%% silence this warning instead of routing it. The error term is +%% captured as a binary so it can't tunnel through a JSON formatter as +%% a raw Erlang term. convert_error(Other) -> - logger:warning( - "pog convert_error: unhandled driver error variant: ~p", - [Other] - ), + logger:warning(#{ + component => pog_convert_error, + event => unhandled_driver_error, + error_term => list_to_binary(io_lib:format("~p", [Other])) + }), connection_unavailable. %% Interceptor support diff --git a/test/pog_ffi_test.erl b/test/pog_ffi_test.erl new file mode 100644 index 0000000..62d32e6 --- /dev/null +++ b/test/pog_ffi_test.erl @@ -0,0 +1,117 @@ +%%% Regression tests for `pog_ffi:convert_error/1`. +%%% +%%% Background: a previous version had no catch-all clause. Any pgo error +%%% term outside the documented set raised `function_clause`, which +%%% propagated up through `pog:query` and crashed the calling Gleam +%%% actor. In the proxy this manifested as long-lived service actors +%%% (audit_service, token_usage_writer, pricing_cache) all dying when +%%% the driver hit an undocumented error path. +%%% +%%% These tests pin two invariants: +%%% 1. Documented shapes still map to their specific QueryError variants +%%% (regression guard: don't accidentally re-route them through the +%%% catch-all). +%%% 2. Previously-fatal shapes now map to `connection_unavailable` +%%% and do NOT raise. +%%% +%%% The third test (actor_survives) is the high-signal test: it proves +%%% the catch-all does what it's supposed to do at the layer that +%%% matters (process survival), not just at the FFI return-value layer. + +-module(pog_ffi_test). + +-include_lib("eunit/include/eunit.hrl"). + +%% -------------------------------------------------------------------- +%% Documented shapes still map correctly. +%% -------------------------------------------------------------------- + +known_none_available_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(none_available)). + +known_closed_test() -> + ?assertEqual(query_timeout, pog_ffi:convert_error(closed)). + +known_unexpected_argument_count_test() -> + ?assertEqual({unexpected_argument_count, 2, 3}, + pog_ffi:convert_error({pgo_protocol, {parameters, 2, 3}})). + +known_pgsql_error_with_constraint_test() -> + ErrorTerm = {pgsql_error, #{ + message => <<"duplicate key">>, + constraint => <<"users_email_key">>, + detail => <<"Key (email)=(x) already exists.">> + }}, + ?assertEqual({constraint_violated, + <<"duplicate key">>, + <<"users_email_key">>, + <<"Key (email)=(x) already exists.">>}, + pog_ffi:convert_error(ErrorTerm)). + +%% -------------------------------------------------------------------- +%% Catch-all: previously-fatal shapes now return connection_unavailable. +%% +%% NOTE: this test is non-exhaustive by definition — pgo can introduce +%% new shapes any time. The point isn't to enumerate every possible +%% term; it's to assert that *some specific shapes that used to crash* +%% no longer do, and that the catch-all is wired up. +%% -------------------------------------------------------------------- + +catchall_pgo_error_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({pgo_error, some_inner})). + +catchall_client_disconnected_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(client_disconnected)). + +catchall_unexpected_message_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({unexpected_message, foo})). + +catchall_ssl_refused_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(ssl_refused)). + +catchall_unimplemented_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({unimplemented, sasl_server_final})). + +catchall_arbitrary_atom_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(some_atom_we_have_never_seen)). + +catchall_arbitrary_tuple_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({a, b, c, d, e})). + +%% -------------------------------------------------------------------- +%% The high-signal test: a process calling convert_error/1 with a +%% previously-fatal shape exits NORMALLY rather than crashing with +%% function_clause. Without the catch-all this monitor would receive +%% {'DOWN', _, process, _, {function_clause, _}}. +%% -------------------------------------------------------------------- + +actor_survives_unmatched_shape_test() -> + Self = self(), + {Pid, MonRef} = spawn_monitor(fun() -> + connection_unavailable = pog_ffi:convert_error({pgo_error, x}), + connection_unavailable = pog_ffi:convert_error(client_disconnected), + connection_unavailable = pog_ffi:convert_error(ssl_refused), + Self ! {self(), all_calls_returned} + end), + receive + {Pid, all_calls_returned} -> ok + after + 2000 -> + ?assert(false) %% process never reported back + end, + receive + {'DOWN', MonRef, process, Pid, normal} -> ok; + {'DOWN', MonRef, process, Pid, OtherReason} -> + ?assertEqual(normal, OtherReason) + after + 2000 -> + ?assert(false) %% no DOWN message — monitor missed exit + end.