Skip to content
Open
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,63 @@ pub fn run(db: pog.Connection) {
}
```

## Query Interceptors

Interceptors allow you to observe, log, or modify query execution. They're called before each query runs and can:

- **Log queries** for debugging and auditing
- **Collect metrics** for performance monitoring
- **Return test data** for testing without a database

### Example - Query Logging

```gleam
import gleam/io
import pog

let logger = pog.Interceptor(fn(request) {
io.println("SQL: " <> request.sql)
pog.Continue // Execute normally
})

pog.default_config(pool_name)
|> pog.interceptor(Some(logger))
|> pog.start
```

### Example - Testing Without a Database

```gleam
import gleam/dynamic

let test_data = dynamic.array([
dynamic.int(1),
dynamic.string("Alice"),
dynamic.string("alice@example.com"),
])

let test_interceptor = pog.Interceptor(fn(request) {
case request.sql {
"SELECT * FROM users WHERE id = $1" ->
pog.Respond(count: 1, rows: [test_data])
_ ->
pog.Continue
}
})

pog.default_config(pool_name)
|> pog.interceptor(Some(test_interceptor))
|> pog.start
```

### Interceptor Results

Your interceptor function returns an `InterceptResult`:

- `Continue` - Execute the query normally against the database
- `Respond(count: Int, rows: List(Dynamic))` - Return this data without querying
- `Fail(error: QueryError)` - Return this error without querying

## Support of connection URI

Configuring a Postgres connection is done by using `Config` type in `pog`.
Expand Down
220 changes: 219 additions & 1 deletion src/pog.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const default_port: Int = 5432
pub opaque type Connection {
Pool(Name(Message))
SingleConnection(SingleConnection)
Disconnected(Interceptor)
}

type SingleConnection
Expand All @@ -42,6 +43,29 @@ pub fn named_connection(name: Name(Message)) -> Connection {
Pool(name)
}

/// Create a connection that uses an interceptor without a database.
///
/// No database connection is established. All queries are routed to the
/// interceptor. The interceptor must handle all queries - returning `Continue`
/// will result in a `ConnectionUnavailable` error.
///
/// ## Example
///
/// ```gleam
/// let interceptor = Interceptor(fn(request) {
/// case request.sql {
/// "SELECT * FROM users" -> Respond(1, [user_data])
/// _ -> Fail(PostgresqlError("UNSUPPORTED", "unsupported", "Query not supported"))
/// }
/// })
///
/// let connection = pog.disconnected(interceptor)
/// ```
///
pub fn disconnected(interceptor: Interceptor) -> Connection {
Disconnected(interceptor)
}

