Skip to content

misc(events-processor): Add an events reprocess pipeline#712

Merged
vincent-pochet merged 1 commit intomainfrom
misc-reprocess-pipeline
Mar 6, 2026
Merged

misc(events-processor): Add an events reprocess pipeline#712
vincent-pochet merged 1 commit intomainfrom
misc-reprocess-pipeline

Conversation

@vincent-pochet
Copy link
Contributor

Context

Two issues where recently identified in the events-processor:

Because of this two issues, some events will need to be reprocessed, either completely (for the timestamp issue as not events_enriched record were created), or partially (for the grouped_by issue as we only need to re-create the events_enriched_expanded record)

Description

This PR check for the presence of a new reprocess flag on the event_raw kafka payload. When present this flag will allow to completely by-pass some part of the pipeline like the events_enriched kafka message producing, the subscription flagging for refresh or the cache expiration. Only an events_enriched_explanded message will be produced

Copilot AI review requested due to automatic review settings March 5, 2026 09:46
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a “reprocess” mode to the events-processor pipeline so that specific replayed events can skip parts of the normal flow and only regenerate events_enriched_expanded outputs (intended to address backfills after recent enrichment bugs).

Changes:

  • Introduce source_metadata.reprocess on the raw event payload and an Event.IsReprocess() helper.
  • Update EventProcessor.processEvent to bypass events_enriched production (and related side effects) when reprocess is set, producing only events_enriched_expanded.
  • Add a processor test covering the reprocess behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
events-processor/processors/events_processor/processor.go Adds the reprocess fast-path that only produces enriched-expanded messages.
events-processor/models/event.go Extends SourceMetadata with reprocess and adds IsReprocess().
events-processor/processors/events_processor/processor_test.go Adds coverage for the reprocess path.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@vincent-pochet vincent-pochet force-pushed the misc-reprocess-pipeline branch from 63b569d to a81bf0c Compare March 5, 2026 09:58
@vincent-pochet vincent-pochet merged commit 6048999 into main Mar 6, 2026
1 check passed
@vincent-pochet vincent-pochet deleted the misc-reprocess-pipeline branch March 6, 2026 08:52
vincent-pochet added a commit to getlago/lago-api that referenced this pull request Mar 6, 2026
## Context

This PR is related to getlago/lago#712

Two issues where recently identified in the events-processor:
- Events with `timestamp` formatted as ISO 8601 string were not
processed correctly and were pushed to the dead letter queue (fixed with
getlago/lago#709)
- Pricing group keys were not assigned correctly to the events when no
filters were present on a given charge, leading to inconsistent data in
the `events_enriched_expanded` kafka topic and clickhouse table (fixed
with getlago/lago#710)

Because of this two issues, some events will need to be reprocessed,
either completely (for the timestamp issue as not `events_enriched`
record were created), or partially (for the grouped_by issue as we only
need to re-create the `events_enriched_expanded` record)

## Description

This PR adds two rake tasks that will allow full or partial (to only
produce events_enriched_expanded records) processing of events_raw.

- `events:reprocess` will fetch clickhouse `events_raw` records and push
them for reprocessing in the kafka topic.
  It takes multiple arguments:
  - `ORGANIZATION_ID`
  - An optional `SUBSCRIPTION_IDS to filter on a set of subscription
  - An optional `BM_CODES` to filter on a set of billable metrics
- `REPROCESS` default to `true`, to only refresh the
`events_enriched_expanded`

- `events:deduplicate_enriched_expanded` will remove duplicated events
to ensure a coherent state of the `events_enriched_expanded` table,
mitigating the eventual consistency of Clickhouse
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants