Skip to content

fix(sync): avoid startup image publish backpressure#2

Merged
Jiu-xiao merged 2 commits intomasterfrom
codex/sync-cleanup-20260503
May 4, 2026
Merged

fix(sync): avoid startup image publish backpressure#2
Jiu-xiao merged 2 commits intomasterfrom
codex/sync-cleanup-20260503

Conversation

@Jiu-xiao
Copy link
Copy Markdown
Contributor

@Jiu-xiao Jiu-xiao commented May 4, 2026

问题

RAW_PROBE 启动阶段 detector 从第 0 帧接入时,消费者可能先拿到未同步图像并等待 IMU,导致图像共享 topic 出现 publish backpressure。

修改

  • CameraFrameSyncSubscriber::Wait() 改为先等待同步后 IMU,再按相同 timestamp_us 取图像。
  • 图像订阅使用 BROADCAST_DROP_OLD,消费滞后时丢旧描述符,不把图像流变成 backlog。
  • README 补充消费者启动语义。

验证

  • Ubuntu24: /home/xiao/runs/cfs_latency_dropold_fix_20260504T000356Z
  • raw_probe_start_attached: detected=144 published=200 startup_ok=1
  • 无 image publish failed,无 SYNCED -> OBSERVING
  • RAW_PROBE publish-before 到 detector Wait return p99 7.133us
  • sequence tests: /home/xiao/runs/cfs_sequence_test_20260504T000521Z,10/10 PASS

依赖 libxr PR:修 BROADCAST_DROP_OLD 并发竞态。

Summary by Sourcery

Refine camera frame/IMU sync to avoid image-topic backpressure during RAW_PROBE startup and clarify consumer behavior.

New Features:

  • Add drop-old FIFO handling for IMU ingress and image event queues to prevent backlog when consumers lag.
  • Introduce a BROADCAST_DROP_OLD subscriber mode for image consumption so stale frames are discarded instead of blocking publishers.

Enhancements:

  • Rework RAW_PROBE sync state machine data structures and naming for clearer separation of observed periods, locked sync, and pending frames.
  • Change CameraFrameSyncSubscriber to wait on synced IMU first, then fetch matching images based on timestamps to avoid holding unsynced frames.
  • Standardize gyro/accl/quat topic naming to derive from camera name and simplify runtime configuration.
  • Add extensive inline documentation for CameraFrameSync internals and subscriber behavior.

Documentation:

  • Document the new subscriber wait semantics, drop-old image subscription behavior, and simplified raw IMU topic naming in the README.

Tests:

  • Update and extend sequence harness tests to reflect the new sync-period tracking and pending frame structures.

@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented May 4, 2026

Reviewer's Guide

Refactors CameraFrameSync’s RAW_PROBE sync state machine and subscriber to use IMU-first waiting and drop-old image semantics, restructures IMU ingress/pending queues and sync relations for clearer state, and updates documentation and tests to match the new behavior and naming.

Class diagram for updated CameraFrameSync RAW_PROBE state and queues

classDiagram
class CameraFrameSync {
  +SyncMode sync_mode_
  +int32 offset_us_
  +SyncState state_
  +ImageData current_image_
  +DropOldestQueue_GyroSample_ pending_gyros_
  +DropOldestQueue_AcclSample_ pending_accls_
  +DropOldestQueue_QuatReading_ pending_quats_
  +DropOldestQueue_ImageSample_ image_events_
  +SampleHistory_AssembledImu_ imu_history_
  +ObservedPeriods periods_
  +LockedSync locked_sync_
  +bool last_image_valid_
  +uint64 last_image_timestamp_us_
  +PendingFrame pending_frame_
  +bool overflowed_
  +ProcessImageEvents()
  +ProcessLatestImage(frame)
  +ProcessRawProbeImage(frame)
  +TryProbeImage(frame)
  +TryLatestImuMatch(frame)
  +TrySyncedImage(frame)
  +ResumePendingMatch(frame)
  +PublishOrRememberMatch(frame,match)
  +PublishMatchedImage(image,match)
}

class PendingFrame {
  +bool valid
  +ImageSample image
  +bool cadence_consumed
  +PendingMatch match
}

class PendingMatch {
  +bool valid
  +uint64 imu_timestamp_us
  +uint64 period_us
}

class ImageSample {
  +uint64 sensor_timestamp_us
}

class GyroSample {
  +uint64 sensor_timestamp_us
  +ImuVector angular_velocity_xyz
}

class AcclSample {
  +uint64 sensor_timestamp_us
  +ImuVector linear_acceleration_xyz
}

class QuatReading {
  +uint64 sensor_timestamp_us
  +QuatSample rotation_wxyz
}

class AssembledImu {
  +uint64 sensor_timestamp_us
  +QuatSample rotation_wxyz
  +ImuVector angular_velocity_xyz
  +ImuVector linear_acceleration_xyz
}

class ObservedPeriods {
  +uint64 image_us
  +uint64 imu_us
}

class LockedSync {
  +uint64 period_us
  +uint64 last_imu_timestamp_us
}

class SyncMatch {
  +AssembledImu* imu
  +uint64 period_us
}

class DropOldestQueue_T_ {
  +DropOldestQueue_T_(capacity)
  +bool Empty()
  +bool Front(out)
  +void PopFront()
  +void Clear()
  +bool PushBackDropOldest(value)
}

CameraFrameSync --> PendingFrame : has
CameraFrameSync --> ObservedPeriods : has
CameraFrameSync --> LockedSync : has
CameraFrameSync --> "*" AssembledImu : history
CameraFrameSync --> "*" GyroSample : queues
CameraFrameSync --> "*" AcclSample : queues
CameraFrameSync --> "*" QuatReading : queues
PendingFrame --> ImageSample : image
PendingFrame --> PendingMatch : match
SyncMatch --> AssembledImu : imu

