Skip to content

Conversation

@lian-jung
Copy link
Contributor

@lian-jung lian-jung commented Jan 6, 2026

Summary by Sourcery

Implement automated conversation data retention and cleanup tied to listing and reservation lifecycle, with supporting domain, persistence, application services, Timer trigger, and documentation updates.

New Features:

  • Introduce Azure Functions timer infrastructure support in Cellix and register a scheduled conversationCleanup function in the API that runs daily at 2 AM UTC.
  • Add domain- and persistence-level support for conversation expiration (expiresAt) and reservationRequest linkage, including new query methods for conversations, listings, and reservation requests.
  • Provide application-service commands to process conversations for archived listings and reservation requests, scheduling them for deletion according to retention rules, with shared telemetry-instrumented cleanup orchestration.

Enhancements:

  • Extend the Conversation aggregate with expiration and deletion scheduling capabilities, protected by permission checks and exposed via new getters/setters and reservationRequest accessors.
  • Update MongoDB conversation schema and domain adapters to handle reservationRequest references and conversation expiration, leveraging a TTL index for automatic deletion.
  • Expand read and domain repositories with convenience queries (by listing ID, reservation request ID, states, and expired conversations) to support cleanup workflows.
  • Refine Cellix bootstrap to support timer handlers alongside HTTP handlers, enabling reusable scheduling for background jobs.

Documentation:

  • Add a security requirements document describing conversation data retention controls, including 6‑month retention, TTL-based deletion, and compensating controls.

Tests:

  • Add extensive BDD/Vitest coverage for new conversation cleanup flows (for archived listings and reservation requests), conversation expiration behavior, and new repository/query methods for conversations, item listings, and reservation requests, including success, edge case, and failure scenarios.

@lian-jung lian-jung self-assigned this Jan 6, 2026
@lian-jung lian-jung linked an issue Jan 6, 2026 that may be closed by this pull request
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 6, 2026

Reviewer's Guide

Implements automated conversation data retention by adding Azure Functions timer support and a scheduled cleanup function, extending the conversation domain/persistence model with reservationRequest linkage and expiresAt/TTL-based deletion, and adding application service workflows plus repository queries and tests to schedule and process deletions for archived listings and reservation requests.

Sequence diagram for timer-triggered conversation cleanup workflow

sequenceDiagram
  participant Timer as AzureTimer
  participant AF as conversationCleanupFunction
  participant Cellix as CellixTimerHandler
  participant AppFactory as ApplicationServicesFactory
  participant App as ConversationApplicationService
  participant ReadRepos as ReadRepositories
  participant UoW as ConversationUnitOfWork
  participant ConvRepo as ConversationRepository
  participant Mongo as MongoDB

  Timer->>AF: Trigger at 02:00 UTC
  AF->>Cellix: Invoke registered TimerHandler
  Cellix->>AppFactory: forRequest()
  AppFactory-->>Cellix: ApplicationServicesHost
  Cellix->>AF: TimerHandler(timer, context)

  AF->>App: processConversationsForArchivedListings()
  App->>ReadRepos: ItemListingReadRepository.getByStates(Expired, Cancelled)
  ReadRepos->>Mongo: Query listings by state
  Mongo-->>ReadRepos: Archived listings
  ReadRepos-->>App: Archived listings

  loop For each listing
    App->>UoW: withScopedTransaction(callback)
    activate UoW
    UoW->>ConvRepo: getByListingId(listing.id)
    ConvRepo->>Mongo: find conversations by listing id
    Mongo-->>ConvRepo: Conversation docs
    ConvRepo-->>UoW: Conversation aggregates

    loop For each conversation without expiresAt
      UoW->>ConvRepo: save(conversation.scheduleForDeletion(anchorDate))
      ConvRepo->>Mongo: update expiresAt
      Mongo-->>ConvRepo: ok
    end
    deactivate UoW
  end

  AF->>App: processConversationsForArchivedReservationRequests()
  App->>ReadRepos: ReservationRequestReadRepository.getByStates(Closed, Rejected, Cancelled)
  ReadRepos->>Mongo: Query reservationRequests by state
  Mongo-->>ReadRepos: Archived reservationRequests
  ReadRepos-->>App: Archived reservationRequests

  loop For each reservationRequest
    App->>UoW: withScopedTransaction(callback)
    activate UoW
    UoW->>ConvRepo: getByReservationRequestId(reservationRequest.id)
    ConvRepo->>Mongo: find conversations by reservationRequest id
    Mongo-->>ConvRepo: Conversation docs
    ConvRepo-->>UoW: Conversation aggregates

    loop For each conversation without expiresAt
      UoW->>ConvRepo: save(conversation.scheduleForDeletion(anchorDate))
      ConvRepo->>Mongo: update expiresAt
      Mongo-->>ConvRepo: ok
    end
    deactivate UoW
  end

  Mongo-->>Mongo: TTL deletes conversations when expiresAt <= now()
  AF-->>Timer: Log totals and finish
Loading

Entity relationship diagram for conversations, listings, reservation requests, and TTL expiration

erDiagram
  Conversation {
    string id
    string sharerId
    string reserverId
    string listingId
    string reservationRequestId
    string messagingConversationId
    string schemaVersion
    datetime createdAt
    datetime updatedAt
    datetime expiresAt
  }

  ItemListing {
    string id
    string sharerId
    string state
    datetime sharingPeriodEnd
    datetime updatedAt
  }

  ReservationRequest {
    string id
    string listingId
    string reserverId
    string state
    datetime reservationPeriodEnd
    datetime updatedAt
  }

  User {
    string id
  }

  User ||--o{ Conversation : sharer
  User ||--o{ Conversation : reserver

  ItemListing ||--o{ Conversation : listing
  ReservationRequest ||--o{ Conversation : reservationRequest

  ItemListing ||--o{ ReservationRequest : listing

  Conversation }o--|| MongoTTLErrorBucket : expiresAtTTL

  MongoTTLErrorBucket {
    string collectionName
    datetime expiresAt
  }
Loading

Class diagram for updated conversation domain, persistence, and cleanup services

classDiagram
  class ConversationProps {
    +UserEntityReference sharer
    +UserEntityReference reserver
    +ItemListingEntityReference listing
    +ReservationRequestEntityReference reservationRequest
    +function loadSharer() Promise~UserEntityReference~
    +function loadReserver() Promise~UserEntityReference~
    +function loadListing() Promise~ItemListingEntityReference~
    +function loadReservationRequest() Promise~ReservationRequestEntityReference~
    +string messagingConversationId
    +MessageEntityReference[] messages
    +function loadMessages() Promise~MessageEntityReference[]~
    +Date expiresAt
    +get createdAt() Date
    +get updatedAt() Date
    +get schemaVersion() string
  }

  class ConversationEntityReference {
    +string id
    +UserEntityReference sharer
    +UserEntityReference reserver
    +ItemListingEntityReference listing
    +ReservationRequestEntityReference reservationRequest
    +string messagingConversationId
    +MessageEntityReference[] messages
    +Date expiresAt
    +Date createdAt
    +Date updatedAt
    +string schemaVersion
  }

  class Conversation {
    <<aggregate>>
    -boolean isNew
    -ConversationVisa visa
    +static RETENTION_PERIOD_MS number
    +get sharer() UserEntityReference
    +function loadSharer() Promise~UserEntityReference~
    +get reserver() UserEntityReference
    +function loadReserver() Promise~UserEntityReference~
    +get listing() ItemListingEntityReference
    +function loadListing() Promise~ItemListingEntityReference~
    +get reservationRequest() ReservationRequestEntityReference
    +function loadReservationRequest() Promise~ReservationRequestEntityReference~
    +get messagingConversationId() string
    +get messages() MessageEntityReference[]
    +function loadMessages() Promise~MessageEntityReference[]~
    +get expiresAt() Date
    +set expiresAt(value Date)
    +get createdAt() Date
    +get updatedAt() Date
    +get schemaVersion() string
    +function scheduleForDeletion(archivalDate Date) void
  }

  class ConversationModel {
    <<mongoose model>>
    +ObjectId _id
    +ObjectId sharer
    +ObjectId reserver
    +ObjectId listing
    +ObjectId reservationRequest
    +string messagingConversationId
    +string schemaVersion
    +Date createdAt
    +Date updatedAt
    +Date expiresAt
  }

  class ConversationDomainAdapter {
    -ConversationModel doc
    -MessageEntityReference[] _messages
    +get sharer() UserEntityReference
    +set sharer(user UserEntityReference)
    +function loadSharer() Promise~UserEntityReference~
    +get reserver() UserEntityReference
    +set reserver(user UserEntityReference)
    +function loadReserver() Promise~UserEntityReference~
    +get listing() ItemListingEntityReference
    +set listing(listing ItemListingEntityReference)
    +function loadListing() Promise~ItemListingEntityReference~
    +get reservationRequest() ReservationRequestEntityReference
    +function loadReservationRequest() Promise~ReservationRequestEntityReference~
    +set reservationRequest(res ReservationRequestEntityReference)
    +get messagingConversationId() string
    +set messagingConversationId(id string)
    +function loadMessages() Promise~MessageEntityReference[]~
    +get expiresAt() Date
    +set expiresAt(value Date)
  }

  class ConversationRepository~props~ {
    +function getByIdWithReferences(id string) Promise~Conversation~
    +function getByMessagingId(id string) Promise~Conversation~
    +function getByIdWithSharerReserver(id string) Promise~Conversation~
    +function getBySharerReserverListing(sharer string, reserver string, listing string) Promise~Conversation~
    +function getByListingId(listingId string) Promise~Conversation[]~
    +function getByReservationRequestId(reservationRequestId string) Promise~Conversation[]~
    +function getExpired(limit number) Promise~Conversation[]~
    +function getNewInstance(sharer UserEntityReference, reserver UserEntityReference, listing ItemListingEntityReference) Promise~Conversation~
    +function save(conv Conversation) Promise~void~
  }

  class ConversationReadRepository {
    +function getById(id string) Promise~ConversationEntityReference~
    +function getByUser(userId string) Promise~ConversationEntityReference[]~
    +function getBySharerReserverListing(sharerId string, reserverId string, listingId string) Promise~ConversationEntityReference~
    +function getByListingId(listingId string, options FindOptions) Promise~ConversationEntityReference[]~
  }

  class ItemListingReadRepository {
    +function getById(id string) Promise~ItemListingEntityReference~
    +function getBySharer(userId string) Promise~ItemListingEntityReference[]~
    +function getByStates(states string[], options FindOptions) Promise~ItemListingEntityReference[]~
  }

  class ReservationRequestReadRepository {
    +function getByListingId(listingId string) Promise~ReservationRequestEntityReference[]~
    +function getActiveByListingId(listingId string) Promise~ReservationRequestEntityReference[]~
    +function getByStates(states string[], options FindOptions) Promise~ReservationRequestEntityReference[]~
  }

  class CleanupResult {
    +number processedCount
    +number scheduledCount
    +Date timestamp
    +string[] errors
  }

  class ConversationApplicationService {
    +function create(command ConversationCreateCommand) Promise~ConversationEntityReference~
    +function queryById(query ConversationGetByIdQuery) Promise~ConversationEntityReference~
    +function queryByUser(query ConversationGetByUserQuery) Promise~ConversationEntityReference[]~
    +function processConversationsForArchivedListings() Promise~CleanupResult~
    +function processConversationsForArchivedReservationRequests() Promise~CleanupResult~
    +function sendMessage(command ConversationSendMessageCommand) Promise~MessageEntityReference~
  }

  class CleanupShared {
    +function processArchivedEntities(spanName string, fetchEntities function, processEntity function, entityLabel string) Promise~CleanupResult~
  }

  class CleanupArchivedConversations {
    +function processConversationsForArchivedListings(dataSources DataSources) Promise~CleanupResult~
  }

  class CleanupArchivedReservationConversations {
    +function processConversationsForArchivedReservationRequests(dataSources DataSources) Promise~CleanupResult~
  }

  ConversationProps <|.. Conversation
  ConversationEntityReference <|.. Conversation
  ConversationModel <.. ConversationDomainAdapter
  ConversationDomainAdapter ..> Conversation : converts
  ConversationRepository ..> Conversation : returns
  ConversationReadRepository ..> ConversationEntityReference : returns
  ItemListingReadRepository ..> ItemListingEntityReference : returns
  ReservationRequestReadRepository ..> ReservationRequestEntityReference : returns
  ConversationApplicationService ..> CleanupArchivedConversations
  ConversationApplicationService ..> CleanupArchivedReservationConversations
  CleanupArchivedConversations ..> CleanupShared
  CleanupArchivedReservationConversations ..> CleanupShared
  CleanupShared ..> CleanupResult
  ConversationRepository ..> ConversationModel : uses
  ConversationReadRepository ..> ConversationModel : reads
Loading

File-Level Changes

Change Details Files
Extend Cellix bootstrap and API wiring to support and register timer-triggered Azure Functions for conversation cleanup.
  • Add TimerHandler support, pending timer handler registry, and phase-checked registration method to Cellix bootstrap.
  • Wire pending timer handlers into Azure Functions app.timer during startup with proper lifecycle and tracing.
  • Register a new conversationCleanup timer function on the API entrypoint that invokes a conversation cleanup handler at 2:00 AM UTC.
apps/api/src/cellix.ts
apps/api/src/index.ts
apps/api/src/handlers/conversation-cleanup-handler.ts
Introduce conversation expiration and reservationRequest linkage in the domain model and MongoDB schema with TTL-based deletion support.
  • Extend Conversation domain props and entity to include reservationRequest references, expiresAt field, and a static retention period.
  • Add domain behaviors to get/load reservationRequest and to set/schedule expiresAt with permission checks using the visa/passport model.
  • Update Conversation mongoose model schema to add reservationRequest relation and expiresAt field with a TTL index.
  • Adjust ConversationEntityReference typing to expose reservationRequest and expiresAt where appropriate.
packages/sthrift/domain/src/domain/contexts/conversation/conversation/conversation.entity.ts
packages/sthrift/domain/src/domain/contexts/conversation/conversation/conversation.ts
packages/sthrift/data-sources-mongoose-models/src/models/conversations/conversation.model.ts
Extend domain adapters and repositories to handle expiration, listing/reservation-based queries, and expired-conversation lookup.
  • Update ConversationDomainAdapter to map reservationRequest and expiresAt between domain and persistence, including lazy-loading behavior and validation.
  • Add ConversationRepository methods to query by listingId, reservationRequestId, and to fetch expired conversations with sorting/limit and population, and update ConversationRepository interface accordingly.
  • Enhance conversation domain adapter/repository tests and feature specs to cover new fields and repository methods.
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/conversation.domain-adapter.ts
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/conversation.domain-adapter.test.ts
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/conversation.repository.ts
packages/sthrift/domain/src/domain/contexts/conversation/conversation/conversation.repository.ts
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/conversation.repository.test.ts
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/features/conversation.domain-adapter.feature
packages/sthrift/persistence/src/datasources/domain/conversation/conversation/features/conversation.repository.feature
Add read-model repository APIs for conversations, listings, and reservation requests to support state-based and listing-based retention workflows.
  • Extend Conversation read repository with getByListingId including ObjectId handling, error tolerance, and population, plus tests and BDD scenarios for edge cases.
  • Extend ItemListing read repository with getByStates(filter by state $in), including tests and feature scenarios for normal, empty, and error paths.
  • Extend ReservationRequest read repository with getByStates and filter logic in the mock tests, including feature scenarios for multiple-state and no-match queries.
packages/sthrift/persistence/src/datasources/readonly/conversation/conversation/conversation.read-repository.ts
packages/sthrift/persistence/src/datasources/readonly/conversation/conversation/conversation.read-repository.test.ts
packages/sthrift/persistence/src/datasources/readonly/conversation/conversation/features/conversation.read-repository.feature
packages/sthrift/persistence/src/datasources/readonly/listing/item/item-listing.read-repository.ts
packages/sthrift/persistence/src/datasources/readonly/listing/item/item-listing.read-repository.test.ts
packages/sthrift/persistence/src/datasources/readonly/listing/item/features/item-listing.read-repository.feature
packages/sthrift/persistence/src/datasources/readonly/reservation-request/reservation-request/reservation-request.read-repository.ts
packages/sthrift/persistence/src/datasources/readonly/reservation-request/reservation-request/reservation-request.read-repository.test.ts
packages/sthrift/persistence/src/datasources/readonly/reservation-request/reservation-request/features/reservation-request.read-repository.feature
Implement shared OpenTelemetry-instrumented cleanup workflows in application services for archived listings and reservation requests.
  • Introduce CleanupResult type and a shared processArchivedEntities helper that runs traced batch processing with per-entity error capture.
  • Add processConversationsForArchivedListings and processConversationsForArchivedReservationRequests application-service functions that select archived entities, open domain UoW transactions, locate related conversations, and schedule deletions based on sharing/reservation end or updatedAt timestamps.
  • Wire these cleanup operations into ConversationApplicationService and expose them via the API timer handler; add vitest-cucumber tests and feature files for success, partial failure, full failure, and no-op scenarios.
  • Add OpenTelemetry api dependency to application-services and expose ReservationRequestStates from the domain for use in filters.
packages/sthrift/application-services/src/contexts/conversation/conversation/index.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup.types.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-shared.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-reservation-conversations.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.test.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-reservation-conversations.test.ts
packages/sthrift/application-services/src/contexts/conversation/conversation/features/cleanup-archived-conversations.feature
packages/sthrift/application-services/src/contexts/conversation/conversation/features/cleanup-archived-reservation-conversations.feature
packages/sthrift/application-services/package.json
packages/sthrift/domain/src/domain/contexts/reservation-request/reservation-request/index.ts
Document and test conversation expiration and cleanup behaviors across domain and persistence layers, including security requirements.
  • Add BDD feature scenarios and tests for Conversation.expiresAt default behavior, permission-guarded setters, and scheduleForDeletion logic.
  • Enhance conversation, listing, and reservation-request repository tests to cover new query methods, invalid IDs, empty inputs, database errors, and mock reset behavior.
  • Add a security requirements doc specifying 6‑month retention and TTL-based deletion for conversations tied to archived listings, plus multi-layer controls and observability expectations.
packages/sthrift/domain/src/domain/contexts/conversation/conversation/conversation.test.ts
packages/sthrift/domain/src/domain/contexts/conversation/conversation/features/conversation.feature
apps/docs/docs/security-requirements/0012-conversation-data-retention.md

Assessment against linked issues

Issue Objective Addressed Explanation
#201 Implement automated deletion/expiration of conversation records (and their stored messages) in the operational MongoDB/Cosmos database approximately 6 months after the associated listing or reservation request is archived/closed, driven by a scheduled or event-based process.
#201 Ensure related notifications or other message-related artifacts associated with a conversation are also cleaned up so no residual messaging-related data remains after conversation deletion. The PR adds TTL-based deletion for conversations via an expiresAt field, plus scheduling logic based on archived listings and reservation requests, but it does not include any code or tests that identify or remove notification records or other external message-related artifacts. Cleanup is scoped to Conversation documents only.
#201 Provide observability for the deletion process (logging/tracing/metrics) and update system documentation to describe the conversation/messaging lifecycle and retention rules.

Possibly linked issues

  • #Conversation - Conversation Deletion: PR introduces TTL, domain support, and scheduled Azure Function cleanup for conversations after listing/reservation archival with tracing/logging.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@lian-jung
Copy link
Contributor Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In processConversationsForArchivedListings you hard-code listing states as ['Expired', 'Cancelled'] while the requirement mentions completed/closed as well; consider using a shared enum/constant for terminal listing states so you don’t miss new states or drift from the domain model.
  • The Conversation.scheduleForDeletion logic uses a fixed 6 * 30 * 24 * 60 * 60 * 1000 approximation for six months and the tests assert this exact millisecond offset; if the retention policy is truly “6 calendar months” it would be safer to encapsulate this in a single date-utility (e.g., addMonths) to avoid edge cases around month length and make future policy changes easier.
  • The archived-conversation cleanup (processConversationsForArchivedListings) runs over all archived listings without pagination or an upper bound, which could make the timer function run very long in larger deployments; consider batching (e.g., by date window or limit/skip) or enforcing a hard cap per run to keep the scheduled job within predictable execution time.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `processConversationsForArchivedListings` you hard-code listing states as `['Expired', 'Cancelled']` while the requirement mentions completed/closed as well; consider using a shared enum/constant for terminal listing states so you don’t miss new states or drift from the domain model.
- The `Conversation.scheduleForDeletion` logic uses a fixed `6 * 30 * 24 * 60 * 60 * 1000` approximation for six months and the tests assert this exact millisecond offset; if the retention policy is truly “6 calendar months” it would be safer to encapsulate this in a single date-utility (e.g., addMonths) to avoid edge cases around month length and make future policy changes easier.
- The archived-conversation cleanup (`processConversationsForArchivedListings`) runs over all archived listings without pagination or an upper bound, which could make the timer function run very long in larger deployments; consider batching (e.g., by date window or limit/skip) or enforcing a hard cap per run to keep the scheduled job within predictable execution time.

## Individual Comments

### Comment 1
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts:32` </location>
<code_context>
+ * @param dataSources - The data sources for accessing domain data
+ * @returns A function that processes expired conversations
+ */
+export const processConversationsForArchivedListings = (
+	dataSources: DataSources,
+) => {
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting span handling and the main cleanup loop into dedicated helpers so the exported function focuses on orchestration and domain logic remains isolated and easier to follow.

You can simplify this without changing behavior by pulling span management and the core cleanup loop into small helpers. That removes duplicate `span.end`/status handling and separates tracing/logging from the domain logic.

### 1. Centralize span lifecycle

Right now `span.end()` and status/exception handling are spread across the `try`/`catch`. Wrap that in a small helper so the main function only expresses business logic:

```ts
const withSpan = async <T>(
  name: string,
  fn: (span: Span) => Promise<T>,
): Promise<T> => {
  return tracer.startActiveSpan(name, async (span) => {
    try {
      const result = await fn(span);
      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR });
      if (error instanceof Error) {
        span.recordException(error);
      }
      throw error;
    } finally {
      span.end();
    }
  });
};
```

Usage:

```ts
export const processConversationsForArchivedListings = (dataSources: DataSources) => {
  return async (): Promise<CleanupResult> => {
    return withSpan(
      'conversation.processConversationsForArchivedListings',
      async (span) => {
        const result = await cleanupArchivedListings(dataSources);

        span.setAttribute('processedCount', result.processedCount);
        span.setAttribute('scheduledCount', result.scheduledCount);
        span.setAttribute('errorsCount', result.errors.length);

        console.log(
          `[ConversationCleanup] Cleanup complete. Processed: ${result.processedCount}, Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
        );

        return result;
      },
    );
  };
};
```

### 2. Extract core loop into a pure-ish helper

Move the main iteration logic into a helper that only knows about listings/conversations and the `CleanupResult`. The public function then just orchestrates tracing and summary logging.

```ts
const cleanupArchivedListings = async (
  dataSources: DataSources,
): Promise<CleanupResult> => {
  const result: CleanupResult = {
    processedCount: 0,
    scheduledCount: 0,
    timestamp: new Date(),
    errors: [],
  };

  const archivedListings =
    await dataSources.readonlyDataSource.Listing.ItemListing.ItemListingReadRepo.getByStates(
      ['Expired', 'Cancelled'],
    );

  for (const listing of archivedListings) {
    try {
      const conversations =
        await dataSources.readonlyDataSource.Conversation.Conversation.ConversationReadRepo.getByListingId(
          listing.id,
        );

      for (const conversationRef of conversations) {
        result.processedCount++;

        if (conversationRef.expiresAt) continue;

        await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork.withScopedTransaction(
          async (repo) => {
            const conversation = await repo.get(conversationRef.id);
            if (conversation && !conversation.expiresAt) {
              conversation.scheduleForDeletion(listing.updatedAt);
              await repo.save(conversation);
              result.scheduledCount++;
            }
          },
        );
      }
    } catch (error) {
      const errorMsg = `Failed to process conversations for listing ${listing.id}: ${
        error instanceof Error ? error.message : String(error)
      }`;
      result.errors.push(errorMsg);
      console.error(`[ConversationCleanup] ${errorMsg}`);
    }
  }

  return result;
};
```

This keeps all existing behavior (per‑listing `try/catch`, error accumulation, logging, counts) but:

- Span start/end and status handling are in one place.
- The core cleanup logic is in a focused function that’s easier to read and test.
- The main exported function is reduced to orchestration (tracing + final log + calling the helper).
</issue_to_address>

### Comment 2
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/schedule-deletion-by-listing.ts:38` </location>
<code_context>
+ * @param dataSources - The data sources for accessing domain and readonly data
+ * @returns A function that takes the command and returns the result
+ */
+export const scheduleDeletionByListing = (dataSources: DataSources) => {
+	return async (
+		command: ScheduleDeletionByListingCommand,
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the tracing boilerplate into a reusable helper so this function focuses only on the domain logic of scheduling deletions.

You can significantly reduce complexity by extracting the repeated tracing boilerplate into a small helper and letting it handle `span.end()` / status / error recording. That keeps this function focused on the domain logic and removes duplicated patterns (including the early‑return path).

For example, introduce a generic traced-command helper (in a shared tracing/utils module):

```ts
// tracing/runTracedCommand.ts
import { SpanStatusCode, trace, type Span } from '@opentelemetry/api';

type TracedFn<T> = (span: Span) => Promise<T>;

export async function runTracedCommand<T>(
  tracerName: string,
  spanName: string,
  attributes: Record<string, string>,
  fn: TracedFn<T>,
): Promise<T> {
  const tracer = trace.getTracer(tracerName);

  return tracer.startActiveSpan(spanName, async (span) => {
    try {
      for (const [key, value] of Object.entries(attributes)) {
        span.setAttribute(key, value);
      }

      const result = await fn(span);

      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR });
      if (error instanceof Error) {
        span.recordException(error);
      }
      throw error;
    } finally {
      span.end();
    }
  });
}
```

Then your command becomes much simpler and avoids manual `span.end()` and duplicated status/attribute handling, including the early return:

```ts
// keep the same interfaces

const TRACER_NAME = 'conversation:schedule-deletion';

export const scheduleDeletionByListing = (dataSources: DataSources) => {
  return async (command: ScheduleDeletionByListingCommand): Promise<ScheduleDeletionResult> => {
    return runTracedCommand(
      TRACER_NAME,
      'conversation.scheduleDeletionByListing',
      {
        listingId: command.listingId,
        archivalDate: command.archivalDate.toISOString(),
      },
      async (span) => {
        const conversations =
          await dataSources.readonlyDataSource.Conversation.Conversation.ConversationReadRepo
            .getByListingId(command.listingId);

        if (conversations.length === 0) {
          span.setAttribute('scheduledCount', 0);
          console.log(
            `[ConversationDeletion] No conversations to schedule. ListingId: ${command.listingId}`,
          );
          return { scheduledCount: 0, conversationIds: [] };
        }

        const scheduledIds: string[] = [];

        await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork
          .withScopedTransaction(async (repo) => {
            for (const conversationRef of conversations) {
              const conversation = await repo.get(conversationRef.id);
              if (conversation) {
                conversation.scheduleForDeletion(command.archivalDate);
                await repo.save(conversation);
                scheduledIds.push(conversation.id);
              }
            }
          });

        span.setAttribute('scheduledCount', scheduledIds.length);
        span.setAttribute('conversationIds', scheduledIds.join(','));

        console.log(
          `[ConversationDeletion] Scheduled ${scheduledIds.length} conversation(s) for deletion. ` +
          `ListingId: ${command.listingId}, ArchivalDate: ${command.archivalDate.toISOString()}`,
        );

        return {
          scheduledCount: scheduledIds.length,
          conversationIds: scheduledIds,
        };
      },
    );
  };
};
```

Benefits:

- Removes duplicated `startActiveSpan` / `setStatus` / `end` patterns here and in `cleanup-archived-conversations.ts`.
- Early returns no longer need manual span cleanup.
- The function body now clearly reads as: load conversations → unit of work → schedule + save → return result.
</issue_to_address>

### Comment 3
<location> `apps/api/src/handlers/conversation-cleanup-handler.ts:19` </location>
<code_context>
+ * - Conversations are deleted 6 months after the associated listing reaches a terminal state
+ * - This handler acts as a fallback mechanism in case event-driven scheduling fails
+ */
+export const conversationCleanupHandlerCreator = (
+	applicationServicesFactory: ApplicationServicesFactory,
+): TimerHandler => {
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying this timer handler by either dropping the explicit tracing span in favor of existing app-service instrumentation or extracting the span lifecycle into a helper so the handler code reflects business intent rather than tracing details.

You can slim this handler down by either (a) dropping the extra span and relying on the app-service instrumentation, or (b) abstracting the span boilerplate into a small helper so the handler reads in terms of business intent rather than tracing mechanics.

### Option A: Let the app-service own tracing

If `processConversationsForArchivedListings` already starts its own span and logs enough detail, the timer handler can focus on Azure-specific concerns + orchestration:

```ts
export const conversationCleanupHandlerCreator = (
  applicationServicesFactory: ApplicationServicesFactory,
): TimerHandler => {
  return async (timer: Timer, context: InvocationContext): Promise<void> => {
    context.log(
      `[ConversationCleanup] Timer trigger fired at ${new Date().toISOString()}`,
    );

    if (timer.isPastDue) {
      context.log(
        '[ConversationCleanup] Timer is past due, running catch-up execution',
      );
    }

    const appServices = await applicationServicesFactory.forRequest();
    const result =
      await appServices.Conversation.Conversation
        .processConversationsForArchivedListings();

    context.log(
      `[ConversationCleanup] Completed. Processed: ${result.processedCount}, ` +
      `Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
    );

    if (result.errors.length > 0) {
      context.warn(
        `[ConversationCleanup] Errors: ${result.errors.join('; ')}`,
      );
    }
  };
};
```

This keeps all functionality but removes nested spans and duplicated tracing logic.

### Option B: Factor out span boilerplate

If you really need a span at this layer (e.g., to tag timer-specific attributes), extract the try/catch/finally into a helper so the handler body doesn’t manage span lifecycle directly:

```ts
const withSpan = async <T>(
  name: string,
  fn: (span: Span) => Promise<T>,
): Promise<T> => {
  return tracer.startActiveSpan(name, async (span) => {
    try {
      const result = await fn(span);
      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR });
      if (error instanceof Error) {
        span.recordException(error);
      }
      throw error;
    } finally {
      span.end();
    }
  });
};
```

Then the handler becomes:

```ts
export const conversationCleanupHandlerCreator = (
  applicationServicesFactory: ApplicationServicesFactory,
): TimerHandler => {
  return async (timer: Timer, context: InvocationContext): Promise<void> => {
    await withSpan('conversationCleanup.timerHandler', async (span) => {
      span.setAttribute('timer.isPastDue', timer.isPastDue);
      span.setAttribute(
        'timer.scheduledTime',
        timer.scheduleStatus?.next ?? 'unknown',
      );

      context.log(
        `[ConversationCleanup] Timer trigger fired at ${new Date().toISOString()}`,
      );
      if (timer.isPastDue) {
        context.log(
          '[ConversationCleanup] Timer is past due, running catch-up execution',
        );
      }

      const appServices = await applicationServicesFactory.forRequest();
      const result =
        await appServices.Conversation.Conversation
          .processConversationsForArchivedListings();

      span.setAttribute('result.processedCount', result.processedCount);
      span.setAttribute('result.scheduledCount', result.scheduledCount);
      span.setAttribute('result.errorsCount', result.errors.length);

      context.log(
        `[ConversationCleanup] Completed. Processed: ${result.processedCount}, ` +
        `Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
      );
      if (result.errors.length > 0) {
        context.warn(
          `[ConversationCleanup] Errors: ${result.errors.join('; ')}`,
        );
      }
    });
  };
};
```

Both approaches preserve behavior; Option A reduces complexity the most, while Option B keeps your span but removes the noisy tracing boilerplate from the handler.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

The scheduleDeletionByListing service was designed for event-driven
deletion scheduling when listings are archived, but the event handler
is not implemented. The system currently uses the
processConversationsForArchivedListings cleanup job instead.
Remove redundant tracing span from timer handler, relying on
existing app-service instrumentation. This reduces complexity
and keeps handler focused on Azure-specific orchestration.
@lian-jung
Copy link
Contributor Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In processConversationsForArchivedListings, each conversation is processed in its own withScopedTransaction call; consider batching conversations per listing into a single unit-of-work/transaction to reduce round-trips and improve performance for large archived-listing sets.
  • The listing states 'Expired' and 'Cancelled' are hard-coded strings in the cleanup logic and tests; it would be more robust to reuse a shared enum/constant from the listing domain/persistence layer to avoid drift if the allowed states ever change.
  • The 6‑month retention window is implemented as a fixed 6 * 30 * 24 * 60 * 60 * 1000 offset in Conversation.scheduleForDeletion; consider centralizing this retention period in a single constant (and possibly basing it on calendar months or a clearly documented day count) so that changes to the policy only need to be made in one place and stay in sync with the security requirements document.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `processConversationsForArchivedListings`, each conversation is processed in its own `withScopedTransaction` call; consider batching conversations per listing into a single unit-of-work/transaction to reduce round-trips and improve performance for large archived-listing sets.
- The listing states `'Expired'` and `'Cancelled'` are hard-coded strings in the cleanup logic and tests; it would be more robust to reuse a shared enum/constant from the listing domain/persistence layer to avoid drift if the allowed states ever change.
- The 6‑month retention window is implemented as a fixed `6 * 30 * 24 * 60 * 60 * 1000` offset in `Conversation.scheduleForDeletion`; consider centralizing this retention period in a single constant (and possibly basing it on calendar months or a clearly documented day count) so that changes to the policy only need to be made in one place and stay in sync with the security requirements document.

## Individual Comments

### Comment 1
<location> `packages/sthrift/persistence/src/datasources/readonly/listing/item/item-listing.read-repository.ts:198-216` </location>
<code_context>
 		}
 	}
+
+	async getByStates(
+		states: string[],
+		options?: FindOptions,
+	): Promise<Domain.Contexts.Listing.ItemListing.ItemListingEntityReference[]> {
+		if (!states || states.length === 0) return [];
+		try {
+			const result = await this.mongoDataSource.find(
+				{ state: { $in: states } },
+				{
+					...options,
+				},
+			);
+			if (!result || result.length === 0) return [];
+			return result.map((doc) => this.converter.toDomain(doc, this.passport));
+		} catch (error) {
+			console.error('Error fetching listings by states:', error);
+			return [];
+		}
+	}
 }

</code_context>

<issue_to_address>
**suggestion:** Error handling in `getByStates` differs from other repository methods and may be noisier than intended.

Other methods here catch errors, log a warning, and return `[]`/`null`. This one uses `console.error` and returns `[]`. If read failures are meant to be treated as “no results”, consider aligning the logging level (e.g., `console.warn`) or using the same logging mechanism as the rest of the repository, so this method doesn’t signal a higher-severity error than intended.

```suggestion
	async getByStates(
		states: string[],
		options?: FindOptions,
	): Promise<Domain.Contexts.Listing.ItemListing.ItemListingEntityReference[]> {
		if (!states || states.length === 0) return [];
		try {
			const result = await this.mongoDataSource.find(
				{ state: { $in: states } },
				{
					...options,
				},
			);
			if (!result || result.length === 0) return [];
			return result.map((doc) => this.converter.toDomain(doc, this.passport));
		} catch (error) {
			console.warn('Error fetching listings by states:', error);
			return [];
		}
	}
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

…ethods

Use console.warn instead of console.error to match the logging
level used by other methods like getBySharer.
Move retention period calculation to a static constant
(RETENTION_PERIOD_MS) on the Conversation class. Set to 180 days
exactly (not 6*30) per security requirement 0012. This ensures
the retention period is defined in one place and stays in sync
with documentation.
Replace hard-coded 'Expired' and 'Cancelled' strings with references
to Domain.Contexts.Listing.ItemListing.ValueObjects.ListingStateEnum.
This prevents drift if listing states change and ensures consistency
with the domain layer.
Refactor processConversationsForArchivedListings to process all
conversations for each listing in a single withScopedTransaction call
instead of creating a new transaction for each conversation. This
reduces database round-trips and improves performance for large
archived-listing sets.
Remove readonly constraint and 'as const' to match getByStates
signature which expects mutable string[].
@lian-jung
Copy link
Contributor Author

@soucery-ai review

@lian-jung
Copy link
Contributor Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 4 issues, and left some high level feedback:

  • In conversation-cleanup-handler.ts the error logging uses context.warn(...), but InvocationContext exposes context.log.warn(...); this will currently fail at runtime and should be updated to use the logger correctly.
  • The processConversationsForArchivedListings loop mixes per-conversation and per-listing increments of processedCount, which is logically correct but hard to follow; consider centralizing the processedCount update (e.g., once per listing based on conversations.length) to make the accounting clearer and less error-prone.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `conversation-cleanup-handler.ts` the error logging uses `context.warn(...)`, but `InvocationContext` exposes `context.log.warn(...)`; this will currently fail at runtime and should be updated to use the logger correctly.
- The `processConversationsForArchivedListings` loop mixes per-conversation and per-listing increments of `processedCount`, which is logically correct but hard to follow; consider centralizing the `processedCount` update (e.g., once per listing based on `conversations.length`) to make the accounting clearer and less error-prone.

## Individual Comments

### Comment 1
<location> `apps/api/src/handlers/conversation-cleanup-handler.ts:27-28` </location>
<code_context>
+			`[ConversationCleanup] Completed. Processed: ${result.processedCount}, Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
+		);
+
+		if (result.errors.length > 0) {
+			context.warn(
+				`[ConversationCleanup] Errors: ${result.errors.join('; ')}`,
+			);
</code_context>

<issue_to_address>
**issue (bug_risk):** InvocationContext likely does not expose a top-level `warn` method; use the logger on `context.log` instead

`InvocationContext` exposes logging via `context.log` (and its level-specific methods), not `context.warn`. Calling `context.warn` will likely throw at runtime. Please update this block to use `context.log.warn`, e.g.:

```ts
if (result.errors.length > 0) {
  context.log.warn(
    `[ConversationCleanup] Errors: ${result.errors.join('; ')}`,
  );
}
```
</issue_to_address>

### Comment 2
<location> `packages/sthrift/persistence/src/datasources/readonly/conversation/conversation/conversation.read-repository.ts:149-158` </location>
<code_context>
 		return this.typeConverter.toDomain(mongoConversation, this.passport);
 	}

+	async getByListingId(
+		listingId: string,
+	): Promise<
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Swallowing all errors in getByListingId may hide systemic issues

Catching all errors here and returning an empty array (with only a warning) prevents callers from telling "no conversations" apart from a data source failure and can mask outages. Consider limiting the catch to the `ObjectId` construction/validation error you expect, and either rethrow or propagate other errors so they can be handled and monitored properly.
</issue_to_address>

### Comment 3
<location> `packages/sthrift/persistence/src/datasources/readonly/listing/item/item-listing.read-repository.ts:198-207` </location>
<code_context>
 		}
 	}
+
+	async getByStates(
+		states: string[],
+		options?: FindOptions,
+	): Promise<Domain.Contexts.Listing.ItemListing.ItemListingEntityReference[]> {
+		if (!states || states.length === 0) return [];
+		try {
+			const result = await this.mongoDataSource.find(
+				{ state: { $in: states } },
+				{
+					...options,
+				},
+			);
+			if (!result || result.length === 0) return [];
+			return result.map((doc) => this.converter.toDomain(doc, this.passport));
+		} catch (error) {
+			console.warn('Error fetching listings by states:', error);
+			return [];
</code_context>

<issue_to_address>
**suggestion (bug_risk):** getByStates conflates errors with "no results" by returning an empty array on any failure

By returning `[]` for any caught error and only logging a warning, callers can’t distinguish “no listings for these states” from “query failed” (e.g. DB outage). If you want resilience to known query failures, consider narrowing the `catch` to those cases or rethrowing unexpected errors so upstream layers/telemetry can detect and handle real failures.

Suggested implementation:

```typescript
	async getByStates(
		states: string[],
		options?: FindOptions,
	): Promise<Domain.Contexts.Listing.ItemListing.ItemListingEntityReference[]> {
		if (!states || states.length === 0) return [];

		const result = await this.mongoDataSource.find(
			{ state: { $in: states } },
			{
				...options,
			},
		);

		if (!result || result.length === 0) return [];

		return result.map((doc) => this.converter.toDomain(doc, this.passport));

```

If the rest of this repository layer uses structured error handling (e.g. wraps data source errors into domain-specific error types or logs them centrally), ensure that callers of `getByStates` are ready to handle propagated errors similarly to other read methods (like `getById`, `getByIds`, etc.) in this file. If other methods also swallow all errors, you may want to apply the same pattern there for consistency with this change.
</issue_to_address>

### Comment 4
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts:21` </location>
<code_context>
+	errors: string[];
+}
+
+export const processConversationsForArchivedListings = (
+	dataSources: DataSources,
+) => {
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the tracing wrapper, core cleanup logic, and per-listing processing into separate helper functions to flatten control flow and isolate responsibilities.

You can reduce the complexity meaningfully without changing behavior by splitting responsibilities and pushing counting/error-handling into small helpers.

### 1. Thin traced wrapper + core cleanup function

Right now `processConversationsForArchivedListings` returns an async function that immediately calls `startActiveSpan` and mixes all logic inside it. You can keep the exported API exactly as-is but move the business logic to a separate function:

```ts
export const processConversationsForArchivedListings = (dataSources: DataSources) => {
  return async (): Promise<CleanupResult> => {
    return tracer.startActiveSpan(
      'conversation.processConversationsForArchivedListings',
      async (span) => {
        const result: CleanupResult = {
          processedCount: 0,
          scheduledCount: 0,
          timestamp: new Date(),
          errors: [],
        };

        try {
          await runCleanupForArchivedListings(dataSources, result);
          span.setAttribute('processedCount', result.processedCount);
          span.setAttribute('scheduledCount', result.scheduledCount);
          span.setAttribute('errorsCount', result.errors.length);
          span.setStatus({ code: SpanStatusCode.OK });
          console.log(
            `[ConversationCleanup] Cleanup complete. Processed: ${result.processedCount}, Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
          );
        } catch (error) {
          span.setStatus({ code: SpanStatusCode.ERROR });
          if (error instanceof Error) {
            span.recordException(error);
            result.errors.push(error.message);
          }
          console.error('[ConversationCleanup] Cleanup failed:', error);
          throw error;
        } finally {
          span.end();
        }

        return result;
      },
    );
  };
};
```

Now the core logic can live in `runCleanupForArchivedListings`, making the span wrapper much flatter.

### 2. Per-listing helper to flatten control flow and counting

Move the per-listing logic (including the inner `try/catch`) into a dedicated helper that returns per-listing counts. This keeps counters local and then aggregated, and centralizes error handling:

```ts
async function runCleanupForArchivedListings(
  dataSources: DataSources,
  result: CleanupResult,
): Promise<void> {
  const archivedListings =
    await dataSources.readonlyDataSource.Listing.ItemListing.ItemListingReadRepo.getByStates(
      ARCHIVED_LISTING_STATES,
    );

  for (const listing of archivedListings) {
    const { processed, scheduled, error } =
      await processListingConversations(listing, dataSources);

    result.processedCount += processed;
    result.scheduledCount += scheduled;

    if (error) {
      result.errors.push(error);
    }
  }
}
```

And the listing-level helper:

```ts
async function processListingConversations(
  listing: { id: string; updatedAt: Date },
  dataSources: DataSources,
): Promise<{ processed: number; scheduled: number; error?: string }> {
  let processed = 0;
  let scheduled = 0;

  try {
    const conversations =
      await dataSources.readonlyDataSource.Conversation.Conversation.ConversationReadRepo
        .getByListingId(listing.id);

    const conversationsToSchedule = conversations.filter((c) => !c.expiresAt);

    if (conversationsToSchedule.length === 0) {
      processed += conversations.length;
      return { processed, scheduled };
    }

    await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork
      .withScopedTransaction(async (repo) => {
        for (const conversationRef of conversationsToSchedule) {
          const conversation = await repo.get(conversationRef.id);
          if (conversation && !conversation.expiresAt) {
            conversation.scheduleForDeletion(listing.updatedAt);
            await repo.save(conversation);
            scheduled++;
          }
          processed++;
        }
      });

    processed += conversations.length - conversationsToSchedule.length;

    return { processed, scheduled };
  } catch (e) {
    const errorMsg = `Failed to process conversations for listing ${
      listing.id
    }: ${e instanceof Error ? e.message : String(e)}`;
    console.error(`[ConversationCleanup] ${errorMsg}`);
    return { processed, scheduled, error: errorMsg };
  }
}
```

Benefits:

- Removes nested `try/catch` from the main function; each function has a single concern.
- Counters are computed per listing (`processed`, `scheduled`) and then aggregated, avoiding interleaved mutations in multiple scopes.
- Error handling for a listing is localized; the main flow stays focused on orchestration and aggregation.
- Telemetry remains intact but is no longer mixed with the per-listing algorithm.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@lian-jung
Copy link
Contributor Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 5 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts:61` </location>
<code_context>
+							}
+
+							// Batch all conversations for this listing in a single transaction
+							await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork.withScopedTransaction(
+								async (repo) => {
+									for (const conversationRef of conversationsToSchedule) {
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Clarify and simplify processedCount accounting to avoid double-adding and edge-case drift.

`processedCount` is updated in two different places, which makes it easy for the count to diverge from what you actually processed:

- In the transaction you increment only for `conversationsToSchedule` without `expiresAt`.
- Afterward you add `conversations.length - conversationsToSchedule.length`, assuming that covers everything else.

If `repo.get` returns `null` or additional branches are added, this accounting can become incorrect. Consider instead:

- Increment `processedCount` once per conversation you iterate over.
- Increment `scheduledCount` only when you call `scheduleForDeletion`.
- Remove the final `processedCount += conversations.length - conversationsToSchedule.length` and depend solely on the per-conversation increments.

This will keep the metrics aligned with the actual work done, even as the logic evolves.

Suggested implementation:

```typescript
							// Batch all conversations for this listing in a single transaction
							await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork.withScopedTransaction(
								async (repo) => {
									for (const conversationRef of conversationsToSchedule) {
										const conversation = await repo.get(conversationRef.id);
										if (conversation && !conversation.expiresAt) {
											conversation.scheduleForDeletion(listing.updatedAt);
											await repo.save(conversation);
											result.scheduledCount++;
										}
										// Count each conversation we actually process in this transaction
										result.processedCount++;
									}
								},
							);

```

1. Ensure there are no other aggregate `result.processedCount += ...` adjustments for this same `conversations` collection elsewhere in the function (beyond the `if (conversationsToSchedule.length === 0)` early-return case). Any such bulk “catch-up” increments should be removed so `processedCount` is derived solely from the per-conversation increments.
2. If you later introduce additional branches (e.g., different handling for already-expired conversations), increment `processedCount` exactly once in the same place you handle each conversation’s logic to keep the metric aligned.
</issue_to_address>

### Comment 2
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts:83-87` </location>
<code_context>
+						}
+					}
+
+					span.setAttribute('processedCount', result.processedCount);
+					span.setAttribute('scheduledCount', result.scheduledCount);
+					span.setAttribute('errorsCount', result.errors.length);
+					span.setStatus({ code: SpanStatusCode.OK });
+					span.end();
+
</code_context>

<issue_to_address>
**suggestion:** Span status is set to OK even when there are per-listing errors recorded.

`span.setStatus({ code: SpanStatusCode.OK })` is called unconditionally, so traces won’t reflect partial failures when `result.errors.length > 0`, even though we record `errorsCount`.

Consider setting the status based on whether any errors occurred, for example:

```ts
if (result.errors.length > 0) {
  span.setStatus({
    code: SpanStatusCode.ERROR,
    message: `${result.errors.length} listing(s) failed during cleanup`,
  });
} else {
  span.setStatus({ code: SpanStatusCode.OK });
}
```

This preserves a successful `CleanupResult` while still indicating a degraded run in tracing.

```suggestion
					span.setAttribute('processedCount', result.processedCount);
					span.setAttribute('scheduledCount', result.scheduledCount);
					span.setAttribute('errorsCount', result.errors.length);

					if (result.errors.length > 0) {
						span.setStatus({
							code: SpanStatusCode.ERROR,
							message: `${result.errors.length} listing(s) failed during cleanup`,
						});
					} else {
						span.setStatus({ code: SpanStatusCode.OK });
					}

					span.end();
```
</issue_to_address>

### Comment 3
<location> `apps/docs/docs/security-requirements/0012-conversation-data-retention.md:38-39` </location>
<code_context>
+- `Conversation.scheduleForDeletion(archivalDate: Date)` method sets the expiration date
+- `Conversation.expiresAt` property stores the deletion timestamp
+- Authorization enforced through `ConversationDomainPermissions.isSystemAccount` permission
+- Only system passport can schedule conversations for deletion
+
+### Application Services
</code_context>

<issue_to_address>
**nitpick (typo):** Consider adding an article: "Only the system passport can schedule conversations for deletion" reads more naturally.

This keeps the bullet grammatically correct and consistent with the others in the list.

```suggestion
- Authorization enforced through `ConversationDomainPermissions.isSystemAccount` permission
- Only the system passport can schedule conversations for deletion
```
</issue_to_address>

### Comment 4
<location> `apps/docs/docs/security-requirements/0012-conversation-data-retention.md:46` </location>
<code_context>
+**ScheduleConversationDeletionByListing Service**
+- Triggered when a listing is archived
+- Finds all conversations associated with the listing
+- Schedules each conversation for deletion with 6-month retention period
+- Uses batch processing with configurable batch sizes
+- Includes OpenTelemetry tracing for observability
</code_context>

<issue_to_address>
**nitpick (typo):** Add an article: "with a 6-month retention period" is grammatically smoother.

Consider: "Schedules each conversation for deletion with a 6‑month retention period" so the bullet reads as a complete sentence.

```suggestion
- Schedules each conversation for deletion with a 6-month retention period
```
</issue_to_address>

### Comment 5
<location> `packages/sthrift/application-services/src/contexts/conversation/conversation/cleanup-archived-conversations.ts:21` </location>
<code_context>
+	errors: string[];
+}
+
+export const processConversationsForArchivedListings = (
+	dataSources: DataSources,
+) => {
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the cleanup function to centralize counting, extract per-listing processing and error handling into helpers, and use a single span finalization path to simplify control flow and tracing.

You can reduce complexity around counting, control flow, and span handling without changing behavior.

### 1. Centralize `processedCount` logic

Currently `processedCount` is mutated in two places (inside the transaction loop and after it), which is harder to reason about and easy to break in future edits.

You can define “processed” as “conversations we attempted to process for this listing” and increment once per listing:

```ts
for (const listing of archivedListings) {
  try {
    const conversations =
      await dataSources.readonlyDataSource.Conversation.Conversation.ConversationReadRepo.getByListingId(
        listing.id,
      );

    const conversationsToSchedule = conversations.filter((c) => !c.expiresAt);

    // Count all conversations for this listing once
    result.processedCount += conversations.length;

    if (conversationsToSchedule.length === 0) {
      continue;
    }

    await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork.withScopedTransaction(
      async (repo) => {
        for (const conversationRef of conversationsToSchedule) {
          const conversation = await repo.get(conversationRef.id);
          if (conversation && !conversation.expiresAt) {
            conversation.scheduleForDeletion(listing.updatedAt);
            await repo.save(conversation);
            result.scheduledCount++;
          }
        }
      },
    );
  } catch (error) {
    recordListingError(listing.id, error, result);
  }
}
```

This keeps `processedCount` updated in a single, obvious place and leaves `scheduledCount` tied to real successful scheduling.

### 2. Extract per‑listing processing into a helper

Pulling the per‑listing logic into a helper flattens the nesting and makes responsibilities clearer, while keeping the transaction where it is:

```ts
async function processListing(
  listing: Listing,
  dataSources: DataSources,
  result: CleanupResult,
): Promise<void> {
  const conversations =
    await dataSources.readonlyDataSource.Conversation.Conversation.ConversationReadRepo.getByListingId(
      listing.id,
    );

  result.processedCount += conversations.length;

  const conversationsToSchedule = conversations.filter((c) => !c.expiresAt);
  if (conversationsToSchedule.length === 0) return;

  await dataSources.domainDataSource.Conversation.Conversation.ConversationUnitOfWork.withScopedTransaction(
    async (repo) => {
      for (const conversationRef of conversationsToSchedule) {
        const conversation = await repo.get(conversationRef.id);
        if (conversation && !conversation.expiresAt) {
          conversation.scheduleForDeletion(listing.updatedAt);
          await repo.save(conversation);
          result.scheduledCount++;
        }
      }
    },
  );
}
```

Then the main loop becomes simpler:

```ts
for (const listing of archivedListings) {
  try {
    await processListing(listing, dataSources, result);
  } catch (error) {
    recordListingError(listing.id, error, result);
  }
}
```

### 3. Simplify span finalization

`span.end()` and status setting are currently duplicated in both `try` and `catch`. Putting them in a `finally` block keeps tracing concerns separate from business logic:

```ts
return await tracer.startActiveSpan(
  'conversation.processConversationsForArchivedListings',
  async (span) => {
    const result: CleanupResult = {
      processedCount: 0,
      scheduledCount: 0,
      timestamp: new Date(),
      errors: [],
    };

    try {
      const archivedListings =
        await dataSources.readonlyDataSource.Listing.ItemListing.ItemListingReadRepo.getByStates(
          ARCHIVED_LISTING_STATES,
        );

      span.setAttribute('archivedListingsCount', archivedListings.length);

      for (const listing of archivedListings) {
        try {
          await processListing(listing, dataSources, result);
        } catch (error) {
          recordListingError(listing.id, error, result);
        }
      }

      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR });
      if (error instanceof Error) {
        span.recordException(error);
        result.errors.push(error.message);
      }
      console.error('[ConversationCleanup] Cleanup failed:', error);
      throw error;
    } finally {
      span.setAttribute('processedCount', result.processedCount);
      span.setAttribute('scheduledCount', result.scheduledCount);
      span.setAttribute('errorsCount', result.errors.length);
      span.end();
      console.log(
        `[ConversationCleanup] Cleanup complete. Processed: ${result.processedCount}, Scheduled: ${result.scheduledCount}, Errors: ${result.errors.length}`,
      );
    }
  },
);
```

### 4. Encapsulate per‑listing error logging

The per‑listing error logging is duplicated and string-based. A tiny helper reduces noise:

```ts
function recordListingError(
  listingId: string,
  error: unknown,
  result: CleanupResult,
): void {
  const message =
    error instanceof Error ? error.message : String(error);
  const errorMsg = `Failed to process conversations for listing ${listingId}: ${message}`;
  result.errors.push(errorMsg);
  console.error(`[ConversationCleanup] ${errorMsg}`);
}
```

These changes keep all behavior (including partial failures, logging, and tracing) intact while making the function easier to reason about and safer to modify.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@lian-jung
Copy link
Contributor Author

@sourcery-ai review

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 41 out of 42 changed files in this pull request and generated 5 comments.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 41 out of 42 changed files in this pull request and generated 4 comments.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 42 out of 43 changed files in this pull request and generated 3 comments.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 42 out of 43 changed files in this pull request and generated no new comments.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 48 out of 49 changed files in this pull request and generated 1 comment.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 51 out of 52 changed files in this pull request and generated no new comments.

Files not reviewed (1)
  • pnpm-lock.yaml: Language not supported

@lian-jung lian-jung requested a review from SevenThomas January 9, 2026 16:07
@lian-jung lian-jung marked this pull request as ready for review January 9, 2026 16:07
@lian-jung lian-jung requested a review from a team January 9, 2026 16:07
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @lian-jung, your pull request is larger than the review limit of 150000 diff characters

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Conversation - Conversation Deletion

2 participants