diff --git a/README.md b/README.md index efdfdaa..fa0ebfc 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,63 @@ pub fn run(db: pog.Connection) { } ``` +## Query Interceptors + +Interceptors allow you to observe, log, or modify query execution. They're called before each query runs and can: + +- **Log queries** for debugging and auditing +- **Collect metrics** for performance monitoring +- **Return test data** for testing without a database + +### Example - Query Logging + +```gleam +import gleam/io +import pog + +let logger = pog.Interceptor(fn(request) { + io.println("SQL: " <> request.sql) + pog.Continue // Execute normally +}) + +pog.default_config(pool_name) +|> pog.interceptor(Some(logger)) +|> pog.start +``` + +### Example - Testing Without a Database + +```gleam +import gleam/dynamic + +let test_data = dynamic.array([ + dynamic.int(1), + dynamic.string("Alice"), + dynamic.string("alice@example.com"), +]) + +let test_interceptor = pog.Interceptor(fn(request) { + case request.sql { + "SELECT * FROM users WHERE id = $1" -> + pog.Respond(count: 1, rows: [test_data]) + _ -> + pog.Continue + } +}) + +pog.default_config(pool_name) +|> pog.interceptor(Some(test_interceptor)) +|> pog.start +``` + +### Interceptor Results + +Your interceptor function returns an `InterceptResult`: + +- `Continue` - Execute the query normally against the database +- `Respond(count: Int, rows: List(Dynamic))` - Return this data without querying +- `Fail(error: QueryError)` - Return this error without querying + ## Support of connection URI Configuring a Postgres connection is done by using `Config` type in `pog`. diff --git a/src/pog.gleam b/src/pog.gleam index 2cb6c61..39d629c 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -27,6 +27,7 @@ const default_port: Int = 5432 pub opaque type Connection { Pool(Name(Message)) SingleConnection(SingleConnection) + Disconnected(Interceptor) } type SingleConnection @@ -42,6 +43,29 @@ pub fn named_connection(name: Name(Message)) -> Connection { Pool(name) } +/// Create a connection that uses an interceptor without a database. +/// +/// No database connection is established. All queries are routed to the +/// interceptor. The interceptor must handle all queries - returning `Continue` +/// will result in a `ConnectionUnavailable` error. +/// +/// ## Example +/// +/// ```gleam +/// let interceptor = Interceptor(fn(request) { +/// case request.sql { +/// "SELECT * FROM users" -> Respond(1, [user_data]) +/// _ -> Fail(PostgresqlError("UNSUPPORTED", "unsupported", "Query not supported")) +/// } +/// }) +/// +/// let connection = pog.disconnected(interceptor) +/// ``` +/// +pub fn disconnected(interceptor: Interceptor) -> Connection { + Disconnected(interceptor) +} + /// The configuration for a pool of connections. pub type Config { Config( @@ -85,6 +109,8 @@ pub type Config { /// (default: False) By default, pgo will return a n-tuple, in the order of the query. /// By setting `rows_as_map` to `True`, the result will be `Dict`. rows_as_map: Bool, + /// (default: None): Optional query interceptor for logging, observability, or testing. + interceptor: Option(Interceptor), ) } @@ -210,6 +236,44 @@ pub fn rows_as_map(config: Config, rows_as_map: Bool) -> Config { Config(..config, rows_as_map:) } +/// Set a query interceptor for logging, observability, or testing. +/// +/// The interceptor is called before each query execution. Common use cases: +/// +/// **Logging**: Log all SQL queries for debugging and auditing +/// ```gleam +/// import gleam/io +/// +/// let logger = Interceptor(fn(req) { +/// io.println("SQL: " <> req.sql) +/// Continue +/// }) +/// +/// config |> pog.interceptor(Some(logger)) +/// ``` +/// +/// **Metrics**: Track query performance +/// ```gleam +/// let metrics = Interceptor(fn(req) { +/// metrics.record_query(req.sql, req.timeout) +/// Continue +/// }) +/// ``` +/// +/// **Testing**: Return test data without a database +/// ```gleam +/// let test_interceptor = Interceptor(fn(req) { +/// case req.sql { +/// "SELECT * FROM users" -> Respond(1, [test_user_data]) +/// _ -> Continue +/// } +/// }) +/// ``` +/// +pub fn interceptor(config: Config, interceptor: Option(Interceptor)) -> Config { + Config(..config, interceptor:) +} + /// The internet protocol version to use. pub type IpVersion { /// Internet Protocol version 4 (IPv4) @@ -238,6 +302,7 @@ pub fn default_config(pool_name pool_name: Name(Message)) -> Config { trace: False, ip_version: Ipv4, rows_as_map: False, + interceptor: None, ) } @@ -428,6 +493,14 @@ pub fn transaction( callback: fn(Connection) -> Result(t, error), ) -> Result(t, TransactionError(error)) { case pool { + Disconnected(interceptor) -> { + // For disconnected connections (playback mode), call the callback + // with the disconnected connection so interceptor can serve recorded queries + case callback(Disconnected(interceptor)) { + Ok(value) -> Ok(value) + Error(err) -> Error(TransactionRolledBack(err)) + } + } SingleConnection(conn) -> { transaction_layer(conn, callback) } @@ -483,8 +556,16 @@ fn checkout( pool: Name(Message), ) -> Result(#(Reference, SingleConnection), QueryError) +@external(erlang, "pog_ffi", "cleanup_checkout_interceptor") +fn cleanup_checkout_interceptor(conn: SingleConnection) -> Nil + @external(erlang, "pgo", "checkin") -fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic +fn do_checkin(ref: Reference, conn: SingleConnection) -> Dynamic + +fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic { + cleanup_checkout_interceptor(conn) + do_checkin(ref, conn) +} pub fn nullable(inner_type: fn(a) -> Value, value: Option(a)) -> Value { case value { @@ -498,6 +579,59 @@ pub type Returned(t) { Returned(count: Int, rows: List(t)) } +/// Query interceptor for logging, observability, and testing. +/// +/// Interceptors are called before query execution and can: +/// - Log queries for debugging and auditing +/// - Collect metrics and performance data +/// - Return mock data for testing (without a database) +/// +/// The interceptor receives information about the query and can either +/// continue with normal execution or provide an alternative result. +/// +/// ## Example - Query logging +/// +/// ```gleam +/// import gleam/io +/// +/// let logger = Interceptor(fn(request) { +/// io.println("Executing: " <> request.sql) +/// Continue +/// }) +/// +/// pog.default_config(name) +/// |> pog.interceptor(Some(logger)) +/// |> pog.start +/// ``` +/// +pub type Interceptor { + Interceptor(intercept: fn(InterceptRequest) -> InterceptResult) +} + +/// Information about a query being executed. +/// +/// This is passed to the interceptor function before the query runs. +pub type InterceptRequest { + InterceptRequest(sql: String, parameters: List(Value), timeout: Int) +} + +/// The result returned by an interceptor. +/// +/// - `Continue`: Execute the query normally against the database +/// - `Respond`: Return this data instead of querying the database +/// - `Fail`: Return this error instead of querying the database +/// - `Capture`: Execute the query and capture the raw result for recording +pub type InterceptResult { + /// Execute the query normally + Continue + /// Return data without querying the database + Respond(count: Int, rows: List(Dynamic)) + /// Return an error without querying the database + Fail(error: QueryError) + /// Execute query and capture result for recording + Capture(on_result: fn(Result(#(Int, List(Dynamic)), QueryError)) -> Nil) +} + @external(erlang, "pog_ffi", "query") fn run_query( a: Connection, @@ -584,6 +718,75 @@ pub fn execute( on pool: Connection, ) -> Result(Returned(t), QueryError) { let parameters = list.reverse(query.parameters) + + case pool { + Disconnected(interceptor) -> { + // Disconnected connection - route through interceptor only + let request = + InterceptRequest( + sql: query.sql, + parameters: parameters, + timeout: query.timeout, + ) + + case interceptor.intercept(request) { + Continue -> Error(ConnectionUnavailable) + Respond(count:, rows:) -> + decode_intercepted_rows(count, rows, query.row_decoder) + Fail(error:) -> Error(error) + Capture(_) -> Error(ConnectionUnavailable) + } + } + + Pool(_) | SingleConnection(_) -> { + // Connected pool or connection - check for interceptor + case get_pool_interceptor(pool) { + Some(interceptor) -> { + let request = + InterceptRequest( + sql: query.sql, + parameters: parameters, + timeout: query.timeout, + ) + + case interceptor.intercept(request) { + Continue -> execute_query(pool, query, parameters) + Respond(count:, rows:) -> + decode_intercepted_rows(count, rows, query.row_decoder) + Fail(error:) -> Error(error) + Capture(on_result:) -> { + // Execute real query + let result = run_query(pool, query.sql, parameters, query.timeout) + // Let interceptor capture the raw result + on_result(result) + // Decode and return normally + case result { + Ok(#(count, rows)) -> { + use decoded <- result.try( + list.try_map(over: rows, with: decode.run( + _, + query.row_decoder, + )) + |> result.map_error(UnexpectedResultType), + ) + Ok(Returned(count, decoded)) + } + Error(err) -> Error(err) + } + } + } + } + None -> execute_query(pool, query, parameters) + } + } + } +} + +fn execute_query( + pool: Connection, + query: Query(t), + parameters: List(Value), +) -> Result(Returned(t), QueryError) { use #(count, rows) <- result.try(run_query( pool, query.sql, @@ -597,6 +800,21 @@ pub fn execute( Ok(Returned(count, rows)) } +fn decode_intercepted_rows( + count: Int, + rows: List(Dynamic), + decoder: Decoder(t), +) -> Result(Returned(t), QueryError) { + use decoded <- result.try( + list.try_map(over: rows, with: decode.run(_, decoder)) + |> result.map_error(UnexpectedResultType), + ) + Ok(Returned(count, decoded)) +} + +@external(erlang, "pog_ffi", "get_pool_interceptor") +fn get_pool_interceptor(pool: Connection) -> Option(Interceptor) + /// Get the name for a PostgreSQL error code. /// /// ```gleam diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 9c486a2..2f6bcfa 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,10 +1,24 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, get_pool_interceptor/1, get_pool_interceptor_safe/1, set_pool_interceptor/2, cleanup_checkout_interceptor/1, convert_error/1]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). +-define(INTERCEPTOR_TABLE, pog_interceptor_table). + +ensure_interceptor_table() -> + case ets:info(?INTERCEPTOR_TABLE) of + undefined -> + ets:new( + ?INTERCEPTOR_TABLE, + [named_table, public, set, {heir, whereis(init), undefined}] + ), + ok; + _ -> + ok + end. + null() -> null. @@ -54,8 +68,14 @@ start(Config) -> idle_interval = IdleInterval, trace = Trace, ip_version = IpVersion, - rows_as_map = RowsAsMap + rows_as_map = RowsAsMap, + interceptor = Interceptor } = Config, + % Store interceptor if present + case Interceptor of + {some, I} -> set_pool_interceptor(PoolName, I); + none -> ok + end, {SslActivated, SslOptions} = default_ssl_options(Host, Ssl), Options1 = #{ host => Host, @@ -112,7 +132,14 @@ query_extended(Conn, Sql) -> checkout(Name) when is_atom(Name) -> case pgo:checkout(Name) of - {ok, Ref, Conn} -> {ok, {Ref, Conn}}; + {ok, Ref, Conn} -> + % Copy pool's interceptor to connection for transaction support + ensure_interceptor_table(), + case ets:lookup(?INTERCEPTOR_TABLE, Name) of + [{Name, Interceptor}] -> put({pog_conn_interceptor, Conn}, Interceptor); + _ -> ok + end, + {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} end. @@ -140,4 +167,65 @@ convert_error(#{ Got = list_to_binary(io_lib:format("~p", [Value])), {unexpected_argument_type, Expected, Got}; convert_error(closed) -> - query_timeout. + query_timeout; +%% Catch-all: pgo can emit shapes outside the documented set +%% (e.g. {pgo_error, _}, {unexpected_message, _}, client_disconnected, +%% client_timeout, ssl_refused, {unimplemented, _}, etc.). Without this +%% clause those shapes raise function_clause and crash the calling +%% process — which previously took down audit_service and other +%% long-lived service actors. We map them all to connection_unavailable +%% so callers see a known, recoverable variant. +%% +%% Emits a structured `logger:warning` so consumers with a structured +%% formatter (e.g. JSON) get queryable fields, while consumers with a +%% string formatter still get something readable via `~p`. We +%% intentionally do NOT set `domain` metadata: many default handlers +%% install a `no_domain` filter that drops events whose domain is set +%% to anything other than the handler's expected list, which would +%% silence this warning instead of routing it. The error term is +%% captured as a binary so it can't tunnel through a JSON formatter as +%% a raw Erlang term. +convert_error(Other) -> + logger:warning(#{ + component => pog_convert_error, + event => unhandled_driver_error, + error_term => list_to_binary(io_lib:format("~p", [Other])) + }), + connection_unavailable. + +%% Interceptor support +%% Store interceptor in ETS keyed by pool name +set_pool_interceptor(PoolName, Interceptor) when is_atom(PoolName) -> + ensure_interceptor_table(), + ets:insert(?INTERCEPTOR_TABLE, {PoolName, Interceptor}), + nil. + +%% Get interceptor for a connection +get_pool_interceptor(Connection) -> + case Connection of + {pool, Name} -> + ensure_interceptor_table(), + case ets:lookup(?INTERCEPTOR_TABLE, Name) of + [{Name, Interceptor}] -> {some, Interceptor}; + _ -> none + end; + {single_connection, Conn} -> + % For single connections (from transactions), lookup by connection handle + case get({pog_conn_interceptor, Conn}) of + undefined -> none; + Interceptor -> {some, Interceptor} + end + end. + +get_pool_interceptor_safe(Connection) -> + case Connection of + {pool, _} -> get_pool_interceptor(Connection); + {single_connection, _} -> get_pool_interceptor(Connection); + {disconnected, _} -> none; + _ -> none + end. + +%% Cleanup interceptor reference when connection is checked back in +cleanup_checkout_interceptor(Conn) -> + erase({pog_conn_interceptor, Conn}), + nil. diff --git a/test/pog_ffi_test.erl b/test/pog_ffi_test.erl new file mode 100644 index 0000000..62d32e6 --- /dev/null +++ b/test/pog_ffi_test.erl @@ -0,0 +1,117 @@ +%%% Regression tests for `pog_ffi:convert_error/1`. +%%% +%%% Background: a previous version had no catch-all clause. Any pgo error +%%% term outside the documented set raised `function_clause`, which +%%% propagated up through `pog:query` and crashed the calling Gleam +%%% actor. In the proxy this manifested as long-lived service actors +%%% (audit_service, token_usage_writer, pricing_cache) all dying when +%%% the driver hit an undocumented error path. +%%% +%%% These tests pin two invariants: +%%% 1. Documented shapes still map to their specific QueryError variants +%%% (regression guard: don't accidentally re-route them through the +%%% catch-all). +%%% 2. Previously-fatal shapes now map to `connection_unavailable` +%%% and do NOT raise. +%%% +%%% The third test (actor_survives) is the high-signal test: it proves +%%% the catch-all does what it's supposed to do at the layer that +%%% matters (process survival), not just at the FFI return-value layer. + +-module(pog_ffi_test). + +-include_lib("eunit/include/eunit.hrl"). + +%% -------------------------------------------------------------------- +%% Documented shapes still map correctly. +%% -------------------------------------------------------------------- + +known_none_available_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(none_available)). + +known_closed_test() -> + ?assertEqual(query_timeout, pog_ffi:convert_error(closed)). + +known_unexpected_argument_count_test() -> + ?assertEqual({unexpected_argument_count, 2, 3}, + pog_ffi:convert_error({pgo_protocol, {parameters, 2, 3}})). + +known_pgsql_error_with_constraint_test() -> + ErrorTerm = {pgsql_error, #{ + message => <<"duplicate key">>, + constraint => <<"users_email_key">>, + detail => <<"Key (email)=(x) already exists.">> + }}, + ?assertEqual({constraint_violated, + <<"duplicate key">>, + <<"users_email_key">>, + <<"Key (email)=(x) already exists.">>}, + pog_ffi:convert_error(ErrorTerm)). + +%% -------------------------------------------------------------------- +%% Catch-all: previously-fatal shapes now return connection_unavailable. +%% +%% NOTE: this test is non-exhaustive by definition — pgo can introduce +%% new shapes any time. The point isn't to enumerate every possible +%% term; it's to assert that *some specific shapes that used to crash* +%% no longer do, and that the catch-all is wired up. +%% -------------------------------------------------------------------- + +catchall_pgo_error_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({pgo_error, some_inner})). + +catchall_client_disconnected_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(client_disconnected)). + +catchall_unexpected_message_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({unexpected_message, foo})). + +catchall_ssl_refused_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(ssl_refused)). + +catchall_unimplemented_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({unimplemented, sasl_server_final})). + +catchall_arbitrary_atom_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error(some_atom_we_have_never_seen)). + +catchall_arbitrary_tuple_test() -> + ?assertEqual(connection_unavailable, + pog_ffi:convert_error({a, b, c, d, e})). + +%% -------------------------------------------------------------------- +%% The high-signal test: a process calling convert_error/1 with a +%% previously-fatal shape exits NORMALLY rather than crashing with +%% function_clause. Without the catch-all this monitor would receive +%% {'DOWN', _, process, _, {function_clause, _}}. +%% -------------------------------------------------------------------- + +actor_survives_unmatched_shape_test() -> + Self = self(), + {Pid, MonRef} = spawn_monitor(fun() -> + connection_unavailable = pog_ffi:convert_error({pgo_error, x}), + connection_unavailable = pog_ffi:convert_error(client_disconnected), + connection_unavailable = pog_ffi:convert_error(ssl_refused), + Self ! {self(), all_calls_returned} + end), + receive + {Pid, all_calls_returned} -> ok + after + 2000 -> + ?assert(false) %% process never reported back + end, + receive + {'DOWN', MonRef, process, Pid, normal} -> ok; + {'DOWN', MonRef, process, Pid, OtherReason} -> + ?assertEqual(normal, OtherReason) + after + 2000 -> + ?assert(false) %% no DOWN message — monitor missed exit + end. diff --git a/test/pog_test.gleam b/test/pog_test.gleam index 3a6f7ad..ed6f350 100644 --- a/test/pog_test.gleam +++ b/test/pog_test.gleam @@ -1,4 +1,5 @@ import exception +import gleam/dynamic import gleam/dynamic/decode.{type Decoder} import gleam/erlang/process import gleam/option.{None, Some} @@ -615,3 +616,96 @@ pub fn transaction_commit_test() { disconnect(db) } + +// Interceptor tests + +pub fn interceptor_logging_test() { + let interceptor = + pog.Interceptor(fn(request) { + let _ = request.sql + pog.Continue + }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + + assert config.interceptor == Some(interceptor) +} + +pub fn interceptor_respond_with_data_test() { + let test_data = dynamic.array([dynamic.int(42), dynamic.string("test")]) + let interceptor = pog.Interceptor(fn(_request) { pog.Respond(1, [test_data]) }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let decoder = { + use id <- decode.field(0, decode.int) + use name <- decode.field(1, decode.string) + decode.success(#(id, name)) + } + + let result = + pog.query("SELECT id, name FROM users") + |> pog.returning(decoder) + |> pog.execute(started.data) + + assert result == Ok(pog.Returned(1, [#(42, "test")])) + + disconnect(started) +} + +pub fn interceptor_fail_with_error_test() { + let interceptor = pog.Interceptor(fn(_request) { pog.Fail(pog.ConnectionUnavailable) }) + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let result = + pog.query("SELECT * FROM users") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result == Error(pog.ConnectionUnavailable) + + disconnect(started) +} + +pub fn no_interceptor_is_default_test() { + let name = process.new_name("pog_test") + let config = default_config(name) + + assert config.interceptor == None +} + +pub fn interceptor_can_inspect_sql_test() { + let interceptor = + pog.Interceptor(fn(request) { + case request.sql { + "SELECT 1" -> pog.Respond(1, [dynamic.array([dynamic.int(1)])]) + "SELECT 2" -> pog.Respond(1, [dynamic.array([dynamic.int(2)])]) + _ -> pog.Continue + } + }) + + let name = process.new_name("pog_test") + let config = pog.Config(..default_config(name), interceptor: Some(interceptor)) + let assert Ok(started) = pog.start(config) + + let result1 = + pog.query("SELECT 1") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result1 == Ok(pog.Returned(1, [1])) + + let result2 = + pog.query("SELECT 2") + |> pog.returning(decode.at([0], decode.int)) + |> pog.execute(started.data) + + assert result2 == Ok(pog.Returned(1, [2])) + + disconnect(started) +}