diff --git a/DEPLOY.md b/DEPLOY.md index 9d2b672f..9cb70a82 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -239,6 +239,21 @@ All standard Spring Boot properties can be passed as command-line flags or envir | `DB_USERNAME` | Database user | | `DB_PASSWORD` | Database password | +#### RabbitMQ configuration (CQRS event bus) + +| Environment Variable | Description | +|----------------------|-------------| +| `RABBITMQ_HOST` | RabbitMQ hostname (default: `localhost`) | +| `RABBITMQ_PORT` | RabbitMQ AMQP port (default: `5672`) | +| `RABBITMQ_USERNAME` | RabbitMQ user (default: `guest`) | +| `RABBITMQ_PASSWORD` | RabbitMQ password (default: `guest`) | + +#### Event publisher + +| Environment Variable | Description | +|----------------------|-------------| +| `FACTSTORE_EVENTS_PUBLISHER` | `logging` (default), `rabbitmq` (production CQRS), `inmemory` (tests), `none` | + #### HashiCorp Vault integration | Environment Variable | Description | @@ -260,6 +275,94 @@ All standard Spring Boot properties can be passed as command-line flags or envir | `GITHUB_CLIENT_ID` | GitHub OAuth app client ID | | `GITHUB_CLIENT_SECRET` | GitHub OAuth app client secret | +--- + +## CQRS Deployment (Dual-Service Architecture) + +For production, Factstore runs as two separate services sharing a RabbitMQ event bus: + +### Network Topology + +``` + ┌──────────────┐ + │ Clients │ + └──────┬───────┘ + POST/PUT/DELETE │ GET + ┌──────────────────┴────────────────┐ + ▼ ▼ + ┌───────────────┐ ┌───────────────┐ + │ Command :8080 │──── RabbitMQ ────►│ Query :8081 │ + └───────┬───────┘ └───────┬───────┘ + │ │ + ┌───────▼───────┐ ┌───────▼───────┐ + │ PostgreSQL │ │ PostgreSQL │ + │ (Write DB) │ │ (Read DB) │ + │ :5432 │ │ :5433 │ + └───────────────┘ └───────────────┘ +``` + +### Docker Compose (recommended) + +```bash +docker compose up --build +``` + +This starts: +- **postgres-command** — Write database (port 5432) +- **postgres-query** — Read database (port 5433) +- **rabbitmq** — Event bus (AMQP 5672, Management UI 15672) +- **backend-command** — Command service (port 8080) +- **backend-query** — Query service (port 8081) + +### Event-Driven Synchronization + +1. A command (POST/PUT/DELETE) arrives at the **Command service** (:8080) +2. The command handler persists the entity + appends a domain event to the event store +3. The `EventAppender` publishes the event to the `IDomainEventBus` (RabbitMQ) +4. The **Query service** (:8081) `RabbitMqEventConsumer` receives the event +5. The `ReadModelProjector` applies the event to the read database + +### CLI Configuration + +The CLI supports separate hosts for read and write operations: + +```bash +# Configure with separate command and query hosts +factstore configure +# Or use flags: +factstore --host https://command.example.com --query-host https://query.example.com flows list +# Or environment variables: +export FACTSTORE_HOST=https://command.example.com +export FACTSTORE_QUERY_HOST=https://query.example.com +``` + +When `--query-host` is set, GET requests are routed to the query service and all other requests to the command service. + +### Post-Deployment Verification + +```bash +# 1. Verify command service health +curl -fs http://localhost:8080/actuator/health + +# 2. Verify query service health +curl -fs http://localhost:8081/actuator/health + +# 3. Create a flow via command service and verify it appears on query service +curl -X POST http://localhost:8080/api/v2/flows \ + -H 'Content-Type: application/json' \ + -d '{"name":"verify-cqrs","description":"Post-deployment verification"}' + +# Wait for event propagation (typically < 1 second) +sleep 2 + +# 4. Read the flow from query service +curl -s http://localhost:8081/api/v2/flows | grep verify-cqrs + +# 5. Verify RabbitMQ is healthy +curl -fs http://localhost:15672/api/healthchecks/node \ + -u guest:guest +``` + ### Docker environment variable example ```bash diff --git a/README.md b/README.md index b4fd798a..1a88ff56 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,10 @@ cd OpenFactstore docker compose up --build ``` -- **API** → http://localhost:8080 +- **Command API** → http://localhost:8080 +- **Query API** → http://localhost:8081 - **Swagger UI** → http://localhost:8080/swagger-ui.html +- **RabbitMQ Management** → http://localhost:15672 (guest / guest) - **Grafana** → http://localhost:3000 (admin / changeme) --- @@ -91,54 +93,59 @@ When a software artifact is built, a **trail** captures provenance metadata (Git ## Architecture -Factstore is built on **Hexagonal Architecture** (Ports and Adapters) with a **CQRS + Event Sourcing** split. The core business logic is fully isolated from external systems. Dependencies always point **inward**: adapters depend on ports, ports depend on the domain — never the other way around. +Factstore is built on **Hexagonal Architecture** (Ports and Adapters) with a **fully decoupled CQRS + Event Sourcing** design. The core business logic is fully isolated from external systems. Dependencies always point **inward**: adapters depend on ports, ports depend on the domain — never the other way around. -The **Write** path accepts commands via v2 REST controllers, validates business rules, persists state, and appends immutable domain events to an append-only **Event Log**. The **Read** path serves queries from optimised read models. An **Event Projector** can replay the event log to rebuild read-model state from scratch or catch up incrementally. +The **Command (Write) service** accepts mutations via v2 REST controllers, validates business rules, persists state, and appends immutable domain events to an append-only **Event Log**. Every state-changing event is published to a **Domain Event Bus** (RabbitMQ in production, in-memory in tests) for consumption by the Read side. The **Query (Read) service** consumes events from the bus, projects them into its own database, and serves queries from optimised read models. ``` ┌─────────────────────────────────────────────────────────────────┐ │ Frontend (Vue 3 SPA) │ │ Browser ─► Vite Dev Server :5173 │ -└──────────────────────────────┬──────────────────────────────────┘ - │ HTTP / REST (Axios) -┌──────────────────────────────▼──────────────────────────────────┐ -│ Backend (Spring Boot :8080) │ -│ │ -│ ┌─────────────────────────────────────────────────────────┐ │ -│ │ DRIVING ADAPTERS (Inbound) │ │ -│ │ adapter/inbound/web/command/ (v2 Command Controllers) │ │ -│ │ adapter/inbound/web/query/ (v2 Query Controllers) │ │ -│ │ adapter/inbound/web/ (v1 REST Controllers) │ │ -│ └────────────────┬───────────────────┬────────────────────┘ │ -│ Commands │ │ Queries │ -│ ┌────────────────▼───────────┐ ┌────▼────────────────────┐ │ -│ │ COMMAND HANDLERS (Write) │ │ QUERY HANDLERS (Read) │ │ -│ │ application/command/ │ │ application/query/ │ │ -│ │ (FlowCommandHandler, …) │ │ (FlowQueryHandler, …) │ │ -│ └──────┬──────────┬──────────┘ └──────────┬──────────────┘ │ -│ │ save │ append event │ read │ -│ ┌──────▼──────┐ ┌─▼────────────────┐ ┌─────▼──────────────┐ │ -│ │ JPA Entity │ │ Event Store │ │ Read Repositories │ │ -│ │ Repositories│ │ (IEventStore) │ │ (Read ports) │ │ -│ └──────┬──────┘ └─┬────────────────┘ └─────┬──────────────┘ │ -│ │ │ │ │ -│ ┌──────▼──────────▼─────────────────────────▼──────────────┐ │ -│ │ DRIVEN ADAPTERS (Outbound) │ │ -│ │ adapter/outbound/persistence/ (JPA + EventStoreAdapter) │ │ -│ └──────────┬──────────────────────────────────────────────┘ │ -│ │ │ -│ ┌──────────▼──────────────────────────────────────────────┐ │ -│ │ EVENT PROJECTOR (application/EventProjector) │ │ -│ │ Replays event log → rebuilds read-model state │ │ -│ └─────────────────────────────────────────────────────────┘ │ -└─────────────┬────────────────────────────────────────────────────┘ - │ JDBC -┌─────────────▼────────────────────────────────────────────────────┐ -│ PostgreSQL Database │ -│ Entity tables (flows, trails, …) + domain_events (event log) │ -└──────────────────────────────────────────────────────────────────┘ +└──────────────────────┬──────────────────────┬───────────────────┘ + POST/PUT/DELETE │ │ GET (reads) +┌──────────────────────▼───────────┐ ┌───────▼──────────────────────┐ +│ COMMAND SERVICE (:8080) │ │ QUERY SERVICE (:8081) │ +│ spring.profiles.active=prod │ │ spring.profiles.active=prod │ +│ FACTSTORE_CQRS_ROLE=command │ │ FACTSTORE_CQRS_ROLE=query │ +│ │ │ (read-only — rejects POST/ │ +│ ┌─── DRIVING ADAPTERS ───────┐ │ │ PUT/PATCH/DELETE via filter)│ +│ │ v2 Command Controllers │ │ │ ┌─── DRIVING ADAPTERS ───┐ │ +│ │ v1 REST Controllers │ │ │ │ v2 Query Controllers │ │ +│ └───────────┬────────────────┘ │ │ │ v1 REST Controllers │ │ +│ └───────────┬────────────────┘ │ │ │ RabbitMQ Consumer │ │ +│ │ Commands │ │ └───────┬────────────────┘ │ +│ ┌───────────▼───────────────┐ │ │ │ Queries │ +│ │ COMMAND HANDLERS (Write) │ │ │ ┌───────▼──────────────┐ │ +│ │ FlowCommandHandler, … │ │ │ │ QUERY HANDLERS │ │ +│ │ EventAppender │ │ │ │ FlowQueryHandler, … │ │ +│ └──────┬──────────┬─────────┘ │ │ │ ReadModelProjector │ │ +│ save │ append │ publish │ │ └──────────┬───────────┘ │ +│ ┌──────▼──┐ ┌─────▼────────┐ │ │ read │ │ +│ │ JPA │ │ Event Store │ │ │ ┌──────────▼───────────┐ │ +│ │ Repos │ │ (IEventStore)│ │ │ │ Read Repositories │ │ +│ └────┬────┘ └──────────────┘ │ │ └──────────┬───────────┘ │ +│ │ │ │ │ │ +│ ┌────▼─────────────────────┐ │ │ ┌──────────▼───────────┐ │ +│ │ JPA / EventStoreAdapter │ │ │ │ JPA Persistence │ │ +│ └────┬─────────────────────┘ │ │ └──────────┬───────────┘ │ +└───────┼──────────────────────────┘ └─────────────┼───────────────┘ + │ JDBC ▲ AMQP │ JDBC +┌───────▼──────┐ ┌───────────┴───────┐ ┌───────▼──────┐ +│ PostgreSQL │ │ RabbitMQ │ │ PostgreSQL │ +│ (Write DB) │ │ (Event Bus) │ │ (Read DB) │ +│ :5432 │ │ :5672 / :15672 │ │ :5433 │ +└──────────────┘ └───────────────────┘ └──────────────┘ ``` +### Deployment Profiles + +| Profile | Database | Event Bus | Use Case | +|---------|----------|-----------|----------| +| `prod` | Dual PostgreSQL | RabbitMQ | Production / staging | +| `test` | Dual H2 (in-memory) | In-memory (Spring events) | Integration tests | +| `local` | Single PostgreSQL | Logging (no-op) | Local development | +| *(default)* | Single PostgreSQL | Logging | Backward-compatible single-instance | + ### Why Hexagonal Architecture + Event Sourcing? - **Swap storage backends without touching logic.** Replace the H2 JPA adapter with a PostgreSQL or Vector DB adapter by writing a new `IFlowRepository` implementation — zero changes to `FlowService`. @@ -160,18 +167,21 @@ com.factstore/ │ │ └── query/ ← Query handler interfaces (IFlowQueryHandler, …) │ └── outbound/ │ ├── read/ ← Read-model repository interfaces -│ └── … ← Write-model repository + IEventStore port +│ └── … ← Write-model repository + IEventStore + IDomainEventBus ports ├── application/ │ ├── command/ ← Command handlers + EventAppender │ └── query/ ← Query handlers -│ └── EventProjector ← Replays event log to rebuild read models +│ ├── EventProjector ← Replays event log to rebuild read models +│ └── ReadModelProjector ← Applies domain events to read DB entities ├── adapter/ │ ├── inbound/ -│ │ └── web/ -│ │ ├── command/ ← v2 Command REST controllers -│ │ └── query/ ← v2 Query REST controllers +│ │ ├── web/ +│ │ │ ├── command/ ← v2 Command REST controllers +│ │ │ └── query/ ← v2 Query REST controllers +│ │ └── messaging/ ← RabbitMqEventConsumer + InMemoryEventListener │ └── outbound/ -│ └── persistence/ ← JPA adapters: entity repos + EventStoreAdapter +│ ├── persistence/ ← JPA adapters: entity repos + EventStoreAdapter +│ └── events/ ← Event bus adapters: RabbitMQ, InMemory, Noop ├── dto/ │ └── command/ ← Command DTOs and request objects ├── exception/ ← Domain exceptions and global error handler @@ -187,11 +197,14 @@ com.factstore/ | Command Ports | `core/port/inbound/command/` | Command handler interfaces (`IFlowCommandHandler`, …) | | Query Ports | `core/port/inbound/query/` | Query handler interfaces (`IFlowQueryHandler`, …) | | Outbound Ports | `core/port/outbound/` | Repository interfaces + `IEventStore` (append-only event log) | -| Command Handlers | `application/command/` | Write-side use cases + `EventAppender` (dual-write: JPA entity + event log) | +| Command Handlers | `application/command/` | Write-side use cases + `EventAppender` (dual-write: JPA entity + event log + domain event bus) | | Query Handlers | `application/query/` | Read-side use cases (query read-model repositories) | | Event Projector | `application/` | `EventProjector` — replays event log to rebuild read-model state | +| Read Model Projector | `application/` | `ReadModelProjector` — applies domain events to read DB entities | | Web Adapters | `adapter/inbound/web/` | REST controllers (v1 compat + v2 command/query split) | +| Messaging Adapters | `adapter/inbound/messaging/` | `RabbitMqEventConsumer` + `InMemoryEventListener` | | Persistence Adapters | `adapter/outbound/persistence/` | JPA implementations of outbound ports + `EventStoreAdapter` | +| Event Bus Adapters | `adapter/outbound/events/` | `RabbitMqDomainEventPublisher`, `InMemoryDomainEventPublisher`, `NoopDomainEventBus` | | DTO | `dto/` | Request/response objects and command DTOs | | Exception | `exception/` | Custom exceptions and global error handler | | Config | `config/` | CORS policy and OpenAPI/Swagger setup | diff --git a/backend/build.gradle.kts b/backend/build.gradle.kts index 5818b68f..d933d7cc 100644 --- a/backend/build.gradle.kts +++ b/backend/build.gradle.kts @@ -45,7 +45,9 @@ dependencies { // Vault: Spring Vault core (used when vault.enabled=true) implementation("org.springframework.vault:spring-vault-core:3.1.2") implementation("org.springframework.boot:spring-boot-starter-graphql") + implementation("org.springframework.boot:spring-boot-starter-amqp") testRuntimeOnly("com.h2database:h2") + testImplementation("org.springframework.amqp:spring-rabbit-test") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.springframework.security:spring-security-test") testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0") diff --git a/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/InMemoryEventListener.kt b/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/InMemoryEventListener.kt new file mode 100644 index 00000000..20b7e46d --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/InMemoryEventListener.kt @@ -0,0 +1,32 @@ +package com.factstore.adapter.inbound.messaging + +import com.factstore.application.ReadModelProjector +import com.factstore.core.domain.EventLogEntry +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component + +/** + * Listens for [EventLogEntry] instances published by the + * [com.factstore.adapter.outbound.events.InMemoryDomainEventPublisher] and + * feeds them to the [ReadModelProjector]. + * + * Active when `factstore.events.publisher=inmemory`. This allows + * integration tests to exercise the full CQRS event-driven projection + * pipeline without an external message broker. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "inmemory") +class InMemoryEventListener( + private val projector: ReadModelProjector +) { + + private val log = LoggerFactory.getLogger(InMemoryEventListener::class.java) + + @EventListener + fun onDomainEvent(entry: EventLogEntry) { + log.debug("In-memory event received: type={} seq={}", entry.eventType, entry.sequenceNumber) + projector.project(entry.eventType, entry.payload) + } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/RabbitMqEventConsumer.kt b/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/RabbitMqEventConsumer.kt new file mode 100644 index 00000000..b1f5f663 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/RabbitMqEventConsumer.kt @@ -0,0 +1,43 @@ +package com.factstore.adapter.inbound.messaging + +import com.factstore.application.ReadModelProjector +import com.factstore.config.RabbitMqConfig +import com.factstore.core.domain.EventLogEntry +import org.slf4j.LoggerFactory +import org.springframework.amqp.rabbit.annotation.RabbitListener +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component + +/** + * Consumes domain events from the RabbitMQ projection queue and delegates + * them to the [ReadModelProjector] to update the read database. + * + * Gated on `factstore.cqrs.role=query` so that only the query service + * instance starts consuming from the projection queue. Without this + * guard both the command and query containers (which share the same JAR + * and `prod` profile) would compete for messages, causing projections to + * execute against the wrong database. + */ +@Component +@ConditionalOnProperty(name = ["factstore.cqrs.role"], havingValue = "query") +class RabbitMqEventConsumer( + private val projector: ReadModelProjector +) { + + private val log = LoggerFactory.getLogger(RabbitMqEventConsumer::class.java) + + @RabbitListener(queues = [RabbitMqConfig.PROJECTION_QUEUE_NAME]) + fun onEvent(entry: EventLogEntry) { + try { + log.info("Received domain event from RabbitMQ: type={} seq={}", entry.eventType, entry.sequenceNumber) + val projected = projector.project(entry.eventType, entry.payload) + if (!projected) { + log.error("Failed to project event type={} seq={} — message acknowledged but data may be stale. " + + "Replay from the event store to recover.", entry.eventType, entry.sequenceNumber) + } + } catch (e: Exception) { + log.error("Unexpected error processing domain event: {}", e.message, e) + throw e + } + } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryDomainEventPublisher.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryDomainEventPublisher.kt new file mode 100644 index 00000000..efda8ef0 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryDomainEventPublisher.kt @@ -0,0 +1,31 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.core.domain.EventLogEntry +import com.factstore.core.port.outbound.IDomainEventBus +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.ApplicationEventPublisher +import org.springframework.stereotype.Component + +/** + * In-memory domain event bus backed by Spring's [ApplicationEventPublisher]. + * + * Active when `factstore.events.publisher=inmemory`. The query-side + * [com.factstore.application.ReadModelProjector] listens for these events + * and applies projections within the same JVM — perfect for integration + * tests that do not require an external message broker. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "inmemory") +class InMemoryDomainEventPublisher( + private val applicationEventPublisher: ApplicationEventPublisher +) : IDomainEventBus { + + private val log = LoggerFactory.getLogger(InMemoryDomainEventPublisher::class.java) + + override fun publish(entry: EventLogEntry) { + log.debug("Publishing domain event seq={} type={} via in-memory bus", + entry.sequenceNumber, entry.eventType) + applicationEventPublisher.publishEvent(entry) + } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryEventBus.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryEventBus.kt new file mode 100644 index 00000000..592ad184 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryEventBus.kt @@ -0,0 +1,29 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.core.port.outbound.IEventPublisher +import com.factstore.core.port.outbound.SupplyChainEvent +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.ApplicationEventPublisher +import org.springframework.stereotype.Component + +/** + * In-memory event bus backed by Spring's [ApplicationEventPublisher]. + * + * Active when `factstore.events.publisher=inmemory`. Designed for + * integration tests so they can verify event-driven projections without + * requiring an external message broker. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "inmemory") +class InMemoryEventBus( + private val applicationEventPublisher: ApplicationEventPublisher +) : IEventPublisher { + + private val log = LoggerFactory.getLogger(InMemoryEventBus::class.java) + + override fun publish(event: SupplyChainEvent) { + log.debug("Publishing event {} via in-memory bus", event.id) + applicationEventPublisher.publishEvent(event) + } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBus.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBus.kt new file mode 100644 index 00000000..595badcc --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBus.kt @@ -0,0 +1,22 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.core.domain.EventLogEntry +import com.factstore.core.port.outbound.IDomainEventBus +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component + +/** + * Default no-op domain event bus. Active when the event publisher is set + * to `logging` (the default). Command-side events are still persisted to + * the event store but are not forwarded to any query-side consumer. + * See [NoopDomainEventBusNone] for the `none` publisher profile. + */ +@Component +@ConditionalOnProperty( + name = ["factstore.events.publisher"], + havingValue = "logging", + matchIfMissing = true +) +class NoopDomainEventBus : IDomainEventBus { + override fun publish(entry: EventLogEntry) { /* no-op */ } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBusNone.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBusNone.kt new file mode 100644 index 00000000..903a5c2b --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBusNone.kt @@ -0,0 +1,16 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.core.domain.EventLogEntry +import com.factstore.core.port.outbound.IDomainEventBus +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component + +/** + * No-op domain event bus for the `none` publisher profile. + * See [NoopDomainEventBus] for the `logging` (default) profile. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "none") +class NoopDomainEventBusNone : IDomainEventBus { + override fun publish(entry: EventLogEntry) { /* no-op */ } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqDomainEventPublisher.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqDomainEventPublisher.kt new file mode 100644 index 00000000..79dba549 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqDomainEventPublisher.kt @@ -0,0 +1,37 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.config.RabbitMqConfig +import com.factstore.core.domain.EventLogEntry +import com.factstore.core.port.outbound.IDomainEventBus +import org.slf4j.LoggerFactory +import org.springframework.amqp.rabbit.core.RabbitTemplate +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component + +/** + * Publishes [EventLogEntry] records to the dedicated CQRS domain-event + * exchange so that the query service can project them into its read + * database. + * + * The full [EventLogEntry] (including `eventType` and `payload`) is sent + * as a structured JSON message, avoiding double-encoding issues that would + * occur if only the raw payload string were sent through the Jackson + * message converter. + * + * Active when `factstore.events.publisher=rabbitmq`. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "rabbitmq") +class RabbitMqDomainEventPublisher( + private val rabbitTemplate: RabbitTemplate +) : IDomainEventBus { + + private val log = LoggerFactory.getLogger(RabbitMqDomainEventPublisher::class.java) + + override fun publish(entry: EventLogEntry) { + val routingKey = "cqrs.domain.event.${entry.eventType}" + rabbitTemplate.convertAndSend(RabbitMqConfig.DOMAIN_EXCHANGE_NAME, routingKey, entry) + log.info("Published domain event seq={} type={} to RabbitMQ", + entry.sequenceNumber, entry.eventType) + } +} diff --git a/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqEventPublisher.kt b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqEventPublisher.kt new file mode 100644 index 00000000..b12bc95d --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqEventPublisher.kt @@ -0,0 +1,35 @@ +package com.factstore.adapter.outbound.events + +import com.factstore.config.RabbitMqConfig +import com.factstore.core.port.outbound.IEventPublisher +import com.factstore.core.port.outbound.SupplyChainEvent +import org.slf4j.LoggerFactory +import org.springframework.amqp.rabbit.core.RabbitTemplate +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component + +/** + * Publishes [SupplyChainEvent]s to a dedicated RabbitMQ exchange for + * external consumers (e.g. webhooks, notification pipelines). + * + * Uses a **separate exchange** from the CQRS domain-event feed + * ([RabbitMqDomainEventPublisher]) so that supply-chain messages never + * reach the projection queue and cause deserialization failures. + * + * Active when `factstore.events.publisher=rabbitmq`. + */ +@Component +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "rabbitmq") +class RabbitMqEventPublisher( + private val rabbitTemplate: RabbitTemplate +) : IEventPublisher { + + private val log = LoggerFactory.getLogger(RabbitMqEventPublisher::class.java) + + override fun publish(event: SupplyChainEvent) { + val routingKey = "${RabbitMqConfig.SUPPLY_CHAIN_ROUTING_KEY_PREFIX}${event::class.simpleName}" + rabbitTemplate.convertAndSend(RabbitMqConfig.SUPPLY_CHAIN_EXCHANGE_NAME, routingKey, event) + log.info("Published supply-chain event {} to RabbitMQ exchange={} routingKey={}", + event.id, RabbitMqConfig.SUPPLY_CHAIN_EXCHANGE_NAME, routingKey) + } +} diff --git a/backend/src/main/kotlin/com/factstore/application/EventProjector.kt b/backend/src/main/kotlin/com/factstore/application/EventProjector.kt index 709d4a47..d3b6e731 100644 --- a/backend/src/main/kotlin/com/factstore/application/EventProjector.kt +++ b/backend/src/main/kotlin/com/factstore/application/EventProjector.kt @@ -2,6 +2,7 @@ package com.factstore.application import com.factstore.core.domain.EventLogEntry import com.factstore.core.domain.event.DomainEvent +import com.factstore.core.domain.event.DomainEventRegistry import com.factstore.core.port.outbound.IEventStore import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.readValue @@ -24,15 +25,7 @@ class EventProjector( private val log = LoggerFactory.getLogger(EventProjector::class.java) - private val eventTypeMap: Map> = mapOf( - "FlowCreated" to DomainEvent.FlowCreated::class.java, - "FlowUpdated" to DomainEvent.FlowUpdated::class.java, - "FlowDeleted" to DomainEvent.FlowDeleted::class.java, - "TrailCreated" to DomainEvent.TrailCreated::class.java, - "ArtifactReported" to DomainEvent.ArtifactReported::class.java, - "AttestationRecorded" to DomainEvent.AttestationRecorded::class.java, - "EvidenceUploaded" to DomainEvent.EvidenceUploaded::class.java - ) + private val eventTypeMap = DomainEventRegistry.eventTypeMap /** * Replay every event in the store, invoking [handler] for each diff --git a/backend/src/main/kotlin/com/factstore/application/ReadModelProjector.kt b/backend/src/main/kotlin/com/factstore/application/ReadModelProjector.kt new file mode 100644 index 00000000..bf21b935 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/application/ReadModelProjector.kt @@ -0,0 +1,193 @@ +package com.factstore.application + +import com.factstore.core.domain.* +import com.factstore.core.domain.event.DomainEvent +import com.factstore.core.domain.event.DomainEventRegistry +import com.factstore.core.port.outbound.IArtifactRepository +import com.factstore.core.port.outbound.IAttestationRepository +import com.factstore.core.port.outbound.IFlowRepository +import com.factstore.core.port.outbound.ITrailRepository +import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional + +/** + * Applies [DomainEvent]s to JPA entities in the (read) database. + * + * In a fully decoupled CQRS deployment the query service runs this + * projector against events received from the domain event bus (RabbitMQ or + * in-memory). Each event type maps to a create / update / delete + * operation on the relevant aggregate table. + * + * The projector is idempotent with respect to create operations — if an + * entity with the given aggregate ID already exists the event is skipped. + */ +@Component +class ReadModelProjector( + private val flowRepository: IFlowRepository, + private val trailRepository: ITrailRepository, + private val artifactRepository: IArtifactRepository, + private val attestationRepository: IAttestationRepository, + private val objectMapper: ObjectMapper +) { + + private val log = LoggerFactory.getLogger(ReadModelProjector::class.java) + + private val eventTypeMap = DomainEventRegistry.eventTypeMap + + /** + * Deserialise a JSON payload into a [DomainEvent] and apply it. + * Returns `true` when the event was successfully projected. + */ + @Transactional + fun project(eventType: String, payload: String): Boolean { + val clazz = eventTypeMap[eventType] + if (clazz == null) { + log.warn("Unknown event type '{}' — skipping projection", eventType) + return false + } + return try { + val event = objectMapper.readValue(payload, clazz) + apply(event) + true + } catch (e: Exception) { + log.error("Failed to project event type={}", eventType, e) + false + } + } + + @Transactional + fun apply(event: DomainEvent) { + when (event) { + is DomainEvent.FlowCreated -> applyFlowCreated(event) + is DomainEvent.FlowUpdated -> applyFlowUpdated(event) + is DomainEvent.FlowDeleted -> applyFlowDeleted(event) + is DomainEvent.TrailCreated -> applyTrailCreated(event) + is DomainEvent.ArtifactReported -> applyArtifactReported(event) + is DomainEvent.AttestationRecorded -> applyAttestationRecorded(event) + is DomainEvent.EvidenceUploaded -> applyEvidenceUploaded(event) + } + } + + private fun applyFlowCreated(event: DomainEvent.FlowCreated) { + if (flowRepository.existsById(event.aggregateId)) { + log.debug("Flow {} already exists — skipping projection", event.aggregateId) + return + } + val flow = Flow( + id = event.aggregateId, + name = event.name, + description = event.description, + orgSlug = event.orgSlug, + createdAt = event.occurredAt, + updatedAt = event.occurredAt + ).also { + it.requiredAttestationTypes = event.requiredAttestationTypes + it.tags = event.tags.toMutableMap() + it.templateYaml = event.templateYaml + it.requiresApproval = event.requiresApproval + it.requiredApproverRoles = event.requiredApproverRoles + } + flowRepository.save(flow) + log.debug("Projected FlowCreated: {}", event.aggregateId) + } + + private fun applyFlowUpdated(event: DomainEvent.FlowUpdated) { + val flow = flowRepository.findById(event.aggregateId) + if (flow == null) { + log.warn("FlowUpdated for unknown flow {} — skipping", event.aggregateId) + return + } + event.name?.let { flow.name = it } + event.description?.let { flow.description = it } + event.requiredAttestationTypes?.let { flow.requiredAttestationTypes = it } + event.tags?.let { flow.tags = it.toMutableMap() } + event.templateYaml?.let { flow.templateYaml = it } + event.requiresApproval?.let { flow.requiresApproval = it } + event.requiredApproverRoles?.let { flow.requiredApproverRoles = it } + flow.updatedAt = event.occurredAt + flowRepository.save(flow) + log.debug("Projected FlowUpdated: {}", event.aggregateId) + } + + private fun applyFlowDeleted(event: DomainEvent.FlowDeleted) { + if (!flowRepository.existsById(event.aggregateId)) { + log.debug("Flow {} already deleted — skipping", event.aggregateId) + return + } + flowRepository.deleteById(event.aggregateId) + log.debug("Projected FlowDeleted: {}", event.aggregateId) + } + + private fun applyTrailCreated(event: DomainEvent.TrailCreated) { + if (trailRepository.existsById(event.aggregateId)) { + log.debug("Trail {} already exists — skipping projection", event.aggregateId) + return + } + val trail = Trail( + id = event.aggregateId, + flowId = event.flowId, + gitCommitSha = event.gitCommitSha, + gitBranch = event.gitBranch, + gitAuthor = event.gitAuthor, + gitAuthorEmail = event.gitAuthorEmail, + pullRequestId = event.pullRequestId, + pullRequestReviewer = event.pullRequestReviewer, + deploymentActor = event.deploymentActor, + orgSlug = event.orgSlug, + templateYaml = event.templateYaml, + buildUrl = event.buildUrl, + createdAt = event.occurredAt, + updatedAt = event.occurredAt + ) + trailRepository.save(trail) + log.debug("Projected TrailCreated: {}", event.aggregateId) + } + + private fun applyArtifactReported(event: DomainEvent.ArtifactReported) { + val artifact = Artifact( + id = event.aggregateId, + trailId = event.trailId, + imageName = event.imageName, + imageTag = event.imageTag, + sha256Digest = event.sha256Digest, + registry = event.registry, + reportedBy = event.reportedBy, + orgSlug = event.orgSlug, + reportedAt = event.occurredAt + ) + artifactRepository.save(artifact) + log.debug("Projected ArtifactReported: {}", event.aggregateId) + } + + private fun applyAttestationRecorded(event: DomainEvent.AttestationRecorded) { + val attestation = Attestation( + id = event.aggregateId, + trailId = event.trailId, + type = event.type, + status = AttestationStatus.valueOf(event.status), + details = event.details, + name = event.name, + evidenceUrl = event.evidenceUrl, + orgSlug = event.orgSlug, + artifactFingerprint = event.artifactFingerprint, + createdAt = event.occurredAt + ) + attestationRepository.save(attestation) + log.debug("Projected AttestationRecorded: {}", event.aggregateId) + } + + private fun applyEvidenceUploaded(event: DomainEvent.EvidenceUploaded) { + val attestation = attestationRepository.findById(event.aggregateId) + if (attestation == null) { + log.warn("EvidenceUploaded for unknown attestation {} — skipping", event.aggregateId) + return + } + attestation.evidenceFileHash = event.sha256Hash + attestation.evidenceFileName = event.fileName + attestation.evidenceFileSizeBytes = event.fileSizeBytes + attestationRepository.save(attestation) + log.debug("Projected EvidenceUploaded: {}", event.aggregateId) + } +} diff --git a/backend/src/main/kotlin/com/factstore/application/command/EventAppender.kt b/backend/src/main/kotlin/com/factstore/application/command/EventAppender.kt index 5144dab1..b68cd64e 100644 --- a/backend/src/main/kotlin/com/factstore/application/command/EventAppender.kt +++ b/backend/src/main/kotlin/com/factstore/application/command/EventAppender.kt @@ -2,19 +2,32 @@ package com.factstore.application.command import com.factstore.core.domain.EventLogEntry import com.factstore.core.domain.event.DomainEvent +import com.factstore.core.port.outbound.IDomainEventBus import com.factstore.core.port.outbound.IEventStore import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory import org.springframework.stereotype.Component +import org.springframework.transaction.support.TransactionSynchronization +import org.springframework.transaction.support.TransactionSynchronizationManager /** * Converts a [DomainEvent] into an [EventLogEntry] and appends it to the - * event store. Keeps the command-handler code focused on business logic. + * event store. After the surrounding transaction **commits**, the entry + * is published to the [IDomainEventBus] so that the query service can + * project it into the read database. + * + * Publishing is deferred via [TransactionSynchronizationManager] to + * prevent phantom events: if the transaction rolls back, the RabbitMQ + * message is never sent, keeping the write and read sides consistent. */ @Component class EventAppender( private val eventStore: IEventStore, - private val objectMapper: ObjectMapper + private val objectMapper: ObjectMapper, + private val domainEventBus: IDomainEventBus ) { + private val log = LoggerFactory.getLogger(EventAppender::class.java) + fun append(event: DomainEvent) { val entry = EventLogEntry( eventId = event.eventId, @@ -24,6 +37,29 @@ class EventAppender( payload = objectMapper.writeValueAsString(event), occurredAt = event.occurredAt ) - eventStore.append(entry) + val saved = eventStore.append(entry) + + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(object : TransactionSynchronization { + override fun afterCommit() { + try { + domainEventBus.publish(saved) + } catch (e: Exception) { + log.error( + "Failed to publish domain event seq={} type={} eventId={} aggregateId={} after commit — " + + "replay from the event store to recover", + saved.sequenceNumber, + saved.eventType, + saved.eventId, + saved.aggregateId, + e + ) + } + } + }) + } else { + // No active transaction (e.g. in unit tests) — publish immediately. + domainEventBus.publish(saved) + } } } diff --git a/backend/src/main/kotlin/com/factstore/config/QueryServiceReadOnlyFilter.kt b/backend/src/main/kotlin/com/factstore/config/QueryServiceReadOnlyFilter.kt new file mode 100644 index 00000000..b21837ba --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/config/QueryServiceReadOnlyFilter.kt @@ -0,0 +1,42 @@ +package com.factstore.config + +import jakarta.servlet.FilterChain +import jakarta.servlet.http.HttpServletRequest +import jakarta.servlet.http.HttpServletResponse +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.core.Ordered +import org.springframework.core.annotation.Order +import org.springframework.stereotype.Component +import org.springframework.web.filter.OncePerRequestFilter + +/** + * Rejects mutating HTTP methods (POST, PUT, PATCH, DELETE) when the + * service is running in query (read-only) mode. + * + * Activated by `factstore.cqrs.role=query`. Without this guard the query + * container — which runs the same JAR as the command service — would + * still expose v1/v2 command endpoints and could accept writes against + * the read database. + */ +@Component +@ConditionalOnProperty(name = ["factstore.cqrs.role"], havingValue = "query") +@Order(Ordered.HIGHEST_PRECEDENCE + 10) +class QueryServiceReadOnlyFilter : OncePerRequestFilter() { + + private val readOnlyMethods = setOf("GET", "HEAD", "OPTIONS", "TRACE") + + override fun doFilterInternal( + request: HttpServletRequest, + response: HttpServletResponse, + filterChain: FilterChain + ) { + if (request.method !in readOnlyMethods) { + response.sendError( + HttpServletResponse.SC_METHOD_NOT_ALLOWED, + "Query service is read-only — mutations must be sent to the command service" + ) + return + } + filterChain.doFilter(request, response) + } +} diff --git a/backend/src/main/kotlin/com/factstore/config/RabbitMqConfig.kt b/backend/src/main/kotlin/com/factstore/config/RabbitMqConfig.kt new file mode 100644 index 00000000..cfddea45 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/config/RabbitMqConfig.kt @@ -0,0 +1,60 @@ +package com.factstore.config + +import org.springframework.amqp.core.Binding +import org.springframework.amqp.core.BindingBuilder +import org.springframework.amqp.core.Queue +import org.springframework.amqp.core.TopicExchange +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter +import org.springframework.amqp.support.converter.MessageConverter +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +/** + * RabbitMQ infrastructure for the CQRS event feed. + * + * Active only when `factstore.events.publisher=rabbitmq`. The command + * service publishes domain events to a dedicated topic exchange; the query + * service consumes them from the bound queue and projects into its read + * database. + * + * Supply-chain events ([com.factstore.core.port.outbound.SupplyChainEvent]) + * are published to a separate exchange to avoid polluting the projection + * queue with messages the [com.factstore.application.ReadModelProjector] + * cannot deserialize. + */ +@Configuration +@ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "rabbitmq") +class RabbitMqConfig { + + companion object { + /** Dedicated exchange for CQRS domain events (EventLogEntry payloads). */ + const val DOMAIN_EXCHANGE_NAME = "factstore.domain-events" + const val PROJECTION_QUEUE_NAME = "factstore.events.projection" + const val DOMAIN_ROUTING_KEY = "cqrs.domain.event.#" + + /** Separate exchange for supply-chain events (webhooks, notifications). */ + const val SUPPLY_CHAIN_EXCHANGE_NAME = "factstore.supply-chain-events" + const val SUPPLY_CHAIN_ROUTING_KEY_PREFIX = "supply-chain.event." + } + + // ── Domain event infrastructure ──────────────────────────────────────── + + @Bean + fun domainEventExchange(): TopicExchange = TopicExchange(DOMAIN_EXCHANGE_NAME) + + @Bean + fun projectionQueue(): Queue = Queue(PROJECTION_QUEUE_NAME, true) + + @Bean + fun projectionBinding(projectionQueue: Queue, domainEventExchange: TopicExchange): Binding = + BindingBuilder.bind(projectionQueue).to(domainEventExchange).with(DOMAIN_ROUTING_KEY) + + // ── Supply-chain event infrastructure ────────────────────────────────── + + @Bean + fun supplyChainExchange(): TopicExchange = TopicExchange(SUPPLY_CHAIN_EXCHANGE_NAME) + + @Bean + fun jacksonMessageConverter(): MessageConverter = Jackson2JsonMessageConverter() +} diff --git a/backend/src/main/kotlin/com/factstore/core/domain/event/DomainEventRegistry.kt b/backend/src/main/kotlin/com/factstore/core/domain/event/DomainEventRegistry.kt new file mode 100644 index 00000000..42851743 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/core/domain/event/DomainEventRegistry.kt @@ -0,0 +1,24 @@ +package com.factstore.core.domain.event + +/** + * Single source of truth for the mapping between domain event type names + * (as stored in [com.factstore.core.domain.EventLogEntry.eventType]) and + * their concrete [DomainEvent] classes. + * + * Both [com.factstore.application.EventProjector] (replay) and + * [com.factstore.application.ReadModelProjector] (live projection) + * reference this registry so that new event types only need to be + * registered in one place. + */ +object DomainEventRegistry { + + val eventTypeMap: Map> = mapOf( + "FlowCreated" to DomainEvent.FlowCreated::class.java, + "FlowUpdated" to DomainEvent.FlowUpdated::class.java, + "FlowDeleted" to DomainEvent.FlowDeleted::class.java, + "TrailCreated" to DomainEvent.TrailCreated::class.java, + "ArtifactReported" to DomainEvent.ArtifactReported::class.java, + "AttestationRecorded" to DomainEvent.AttestationRecorded::class.java, + "EvidenceUploaded" to DomainEvent.EvidenceUploaded::class.java + ) +} diff --git a/backend/src/main/kotlin/com/factstore/core/port/outbound/IDomainEventBus.kt b/backend/src/main/kotlin/com/factstore/core/port/outbound/IDomainEventBus.kt new file mode 100644 index 00000000..b6dfe4c2 --- /dev/null +++ b/backend/src/main/kotlin/com/factstore/core/port/outbound/IDomainEventBus.kt @@ -0,0 +1,15 @@ +package com.factstore.core.port.outbound + +import com.factstore.core.domain.EventLogEntry + +/** + * Outbound port for the CQRS event feed. + * + * After a domain event is persisted to the write-side event store, it is + * published through this bus so the query service can project it into the + * read database. Implementations may use RabbitMQ (production) or an + * in-memory Spring event (tests). + */ +interface IDomainEventBus { + fun publish(entry: EventLogEntry) +} diff --git a/backend/src/main/resources/application-prod.yml b/backend/src/main/resources/application-prod.yml new file mode 100644 index 00000000..7ad919ed --- /dev/null +++ b/backend/src/main/resources/application-prod.yml @@ -0,0 +1,46 @@ +# Profile: prod — Decoupled CQRS deployment with dual PostgreSQL databases and RabbitMQ. +# +# The same application JAR is deployed twice: once as the command (write) +# service and once as the query (read) service. Each instance connects to +# its own PostgreSQL database. Spring's @ConditionalOnProperty annotations +# ensure that only the relevant components are active in each instance +# (e.g. RabbitMQ publisher on the command side, consumer on the query side). +# +# Activate with: --spring.profiles.active=prod +# +# Required environment variables: +# DB_HOST / DB_PORT / DB_NAME / DB_USERNAME / DB_PASSWORD +# → The PostgreSQL instance this service connects to. +# For the command service this is the write database. +# For the query service this is the read database. +# RABBITMQ_HOST / RABBITMQ_PORT / RABBITMQ_USERNAME / RABBITMQ_PASSWORD +# → RabbitMQ broker used for the CQRS event feed. + +spring: + datasource: + url: jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:5432}/${DB_NAME:factstore} + driver-class-name: org.postgresql.Driver + username: ${DB_USERNAME:factstore} + password: ${DB_PASSWORD:factstore} + hikari: + maximum-pool-size: 20 + minimum-idle: 5 + connection-timeout: 30000 + idle-timeout: 600000 + flyway: + enabled: true + jpa: + hibernate: + ddl-auto: none + properties: + hibernate: + dialect: org.hibernate.dialect.PostgreSQLDialect + rabbitmq: + host: ${RABBITMQ_HOST:localhost} + port: ${RABBITMQ_PORT:5672} + username: ${RABBITMQ_USERNAME:guest} + password: ${RABBITMQ_PASSWORD:guest} + +factstore: + events: + publisher: rabbitmq diff --git a/backend/src/main/resources/application-test.yml b/backend/src/main/resources/application-test.yml new file mode 100644 index 00000000..cef38d39 --- /dev/null +++ b/backend/src/main/resources/application-test.yml @@ -0,0 +1,41 @@ +# Profile: test — Isolated H2 databases with in-memory event bus. +# +# Each test context gets its own H2 in-memory database. The in-memory +# event bus (backed by Spring ApplicationEvents) replaces RabbitMQ, +# allowing full CQRS event-driven projection tests to run without +# external dependencies. +# +# Activate with: --spring.profiles.active=test + +spring: + datasource: + url: jdbc:h2:mem:factstore-test;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE + driver-class-name: org.h2.Driver + username: sa + password: + flyway: + enabled: false + jpa: + hibernate: + ddl-auto: create-drop + show-sql: false + properties: + hibernate: + dialect: org.hibernate.dialect.H2Dialect + autoconfigure: + exclude: + - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration + +factstore: + events: + publisher: inmemory + +ledger: + enabled: true + type: local + +audit: + hmac-secret: test-secret-for-unit-tests + +security: + enforce-auth: false diff --git a/backend/src/test/kotlin/com/factstore/application/ReadModelProjectorTest.kt b/backend/src/test/kotlin/com/factstore/application/ReadModelProjectorTest.kt new file mode 100644 index 00000000..1eda8db0 --- /dev/null +++ b/backend/src/test/kotlin/com/factstore/application/ReadModelProjectorTest.kt @@ -0,0 +1,237 @@ +package com.factstore.application + +import com.factstore.adapter.mock.InMemoryArtifactRepository +import com.factstore.adapter.mock.InMemoryAttestationRepository +import com.factstore.adapter.mock.InMemoryFlowRepository +import com.factstore.adapter.mock.InMemoryTrailRepository +import com.factstore.core.domain.AttestationStatus +import com.factstore.core.domain.Flow +import com.factstore.core.domain.event.DomainEvent +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.UUID + +class ReadModelProjectorTest { + + private lateinit var objectMapper: ObjectMapper + private lateinit var projector: ReadModelProjector + private lateinit var flowRepo: InMemoryFlowRepository + private lateinit var trailRepo: InMemoryTrailRepository + private lateinit var artifactRepo: InMemoryArtifactRepository + private lateinit var attestationRepo: InMemoryAttestationRepository + + @BeforeEach + fun setUp() { + objectMapper = jacksonObjectMapper().registerModule(JavaTimeModule()) + flowRepo = InMemoryFlowRepository() + trailRepo = InMemoryTrailRepository() + artifactRepo = InMemoryArtifactRepository() + attestationRepo = InMemoryAttestationRepository() + projector = ReadModelProjector(flowRepo, trailRepo, artifactRepo, attestationRepo, objectMapper) + } + + @Test + fun `project FlowCreated creates flow in read model`() { + val flowId = UUID.randomUUID() + val event = DomainEvent.FlowCreated( + aggregateId = flowId, + name = "my-flow", + description = "test flow", + orgSlug = "acme", + requiredAttestationTypes = listOf("snyk"), + tags = mapOf("env" to "prod"), + requiresApproval = true, + requiredApproverRoles = listOf("admin") + ) + val payload = objectMapper.writeValueAsString(event) + + val result = projector.project("FlowCreated", payload) + + assertTrue(result) + val flow = flowRepo.findById(flowId) + assertNotNull(flow) + assertEquals("my-flow", flow!!.name) + assertEquals("test flow", flow.description) + assertEquals("acme", flow.orgSlug) + assertEquals(listOf("snyk"), flow.requiredAttestationTypes) + assertEquals(mapOf("env" to "prod"), flow.tags) + assertTrue(flow.requiresApproval) + assertEquals(listOf("admin"), flow.requiredApproverRoles) + } + + @Test + fun `project FlowUpdated updates existing flow`() { + val flowId = UUID.randomUUID() + flowRepo.save(Flow(id = flowId, name = "original", description = "old")) + + val event = DomainEvent.FlowUpdated( + aggregateId = flowId, + name = "updated-name", + description = "new desc" + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("FlowUpdated", payload)) + + val flow = flowRepo.findById(flowId)!! + assertEquals("updated-name", flow.name) + assertEquals("new desc", flow.description) + } + + @Test + fun `project FlowDeleted removes flow from read model`() { + val flowId = UUID.randomUUID() + flowRepo.save(Flow(id = flowId, name = "to-delete", description = "bye")) + + val event = DomainEvent.FlowDeleted(aggregateId = flowId) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("FlowDeleted", payload)) + assertNull(flowRepo.findById(flowId)) + } + + @Test + fun `project FlowCreated is idempotent`() { + val flowId = UUID.randomUUID() + flowRepo.save(Flow(id = flowId, name = "existing", description = "already here")) + + val event = DomainEvent.FlowCreated( + aggregateId = flowId, + name = "duplicate", + description = "should be skipped" + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("FlowCreated", payload)) + assertEquals("existing", flowRepo.findById(flowId)!!.name) + } + + @Test + fun `project TrailCreated creates trail in read model`() { + val trailId = UUID.randomUUID() + val flowId = UUID.randomUUID() + val event = DomainEvent.TrailCreated( + aggregateId = trailId, + flowId = flowId, + gitCommitSha = "abc123", + gitBranch = "main", + gitAuthor = "dev", + gitAuthorEmail = "dev@test.com", + buildUrl = "https://ci.example.com/123" + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("TrailCreated", payload)) + + val trail = trailRepo.findById(trailId) + assertNotNull(trail) + assertEquals(flowId, trail!!.flowId) + assertEquals("abc123", trail.gitCommitSha) + assertEquals("main", trail.gitBranch) + } + + @Test + fun `project ArtifactReported creates artifact in read model`() { + val artifactId = UUID.randomUUID() + val trailId = UUID.randomUUID() + val event = DomainEvent.ArtifactReported( + aggregateId = artifactId, + trailId = trailId, + imageName = "myapp", + imageTag = "v1.0", + sha256Digest = "sha256:abc", + reportedBy = "ci-bot" + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("ArtifactReported", payload)) + + val artifact = artifactRepo.findById(artifactId) + assertNotNull(artifact) + assertEquals("myapp", artifact!!.imageName) + } + + @Test + fun `project AttestationRecorded creates attestation in read model`() { + val attestationId = UUID.randomUUID() + val trailId = UUID.randomUUID() + val event = DomainEvent.AttestationRecorded( + aggregateId = attestationId, + trailId = trailId, + type = "snyk-scan", + status = "PASSED", + details = "No vulnerabilities" + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("AttestationRecorded", payload)) + + val attestation = attestationRepo.findById(attestationId) + assertNotNull(attestation) + assertEquals("snyk-scan", attestation!!.type) + assertEquals(AttestationStatus.PASSED, attestation.status) + } + + @Test + fun `project unknown event type returns false`() { + assertFalse(projector.project("UnknownEvent", "{}")) + } + + @Test + fun `project EvidenceUploaded updates attestation in read model`() { + val attestationId = UUID.randomUUID() + val trailId = UUID.randomUUID() + // Pre-create attestation + val attestation = com.factstore.core.domain.Attestation( + id = attestationId, + trailId = trailId, + type = "snyk-scan", + status = AttestationStatus.PASSED + ) + attestationRepo.save(attestation) + + val event = DomainEvent.EvidenceUploaded( + aggregateId = attestationId, + trailId = trailId, + fileName = "report.pdf", + contentType = "application/pdf", + sha256Hash = "sha256:abc123", + fileSizeBytes = 42_000L + ) + val payload = objectMapper.writeValueAsString(event) + + assertTrue(projector.project("EvidenceUploaded", payload)) + + val updated = attestationRepo.findById(attestationId) + assertNotNull(updated) + assertEquals("report.pdf", updated!!.evidenceFileName) + assertEquals("sha256:abc123", updated.evidenceFileHash) + assertEquals(42_000L, updated.evidenceFileSizeBytes) + } + + @Test + fun `project EvidenceUploaded for unknown attestation is skipped`() { + val unknownId = UUID.randomUUID() + val event = DomainEvent.EvidenceUploaded( + aggregateId = unknownId, + trailId = UUID.randomUUID(), + fileName = "report.pdf", + contentType = "application/pdf", + sha256Hash = "sha256:abc123", + fileSizeBytes = 42_000L + ) + val payload = objectMapper.writeValueAsString(event) + + // Should succeed (not crash) but skip because attestation doesn't exist + assertTrue(projector.project("EvidenceUploaded", payload)) + } + + @Test + fun `project malformed payload returns false`() { + assertFalse(projector.project("FlowCreated", "not-json")) + } +} diff --git a/backend/src/test/kotlin/com/factstore/application/command/EventAppenderTest.kt b/backend/src/test/kotlin/com/factstore/application/command/EventAppenderTest.kt index 6f1ecbc9..6c8bf85f 100644 --- a/backend/src/test/kotlin/com/factstore/application/command/EventAppenderTest.kt +++ b/backend/src/test/kotlin/com/factstore/application/command/EventAppenderTest.kt @@ -1,7 +1,9 @@ package com.factstore.application.command import com.factstore.adapter.mock.InMemoryEventStore +import com.factstore.core.domain.EventLogEntry import com.factstore.core.domain.event.DomainEvent +import com.factstore.core.port.outbound.IDomainEventBus import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -15,12 +17,15 @@ class EventAppenderTest { private lateinit var eventStore: InMemoryEventStore private lateinit var objectMapper: ObjectMapper private lateinit var eventAppender: EventAppender + private val noopBus = object : IDomainEventBus { + override fun publish(entry: EventLogEntry) { /* no-op for test */ } + } @BeforeEach fun setUp() { eventStore = InMemoryEventStore() objectMapper = jacksonObjectMapper().registerModule(JavaTimeModule()) - eventAppender = EventAppender(eventStore, objectMapper) + eventAppender = EventAppender(eventStore, objectMapper, noopBus) } @Test diff --git a/backend/src/test/resources/application.yml b/backend/src/test/resources/application.yml index ac7bc7da..2001abcf 100644 --- a/backend/src/test/resources/application.yml +++ b/backend/src/test/resources/application.yml @@ -13,6 +13,9 @@ spring: properties: hibernate: dialect: org.hibernate.dialect.H2Dialect + autoconfigure: + exclude: + - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration ledger: enabled: true diff --git a/cli/commands/configure.go b/cli/commands/configure.go index 9114f4ce..276ae822 100644 --- a/cli/commands/configure.go +++ b/cli/commands/configure.go @@ -14,8 +14,8 @@ import ( var configureCmd = &cobra.Command{ Use: "configure", - Short: "Set the API host and authentication token", - Long: "Interactively set the Factstore API host and bearer token, saved to ~/.factstore.yaml.", + Short: "Set the API host, query host, and authentication token", + Long: "Interactively set the Factstore API host, query host, and bearer token, saved to ~/.factstore.yaml.", RunE: func(cmd *cobra.Command, args []string) error { reader := bufio.NewReader(os.Stdin) @@ -26,6 +26,13 @@ var configureCmd = &cobra.Command{ } host = strings.TrimSpace(host) + fmt.Print("Query host (leave blank to use API host for reads): ") + queryHost, err := reader.ReadString('\n') + if err != nil { + return fmt.Errorf("read query host: %w", err) + } + queryHost = strings.TrimSpace(queryHost) + fmt.Print("Bearer token: ") tokenBytes, err := term.ReadPassword(int(os.Stdin.Fd())) fmt.Println() // newline after the hidden input @@ -34,7 +41,7 @@ var configureCmd = &cobra.Command{ } token := strings.TrimSpace(string(tokenBytes)) - if err := config.Save(host, token); err != nil { + if err := config.Save(host, token, queryHost); err != nil { return fmt.Errorf("save config: %w", err) } output.PrintSuccess("Configuration saved to ~/.factstore.yaml") diff --git a/cli/commands/root.go b/cli/commands/root.go index 002f2e96..741f0ac7 100644 --- a/cli/commands/root.go +++ b/cli/commands/root.go @@ -10,9 +10,10 @@ import ( ) var ( - cfgHost string - cfgToken string - jsonOutput bool + cfgHost string + cfgQueryHost string + cfgToken string + jsonOutput bool ) // RootCmd is the top-level command. @@ -25,6 +26,7 @@ attestations and compliance assertions in the Factstore API.`, func init() { RootCmd.PersistentFlags().StringVar(&cfgHost, "host", "", "API host (overrides config/FACTSTORE_HOST)") + RootCmd.PersistentFlags().StringVar(&cfgQueryHost, "query-host", "", "Query API host for read operations (overrides config/FACTSTORE_QUERY_HOST)") RootCmd.PersistentFlags().StringVar(&cfgToken, "token", "", "Bearer token (overrides config/FACTSTORE_TOKEN)") RootCmd.PersistentFlags().BoolVar(&jsonOutput, "json", false, "Output as JSON") @@ -41,6 +43,8 @@ func init() { } // newClient builds an HTTP client from merged config + flag overrides. +// When a query-host is configured, GET requests are routed to the query +// (read) service while mutating requests go to the command (write) service. func newClient() (*client.Client, error) { cfg, err := config.Load() if err != nil { @@ -51,6 +55,10 @@ func newClient() (*client.Client, error) { if cfgHost != "" { host = cfgHost } + queryHost := cfg.QueryHost + if cfgQueryHost != "" { + queryHost = cfgQueryHost + } token := cfg.Token if cfgToken != "" { token = cfgToken @@ -60,5 +68,5 @@ func newClient() (*client.Client, error) { fmt.Fprintln(os.Stderr, "hint: run 'factstore configure' or set FACTSTORE_HOST to set the API host") return nil, fmt.Errorf("no host configured") } - return client.New(host, token) + return client.NewWithQueryHost(host, queryHost, token) } diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index a6434a43..873f6c93 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -21,20 +21,22 @@ const ( ) // Client is an HTTP client for the Factstore API. +// When QueryBaseURL is set, GET requests are routed to the query (read) +// service while mutating requests (POST/PUT/DELETE) go to BaseURL (the +// command/write service). When QueryBaseURL is empty both read and write +// requests go to BaseURL, preserving backward compatibility. type Client struct { - BaseURL string - Token string - httpClient *http.Client + BaseURL string + QueryBaseURL string + Token string + httpClient *http.Client } // New creates a new Client. Returns an error if baseURL uses http:// with a // non-localhost host, to prevent sending tokens over plaintext connections. func New(baseURL, token string) (*Client, error) { - if strings.HasPrefix(baseURL, "http://") { - u, err := url.Parse(baseURL) - if err != nil || (u.Hostname() != "localhost" && u.Hostname() != "127.0.0.1") { - return nil, fmt.Errorf("insecure connection refused: use https:// (http:// is only allowed for localhost)") - } + if err := validateURL(baseURL); err != nil { + return nil, err } return &Client{ BaseURL: strings.TrimRight(baseURL, "/"), @@ -45,6 +47,41 @@ func New(baseURL, token string) (*Client, error) { }, nil } +// NewWithQueryHost creates a Client that routes GET requests to queryBaseURL. +// If queryBaseURL is empty it falls back to baseURL for all operations. +func NewWithQueryHost(baseURL, queryBaseURL, token string) (*Client, error) { + if err := validateURL(baseURL); err != nil { + return nil, err + } + if queryBaseURL != "" { + if err := validateURL(queryBaseURL); err != nil { + return nil, err + } + } + c := &Client{ + BaseURL: strings.TrimRight(baseURL, "/"), + Token: token, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } + if queryBaseURL != "" { + c.QueryBaseURL = strings.TrimRight(queryBaseURL, "/") + } + return c, nil +} + +// validateURL rejects insecure (http://) connections to non-localhost hosts. +func validateURL(rawURL string) error { + if strings.HasPrefix(rawURL, "http://") { + u, err := url.Parse(rawURL) + if err != nil || (u.Hostname() != "localhost" && u.Hostname() != "127.0.0.1") { + return fmt.Errorf("insecure connection refused: use https:// (http:// is only allowed for localhost)") + } + } + return nil +} + // reqFactory is a function that produces a fresh *http.Request for each attempt. type reqFactory func() (*http.Request, error) @@ -82,6 +119,10 @@ func (c *Client) do(build reqFactory) ([]byte, int, error) { } func (c *Client) doRequest(method, path string, body interface{}) ([]byte, int, error) { + return c.doRequestWithBase(method, c.BaseURL, path, body) +} + +func (c *Client) doRequestWithBase(method, base, path string, body interface{}) ([]byte, int, error) { // Pre-marshal the body once so each retry reuses the same bytes. var bodyBytes []byte if body != nil { @@ -97,7 +138,7 @@ func (c *Client) doRequest(method, path string, body interface{}) ([]byte, int, if bodyBytes != nil { bodyReader = bytes.NewReader(bodyBytes) } - reqURL := c.BaseURL + path + reqURL := base + path req, err := http.NewRequest(method, reqURL, bodyReader) if err != nil { return nil, err @@ -113,9 +154,14 @@ func (c *Client) doRequest(method, path string, body interface{}) ([]byte, int, }) } -// Get performs a GET request. +// Get performs a GET request. When a QueryBaseURL is configured the request +// is routed to the query (read) service; otherwise it goes to BaseURL. func (c *Client) Get(path string) ([]byte, int, error) { - return c.doRequest(http.MethodGet, path, nil) + base := c.BaseURL + if c.QueryBaseURL != "" { + base = c.QueryBaseURL + } + return c.doRequestWithBase(http.MethodGet, base, path, nil) } // Post performs a POST request with a JSON body. diff --git a/cli/internal/config/config.go b/cli/internal/config/config.go index fcb0b7f1..daf4a627 100644 --- a/cli/internal/config/config.go +++ b/cli/internal/config/config.go @@ -13,12 +13,14 @@ const ( defaultConfigFile = ".factstore.yaml" KeyHost = "host" KeyToken = "token" + KeyQueryHost = "query_host" ) // Config holds CLI configuration values. type Config struct { - Host string - Token string + Host string + Token string + QueryHost string } // Load initializes Viper and returns the current configuration. @@ -46,14 +48,15 @@ func Load() (*Config, error) { } return &Config{ - Host: viper.GetString(KeyHost), - Token: viper.GetString(KeyToken), + Host: viper.GetString(KeyHost), + Token: viper.GetString(KeyToken), + QueryHost: viper.GetString(KeyQueryHost), }, nil } -// Save writes host and token to ~/.factstore.yaml with owner-only permissions -// (0600) to prevent token disclosure. -func Save(host, token string) error { +// Save writes host, token, and queryHost to ~/.factstore.yaml with owner-only +// permissions (0600) to prevent token disclosure. +func Save(host, token, queryHost string) error { home, err := os.UserHomeDir() if err != nil { return fmt.Errorf("cannot determine home directory: %w", err) @@ -76,6 +79,7 @@ func Save(host, token string) error { viper.Set(KeyHost, host) viper.Set(KeyToken, token) + viper.Set(KeyQueryHost, queryHost) viper.SetConfigFile(path) if err := viper.WriteConfig(); err != nil { diff --git a/cli/tests/client_test.go b/cli/tests/client_test.go index e5c05196..634211b8 100644 --- a/cli/tests/client_test.go +++ b/cli/tests/client_test.go @@ -149,3 +149,130 @@ func TestNewClientAllows127(t *testing.T) { t.Fatalf("expected no error for http://127.0.0.1, got: %v", err) } } + +func TestNewWithQueryHostGetRoutesToQueryHost(t *testing.T) { + commandServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("GET should not reach command server when query host is set") + w.WriteHeader(http.StatusOK) + })) + defer commandServer.Close() + + queryServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("expected GET on query server, got %s", r.Method) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"source": "query"}) + })) + defer queryServer.Close() + + c, err := client.NewWithQueryHost(commandServer.URL, queryServer.URL, "tok") + if err != nil { + t.Fatalf("NewWithQueryHost: %v", err) + } + body, status, err := c.Get("/api/v1/flows") + if err != nil { + t.Fatalf("Get: %v", err) + } + if status != http.StatusOK { + t.Errorf("expected 200, got %d", status) + } + if len(body) == 0 { + t.Error("expected non-empty body from query server") + } +} + +func TestNewWithQueryHostPostRoutesToCommandHost(t *testing.T) { + commandServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST on command server, got %s", r.Method) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"source": "command"}) + })) + defer commandServer.Close() + + queryServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("POST should not reach query server") + w.WriteHeader(http.StatusOK) + })) + defer queryServer.Close() + + c, err := client.NewWithQueryHost(commandServer.URL, queryServer.URL, "tok") + if err != nil { + t.Fatalf("NewWithQueryHost: %v", err) + } + _, status, err := c.Post("/api/v1/flows", map[string]string{"name": "test"}) + if err != nil { + t.Fatalf("Post: %v", err) + } + if status != http.StatusCreated { + t.Errorf("expected 201, got %d", status) + } +} + +func TestNewWithQueryHostPutAndDeleteRouteToCommandHost(t *testing.T) { + commandServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"source": "command"}) + })) + defer commandServer.Close() + + queryServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("PUT/DELETE should not reach query server") + w.WriteHeader(http.StatusOK) + })) + defer queryServer.Close() + + c, err := client.NewWithQueryHost(commandServer.URL, queryServer.URL, "tok") + if err != nil { + t.Fatalf("NewWithQueryHost: %v", err) + } + + _, status, err := c.Put("/api/v1/flows/1", map[string]string{"name": "upd"}) + if err != nil { + t.Fatalf("Put: %v", err) + } + if status != http.StatusOK { + t.Errorf("expected 200 for PUT, got %d", status) + } + + _, status, err = c.Delete("/api/v1/flows/1") + if err != nil { + t.Fatalf("Delete: %v", err) + } + if status != http.StatusOK { + t.Errorf("expected 200 for DELETE, got %d", status) + } +} + +func TestNewWithQueryHostEmptyFallsBackToBaseURL(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"source": "base"}) + })) + defer server.Close() + + c, err := client.NewWithQueryHost(server.URL, "", "tok") + if err != nil { + t.Fatalf("NewWithQueryHost: %v", err) + } + _, status, err := c.Get("/api/v1/flows") + if err != nil { + t.Fatalf("Get: %v", err) + } + if status != http.StatusOK { + t.Errorf("expected 200, got %d", status) + } +} + +func TestNewWithQueryHostValidatesQueryURL(t *testing.T) { + _, err := client.NewWithQueryHost("http://localhost:8080", "http://remote.example.com:8081", "tok") + if err == nil { + t.Fatal("expected error for insecure non-localhost query URL") + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 0a87e561..77a91d5c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,51 @@ services: - postgres: + # ── Write (command) database ────────────────────────────────────────────── + postgres-command: image: postgres:16-alpine environment: - POSTGRES_DB: factstore + POSTGRES_DB: factstore_command POSTGRES_USER: factstore POSTGRES_PASSWORD: factstore ports: - "5432:5432" volumes: - - postgres_data:/var/lib/postgresql/data + - postgres_command_data:/var/lib/postgresql/data healthcheck: - test: ["CMD-SHELL", "pg_isready -U factstore -d factstore"] + test: ["CMD-SHELL", "pg_isready -U factstore -d factstore_command"] + interval: 10s + timeout: 5s + retries: 5 + + # ── Read (query) database ───────────────────────────────────────────────── + postgres-query: + image: postgres:16-alpine + environment: + POSTGRES_DB: factstore_query + POSTGRES_USER: factstore + POSTGRES_PASSWORD: factstore + ports: + - "5433:5432" + volumes: + - postgres_query_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U factstore -d factstore_query"] + interval: 10s + timeout: 5s + retries: 5 + + # ── RabbitMQ (CQRS event bus) ───────────────────────────────────────────── + # WARNING: Change default credentials for production deployments. + # Use RABBITMQ_DEFAULT_USER / RABBITMQ_DEFAULT_PASS environment variables. + rabbitmq: + image: rabbitmq:3.13-management-alpine + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + ports: + - "5672:5672" + - "15672:15672" + healthcheck: + test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"] interval: 10s timeout: 5s retries: 5 @@ -31,25 +66,62 @@ services: retries: 5 start_period: 5s - backend: + # ── Command service (write side) ────────────────────────────────────────── + backend-command: build: . depends_on: - postgres: + postgres-command: + condition: service_healthy + rabbitmq: condition: service_healthy vault: condition: service_healthy environment: - DB_HOST: postgres + SPRING_PROFILES_ACTIVE: prod + FACTSTORE_CQRS_ROLE: command + DB_HOST: postgres-command DB_PORT: 5432 - DB_NAME: factstore + DB_NAME: factstore_command DB_USERNAME: factstore DB_PASSWORD: factstore + RABBITMQ_HOST: rabbitmq + RABBITMQ_PORT: 5672 + RABBITMQ_USERNAME: guest + RABBITMQ_PASSWORD: guest VAULT_ENABLED: "true" VAULT_ADDR: http://vault:8200 VAULT_TOKEN: dev-root-token ports: - "8080:8080" + # ── Query service (read side) ───────────────────────────────────────────── + backend-query: + build: . + depends_on: + postgres-query: + condition: service_healthy + rabbitmq: + condition: service_healthy + vault: + condition: service_healthy + environment: + SPRING_PROFILES_ACTIVE: prod + FACTSTORE_CQRS_ROLE: query + DB_HOST: postgres-query + DB_PORT: 5432 + DB_NAME: factstore_query + DB_USERNAME: factstore + DB_PASSWORD: factstore + RABBITMQ_HOST: rabbitmq + RABBITMQ_PORT: 5672 + RABBITMQ_USERNAME: guest + RABBITMQ_PASSWORD: guest + VAULT_ENABLED: "true" + VAULT_ADDR: http://vault:8200 + VAULT_TOKEN: dev-root-token + ports: + - "8081:8080" + prometheus: image: prom/prometheus:latest volumes: @@ -57,7 +129,7 @@ services: ports: - "9090:9090" depends_on: - - backend + - backend-command grafana: image: grafana/grafana:latest @@ -72,4 +144,5 @@ services: - prometheus volumes: - postgres_data: + postgres_command_data: + postgres_query_data: diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml index e9f2d4cf..321996b7 100644 --- a/monitoring/prometheus.yml +++ b/monitoring/prometheus.yml @@ -2,7 +2,12 @@ global: scrape_interval: 15s scrape_configs: - - job_name: 'factstore' + - job_name: 'factstore-command' static_configs: - - targets: ['backend:8080'] + - targets: ['backend-command:8080'] + metrics_path: '/actuator/prometheus' + + - job_name: 'factstore-query' + static_configs: + - targets: ['backend-query:8080'] metrics_path: '/actuator/prometheus'