Refactor to decoupled CQRS architecture with RabbitMQ event bus and dual-database deployment#107
Conversation
… ReadModelProjector, and dual-profile configuration Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/bc3f6411-1821-4b43-9a5b-ee2723cd79dc Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
…L + RabbitMQ, update README and DEPLOY docs Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/bc3f6411-1821-4b43-9a5b-ee2723cd79dc Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
…fy docs Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/bc3f6411-1821-4b43-9a5b-ee2723cd79dc Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Refactors the CQRS architecture towards a fully decoupled command/query deployment by introducing a domain-event bus (RabbitMQ in prod, in-memory in tests) and a query-side read model projector, plus CLI/docker/docs updates to support separate read/write hosts and services.
Changes:
- Add
IDomainEventBus+ RabbitMQ/InMemory/Noop implementations and publish persistedEventLogEntryrecords fromEventAppender. - Introduce
ReadModelProjector+ consumer/listener to apply domain events into the read database. - Update CLI to support
--query-host, and update docker-compose/docs for dual-service topology.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| docker-compose.yml | Splits command/query DBs and services; adds RabbitMQ; updates ports/deps. |
| cli/internal/config/config.go | Adds query_host to CLI config model/load. |
| cli/internal/client/client.go | Adds query-base routing for GETs and shared URL validation. |
| cli/commands/root.go | Adds --query-host and uses NewWithQueryHost. |
| backend/src/test/resources/application.yml | Excludes Rabbit auto-config in tests. |
| backend/src/test/kotlin/com/factstore/application/command/EventAppenderTest.kt | Updates test to provide an IDomainEventBus. |
| backend/src/test/kotlin/com/factstore/application/ReadModelProjectorTest.kt | Adds unit coverage for read-model projections. |
| backend/src/main/resources/application-test.yml | Adds test profile config (H2 + in-memory bus). |
| backend/src/main/resources/application-prod.yml | Adds prod profile config (Postgres + RabbitMQ + publisher switch). |
| backend/src/main/kotlin/com/factstore/core/port/outbound/IDomainEventBus.kt | Adds outbound port for CQRS domain-event feed. |
| backend/src/main/kotlin/com/factstore/config/RabbitMqConfig.kt | Declares exchange/queue/binding + Jackson message converter. |
| backend/src/main/kotlin/com/factstore/application/command/EventAppender.kt | Publishes appended events to the domain-event bus. |
| backend/src/main/kotlin/com/factstore/application/ReadModelProjector.kt | Projects DomainEvent payloads into read DB entities. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqEventPublisher.kt | RabbitMQ publisher for SupplyChainEvent. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/RabbitMqDomainEventPublisher.kt | RabbitMQ publisher for persisted domain-event payloads. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBusNone.kt | No-op domain bus for none mode. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/NoopDomainEventBus.kt | Default no-op domain bus for logging/missing mode. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryEventBus.kt | In-memory publisher for SupplyChainEvent in tests. |
| backend/src/main/kotlin/com/factstore/adapter/outbound/events/InMemoryDomainEventPublisher.kt | In-memory domain bus for tests. |
| backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/RabbitMqEventConsumer.kt | RabbitMQ consumer feeding the read-model projector. |
| backend/src/main/kotlin/com/factstore/adapter/inbound/messaging/InMemoryEventListener.kt | In-memory listener feeding the read-model projector. |
| backend/build.gradle.kts | Adds Spring AMQP + rabbit test dependency. |
| README.md | Documents dual-service CQRS topology + endpoints. |
| DEPLOY.md | Documents RabbitMQ/env vars + CQRS deployment steps + CLI usage. |
| const ( | ||
| defaultConfigFile = ".factstore.yaml" | ||
| KeyHost = "host" | ||
| KeyToken = "token" | ||
| KeyQueryHost = "query_host" | ||
| ) | ||
|
|
||
| // Config holds CLI configuration values. | ||
| type Config struct { | ||
| Host string | ||
| Token string | ||
| Host string | ||
| Token string | ||
| QueryHost string | ||
| } | ||
|
|
||
| // Load initializes Viper and returns the current configuration. | ||
| // A missing config file is not an error; any other read failure is. | ||
| func Load() (*Config, error) { | ||
| viper.SetConfigName(".factstore") | ||
| viper.SetConfigType("yaml") | ||
| viper.AddConfigPath("$HOME") | ||
|
|
||
| home, err := os.UserHomeDir() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("cannot determine home directory: %w", err) | ||
| } | ||
| viper.AddConfigPath(home) | ||
|
|
||
| viper.SetEnvPrefix("FACTSTORE") | ||
| viper.AutomaticEnv() | ||
|
|
||
| if err := viper.ReadInConfig(); err != nil { | ||
| var notFound viper.ConfigFileNotFoundError | ||
| if !errors.As(err, ¬Found) { | ||
| return nil, fmt.Errorf("read config file: %w", err) | ||
| } | ||
| // Config file doesn't exist yet — that's fine. | ||
| } | ||
|
|
||
| return &Config{ | ||
| Host: viper.GetString(KeyHost), | ||
| Token: viper.GetString(KeyToken), | ||
| Host: viper.GetString(KeyHost), | ||
| Token: viper.GetString(KeyToken), | ||
| QueryHost: viper.GetString(KeyQueryHost), | ||
| }, nil |
There was a problem hiding this comment.
QueryHost is now part of the loaded config, but it can't be set via the interactive configure flow because Save() (and the configure command) still only persist host and token. As a result, --query-host works only as a flag/env override and not as a durable config setting. Consider extending Save() (and the configure command) to write query_host too, or explicitly document that it is not persisted.
| const val EXCHANGE_NAME = "factstore.events" | ||
| const val QUEUE_NAME = "factstore.events.projection" | ||
| const val ROUTING_KEY = "domain.event.#" |
There was a problem hiding this comment.
EXCHANGE_NAME/ROUTING_KEY are shared by both the CQRS domain-event feed (RabbitMqDomainEventPublisher) and the existing IEventPublisher (RabbitMqEventPublisher). Because both publish with domain.event.*, the projection queue will also receive SupplyChainEvent messages, which ReadModelProjector cannot deserialize as DomainEvents. Split these concerns by using a dedicated exchange and/or routing-key prefix for domain events vs supply-chain events, and bind the projection queue only to the domain-event feed.
| const val EXCHANGE_NAME = "factstore.events" | |
| const val QUEUE_NAME = "factstore.events.projection" | |
| const val ROUTING_KEY = "domain.event.#" | |
| // Dedicated exchange for CQRS domain events to keep them separate from supply-chain events | |
| const val EXCHANGE_NAME = "factstore.domain-events" | |
| const val QUEUE_NAME = "factstore.events.projection" | |
| // CQRS-specific routing-key prefix to avoid receiving non-domain events | |
| const val ROUTING_KEY = "cqrs.domain.event.#" |
| // ── Simple in-memory repository implementations for testing ─────────────── | ||
|
|
||
| private class InMemoryFlowRepository : IFlowRepository { | ||
| private val store = mutableMapOf<UUID, Flow>() | ||
| override fun save(flow: Flow): Flow { store[flow.id] = flow; return flow } | ||
| override fun findById(id: UUID): Flow? = store[id] | ||
| override fun findAll(): List<Flow> = store.values.toList() | ||
| override fun findAllByIds(ids: Collection<UUID>) = ids.mapNotNull { store[it] } | ||
| override fun existsById(id: UUID): Boolean = store.containsKey(id) | ||
| override fun existsByName(name: String): Boolean = store.values.any { it.name == name } | ||
| override fun deleteById(id: UUID) { store.remove(id) } | ||
| override fun countAll(): Long = store.size.toLong() | ||
| override fun findAllByOrgSlug(orgSlug: String) = store.values.filter { it.orgSlug == orgSlug } | ||
| } |
There was a problem hiding this comment.
This test redefines several InMemory*Repository implementations inline, but the repo already has reusable mock adapters under backend/src/test/kotlin/com/factstore/adapter/mock/ (e.g., InMemoryFlowRepository, InMemoryTrailRepository, etc.). Duplicating these implementations increases maintenance cost and risks behavioral drift between tests. Prefer reusing the existing mock adapters (or extending them if they’re missing methods needed for this test).
| - "8081:8080" | ||
|
|
||
| prometheus: | ||
| image: prom/prometheus:latest | ||
| volumes: | ||
| - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml | ||
| ports: | ||
| - "9090:9090" | ||
| depends_on: |
There was a problem hiding this comment.
The compose file renames the backend service to backend-command/backend-query, but Prometheus is still mounted with ./monitoring/prometheus.yml, which currently targets backend:8080. With this change, Prometheus will fail to scrape metrics unless the config is updated to target the new service name(s) (and potentially scrape both command and query services).
| /** | ||
| * Publishes [SupplyChainEvent]s to RabbitMQ for external consumers | ||
| * (e.g. webhooks, notification pipelines). | ||
| * | ||
| * This is separate from [RabbitMqDomainEventPublisher], which handles | ||
| * the internal CQRS event feed (domain events projected to the read DB). | ||
| * | ||
| * Active when `factstore.events.publisher=rabbitmq`. | ||
| */ | ||
| @Component | ||
| @ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "rabbitmq") | ||
| class RabbitMqEventPublisher( | ||
| private val rabbitTemplate: RabbitTemplate | ||
| ) : IEventPublisher { | ||
|
|
||
| private val log = LoggerFactory.getLogger(RabbitMqEventPublisher::class.java) | ||
|
|
||
| override fun publish(event: SupplyChainEvent) { |
There was a problem hiding this comment.
This publisher uses the same domain.event.* routing-key namespace and exchange as the CQRS domain-event feed, which will cause these SupplyChainEvent messages to be delivered to the projection queue and consumed by RabbitMqEventConsumer (leading to failed projections / message loss). Use a different exchange and/or routing-key prefix for SupplyChainEvent publishing (or remove this publisher if CQRS replication is intended to be based solely on the event log). Also, the KDoc currently claims the query service projects these events, but the query-side projector expects DomainEvent payloads.
|
|
||
| override fun publish(entry: EventLogEntry) { | ||
| val routingKey = "domain.event.${entry.eventType}" | ||
| rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, routingKey, entry.payload) |
There was a problem hiding this comment.
convertAndSend(..., entry.payload) sends a String while RabbitMqConfig configures Jackson2JsonMessageConverter on the shared RabbitTemplate. With the Jackson converter, a String body is typically JSON-encoded as a JSON string (quoted/escaped), so the consumer may receive an extra-encoded payload that ReadModelProjector cannot deserialize into a DomainEvent. Prefer publishing a structured message (e.g., the EventLogEntry itself or a DTO containing eventType + payload) and have the consumer deserialize that, or send raw bytes with an appropriate content-type using a non-Jackson converter for this path.
| rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, routingKey, entry.payload) | |
| rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, routingKey, entry) |
| * Active when `factstore.events.publisher=rabbitmq`. In production the | ||
| * query service runs this consumer while the command service runs the | ||
| * corresponding publisher. | ||
| */ | ||
| @Component | ||
| @ConditionalOnProperty(name = ["factstore.events.publisher"], havingValue = "rabbitmq") | ||
| class RabbitMqEventConsumer( | ||
| private val projector: ReadModelProjector | ||
| ) { | ||
|
|
||
| private val log = LoggerFactory.getLogger(RabbitMqEventConsumer::class.java) | ||
|
|
||
| @RabbitListener(queues = [RabbitMqConfig.QUEUE_NAME]) |
There was a problem hiding this comment.
This consumer is enabled whenever factstore.events.publisher=rabbitmq. In the provided docker-compose both the command and query services run with the same prod profile, so both instances will start consuming from the same projection queue (competing consumers). That can cause projections to run against the write database and/or split messages between services. Consider gating the consumer on a dedicated property/profile (e.g., factstore.cqrs.role=query) so only the query service runs the @RabbitListener.
| @@ -24,6 +28,7 @@ class EventAppender( | |||
| payload = objectMapper.writeValueAsString(event), | |||
| occurredAt = event.occurredAt | |||
| ) | |||
| eventStore.append(entry) | |||
| val saved = eventStore.append(entry) | |||
| domainEventBus.publish(saved) | |||
| } | |||
There was a problem hiding this comment.
domainEventBus.publish(saved) is invoked immediately after eventStore.append(entry), which will typically be inside a larger @Transactional command-handler transaction. If that transaction later rolls back, the RabbitMQ message may already have been sent, causing the query side to project an event that never committed (and potentially breaking consistency). Consider publishing only after transaction commit (e.g., via TransactionSynchronizationManager/@TransactionalEventListener) or using an outbox pattern.
| private val eventTypeMap: Map<String, Class<out DomainEvent>> = mapOf( | ||
| "FlowCreated" to DomainEvent.FlowCreated::class.java, | ||
| "FlowUpdated" to DomainEvent.FlowUpdated::class.java, | ||
| "FlowDeleted" to DomainEvent.FlowDeleted::class.java, | ||
| "TrailCreated" to DomainEvent.TrailCreated::class.java, | ||
| "ArtifactReported" to DomainEvent.ArtifactReported::class.java, | ||
| "AttestationRecorded" to DomainEvent.AttestationRecorded::class.java, | ||
| "EvidenceUploaded" to DomainEvent.EvidenceUploaded::class.java | ||
| ) |
There was a problem hiding this comment.
eventTypeMap duplicates the same domain-event type registry already present in EventProjector. Keeping two separate maps in different classes increases the risk that new DomainEvent types are registered in one place but not the other (breaking either replay or live projection). Consider extracting a shared registry (single source of truth) and reusing it in both projectors.
|
@copilot apply changes based on the comments in this thread |
…ges, transaction-safe publishing, role-based consumer gating, CLI query-host persistence, prometheus targets, reuse mock repos Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/a1ca8783-e078-42ee-b791-07f71a1377ce Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
All 9 review comments addressed in commit f20d2cf:
Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
…EventConsumer Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/a1ca8783-e078-42ee-b791-07f71a1377ce Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
| deploymentActor = event.deploymentActor, | ||
| orgSlug = event.orgSlug, | ||
| templateYaml = event.templateYaml, | ||
| buildUrl = event.buildUrl |
There was a problem hiding this comment.
Trail projection has the same timestamp inconsistency as Flow: Trail.createdAt/updatedAt will be set to Instant.now() at projection time (defaults), not the domain event’s occurredAt. This breaks determinism/idempotency on replay and can produce misleading audit timestamps. Populate createdAt/updatedAt from event.occurredAt when constructing the Trail.
| buildUrl = event.buildUrl | |
| buildUrl = event.buildUrl, | |
| createdAt = event.occurredAt, | |
| updatedAt = event.occurredAt |
| val artifact = Artifact( | ||
| id = event.aggregateId, | ||
| trailId = event.trailId, | ||
| imageName = event.imageName, | ||
| imageTag = event.imageTag, | ||
| sha256Digest = event.sha256Digest, | ||
| registry = event.registry, | ||
| reportedBy = event.reportedBy, | ||
| orgSlug = event.orgSlug | ||
| ) | ||
| artifactRepository.save(artifact) | ||
| log.debug("Projected ArtifactReported: {}", event.aggregateId) |
There was a problem hiding this comment.
Artifact projection ignores the event timestamp: Artifact.reportedAt defaults to Instant.now() in the entity constructor, so projections will record the projector runtime rather than the ArtifactReported.occurredAt (and repeated projections can overwrite reported_at). Pass reportedAt = event.occurredAt when constructing the Artifact (and consider skipping/ignoring if the artifact already exists to preserve the original report time).
| log.error("Failed to publish domain event seq={} type={} after commit — " + | ||
| "replay from the event store to recover: {}", | ||
| saved.sequenceNumber, saved.eventType, e.message) |
There was a problem hiding this comment.
The publish-after-commit failure path logs only e.message and does not include the exception, so stack traces are lost (and e.message can be null). Include the exception object in the log call so the full cause is recorded; also consider including eventId/aggregateId to make recovery via replay easier.
| log.error("Failed to publish domain event seq={} type={} after commit — " + | |
| "replay from the event store to recover: {}", | |
| saved.sequenceNumber, saved.eventType, e.message) | |
| log.error( | |
| "Failed to publish domain event seq={} type={} eventId={} aggregateId={} after commit — " + | |
| "replay from the event store to recover", | |
| saved.sequenceNumber, | |
| saved.eventType, | |
| saved.eventId, | |
| saved.aggregateId, | |
| e | |
| ) |
| "Replay from the event store to recover.", entry.eventType, entry.sequenceNumber) | ||
| } | ||
| } catch (e: Exception) { | ||
| log.error("Unexpected error processing domain event: {}", e.message, e) |
There was a problem hiding this comment.
RabbitMQ consumer swallows projection failures/exceptions, which will acknowledge the message and permanently drop it from the queue. That can leave the read DB stale until a manual replay, even for transient issues (e.g., DB outage). Prefer letting exceptions propagate (or explicitly nacking/requeueing / dead-lettering) so messages can be retried and not lost silently.
| log.error("Unexpected error processing domain event: {}", e.message, e) | |
| log.error("Unexpected error processing domain event: {}", e.message, e) | |
| throw e |
| // NewWithQueryHost creates a Client that routes GET requests to queryBaseURL. | ||
| // If queryBaseURL is empty it falls back to baseURL for all operations. | ||
| func NewWithQueryHost(baseURL, queryBaseURL, token string) (*Client, error) { | ||
| if err := validateURL(baseURL); err != nil { | ||
| return nil, err |
There was a problem hiding this comment.
NewWithQueryHost/QueryBaseURL routing isn’t covered by tests. There are existing client tests for the single-host behavior, but none that assert GET goes to QueryBaseURL when set (and POST/PUT/DELETE still go to BaseURL), nor that validateURL is applied to queryBaseURL. Add targeted httptest-based unit tests for these scenarios.
| private fun applyEvidenceUploaded(event: DomainEvent.EvidenceUploaded) { | ||
| val attestation = attestationRepository.findById(event.aggregateId) | ||
| if (attestation == null) { | ||
| log.warn("EvidenceUploaded for unknown attestation {} — skipping", event.aggregateId) | ||
| return | ||
| } | ||
| attestation.evidenceFileHash = event.sha256Hash | ||
| attestation.evidenceFileName = event.fileName | ||
| attestation.evidenceFileSizeBytes = event.fileSizeBytes | ||
| attestationRepository.save(attestation) | ||
| log.debug("Projected EvidenceUploaded: {}", event.aggregateId) | ||
| } |
There was a problem hiding this comment.
ReadModelProjector has a dedicated EvidenceUploaded projection path, but the new ReadModelProjectorTest suite doesn’t include a test for it (only Flow/Trail/Artifact/Attestation + malformed/unknown). Add coverage for EvidenceUploaded (happy path + unknown-attestation skip) to prevent regressions.
| val flow = Flow( | ||
| id = event.aggregateId, | ||
| name = event.name, | ||
| description = event.description, | ||
| orgSlug = event.orgSlug | ||
| ).also { | ||
| it.requiredAttestationTypes = event.requiredAttestationTypes | ||
| it.tags = event.tags.toMutableMap() | ||
| it.templateYaml = event.templateYaml | ||
| it.requiresApproval = event.requiresApproval | ||
| it.requiredApproverRoles = event.requiredApproverRoles | ||
| } |
There was a problem hiding this comment.
Read-side projection is currently setting Flow.createdAt/updatedAt using the entity defaults (Instant.now()) rather than the event’s occurredAt. This makes the projected read model inconsistent with the event log (and replaying later will produce different timestamps). Pass event.occurredAt into the Flow constructor for createdAt/updatedAt (and keep updatedAt aligned with occurredAt for create events).
| name = event.name, | ||
| evidenceUrl = event.evidenceUrl, | ||
| orgSlug = event.orgSlug, | ||
| artifactFingerprint = event.artifactFingerprint |
There was a problem hiding this comment.
Attestation projection sets Attestation.createdAt via the entity default (Instant.now()) instead of the AttestationRecorded.occurredAt time. This makes the read model’s attestation timestamps dependent on projection time/replay time. Pass createdAt = event.occurredAt into the Attestation constructor (and consider whether evidence updates should preserve createdAt).
| artifactFingerprint = event.artifactFingerprint | |
| artifactFingerprint = event.artifactFingerprint, | |
| createdAt = event.occurredAt |
| apply(event) | ||
| true | ||
| } catch (e: Exception) { | ||
| log.error("Failed to project event type={}: {}", eventType, e.message) |
There was a problem hiding this comment.
The error log in the projection deserialization path drops the exception/stack trace (only e.message is logged). That makes diagnosing production projection failures significantly harder. Log the exception itself (e) so stack traces are captured; consider also logging eventId/sequence if available at the call site.
| log.error("Failed to project event type={}: {}", eventType, e.message) | |
| log.error("Failed to project event type={}", eventType, e) |
| POST/PUT/DELETE │ │ GET (reads) | ||
| ┌──────────────────────▼───────────┐ ┌───────▼──────────────────────┐ | ||
| │ COMMAND SERVICE (:8080) │ │ QUERY SERVICE (:8081) │ | ||
| │ spring.profiles.active=prod │ │ spring.profiles.active=prod │ | ||
| │ │ │ │ | ||
| │ ┌─── DRIVING ADAPTERS ───────┐ │ │ ┌─── DRIVING ADAPTERS ───┐ │ | ||
| │ │ v2 Command Controllers │ │ │ │ v2 Query Controllers │ │ | ||
| │ │ v1 REST Controllers │ │ │ │ v1 REST Controllers │ │ | ||
| │ └───────────┬────────────────┘ │ │ │ RabbitMQ Consumer │ │ |
There was a problem hiding this comment.
README presents the query service as GET-only, but in the current codebase the v2 command controllers (e.g. FlowCommandController) are not gated on FACTSTORE_CQRS_ROLE/factstore.cqrs.role and will still be registered in the query container (same JAR + prod profile). That means the query host can accept POST/PUT/DELETE and mutate the read database if called directly. Consider documenting this caveat or (preferably) enforcing it by conditionally enabling command/query web adapters based on factstore.cqrs.role (or by blocking non-GET methods on the query role).
|
@copilot apply changes based on the comments in this thread |
…rror logging, EvidenceUploaded tests, CLI query-host routing tests, query-service read-only filter Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/431aff67-80ca-435a-b0c6-364c83e009c2 Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
…se() Agent-Logs-Url: https://github.com/MaximumTrainer/OpenFactstore/sessions/431aff67-80ca-435a-b0c6-364c83e009c2 Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
All 10 review comments addressed in commits 35bc9f8 and f5b5110:
Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
Separates write and read concerns at the code, database, and deployment levels. Command and query services run as independent containers, each with its own PostgreSQL instance, connected via a RabbitMQ event feed.
Event Bus Infrastructure
IDomainEventBusoutbound port with four implementations:RabbitMqDomainEventPublisher(prod),InMemoryDomainEventPublisher(tests),NoopDomainEventBus(logging default),NoopDomainEventBusNoneEventAppenderpublishes to the domain event bus after transaction commit viaTransactionSynchronizationManagerto prevent phantom events on rollback; error logging includeseventId,aggregateId, and full exception stack traces for replay recoveryRabbitMqEventConsumer/InMemoryEventListener— inbound messaging adapters that feed events to the projectorRabbitMqConfig— dedicatedfactstore.domain-eventsexchange withcqrs.domain.event.#routing key for CQRS feed, separatefactstore.supply-chain-eventsexchange for external consumers (webhooks, notifications)RabbitMqDomainEventPublishersends the full structuredEventLogEntryobject (not raw payload string) to avoid Jackson double-encodingRabbitMqEventConsumergated onfactstore.cqrs.role=queryso only the query service consumes from the projection queue; re-throws exceptions to enable message requeue/dead-lettering instead of silent acknowledgementRead-Model Projection
ReadModelProjectorappliesFlowCreated,FlowUpdated,FlowDeleted,TrailCreated,ArtifactReported,AttestationRecorded,EvidenceUploadedto JPA entities in the read databasecreatedAt,updatedAt,reportedAt) are sourced fromevent.occurredAtfor deterministic replay and consistent audit trailsDomainEventRegistry— single source of truth for event type→class mapping, shared by bothEventProjector(replay) andReadModelProjector(live projection)Query Service Read-Only Enforcement
QueryServiceReadOnlyFilter— servlet filter gated onfactstore.cqrs.role=querythat rejects POST/PUT/PATCH/DELETE with HTTP 405, preventing mutations against the read database even though the same JAR is deployed for both servicesProfile-Based Configuration
application-prod.yml— dual PostgreSQL + RabbitMQ (factstore.events.publisher=rabbitmq)application-test.yml— H2 + in-memory Spring events (factstore.events.publisher=inmemory)application.ymlexcludesRabbitAutoConfigurationto avoid broker dependency in CICLI
--query-host/FACTSTORE_QUERY_HOSTroutes GET requests to the query service; POST/PUT/DELETE go to--host. Backward-compatible when unset.Save()and the interactiveconfigurecommand now persistquery_hostalongsidehostandtokenDocker Compose
postgres-command(:5432),postgres-query(:5433),rabbitmq(:5672/:15672)backend-command(:8080) withFACTSTORE_CQRS_ROLE=command,backend-query(:8081) withFACTSTORE_CQRS_ROLE=query— same JAR, differentDB_*and role env varsbackend-commandandbackend-queryservicesTests
ReadModelProjectorTest— 11 cases covering all event types (includingEvidenceUploadedhappy path + unknown-attestation skip), idempotency, unknown/malformed events; reuses shared mock adapters fromadapter/mock/EventAppenderTestupdated for newIDomainEventBusdependencyclient_test.go— 5 new tests forNewWithQueryHostrouting: GET→query server, POST→command server, PUT/DELETE→command server, empty-query-host fallback, insecure URL validation🔒 GitHub Advanced Security automatically protects Copilot coding agent pull requests. You can protect all pull requests by enabling Advanced Security for your repositories. Learn more about Advanced Security.