/// The configuration for a pool of connections.
pub type Config {
Config(
Expand Down Expand Up @@ -85,6 +109,8 @@ pub type Config {
/// (default: False) By default, pgo will return a n-tuple, in the order of the query.
/// By setting `rows_as_map` to `True`, the result will be `Dict`.
rows_as_map: Bool,
/// (default: None): Optional query interceptor for logging, observability, or testing.
interceptor: Option(Interceptor),
)
}

Expand Down Expand Up @@ -210,6 +236,44 @@ pub fn rows_as_map(config: Config, rows_as_map: Bool) -> Config {
Config(..config, rows_as_map:)
}

/// Set a query interceptor for logging, observability, or testing.
///
/// The interceptor is called before each query execution. Common use cases:
///
/// **Logging**: Log all SQL queries for debugging and auditing
/// ```gleam
/// import gleam/io
///
/// let logger = Interceptor(fn(req) {
/// io.println("SQL: " <> req.sql)
/// Continue
/// })
///
/// config |> pog.interceptor(Some(logger))
/// ```
///
/// **Metrics**: Track query performance
/// ```gleam
/// let metrics = Interceptor(fn(req) {
/// metrics.record_query(req.sql, req.timeout)
/// Continue
/// })
/// ```
///
/// **Testing**: Return test data without a database
/// ```gleam
/// let test_interceptor = Interceptor(fn(req) {
/// case req.sql {
/// "SELECT * FROM users" -> Respond(1, [test_user_data])
/// _ -> Continue
/// }
/// })
/// ```
///
pub fn interceptor(config: Config, interceptor: Option(Interceptor)) -> Config {
Config(..config, interceptor:)
}

/// The internet protocol version to use.
pub type IpVersion {
/// Internet Protocol version 4 (IPv4)
Expand Down Expand Up @@ -238,6 +302,7 @@ pub fn default_config(pool_name pool_name: Name(Message)) -> Config {
trace: False,
ip_version: Ipv4,
rows_as_map: False,
interceptor: None,
)
}

Expand Down Expand Up @@ -428,6 +493,14 @@ pub fn transaction(
callback: fn(Connection) -> Result(t, error),
) -> Result(t, TransactionError(error)) {
case pool {
Disconnected(interceptor) -> {
// For disconnected connections (playback mode), call the callback
// with the disconnected connection so interceptor can serve recorded queries
case callback(Disconnected(interceptor)) {
Ok(value) -> Ok(value)
Error(err) -> Error(TransactionRolledBack(err))
}
}
SingleConnection(conn) -> {
transaction_layer(conn, callback)
}
Expand Down Expand Up @@ -483,8 +556,16 @@ fn checkout(
pool: Name(Message),
) -> Result(#(Reference, SingleConnection), QueryError)

@external(erlang, "pog_ffi", "cleanup_checkout_interceptor")
fn cleanup_checkout_interceptor(conn: SingleConnection) -> Nil

@external(erlang, "pgo", "checkin")
fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic
fn do_checkin(ref: Reference, conn: SingleConnection) -> Dynamic

fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic {
cleanup_checkout_interceptor(conn)
do_checkin(ref, conn)
}

pub fn nullable(inner_type: fn(a) -> Value, value: Option(a)) -> Value {
case value {
Expand All @@ -498,6 +579,59 @@ pub type Returned(t) {
Returned(count: Int, rows: List(t))
}

/// Query interceptor for logging, observability, and testing.
///
/// Interceptors are called before query execution and can:
/// - Log queries for debugging and auditing
/// - Collect metrics and performance data
/// - Return mock data for testing (without a database)
///
/// The interceptor receives information about the query and can either
/// continue with normal execution or provide an alternative result.
///
/// ## Example - Query logging
///
/// ```gleam
/// import gleam/io
///
/// let logger = Interceptor(fn(request) {
/// io.println("Executing: " <> request.sql)
/// Continue
/// })
///
/// pog.default_config(name)
/// |> pog.interceptor(Some(logger))
/// |> pog.start
/// ```
///
pub type Interceptor {
Interceptor(intercept: fn(InterceptRequest) -> InterceptResult)
}

/// Information about a query being executed.
///
/// This is passed to the interceptor function before the query runs.
pub type InterceptRequest {
InterceptRequest(sql: String, parameters: List(Value), timeout: Int)
}

/// The result returned by an interceptor.
///
/// - `Continue`: Execute the query normally against the database
/// - `Respond`: Return this data instead of querying the database
/// - `Fail`: Return this error instead of querying the database
/// - `Capture`: Execute the query and capture the raw result for recording
pub type InterceptResult {
/// Execute the query normally
Continue
/// Return data without querying the database
Respond(count: Int, rows: List(Dynamic))
/// Return an error without querying the database
Fail(error: QueryError)
/// Execute query and capture result for recording
Capture(on_result: fn(Result(#(Int, List(Dynamic)), QueryError)) -> Nil)
}

@external(erlang, "pog_ffi", "query")
fn run_query(
a: Connection,
Expand Down Expand Up @@ -584,6 +718,75 @@ pub fn execute(
on pool: Connection,
) -> Result(Returned(t), QueryError) {
let parameters = list.reverse(query.parameters)

case pool {
Disconnected(interceptor) -> {
// Disconnected connection - route through interceptor only
let request =
InterceptRequest(
sql: query.sql,
parameters: parameters,
timeout: query.timeout,
)

case interceptor.intercept(request) {
Continue -> Error(ConnectionUnavailable)
Respond(count:, rows:) ->
decode_intercepted_rows(count, rows, query.row_decoder)
Fail(error:) -> Error(error)
Capture(_) -> Error(ConnectionUnavailable)
}
}

Pool(_) | SingleConnection(_) -> {
// Connected pool or connection - check for interceptor
case get_pool_interceptor(pool) {
Some(interceptor) -> {
let request =
InterceptRequest(
sql: query.sql,
parameters: parameters,
timeout: query.timeout,
)

case interceptor.intercept(request) {
Continue -> execute_query(pool, query, parameters)
Respond(count:, rows:) ->
decode_intercepted_rows(count, rows, query.row_decoder)
Fail(error:) -> Error(error)
Capture(on_result:) -> {
// Execute real query
let result = run_query(pool, query.sql, parameters, query.timeout)
// Let interceptor capture the raw result
on_result(result)
// Decode and return normally
case result {
Ok(#(count, rows)) -> {
use decoded <- result.try(
list.try_map(over: rows, with: decode.run(
_,
query.row_decoder,
))
|> result.map_error(UnexpectedResultType),
)
Ok(Returned(count, decoded))
}
Error(err) -> Error(err)
}
}
}
}
None -> execute_query(pool, query, parameters)
}
}
}
}

fn execute_query(
pool: Connection,
query: Query(t),
parameters: List(Value),
) -> Result(Returned(t), QueryError) {
use #(count, rows) <- result.try(run_query(
pool,
query.sql,
Expand All @@ -597,6 +800,21 @@ pub fn execute(
Ok(Returned(count, rows))
}

fn decode_intercepted_rows(
count: Int,
rows: List(Dynamic),
decoder: Decoder(t),
) -> Result(Returned(t), QueryError) {
use decoded <- result.try(
list.try_map(over: rows, with: decode.run(_, decoder))
|> result.map_error(UnexpectedResultType),
)
Ok(Returned(count, decoded))
}

@external(erlang, "pog_ffi", "get_pool_interceptor")
fn get_pool_interceptor(pool: Connection) -> Option(Interceptor)

/// Get the name for a PostgreSQL error code.
///
/// ```gleam
Expand Down
Loading