class DropOldestQueue_GyroSample_ {
}
class DropOldestQueue_AcclSample_ {
}
class DropOldestQueue_QuatReading_ {
}
class DropOldestQueue_ImageSample_ {
}

DropOldestQueue_GyroSample_ --|> DropOldestQueue_T_ : T=GyroSample
DropOldestQueue_AcclSample_ --|> DropOldestQueue_T_ : T=AcclSample
DropOldestQueue_QuatReading_ --|> DropOldestQueue_T_ : T=QuatReading
DropOldestQueue_ImageSample_ --|> DropOldestQueue_T_ : T=ImageSample
Loading

Class diagram for updated CameraFrameSyncSubscriber image/IMU waiting

classDiagram
class CameraFrameSyncSubscriber {
  +string image_topic_name_
  +string imu_topic_name_
  +string host_topic_domain_name_
  +LinuxSharedTopicSubscriber image_sub_
  +ImuStampedT latest_imu_
  +SyncSubscriber_ImuStampedT_ imu_sub_
  +bool last_imu_valid_
  +uint64 last_imu_timestamp_us_
  +ImageData pending_image_
  +Wait(out,timeout_ms)
  +WaitForNextImu(deadline_ms,timeout_ms,imu)
  +WaitForMatchingImage(imu_timestamp_us,deadline_ms,timeout_ms,image)
}

class SyncedFrame {
  +ImageData image
  +ImuStampedT imu
}

class ImageData {
  +ImageFrame* GetData()
  +void Reset()
}

class ImuStampedT {
  +uint64 timestamp_us
}

class LinuxSharedTopicSubscriber {
  +Wait(timeout_ms)
}

class SyncSubscriber_ImuStampedT_ {
  +Wait(timeout_ms)
}

CameraFrameSyncSubscriber --> SyncedFrame : fills
CameraFrameSyncSubscriber --> ImageData : uses
CameraFrameSyncSubscriber --> ImuStampedT : uses
CameraFrameSyncSubscriber --> LinuxSharedTopicSubscriber : image_sub_
CameraFrameSyncSubscriber --> SyncSubscriber_ImuStampedT_ : imu_sub_
Loading

File-Level Changes

Change Details Files
Rework CameraFrameSync RAW_PROBE state machine, IMU assembly, and sync relation data structures for clearer responsibilities and safer recovery on overflow/out-of-order data.
  • Rename and document many type aliases and structs, introducing PendingFrame, PendingMatch, ObservedPeriods, and LockedSync to separate cadence observation from locked sync relationships.
  • Replace QueuedGyro/Accl/Quat wrappers with direct GyroSample/AcclSample/QuatReading in lock-free ingress queues and pending DropOldestQueue buffers.
  • Adjust RAW_PROBE and LATEST_IMU image processing flow to track per-frame cadence consumption and sync matches, ensure images are processed in order, and treat probe gaps separately from normal cadence.
  • Refine IMU history assembly to use gyro as the master timestamp, aggressively drop older accl/quat, and enforce strictly monotonic assembled IMU timestamps.
  • Handle cadence observation and sync lock reset via new ObservedPeriods/LockedSync fields, updating EstimatedSyncPeriod, normal/probe gap checks, and reset paths accordingly.
  • Tighten overflow recovery hooks by marking overflow when DropOldestQueue drops samples and resetting sync lock on cadence breaks or history anomalies.
CameraFrameSync.hpp
camera_frame_sync_state_machine.hpp
camera_frame_sync_impl.hpp
tests/sequence_test.cpp
Change CameraFrameSyncSubscriber semantics to wait on IMU first and use BROADCAST_DROP_OLD image subscription, preventing image-topic backpressure when consumers lag or before sync is locked.
  • Introduce a static image_subscriber_mode set to LinuxSharedSubscriberMode::BROADCAST_DROP_OLD and construct image_sub_ with this mode to drop old image descriptors.
  • Rewrite Wait() to first wait for the next monotonic synced IMU, then find a matching image by timestamp, skipping images older than the IMU and handling the case where images run ahead of IMU.
  • Add WaitForNextImu and WaitForMatchingImage helpers, tracking last_imu_timestamp_us_ and pending_image_ to avoid stale semaphore counts and to retain a single candidate image for future IMU arrivals.
camera_frame_sync_subscriber.hpp
Align tests and documentation with the new sync relation naming and subscriber semantics, and simplify raw IMU topic naming to derive from camera name instead of a runtime prefix.
  • Update sequence harness structs to mirror production types (PendingFrame, PendingMatch, ObservedPeriods, LockedSync), adjusting logic to use periods_/locked_sync_ instead of relation_.
  • Adapt tests’ estimated period, gap detection, and sync lock logic to the new field names and behavior, including PublishOrRemember and Resume paths.
  • Remove raw_imu_topic_prefix from runtime parameters, always deriving raw IMU topic names from camera.NameView().
  • Update README to describe the new IMU topic naming convention, consumer Wait behavior (IMU-first), and drop-old image subscriber semantics.
  • Clarify construction, topics, and logging comments in CameraFrameSync implementation to document assumptions and failure handling paths.
CameraFrameSync.hpp
tests/sequence_test.cpp
camera_frame_sync_impl.hpp
README.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've reviewed your changes and they look great!


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jiu-xiao Jiu-xiao merged commit ca49a23 into master May 4, 2026
3 checks passed
@Jiu-xiao Jiu-xiao deleted the codex/sync-cleanup-20260503 branch May 4, 2026 00:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant