Skip to content

Migrate report events to Avro with Schema Registry (producer + consumer)#113

Open
JohnnyAstrom wants to merge 97 commits intomainfrom
issue/68+105
Open

Migrate report events to Avro with Schema Registry (producer + consumer)#113
JohnnyAstrom wants to merge 97 commits intomainfrom
issue/68+105

Conversation

@JohnnyAstrom
Copy link
Copy Markdown
Contributor

@JohnnyAstrom JohnnyAstrom commented Dec 16, 2025

important PR merge order:
Issue/7 event listener → This PR

Summary

This PR migrates the “report created” Kafka event from raw JSON to Avro with Schema Registry support, covering both producer and consumer.

The change is implemented end-to-end: events are published as Avro, deserialized using Schema Registry, and forwarded to clients via Server-Sent Events (SSE).


What was done

Kafka Producer

  • Added Report.avsc schema
  • Introduced ReportAvro and ReportAvroMapper
  • Updated producer to publish Avro messages
  • Added Schema Registry configuration

Kafka Consumer

  • Updated consumer to use KafkaAvroDeserializer
  • Connected consumer to Schema Registry
  • Removed JSON-based deserialization
  • Consumes ReportAvro directly

Event Delivery

  • Consumed Kafka events are forwarded to connected clients via SSE
  • Enables real-time updates without polling

Infrastructure & Config

  • Added Schema Registry to docker-compose
  • Cleaned up legacy Kafka/JSON configuration
  • Updated tests to reflect Avro-based flow

Result

  • Kafka events are schema-safe and versioned
  • Producer and consumer are decoupled via Avro
  • No JSON parsing remains in the Kafka pipeline
  • “Report created” events flow correctly from API → Kafka → Consumer → SSE

How to Test

  1. Start the backend application
    (Docker Compose automatically starts Kafka, Schema Registry and MySQL)

  2. (Optional) Open an SSE connection for a specific user to observe real-time events:

    GET /events/{userId}

    Example:

    http://localhost:8080/events/map123
    
  3. Send a POST request to /api/reports

  4. Verify that:

    • The event is published to Kafka as Avro using the Schema Registry
    • The Kafka consumer successfully deserializes the Avro message
    • If an SSE client is connected, it receives the event in real time

Issues resolved

Summary by CodeRabbit

  • New Features

    • Real-time event streaming for submitted reports with Avro-backed payloads
    • Reports now include optional timestamps, status, and image URLs
  • Chores

    • Added schema registry and Kafka/Avro configuration for reliable messaging
    • Introduced publish/subscribe plumbing (publisher + listener) and test updates
  • Improvements

    • Enhanced message reliability and topic configurability (higher replication)

✏️ Tip: You can customize this high-level summary in your review settings.

Kirill9m and others added 30 commits November 26, 2025 16:04
New dependency
Flyway migration
New Repository
Bonus: GlobalExceptionhandler
# Conflicts:
#	backend/pom.xml
#	backend/src/main/resources/application.properties
* Add ADR for database schema management strategy detailing transition from Hibernate `ddl-auto=update` to Flyway

* Update ADR for database schema management strategy and adjust editorconfig indentation settings
* Add Spring Security and OAuth2 dependencies - Done with Henrik and emily

* got OAuth working, but need to configure login and logout

* Changes by Alfred

* Oauth functioning, CustomOAuth2UserService for handling of user(work in progress). flyway migration V2 up. working on logic for user check, oversee entity with role?

* added another field for user - role,updated id to use uuid, removed flyway, added support for .env file in root.

* pulled in main before push.

* added test, and worked out some issues with the oAuth

* Add enum for role

* Add Role to User entity and add in Service to new user in database

* Add Role to User entity and add in Service to new user in database

* Add Role to Service to new user in database

---------

Co-authored-by: Henrik <Henrikmattsson89@gmail.com>
Co-authored-by: Emilyempa <emilypettersson@hotmail.com>
* First setup

* Adjusted margin.

* Logo

* Removed HTML suggested by code rabbit to make fragments pure and adjusted css styling

* Additional css adjustments

* Merged main. Commented out ("/") in AuthenticationController because RestController uses same endpoint as Controller. Adjusted styling for focus.

* ...and so it goes on... css

* Changes L to l
Tobias-hubs and others added 14 commits December 11, 2025 14:09
…pplication-test.yml`, `application-dev.yml`) for better separation of concerns.
…tructured logging, and enhance unit tests accordingly.
…lity into `EventListenerService` with improved logging and maintainability.
…ironment variable fallback; remove unused Kafka producer properties.
Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
… service ReportEventPublisher

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
    Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
    Co-authored-by: Johnny Åström <johnny.astrom@hotmail.com>
…zipp into issue/68+105

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
…e/68+105

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
- Replace JSON-based Kafka consumer with Avro (Schema Registry)
- Remove legacy KafkaConfig and ReportEvent
- Add Avro producer and consumer using KafkaTemplate and @KafkaListener
- Forward consumed Kafka events to clients via Server-Sent Events (SSE)
- Clean up Kafka and application configuration
- Update tests to reflect Avro-based event flow

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
- Rename ReportAvroMapper to ReportDtoToAvroMapper for explicit mapping direction
- Add ReportAvroToDtoMapper for Avro → DTO conversion
- Adjust EventListenerService to forward deserialized Avro events to SSE

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
with spotless and checkstyle

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Viktor Eriksson <vikkerinho@gmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
@JohnnyAstrom JohnnyAstrom added the enhancement New feature or request label Dec 16, 2025
@JohnnyAstrom JohnnyAstrom self-assigned this Dec 16, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 16, 2025

Walkthrough

Adds Avro-based Kafka integration: schema, codegen plugin, serializers/deserializers, schema registry service, publisher/listener services, mappers, config changes, and test updates; refactors controller to use a ReportEventPublisher and externalizes topic name and replication factor.

Changes

Cohort / File(s) Summary
Build & Dependencies
backend/pom.xml
Added Confluent Maven repo, Spring Kafka deps, Avro libs, kafka-avro-serializer, and avro-maven-plugin (generate-sources from src/main/avro).
Avro Schema
backend/src/main/avro/Report.avsc
New Avro record ReportAvro (namespace org.fungover.zipp.kafka) with fields: submittedByUserId, description, eventType (enum), latitude, longitude, submittedAt (nullable timestamp-millis), status (nullable enum), imageUrls (nullable array).
Publishing Abstraction
backend/src/main/java/.../service/ReportEventPublisher.java, backend/src/main/java/.../controller/ReportController.java
Introduced ReportEventPublisher service; refactored ReportController to inject and call publisher instead of using a KafkaTemplate directly; topic injected via @Value.
Mappers (DTO ⇄ Avro)
backend/src/main/java/.../mapper/ReportDtoToAvroMapper.java, backend/src/main/java/.../mapper/ReportAvroToDtoMapper.java
Added bidirectional mappers to convert between ReportResponse DTO and generated ReportAvro, handling enums and nullable fields.
Consumer / SSE Integration
backend/src/main/java/.../service/EventListenerService.java
New Kafka listener consuming ReportAvro, mapping to DTO and forwarding via SseService.
Application / Kafka Config
backend/src/main/resources/application.yml, backend/src/main/resources/application.properties, backend/src/test/resources/application.properties
Switched producer/consumer serializers to Confluent Avro serializers/deserializers, added schema registry URL, enabled specific Avro reader, added app.kafka.topic.report=report-avro, listener settings and active profile defaults. Test properties contain unresolved merge markers.
Application Boot & Security
backend/src/main/java/.../BackendApplication.java, backend/src/main/java/.../security/SecurityConfig.java
NewTopic bean now accepts @Value("${app.kafka.topic.report}") and uses replication factor 3; apiKeySecurityChain bean annotated with @Profile("dev").
Docker Compose
docker-compose.yml
Added schema-registry service (confluentinc/cp-schema-registry:8.1.0) exposing port 8081 and depending on Kafka.
Tests
backend/src/test/java/.../service/EventListenerServiceTest.java, backend/src/test/java/.../controller/ReportControllerTest.java, backend/src/test/java/.../kafka/KafkaEventsTest.java
Updated tests to use Avro mappers and ReportEventPublisher with KafkaTemplate<String, ReportAvro>; adjusted verifications and logging assertions. Some test property files include merge conflict markers.
Minor formatting
backend/src/main/resources/application-dev.yml
Removed a trailing blank line; no behavioral change.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor User
    participant ReportController
    participant ReportEventPublisher
    participant ReportDtoToAvroMapper
    participant KafkaTemplate as KafkaProducer
    participant SchemaRegistry
    participant Kafka
    participant EventListenerService
    participant ReportAvroToDtoMapper
    participant SseService

    User->>ReportController: POST /report (ReportResponse)
    ReportController->>ReportEventPublisher: publishReportCreated(ReportResponse)
    ReportEventPublisher->>ReportDtoToAvroMapper: toAvro(ReportResponse)
    ReportDtoToAvroMapper-->>ReportEventPublisher: ReportAvro
    ReportEventPublisher->>KafkaProducer: send(topic, key, ReportAvro)
    KafkaProducer->>SchemaRegistry: register/lookup schema
    SchemaRegistry-->>KafkaProducer: schema id
    KafkaProducer->>Kafka: publish Avro message
    Kafka-->>EventListenerService: deliver Avro message (ReportAvro)
    EventListenerService->>ReportAvroToDtoMapper: toDto(ReportAvro)
    ReportAvroToDtoMapper-->>EventListenerService: ReportResponse
    EventListenerService->>SseService: send(userId, ReportResponse)
    SseService-->>User: SSE event
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Pay attention to unresolved merge conflict markers in backend/src/test/resources/application.properties.
  • Verify Avro schema namespace, generated class package, and produced/consumed types align with mapper imports and Spring Kafka generics.
  • Confirm Schema Registry URL and serializers/deserializers are consistent across application.yml and application.properties.
  • Review nullable/optional field handling in ReportDtoToAvroMapper and ReportAvroToDtoMapper.
  • Check NewTopic replication factor (3) and topic name injection for deployment compatibility.

Possibly related PRs

  • Issue/7 event listener #77 — also adds EventListenerService and SSE/Kafka consumer plumbing; this PR migrates that flow to Avro and adds mappers/schema-registry changes.
  • Test/Kafka test #99 — modifies how reports are published to Kafka from controller; this PR refactors publishing into ReportEventPublisher and switches payload to Avro.
  • Send Kafka event on new REST API report #20 #75 — touches BackendApplication Kafka setup and topic creation; this PR externalizes topic name and increases replication factor.

Suggested labels

ready for review

Suggested reviewers

  • Tobias-hubs
  • kappsegla

Pre-merge checks and finishing touches

✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The PR successfully implements all requirements from issue #68 (Avro schema definition, Schema Registry integration, automatic registration) and #105 (Avro deserialization, Schema Registry connection, JSON removal). All mandatory coding objectives are met.
Out of Scope Changes check ✅ Passed All changes align with PR objectives: Avro infrastructure (dependencies, schema file, mappers), producer/consumer updates, configuration, Docker setup, and test refactoring. No unrelated or extraneous modifications detected.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch issue/68+105

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (5)
docker-compose.yml (1)

39-53: Schema Registry service configuration looks good.

The Schema Registry service is properly configured with appropriate environment variables, dependency chain, and network settings. The image version (8.1.0) correctly matches the Kafka version for compatibility.

Consider adding a health check to ensure the service is fully ready before any dependent services start:

     networks:
       - app-net
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:8081/"]
+      interval: 10s
+      timeout: 5s
+      retries: 5

Optionally, you may want to add a volume for schema persistence to prevent data loss during container restarts:

     volumes:
       - kafka-data:/var/lib/kafka/data
+      - schema-registry-data:/var/lib/schema-registry

And add the volume declaration:

 volumes:
   kafka-data:
   mysql-data:
+  schema-registry-data:
backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (1)

16-29: Consider expanding test coverage for edge cases.

The test validates the happy path but could benefit from additional scenarios:

  • Non-null imageUrls to ensure the list mapping works correctly
  • Error handling when the mapper or SSE service encounters issues

Example additional test:

@Test
void listenerShouldHandleNonNullImageUrls() {
    SseService sseService = mock(SseService.class);
    ReportAvroToDtoMapper mapper = new ReportAvroToDtoMapper();
    EventListenerService listener = new EventListenerService(sseService, mapper);

    ReportAvro event = ReportAvro.newBuilder()
        .setSubmittedByUserId("map123")
        .setDescription("test")
        .setEventType(ReportType.ACCIDENT)
        .setLatitude(1.0)
        .setLongitude(2.0)
        .setSubmittedAt(Instant.now())
        .setStatus(ReportStatus.ACTIVE)
        .setImageUrls(List.of("http://example.com/image1.jpg", "http://example.com/image2.jpg"))
        .build();

    listener.listen(event);

    ArgumentCaptor<ReportResponse> captor = ArgumentCaptor.forClass(ReportResponse.class);
    verify(sseService).send(eq("map123"), captor.capture());
    assertEquals(2, captor.getValue().imageUrls().size());
}
backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1)

26-26: Document the ACTIVE status default behavior.

Line 26 defaults the status to ACTIVE when null. This business logic assumption should be documented to clarify why ACTIVE is the appropriate default for new reports.

Add a comment:

+    // Default to ACTIVE for new reports when status is not explicitly set
     b.setStatus(r.status() != null ? ReportStatus.valueOf(r.status().name()) : ReportStatus.ACTIVE);
backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1)

10-16: LGTM! Consider extracting field mappings for readability.

The mapper correctly handles Avro CharSequence to String conversions and nullable fields. The logic aligns with the Avro schema requirements.

Optionally, you can improve readability by extracting intermediate variables for the long constructor call:

 public ReportResponse toDto(ReportAvro avro) {
-    return new ReportResponse(avro.getSubmittedByUserId().toString(), avro.getDescription().toString(),
-            org.fungover.zipp.dto.ReportType.valueOf(avro.getEventType().name()), avro.getLatitude(),
-            avro.getLongitude(), avro.getSubmittedAt(),
-            avro.getStatus() != null ? org.fungover.zipp.dto.ReportStatus.valueOf(avro.getStatus().name()) : null,
-            avro.getImageUrls() == null ? null : avro.getImageUrls().stream().map(CharSequence::toString).toList());
+    String userId = avro.getSubmittedByUserId().toString();
+    String description = avro.getDescription().toString();
+    org.fungover.zipp.dto.ReportType eventType = org.fungover.zipp.dto.ReportType.valueOf(avro.getEventType().name());
+    org.fungover.zipp.dto.ReportStatus status = avro.getStatus() != null 
+        ? org.fungover.zipp.dto.ReportStatus.valueOf(avro.getStatus().name()) 
+        : null;
+    List<String> imageUrls = avro.getImageUrls() == null 
+        ? null 
+        : avro.getImageUrls().stream().map(CharSequence::toString).toList();
+    
+    return new ReportResponse(userId, description, eventType, 
+        avro.getLatitude(), avro.getLongitude(), avro.getSubmittedAt(), 
+        status, imageUrls);
 }
backend/src/main/resources/application.properties (1)

35-37: Consider the implications of missing-topics-fatal=false.

Setting missing-topics-fatal=false allows the application to start even when the configured topic doesn't exist. While this can be convenient during development, it may hide configuration errors in production.

Consider:

  • Using profile-specific configuration: false for dev, true for production
  • Or relying on the NewTopic bean in BackendApplication.java to auto-create the topic, making this setting less critical

Additionally, note that BackendApplication creates the topic with replication factor 3 (line 26), which requires at least 3 Kafka brokers. Verify your deployment has sufficient brokers, or this will fail.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 853813d and 19e15ab.

📒 Files selected for processing (18)
  • backend/pom.xml (5 hunks)
  • backend/src/main/avro/Report.avsc (1 hunks)
  • backend/src/main/java/org/fungover/zipp/BackendApplication.java (2 hunks)
  • backend/src/main/java/org/fungover/zipp/controller/EventStreamController.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/controller/ReportController.java (2 hunks)
  • backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/security/SecurityConfig.java (0 hunks)
  • backend/src/main/java/org/fungover/zipp/service/EventListenerService.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/service/ReportEventPublisher.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/service/SseService.java (1 hunks)
  • backend/src/main/resources/application-dev.yml (0 hunks)
  • backend/src/main/resources/application-test.yml (1 hunks)
  • backend/src/main/resources/application.properties (2 hunks)
  • backend/src/main/resources/application.yml (3 hunks)
  • backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (1 hunks)
  • backend/src/test/resources/application.properties (1 hunks)
  • docker-compose.yml (1 hunks)
💤 Files with no reviewable changes (2)
  • backend/src/main/resources/application-dev.yml
  • backend/src/main/java/org/fungover/zipp/security/SecurityConfig.java
🧰 Additional context used
🧬 Code graph analysis (5)
backend/src/main/java/org/fungover/zipp/service/ReportEventPublisher.java (1)
backend/src/main/java/org/fungover/zipp/service/EventListenerService.java (1)
  • Service (12-31)
backend/src/main/java/org/fungover/zipp/service/EventListenerService.java (2)
backend/src/main/java/org/fungover/zipp/service/ReportEventPublisher.java (1)
  • Service (12-39)
backend/src/main/java/org/fungover/zipp/service/SseService.java (1)
  • Service (15-69)
backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1)
backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1)
  • Component (9-30)
backend/src/main/java/org/fungover/zipp/service/SseService.java (1)
backend/src/main/java/org/fungover/zipp/service/EventListenerService.java (1)
  • Service (12-31)
backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1)
backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1)
  • Component (7-17)
🔇 Additional comments (20)
backend/pom.xml (4)

34-39: LGTM!

The Confluent Maven repository is required for the kafka-avro-serializer dependency and is correctly configured.


118-131: LGTM!

Kafka dependencies are properly configured with appropriate scopes. Spring Boot manages the versions, ensuring compatibility.


212-228: LGTM!

The Avro Maven plugin is correctly configured to generate Java classes from Avro schemas during the generate-sources phase. The output directory is properly excluded from PMD analysis (line 280).


178-189: No action needed—the specified Avro and kafka-avro-serializer versions are current, compatible, and free from known vulnerabilities.

backend/src/test/resources/application.properties (1)

23-24: LGTM!

The Kafka topic property is clearly defined and aligns with the Avro migration. The topic name "report-avro" is descriptive.

backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1)

22-24: The original concern about submittedAt is incorrect.

The Avro schema defines submittedAt as optional with type ["null", {"type": "long", "logicalType": "timestamp-millis"}] and a default of null, so the conditional setting on lines 22-24 is correct and will not cause runtime errors. When r.submittedAt() is null, the field simply uses the schema default, which is the intended behavior.

Likely an incorrect or invalid review comment.

backend/src/main/resources/application-test.yml (1)

1-6: This is a unit test that doesn't use the test configuration or exercise Kafka serialization.

The EventListenerServiceTest is a pure unit test that mocks dependencies and tests EventListenerService business logic in isolation. It does not bootstrap Spring, does not use @EmbeddedKafka, and therefore does not use the application-test.yml configuration at all. The test correctly focuses on unit testing and doesn't aim to validate Avro serialization. Any serialization/Schema Registry integration testing would be in separate integration tests that properly configure embedded Kafka with Avro serializers.

Likely an incorrect or invalid review comment.

backend/src/main/java/org/fungover/zipp/service/ReportEventPublisher.java (1)

21-26: LGTM!

Constructor injection is correctly implemented with proper dependency management and configuration-driven topic naming.

backend/src/main/resources/application.properties (4)

13-16: LGTM!

Kafka producer correctly configured with KafkaAvroSerializer and Schema Registry integration. The environment variable override pattern is appropriate for different deployment environments.


18-23: Verify the impact of auto-offset-reset with random consumer groups.

The consumer configuration looks correct for Avro deserialization. However, note that auto-offset-reset=earliest combined with the random UUID group ID in EventListenerService (line 24) means every application restart will reprocess all messages from the beginning of the topic.

Confirm this behavior is intentional for your use case (e.g., development/testing). For production, consider:

  • Using a stable consumer group ID to maintain offset tracking across restarts
  • Or using auto-offset-reset=latest if reprocessing historical events is not desired

32-33: LGTM!

Custom topic property enables configuration-driven topic naming across producer and consumer components.


39-39: LGTM!

Profile activation with environment variable override and sensible default is correctly configured.

backend/src/main/java/org/fungover/zipp/service/SseService.java (3)

21-30: LGTM!

The subscription method correctly creates an SSE emitter with appropriate timeout and lifecycle callbacks. Thread-safe collection usage (CopyOnWriteArrayList) prevents concurrent modification issues during event broadcasting.


49-54: LGTM!

The remove method correctly uses computeIfPresent for atomic cleanup and prevents memory leaks by removing empty lists from the map.


56-68: LGTM!

The scheduled keep-alive mechanism correctly maintains SSE connections by sending periodic comments. The 20-second interval prevents most proxy/load balancer timeouts while remaining lightweight.

backend/src/main/java/org/fungover/zipp/controller/ReportController.java (1)

29-33: LGTM! Excellent refactoring for separation of concerns.

The controller is now decoupled from Kafka infrastructure details by using the ReportEventPublisher abstraction. This improves testability and maintainability.

Also applies to: 43-43

backend/src/main/avro/Report.avsc (4)

6-15: LGTM!

Required fields are correctly defined with appropriate types. The ReportType enum provides type safety for event classification, and double precision is suitable for GPS coordinates.


17-20: LGTM!

The submittedAt field correctly uses the Avro timestamp-millis logical type for millisecond-precision timestamps with proper nullable union syntax.


22-26: LGTM!

The nullable status enum correctly defines report lifecycle states with appropriate default handling.


28-31: LGTM!

The imageUrls field correctly defines an optional array of strings for image URLs with appropriate nullable union syntax.

Comment on lines 24 to 27
@Bean
public NewTopic newTopic() {
return new NewTopic("report", 1, (short) 1);
public NewTopic newTopic(@Value("${app.kafka.topic.report}") String topic) {
return new NewTopic(topic, 1, (short) 3);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find docker-compose files and property files
find . -name "docker-compose*" -o -name "application*.properties" -o -name "application*.yml" 2>/dev/null | head -20

Repository: fungover/zipp

Length of output: 381


🏁 Script executed:

cat docker-compose.yml | head -100
cat backend/src/main/resources/application.yml | head -50
cat backend/src/main/resources/application-dev.properties 2>/dev/null | head -30

Repository: fungover/zipp

Length of output: 4182


Make replication factor configurable instead of hardcoding to 3.

A replication factor of 3 will fail in single-broker setups with "replication factor: 3 larger than available brokers: 1". The docker-compose configuration explicitly sets single-broker defaults to RF=1, but this code hardcodes RF=3, causing topic creation to fail at startup.

 @Bean
-public NewTopic newTopic(@Value("${app.kafka.topic.report}") String topic) {
-    return new NewTopic(topic, 1, (short) 3);
+public NewTopic newTopic(
+        @Value("${app.kafka.topic.report}") String topic,
+        @Value("${app.kafka.topic.replication-factor:1}") short replicationFactor) {
+    return new NewTopic(topic, 1, replicationFactor);
 }

Add to application-dev.properties:

app.kafka.topic.replication-factor=1

And to application.properties (or production config):

app.kafka.topic.replication-factor=3
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/BackendApplication.java around lines
24 to 27, the NewTopic bean hardcodes a replication factor of 3 which fails on
single-broker setups; change the bean to accept a replication-factor property
(e.g. @Value("${app.kafka.topic.replication-factor}") int replicationFactor),
cast it to short when constructing NewTopic, and use that value instead of the
literal 3; also add app.kafka.topic.replication-factor=1 to
application-dev.properties and app.kafka.topic.replication-factor=3 to
production application.properties as suggested.

Comment on lines +18 to +21
@GetMapping("/events/{id}")
public SseEmitter streamEvents(@PathVariable String id) {
return sseService.subscribe(id);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add authentication and authorization to prevent unauthorized access.

The /events/{id} endpoint lacks authentication and authorization, allowing anyone to subscribe to any user's event stream by providing their user ID. This is a privacy and security violation, as users can receive events (including report details and locations) intended for other users.

Consider these security measures:

  1. Add authentication to verify the caller's identity
  2. Add authorization to ensure the authenticated user can only subscribe to their own events
  3. Validate the id parameter to prevent injection or malformed input

Example secured endpoint:

@GetMapping("/events/{id}")
public SseEmitter streamEvents(
    @PathVariable String id, 
    @AuthenticationPrincipal OAuth2User principal) {
    
    String authenticatedUserId = principal.getAttribute("sub"); // or appropriate ID field
    
    if (!id.equals(authenticatedUserId)) {
        throw new AccessDeniedException("Cannot subscribe to another user's events");
    }
    
    return sseService.subscribe(id);
}
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/controller/EventStreamController.java
around lines 18-21, the /events/{id} endpoint is missing
authentication/authorization and input validation; update the controller method
to accept the authenticated principal (e.g., @AuthenticationPrincipal OAuth2User
or Principal), extract the authenticated user's ID, validate the path variable
format (e.g., UUID regex or expected schema) and compare it to the authenticated
ID, and if they don't match throw an AccessDeniedException or return 403; also
add null checks and input sanitation and update imports/annotations so only
authenticated users can call this endpoint (or use @PreAuthorize to enforce
same-user access).

Comment on lines +14 to +16
var b = ReportAvro.newBuilder().setSubmittedByUserId(r.submittedByUserId()).setDescription(r.description())
.setEventType(ReportType.valueOf(r.eventType().name())).setLatitude(r.latitude())
.setLongitude(r.longitude());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling for enum conversions.

The enum conversions using valueOf() (line 15 for eventType, line 26 for status) will throw IllegalArgumentException if the enum names don't match between the DTO and Avro schemas. This can cause message processing failures.

Consider adding validation or error handling:

 public ReportAvro toAvro(ReportResponse r) {

+    ReportType avroEventType;
+    try {
+        avroEventType = ReportType.valueOf(r.eventType().name());
+    } catch (IllegalArgumentException e) {
+        throw new IllegalStateException("Unsupported event type: " + r.eventType(), e);
+    }
+
     var b = ReportAvro.newBuilder().setSubmittedByUserId(r.submittedByUserId()).setDescription(r.description())
-            .setEventType(ReportType.valueOf(r.eventType().name())).setLatitude(r.latitude())
+            .setEventType(avroEventType).setLatitude(r.latitude())
             .setLongitude(r.longitude());

Alternatively, ensure the DTO and Avro enum definitions are kept in sync via build-time validation or shared definitions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var b = ReportAvro.newBuilder().setSubmittedByUserId(r.submittedByUserId()).setDescription(r.description())
.setEventType(ReportType.valueOf(r.eventType().name())).setLatitude(r.latitude())
.setLongitude(r.longitude());
public ReportAvro toAvro(ReportResponse r) {
ReportType avroEventType;
try {
avroEventType = ReportType.valueOf(r.eventType().name());
} catch (IllegalArgumentException e) {
throw new IllegalStateException("Unsupported event type: " + r.eventType(), e);
}
var b = ReportAvro.newBuilder().setSubmittedByUserId(r.submittedByUserId()).setDescription(r.description())
.setEventType(avroEventType).setLatitude(r.latitude())
.setLongitude(r.longitude());
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java
around lines 14-16 (and also the status conversion near line 26), the direct use
of Enum.valueOf(...) can throw IllegalArgumentException if names differ; wrap
the enum conversions in a safe mapping: try to map by name inside a try/catch
(catch IllegalArgumentException), log the mismatch with the offending value and
source report id, and either set a sensible default Avro enum value or rethrow a
clear, documented mapping exception; alternatively implement a small mapping
helper that looks up by name and returns Optional or fallback value so
conversions never throw unchecked exceptions at runtime.


}

@KafkaListener(topics = "${app.kafka.topic.report}", groupId = "#{T(java.util.UUID).randomUUID().toString()}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reconsider dynamic UUID groupId for Kafka consumer.

The groupId is set to a dynamically generated UUID (#{T(java.util.UUID).randomUUID().toString()}), which means:

  1. Every instance gets a unique consumer group, causing all instances to receive all messages (broadcast behavior)
  2. No load balancing across multiple backend instances
  3. Potential duplicate SSE events if multiple instances are running

Unless broadcast behavior is intentional, consider using a static group ID to enable proper Kafka consumer group load balancing:

Apply this diff:

-@KafkaListener(topics = "${app.kafka.topic.report}", groupId = "#{T(java.util.UUID).randomUUID().toString()}")
+@KafkaListener(topics = "${app.kafka.topic.report}", groupId = "report-event-listeners")

If broadcast is required, add a comment explaining why each instance needs to process every message.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@KafkaListener(topics = "${app.kafka.topic.report}", groupId = "#{T(java.util.UUID).randomUUID().toString()}")
@KafkaListener(topics = "${app.kafka.topic.report}", groupId = "report-event-listeners")
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/service/EventListenerService.java
around line 25, the @KafkaListener currently uses a dynamic UUID for groupId
which causes each instance to form its own consumer group (broadcasting messages
to all instances); replace the dynamic expression with a static, configurable
group id (for example reference a property like ${app.kafka.consumer.group} or a
fixed string) so multiple instances share a consumer group and messages are
load-balanced; update application.properties (or docs) to provide the default
group id and, if broadcast behavior is actually required, add a concise code
comment above the @KafkaListener explaining why each instance must process every
message.

Comment on lines +26 to +30
public void listen(ReportAvro event) {
ReportResponse dto = mapper.toDto(event);

sseService.send(dto.submittedByUserId(), dto);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling and logging to the Kafka listener.

The listen method lacks error handling and logging, which can lead to:

  1. Silent failures if mapping or SSE sending fails
  2. Difficult troubleshooting without visibility into message processing
  3. Uncontrolled retries if exceptions occur

Apply this diff to add error handling and observability:

 @KafkaListener(topics = "${app.kafka.topic.report}", groupId = "#{T(java.util.UUID).randomUUID().toString()}")
 public void listen(ReportAvro event) {
-    ReportResponse dto = mapper.toDto(event);
+    try {
+        LOGGER.info("Received report event for user: {}", event.getSubmittedByUserId());
+        ReportResponse dto = mapper.toDto(event);
+        sseService.send(dto.submittedByUserId(), dto);
+        LOGGER.debug("Successfully forwarded event to SSE for user: {}", dto.submittedByUserId());
+    } catch (Exception e) {
+        LOGGER.error("Failed to process report event: {}", event, e);
+        // Consider: dead letter queue, alerting, or other error handling strategy
+    }
-    sseService.send(dto.submittedByUserId(), dto);
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/service/EventListenerService.java
around lines 26 to 30, wrap the body of listen(ReportAvro event) in a try/catch
and add observability: log receipt of the event (include identifying fields like
reportId or submittedByUserId), attempt the mapping and sseService.send inside
the try, on success log a debug/info success; on any exception catch it, log an
error with the exception and relevant event payload/ids, increment or emit a
metric/monitoring counter for failed processing, and route the failed message to
a dead-letter handler or publish to a DLQ/poison topic (or mark as handled)
instead of leaving the exception unhandled to cause uncontrolled retries.

Comment on lines +28 to +38
public void publishReportCreated(ReportResponse report) {
ReportAvro avro = mapper.toAvro(report);

template.send(topic, report.submittedByUserId(), avro).whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to publish report {}", report, ex);
} else {
LOG.debug("Published report {}", report);
}
});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reduce PII exposure in error logs.

Line 33 logs the entire report object on error, which may contain sensitive information (description, location coordinates, image URLs). Consider logging only the user ID or a non-sensitive identifier.

Apply this diff to reduce PII exposure:

     template.send(topic, report.submittedByUserId(), avro).whenComplete((result, ex) -> {
         if (ex != null) {
-            LOG.error("Failed to publish report {}", report, ex);
+            LOG.error("Failed to publish report for user {}", report.submittedByUserId(), ex);
         } else {
             LOG.debug("Published report {}", report);
         }
     });

Note: You may also want to apply similar logging practices to the debug statement at line 35, though debug logs are typically not enabled in production.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void publishReportCreated(ReportResponse report) {
ReportAvro avro = mapper.toAvro(report);
template.send(topic, report.submittedByUserId(), avro).whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to publish report {}", report, ex);
} else {
LOG.debug("Published report {}", report);
}
});
}
public void publishReportCreated(ReportResponse report) {
ReportAvro avro = mapper.toAvro(report);
template.send(topic, report.submittedByUserId(), avro).whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to publish report for user {}", report.submittedByUserId(), ex);
} else {
LOG.debug("Published report {}", report);
}
});
}
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/service/ReportEventPublisher.java
around lines 28 to 38, the error log currently prints the entire report object
which may contain PII; change the logging to only include a non-sensitive
identifier (e.g., report.submittedByUserId() or report.getId()) and the
exception, and update the debug log similarly to avoid printing the full report;
maintain the existing exception parameter in the error log so stacktrace is
preserved and include a clear, minimal message like "Failed to publish report
for user {}" and "Published report for user {}".

Comment on lines +32 to +47
public void send(String id, Object event) {
var list = emitters.get(id);
if (list == null) {
return;
}
for (var emitter : list) {
try {
emitter.send(SseEmitter.event().data(event));
} catch (Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to send keep-alive to emitter for id {}: {}", id, e.getMessage());
}
remove(id, emitter);
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix misleading log message.

Line 42 logs "Failed to send keep-alive" but this method sends actual events, not keep-alive messages. The keep-alive functionality is in the separate sendKeepAlive() method.

Apply this diff to fix the log message:

         try {
             emitter.send(SseEmitter.event().data(event));
         } catch (Exception e) {
             if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Failed to send keep-alive to emitter for id {}: {}", id, e.getMessage());
+                LOGGER.debug("Failed to send event to emitter for id {}: {}", id, e.getMessage());
             }
             remove(id, emitter);
         }
🤖 Prompt for AI Agents
In backend/src/main/java/org/fungover/zipp/service/SseService.java around lines
32 to 47, the catch block logs "Failed to send keep-alive" which is misleading
because this method sends regular events; change the log message to reflect that
an event failed to send (e.g., "Failed to send event to emitter for id {}: {}")
and keep the existing exception detail and subsequent remove(id, emitter)
behavior.

Comment on lines +2 to +8
kafka:
bootstrap-servers: localhost:9092
properties:
schema.registry.url: http://localhost:8081
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Externalize hardcoded Kafka endpoints.

The bootstrap-servers and schema.registry.url are hardcoded to localhost, which prevents deployment to other environments (staging, production, containers, etc.) without modifying the configuration file.

Apply this diff to use environment variables with sensible defaults:

 spring:
   kafka:
-    bootstrap-servers: localhost:9092
+    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
     properties:
-      schema.registry.url: http://localhost:8081
+      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka:
bootstrap-servers: localhost:9092
properties:
schema.registry.url: http://localhost:8081
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
🤖 Prompt for AI Agents
In backend/src/main/resources/application.yml around lines 2 to 8, the
kafka.bootstrap-servers and kafka.properties.schema.registry.url are hardcoded
to localhost; replace them with references to environment properties (e.g.,
${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} and
${SCHEMA_REGISTRY_URL:http://localhost:8081}) so the application can be
configured per-environment; update the YAML keys to use the property
placeholders and ensure any Spring or app config reads those environment
variables at runtime.

Comment on lines +21 to +27
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Review JPA configuration for production safety.

Two concerns with the current JPA configuration:

  1. ddl-auto: update can cause unintended schema modifications in production (data loss, schema drift). Since Flyway is already enabled (per application.properties), consider using validate or none to prevent Hibernate from altering the schema.

  2. show-sql: true logs all SQL statements, which can expose sensitive data in logs and degrade performance under load.

Apply this diff for a safer production configuration:

   jpa:
     hibernate:
-      ddl-auto: update
-    show-sql: true
+      ddl-auto: validate
+    show-sql: false
     properties:
       hibernate:
         dialect: org.hibernate.dialect.MySQLDialect
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
jpa:
hibernate:
ddl-auto: validate
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
🤖 Prompt for AI Agents
In backend/src/main/resources/application.yml around lines 21 to 27, the JPA
settings are unsafe for production: change ddl-auto from update to validate (or
none) so Hibernate won’t modify the schema at runtime (Flyway handles
migrations), set show-sql to false to avoid verbose SQL/sensitive data in logs
and performance impact, and keep the existing Hibernate dialect entry unchanged;
update the file accordingly and verify application startup with Flyway
migrations in your deployment environment.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
backend/src/main/resources/application.properties (1)

18-23: Clean up the consumer config comment label.

The Kafka Consumer Avro configuration is correct and properly wired to Schema Registry. However, line 18's comment includes "(Avro debug)" which appears to be a debug label—consider updating to simply "# Kafka Consumer (Avro)" for consistency and clarity.

-# Kafka Consumer (Avro debug)
+# Kafka Consumer (Avro)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 19e15ab and f6f67b1.

📒 Files selected for processing (1)
  • backend/src/main/resources/application.properties (2 hunks)
🔇 Additional comments (3)
backend/src/main/resources/application.properties (3)

13-17: Kafka Producer Avro configuration looks good.

The producer is correctly configured to publish Avro messages via KafkaAvroSerializer with Schema Registry URL properly externalized. Default values enable local development while supporting prod overrides.


18-23: Verify consumer group ID configuration.

Consumer group ID is not explicitly set in this configuration. Confirm that either:

  1. A consumer group is specified elsewhere (e.g., via @KafkaListener(groupId = "...") in the consumer code), or
  2. Spring's auto-generated group ID is acceptable for this use case.

If the consumer should use a stable, explicit group for proper offset tracking and restart resilience, consider adding spring.kafka.consumer.group-id here.


32-39: Topic and listener configuration is sound.

Topic name "report-avro" is appropriately externalized and aligns with the PR context. Listener configuration with type=single and missing-topics-fatal=false provides sensible defaults for the event forwarding use case.

…+105

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Johnny Åström <johnny.astrom@hotmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
@jenkins-cd-for-zipp
Copy link
Copy Markdown

Jenkins Build #1 Summary (for PR #113)

  • Status: FAILURE
  • Duration: 1 min 2 sec
  • Branch: PR-113
  • Commit: 9d024c6
  • Docker Image: 192.168.0.82:5000/zipp:9d024c6 (pushed to registry)

Details:

  • Checkout: Successful
  • Build & Scan: Failed (check logs below)
  • Push: Skipped (due to earlier failure)

Error Logs (truncated):
For full logs, contact the Jenkins admin.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (1)

3-56: Resolve merge conflict immediately.

The test file contains unresolved merge conflict markers (<<<<<<<, =======, >>>>>>>), which blocks compilation and causes the pipeline Spotless check to fail.

The AVRO-based test (lines 4-42) aligns with the PR objectives to migrate to Avro, while the non-AVRO version (lines 14-55) appears to be legacy code. Remove the conflict markers and retain only the AVRO-based implementation.

🔎 Proposed resolution

Remove the merge conflict markers and retain the Avro-based test:

-<<<<<<< HEAD
 import org.fungover.zipp.kafka.ReportAvro;
 import org.fungover.zipp.kafka.ReportStatus;
 import org.fungover.zipp.kafka.ReportType;
 import org.fungover.zipp.mapper.ReportAvroToDtoMapper;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
 
 import static org.mockito.Mockito.mock;
-=======
-import org.fungover.zipp.dto.ReportResponse;
-import org.fungover.zipp.entity.ReportStatus;
-import org.fungover.zipp.entity.ReportType;
-import org.junit.jupiter.api.Test;
-
-import java.time.Instant;
-import java.util.List;
-
-import static org.mockito.Mockito.spy;
->>>>>>> 1c0b1f60b0edbfe7e17e6d076fe2a86ad0586ad2
 import static org.mockito.Mockito.verify;
 
 class EventListenerServiceTest {
 
     @Test
     void listenerShouldForwardEventToSseService() {
-<<<<<<< HEAD
         SseService sseService = mock(SseService.class);
         ReportAvroToDtoMapper mapper = new ReportAvroToDtoMapper();
         EventListenerService listener = new EventListenerService(sseService, mapper);
 
         ReportAvro event = ReportAvro.newBuilder().setSubmittedByUserId("map123").setDescription("test")
                 .setEventType(ReportType.ACCIDENT).setLatitude(1.0).setLongitude(2.0).setSubmittedAt(Instant.now())
                 .setStatus(ReportStatus.ACTIVE).setImageUrls(null).build();
 
         listener.listen(event);
 
         verify(sseService).send("map123", mapper.toDto(event));
     }
 
-=======
-
-        SseService sseService = spy(new SseService());
-        EventListenerService listener = new EventListenerService(sseService);
-
-        ReportResponse event = new ReportResponse("map123", "Hello World", ReportType.ACCIDENT, 59.0, 18.0,
-                Instant.now(), ReportStatus.ACTIVE, List.of());
-
-        listener.listen(event);
-
-        verify(sseService).send("map123", event);
-    }
->>>>>>> 1c0b1f60b0edbfe7e17e6d076fe2a86ad0586ad2
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f6f67b1 and 9d024c6.

📒 Files selected for processing (9)
  • backend/pom.xml (4 hunks)
  • backend/src/main/java/org/fungover/zipp/controller/ReportController.java (2 hunks)
  • backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1 hunks)
  • backend/src/main/java/org/fungover/zipp/security/SecurityConfig.java (1 hunks)
  • backend/src/main/resources/application-dev.yml (0 hunks)
  • backend/src/main/resources/application.properties (2 hunks)
  • backend/src/main/resources/application.yml (2 hunks)
  • backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (3 hunks)
  • backend/src/test/resources/application.properties (1 hunks)
💤 Files with no reviewable changes (1)
  • backend/src/main/resources/application-dev.yml
🚧 Files skipped from review as they are similar to previous changes (4)
  • backend/src/test/resources/application.properties
  • backend/src/main/java/org/fungover/zipp/controller/ReportController.java
  • backend/src/main/resources/application.yml
  • backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java
🧰 Additional context used
🪛 GitHub Actions: CI Pipeline
backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java

[error] 3-3: Spotless check failed. com.google.googlejavaformat.java.FormatterException: 3:1: error: class, interface, enum, or record expected

🔇 Additional comments (5)
backend/pom.xml (2)

34-39: Confluent repository added correctly.

The Confluent Maven repository is required for the kafka-avro-serializer dependency and is correctly configured.


237-253: Avro Maven plugin configuration looks correct.

The avro-maven-plugin is properly configured to:

  • Generate sources from src/main/avro directory
  • Output to target/generated-sources/avro
  • Run during the generate-sources phase

The plugin version (1.12.0) aligns well with the Avro library version (1.12.1).

backend/src/main/resources/application.properties (2)

10-23: Avro serialization configuration looks correct.

The Kafka producer and consumer are properly configured to use:

  • KafkaAvroSerializer for the producer
  • KafkaAvroDeserializer for the consumer
  • Schema Registry URL with sensible defaults
  • specific.avro.reader=true to enable strongly-typed Avro objects

This aligns with the PR objective to migrate from JSON to Avro with Schema Registry support.


33-33: Application Kafka topic property added correctly.

The app.kafka.topic.report=report-avro property enables topic name injection for producer and consumer components, supporting the Avro-based event flow.

backend/src/main/java/org/fungover/zipp/security/SecurityConfig.java (1)

51-54: Clarify the intentional removal of API key authentication in production.

The @Profile("dev") annotation on apiKeySecurityChain restricts it to the dev profile only. In production (non-dev profiles), /graphql/** and /api/m2m/** requests fall through to oauthSecurityChain, which requires OAuth2 authentication instead of the API key authentication configured for dev.

This represents an intentional security posture change—from API key-based M2M authentication to OAuth2-based user authentication—but it appears unrelated to the Avro migration objective. Please confirm this change is intentional and explain the rationale in the PR description.

Comment thread backend/pom.xml
Comment on lines +185 to 196
<!-- Avro core -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.1</version>
</dependency>
<!-- Confluent Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>8.1.1</version>
</dependency>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Versions are compatible and reasonably current.

Avro 1.12.1 is the latest release, which addresses 4 security fixes. Confluent kafka-avro-serializer 8.1.1 is documented in current Confluent Platform examples. Both versions are actively maintained and compatible with each other. Consider monitoring for future updates periodically, as Avro and Confluent release new versions regularly.

🤖 Prompt for AI Agents
In backend/pom.xml around lines 185 to 196 the Avro and Confluent dependency
versions are hard-coded; extract the versions into Maven properties (e.g.,
avro.version and confluent.version) at the top of the pom (or in the parent) and
reference those properties in the dependency blocks, and optionally move these
dependency declarations to a dependencyManagement section if this is a
multi-module build; also add a short inline comment noting to periodically
review/upgrade these versions for security updates.

Comment on lines 41 to 44
# Kafka Consumer
spring.kafka.consumer.bootstrap-servers=${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=org.fungover.zipp.dto.ReportResponse
spring.kafka.consumer.properties.spring.json.trusted.packages=org.fungover.zipp.service,org.fungover.zipp.dto
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove conflicting legacy JSON-based Kafka consumer configuration.

Lines 41-44 contain legacy JSON-based Kafka consumer configuration that conflicts with the Avro configuration (lines 19-23):

  • Line 42 duplicates bootstrap-servers already set on line 11
  • Lines 43-44 configure JSON deserialization (spring.json.value.default.type, spring.json.trusted.packages), which contradicts the Avro migration objective

Per the PR objectives: "Remove JSON-specific parsing for report events" and "No JSON parsing remains in the Kafka pipeline."

🔎 Proposed fix

Remove the legacy JSON configuration block:

 spring.profiles.active=${SPRING_PROFILES_ACTIVE:dev}
 
-# Kafka Consumer
-spring.kafka.consumer.bootstrap-servers=${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
-spring.kafka.consumer.properties.spring.json.value.default.type=org.fungover.zipp.dto.ReportResponse
-spring.kafka.consumer.properties.spring.json.trusted.packages=org.fungover.zipp.service,org.fungover.zipp.dto
-
 # Security
 server.servlet.session.cookie.same-site=lax
 server.servlet.session.cookie.secure=true

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
backend/src/main/resources/application.properties lines 41-44: remove the legacy
JSON-based Kafka consumer block that conflicts with the Avro configuration —
specifically delete the duplicate bootstrap-servers entry and the spring.json.*
properties (spring.kafka.consumer.properties.spring.json.value.default.type and
spring.kafka.consumer.properties.spring.json.trusted.packages) so only the
Avro-based Kafka consumer settings (lines 19-23) remain; ensure no other
JSON-specific consumer properties are left elsewhere in the file.

Co-authored-by: Taru Keskinen <tarukeskinen@hotmail.com>
Co-authored-by: Johnny Åström <johnny.astrom@hotmail.com>
Co-authored-by: Alexander Andersson <alle7000.andersson@gmail.com>
@jenkins-cd-for-zipp
Copy link
Copy Markdown

Jenkins Build #2 Summary (for PR #113)

  • Status: SUCCESS
  • Duration: 1 min 43 sec
  • Branch: PR-113
  • Commit: 3b920aa
  • Docker Image: 192.168.0.82:5000/zipp:3b920aa (pushed to registry)

Details:

  • Checkout: Successful
  • Build & Scan: Passed
  • Push: Successful

All stages passed—no issues detected.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/src/test/java/org/fungover/zipp/controller/ReportControllerTest.java (1)

59-61: Remove unused CompletableFuture setup.

This CompletableFuture is created and completed but never used. It appears to be leftover from the previous KafkaTemplate implementation.

         when(userIdentityService.getCurrentUser(authentication)).thenReturn(currentUser);
-
-        CompletableFuture<SendResult<String, ReportResponse>> future = new CompletableFuture<>();
-        future.complete(null);
     }
🧹 Nitpick comments (5)
backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (2)

7-8: Consider marking the class final for consistency.

The companion mapper ReportDtoToAvroMapper is declared as final class. For consistency across mappers, consider applying the same pattern here.

 @Component
-public class ReportAvroToDtoMapper {
+public final class ReportAvroToDtoMapper {

10-17: Use imports instead of fully qualified class names.

Fully qualified names for ReportType and ReportStatus reduce readability. Import them at the top of the file.

Suggested refactor

Add imports:

import org.fungover.zipp.entity.ReportStatus;
import org.fungover.zipp.entity.ReportType;

Then simplify the method:

     public ReportResponse toDto(ReportAvro avro) {
         return new ReportResponse(avro.getSubmittedByUserId().toString(), avro.getDescription().toString(),
-                org.fungover.zipp.entity.ReportType.valueOf(avro.getEventType().name()), avro.getLatitude(),
+                ReportType.valueOf(avro.getEventType().name()), avro.getLatitude(),
                 avro.getLongitude(), avro.getSubmittedAt(),
                 avro.getStatus() != null
-                        ? org.fungover.zipp.entity.ReportStatus.valueOf(avro.getStatus().name())
+                        ? ReportStatus.valueOf(avro.getStatus().name())
                         : null,
                 avro.getImageUrls() == null ? null : avro.getImageUrls().stream().map(CharSequence::toString).toList());
     }
backend/src/test/java/org/fungover/zipp/controller/ReportControllerTest.java (2)

65-68: Remove unused attacker variable.

The attacker User is created but never used in this test. The same issue exists in createReportWithImagesShouldReturnCreatedReport (lines 89-92).

     @Test
     void createReportShouldReturnCreatedReportWithStatus201() {
-        User attacker = new User();
-        attacker.setId(UUID.randomUUID());
-        attacker.setEmail("attacker@example.com");
-        attacker.setProviderId("attacker-999");
-
         Report inputReport = new Report("Test description", ReportType.ACCIDENT, 59.3293, 18.0686, null);

15-21: Unused imports after cleanup.

Once the dead CompletableFuture code is removed (lines 59-60), the following imports will be unused and should also be removed:

  • org.springframework.kafka.support.SendResult (line 15)
  • java.util.concurrent.CompletableFuture (line 21)
backend/src/test/java/org/fungover/zipp/kafka/KafkaEventsTest.java (1)

60-62: Use English comments and prefer any() matchers for consistency.

The Swedish comment should be in English. Additionally, nullable() matchers are typically used when explicitly testing null-handling behavior. For standard stubbing, any() (as used in line 45) is more conventional.

-        // Stubbing med matchers för alla parametrar
-        when(kafkaTemplate.send(nullable(String.class), nullable(String.class), nullable(ReportAvro.class)))
+        // Stubbing with matchers for all parameters
+        when(kafkaTemplate.send(any(), any(), any()))
                 .thenReturn(failedFuture);
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9d024c6 and 3b920aa.

📒 Files selected for processing (4)
  • backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1 hunks)
  • backend/src/test/java/org/fungover/zipp/controller/ReportControllerTest.java (7 hunks)
  • backend/src/test/java/org/fungover/zipp/kafka/KafkaEventsTest.java (1 hunks)
  • backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/src/main/java/org/fungover/zipp/mapper/ReportAvroToDtoMapper.java (1)
backend/src/main/java/org/fungover/zipp/mapper/ReportDtoToAvroMapper.java (1)
  • Component (9-30)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: PR Build
  • GitHub Check: Jenkins
  • GitHub Check: Build, Test & Analyze
🔇 Additional comments (2)
backend/src/test/java/org/fungover/zipp/service/EventListenerServiceTest.java (1)

16-29: LGTM!

The test correctly validates the Avro-to-DTO conversion flow. Using a real ReportAvroToDtoMapper instance ensures the integration between the mapper and the service is properly tested.

backend/src/test/java/org/fungover/zipp/kafka/KafkaEventsTest.java (1)

37-50: LGTM!

The success test correctly stubs the mapper, sends via Kafka, and verifies the interaction with the expected key and Avro payload.

Comment on lines 53 to +71
void kafkaReportFailedToSendGivesError() {
ReportResponse incomingReport = new ReportResponse("user1", "test report", OTHER, 0.0, 0.0, Instant.now(),
ACTIVE, null);

CompletableFuture<SendResult<String, ReportResponse>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException());
when(kafkaTemplate.send(anyString(), any())).thenReturn(failedFuture);
CompletableFuture<SendResult<String, ReportAvro>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Kafka is down"));

// Stubbing med matchers för alla parametrar
when(kafkaTemplate.send(nullable(String.class), nullable(String.class), nullable(ReportAvro.class)))
.thenReturn(failedFuture);

ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory
.getLogger(ReportController.class);
.getLogger(ReportEventPublisher.class);

TestLogAppender appender = new TestLogAppender();
logger.addAppender(appender);
appender.start();

reportController.sendReport("report", incomingReport);
reportEventPublisher.publishReportCreated(incomingReport);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add missing stub for mapper.toAvro() in failure test.

The mapper.toAvro(incomingReport) call is not stubbed, so it returns null by default. This causes kafkaTemplate.send() to be called with a null Avro payload, which may not reflect realistic failure scenarios. Stub the mapper to return a proper ReportAvro instance.

         CompletableFuture<SendResult<String, ReportAvro>> failedFuture = new CompletableFuture<>();
         failedFuture.completeExceptionally(new RuntimeException("Kafka is down"));

-        // Stubbing med matchers för alla parametrar
-        when(kafkaTemplate.send(nullable(String.class), nullable(String.class), nullable(ReportAvro.class)))
+        ReportAvro avro = new ReportAvro();
+        when(mapper.toAvro(incomingReport)).thenReturn(avro);
+        when(kafkaTemplate.send(any(), any(), any()))
                 .thenReturn(failedFuture);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void kafkaReportFailedToSendGivesError() {
ReportResponse incomingReport = new ReportResponse("user1", "test report", OTHER, 0.0, 0.0, Instant.now(),
ACTIVE, null);
CompletableFuture<SendResult<String, ReportResponse>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException());
when(kafkaTemplate.send(anyString(), any())).thenReturn(failedFuture);
CompletableFuture<SendResult<String, ReportAvro>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Kafka is down"));
// Stubbing med matchers för alla parametrar
when(kafkaTemplate.send(nullable(String.class), nullable(String.class), nullable(ReportAvro.class)))
.thenReturn(failedFuture);
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory
.getLogger(ReportController.class);
.getLogger(ReportEventPublisher.class);
TestLogAppender appender = new TestLogAppender();
logger.addAppender(appender);
appender.start();
reportController.sendReport("report", incomingReport);
reportEventPublisher.publishReportCreated(incomingReport);
void kafkaReportFailedToSendGivesError() {
ReportResponse incomingReport = new ReportResponse("user1", "test report", OTHER, 0.0, 0.0, Instant.now(),
ACTIVE, null);
CompletableFuture<SendResult<String, ReportAvro>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Kafka is down"));
ReportAvro avro = new ReportAvro();
when(mapper.toAvro(incomingReport)).thenReturn(avro);
when(kafkaTemplate.send(any(), any(), any()))
.thenReturn(failedFuture);
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory
.getLogger(ReportEventPublisher.class);
TestLogAppender appender = new TestLogAppender();
logger.addAppender(appender);
appender.start();
reportEventPublisher.publishReportCreated(incomingReport);
🤖 Prompt for AI Agents
In backend/src/test/java/org/fungover/zipp/kafka/KafkaEventsTest.java around
lines 53 to 71, the test does not stub mapper.toAvro(incomingReport) so it
returns null and causes kafkaTemplate.send to be invoked with a null payload;
update the test to stub the mapper mock so that when
mapper.toAvro(incomingReport) is called it returns a valid ReportAvro instance
(constructed from incomingReport fields or a minimal valid object), and use that
stubbed ReportAvro in the existing kafkaTemplate.send(_) setup so the failure
path reflects a realistic non-null Avro payload.

@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
64.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Update Kafka Consumer to Support Avro Messages Kafka message schema optimization