From 808248c8408252415f42736e946f23f6ba747a10 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 12:30:12 +0000 Subject: [PATCH 1/2] Initial plan From 704692c99d7a6b67dd3809c5b0743000055aace9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 12:39:40 +0000 Subject: [PATCH 2/2] Refactor Apollo C++ code to use Abseil for better performance - Replace std::unordered_map with absl::flat_hash_map in scheduler headers - Replace std::mutex with absl::Mutex in scheduler and thread_safe_queue - Use absl::WriterMutexLock for exclusive lock guards - Replace std::condition_variable with absl::Mutex::Await() in ThreadSafeQueue - Replace string .append() chains with absl::StrAppend() in scheduler - Use absl::StrCat for string construction in scheduler_factory - Add ABSL_GUARDED_BY annotations for thread safety documentation - Update BUILD files with absl dependencies - Update scheduler test to use absl::flat_hash_map Co-authored-by: daohu527 <10419854+daohu527@users.noreply.github.com> --- cyber/base/BUILD | 3 ++ cyber/base/thread_safe_queue.h | 31 ++++++++++--------- cyber/scheduler/BUILD | 10 ++++++ .../policy/scheduler_choreography.cc | 5 +-- .../scheduler/policy/scheduler_choreography.h | 4 +-- cyber/scheduler/policy/scheduler_classic.cc | 5 +-- cyber/scheduler/policy/scheduler_classic.h | 4 +-- cyber/scheduler/scheduler.cc | 18 +++++------ cyber/scheduler/scheduler.h | 12 +++---- cyber/scheduler/scheduler_factory.cc | 11 ++++--- cyber/scheduler/scheduler_test.cc | 3 +- 11 files changed, 63 insertions(+), 43 deletions(-) diff --git a/cyber/base/BUILD b/cyber/base/BUILD index 62b9ab81..1cd01860 100644 --- a/cyber/base/BUILD +++ b/cyber/base/BUILD @@ -170,6 +170,9 @@ cc_library( cc_library( name = "thread_safe_queue", hdrs = ["thread_safe_queue.h"], + deps = [ + "@com_google_absl//absl/synchronization", + ], ) cc_library( diff --git a/cyber/base/thread_safe_queue.h b/cyber/base/thread_safe_queue.h index a17343fe..eb17ac97 100644 --- a/cyber/base/thread_safe_queue.h +++ b/cyber/base/thread_safe_queue.h @@ -17,12 +17,11 @@ #ifndef CYBER_BASE_THREAD_SAFE_QUEUE_H_ #define CYBER_BASE_THREAD_SAFE_QUEUE_H_ -#include -#include #include -#include #include +#include "absl/synchronization/mutex.h" + namespace apollo { namespace cyber { namespace base { @@ -37,13 +36,12 @@ class ThreadSafeQueue { ~ThreadSafeQueue() { BreakAllWait(); } void Enqueue(const T& element) { - std::lock_guard lock(mutex_); + absl::MutexLock lock(&mutex_); queue_.emplace(element); - cv_.notify_one(); } bool Dequeue(T* element) { - std::lock_guard lock(mutex_); + absl::MutexLock lock(&mutex_); if (queue_.empty()) { return false; } @@ -53,8 +51,8 @@ class ThreadSafeQueue { } bool WaitDequeue(T* element) { - std::unique_lock lock(mutex_); - cv_.wait(lock, [this]() { return break_all_wait_ || !queue_.empty(); }); + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition(this, &ThreadSafeQueue::IsReadyForDequeue)); if (break_all_wait_) { return false; } @@ -64,25 +62,30 @@ class ThreadSafeQueue { } typename std::queue::size_type Size() { - std::lock_guard lock(mutex_); + absl::MutexLock lock(&mutex_); return queue_.size(); } bool Empty() { - std::lock_guard lock(mutex_); + absl::MutexLock lock(&mutex_); return queue_.empty(); } void BreakAllWait() { + absl::MutexLock lock(&mutex_); break_all_wait_ = true; - cv_.notify_all(); } private: - volatile bool break_all_wait_ = false; - std::mutex mutex_; + // Returns true when the queue is ready for dequeue: either the queue has + // elements or BreakAllWait() has been called. + bool IsReadyForDequeue() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + return break_all_wait_ || !queue_.empty(); + } + + bool break_all_wait_ ABSL_GUARDED_BY(mutex_) = false; + absl::Mutex mutex_; std::queue queue_; - std::condition_variable cv_; }; } // namespace base diff --git a/cyber/scheduler/BUILD b/cyber/scheduler/BUILD index 1a580703..f535ff3a 100644 --- a/cyber/scheduler/BUILD +++ b/cyber/scheduler/BUILD @@ -36,6 +36,9 @@ cc_library( srcs = ["scheduler.cc"], hdrs = ["scheduler.h"], deps = [ + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/synchronization", "//cyber/croutine", "//cyber/scheduler:mutex_wrapper", "//cyber/scheduler:pin_thread", @@ -67,6 +70,8 @@ cc_library( srcs = ["scheduler_factory.cc"], hdrs = ["scheduler_factory.h"], deps = [ + "@com_google_absl//absl/strings", + "@com_google_absl//absl/synchronization", "//cyber/proto:component_conf_cc_proto", "//cyber/scheduler:scheduler_choreography", "//cyber/scheduler:scheduler_classic", @@ -78,6 +83,8 @@ cc_library( srcs = ["policy/scheduler_choreography.cc"], hdrs = ["policy/scheduler_choreography.h"], deps = [ + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/synchronization", "//cyber/scheduler", "//cyber/scheduler:choreography_context", "//cyber/scheduler:classic_context", @@ -89,6 +96,8 @@ cc_library( srcs = ["policy/scheduler_classic.cc"], hdrs = ["policy/scheduler_classic.h"], deps = [ + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/synchronization", "//cyber/scheduler", "//cyber/scheduler:classic_context", ], @@ -123,6 +132,7 @@ cc_test( size = "small", srcs = ["scheduler_test.cc"], deps = [ + "@com_google_absl//absl/container:flat_hash_map", "//cyber", "@com_google_googletest//:gtest_main", ], diff --git a/cyber/scheduler/policy/scheduler_choreography.cc b/cyber/scheduler/policy/scheduler_choreography.cc index 0e2307d5..6b166535 100644 --- a/cyber/scheduler/policy/scheduler_choreography.cc +++ b/cyber/scheduler/policy/scheduler_choreography.cc @@ -20,6 +20,7 @@ #include #include +#include "absl/synchronization/mutex.h" #include "cyber/common/environment.h" #include "cyber/common/file.h" #include "cyber/scheduler/policy/choreography_context.h" @@ -126,7 +127,7 @@ bool SchedulerChoreography::DispatchTask(const std::shared_ptr& cr) { MutexWrapper* wrapper = nullptr; if (!id_map_mutex_.Get(cr->id(), &wrapper)) { { - std::lock_guard wl_lg(cr_wl_mtx_); + absl::WriterMutexLock wl_lg(&cr_wl_mtx_); if (!id_map_mutex_.Get(cr->id(), &wrapper)) { wrapper = new MutexWrapper(); id_map_mutex_.Set(cr->id(), wrapper); @@ -194,7 +195,7 @@ bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) { MutexWrapper* wrapper = nullptr; if (!id_map_mutex_.Get(crid, &wrapper)) { { - std::lock_guard wl_lg(cr_wl_mtx_); + absl::WriterMutexLock wl_lg(&cr_wl_mtx_); if (!id_map_mutex_.Get(crid, &wrapper)) { wrapper = new MutexWrapper(); id_map_mutex_.Set(crid, wrapper); diff --git a/cyber/scheduler/policy/scheduler_choreography.h b/cyber/scheduler/policy/scheduler_choreography.h index a9b29d6e..a6fa2b3a 100644 --- a/cyber/scheduler/policy/scheduler_choreography.h +++ b/cyber/scheduler/policy/scheduler_choreography.h @@ -19,9 +19,9 @@ #include #include -#include #include +#include "absl/container/flat_hash_map.h" #include "cyber/croutine/croutine.h" #include "cyber/proto/choreography_conf.pb.h" #include "cyber/scheduler/scheduler.h" @@ -46,7 +46,7 @@ class SchedulerChoreography : public Scheduler { void CreateProcessor(); bool NotifyProcessor(uint64_t crid) override; - std::unordered_map cr_confs_; + absl::flat_hash_map cr_confs_; int32_t choreography_processor_prio_; int32_t pool_processor_prio_; diff --git a/cyber/scheduler/policy/scheduler_classic.cc b/cyber/scheduler/policy/scheduler_classic.cc index 51a56c54..0afcd1f6 100644 --- a/cyber/scheduler/policy/scheduler_classic.cc +++ b/cyber/scheduler/policy/scheduler_classic.cc @@ -20,6 +20,7 @@ #include #include +#include "absl/synchronization/mutex.h" #include "cyber/common/environment.h" #include "cyber/common/file.h" #include "cyber/scheduler/policy/classic_context.h" @@ -115,7 +116,7 @@ bool SchedulerClassic::DispatchTask(const std::shared_ptr& cr) { MutexWrapper* wrapper = nullptr; if (!id_map_mutex_.Get(cr->id(), &wrapper)) { { - std::lock_guard wl_lg(cr_wl_mtx_); + absl::WriterMutexLock wl_lg(&cr_wl_mtx_); if (!id_map_mutex_.Get(cr->id(), &wrapper)) { wrapper = new MutexWrapper(); id_map_mutex_.Set(cr->id(), wrapper); @@ -196,7 +197,7 @@ bool SchedulerClassic::RemoveCRoutine(uint64_t crid) { MutexWrapper* wrapper = nullptr; if (!id_map_mutex_.Get(crid, &wrapper)) { { - std::lock_guard wl_lg(cr_wl_mtx_); + absl::WriterMutexLock wl_lg(&cr_wl_mtx_); if (!id_map_mutex_.Get(crid, &wrapper)) { wrapper = new MutexWrapper(); id_map_mutex_.Set(crid, wrapper); diff --git a/cyber/scheduler/policy/scheduler_classic.h b/cyber/scheduler/policy/scheduler_classic.h index f4a3a82c..6bc4bce7 100644 --- a/cyber/scheduler/policy/scheduler_classic.h +++ b/cyber/scheduler/policy/scheduler_classic.h @@ -19,9 +19,9 @@ #include #include -#include #include +#include "absl/container/flat_hash_map.h" #include "cyber/croutine/croutine.h" #include "cyber/proto/classic_conf.pb.h" #include "cyber/scheduler/scheduler.h" @@ -47,7 +47,7 @@ class SchedulerClassic : public Scheduler { void CreateProcessor(); bool NotifyProcessor(uint64_t crid) override; - std::unordered_map cr_confs_; + absl::flat_hash_map cr_confs_; ClassicConf classic_conf_; }; diff --git a/cyber/scheduler/scheduler.cc b/cyber/scheduler/scheduler.cc index 8b8043ed..23143350 100644 --- a/cyber/scheduler/scheduler.cc +++ b/cyber/scheduler/scheduler.cc @@ -20,6 +20,7 @@ #include +#include "absl/strings/str_cat.h" #include "cyber/common/environment.h" #include "cyber/common/file.h" #include "cyber/common/global_data.h" @@ -106,18 +107,17 @@ void Scheduler::CheckSchedStatus() { auto snap = processor->ProcSnapshot(); if (snap->execute_start_time.load()) { auto execute_time = (now - snap->execute_start_time.load()) / 1000000; - snap_info.append(std::to_string(snap->processor_id.load())) - .append(":") - .append(snap->routine_name) - .append(":") - .append(std::to_string(execute_time)); + absl::StrAppend(&snap_info, + std::to_string(snap->processor_id.load()), ":", + snap->routine_name, ":", + std::to_string(execute_time)); } else { - snap_info.append(std::to_string(snap->processor_id.load())) - .append(":idle"); + absl::StrAppend(&snap_info, + std::to_string(snap->processor_id.load()), ":idle"); } - snap_info.append(", "); + absl::StrAppend(&snap_info, ", "); } - snap_info.append("timestamp: ").append(std::to_string(now)); + absl::StrAppend(&snap_info, "timestamp: ", std::to_string(now)); AINFO << snap_info; snap_info.clear(); } diff --git a/cyber/scheduler/scheduler.h b/cyber/scheduler/scheduler.h index 8e53c0a2..2036e76b 100644 --- a/cyber/scheduler/scheduler.h +++ b/cyber/scheduler/scheduler.h @@ -22,12 +22,12 @@ #include #include #include -#include #include #include -#include #include +#include "absl/container/flat_hash_map.h" +#include "absl/synchronization/mutex.h" #include "cyber/proto/choreography_conf.pb.h" #include "cyber/base/atomic_hash_map.h" @@ -80,7 +80,7 @@ class Scheduler { void CheckSchedStatus(); void SetInnerThreadConfs( - const std::unordered_map& confs) { + const absl::flat_hash_map& confs) { inner_thr_confs_ = confs; } @@ -89,13 +89,13 @@ class Scheduler { AtomicRWLock id_cr_lock_; AtomicHashMap id_map_mutex_; - std::mutex cr_wl_mtx_; + absl::Mutex cr_wl_mtx_; - std::unordered_map> id_cr_; + absl::flat_hash_map> id_cr_; std::vector> pctxs_; std::vector> processors_; - std::unordered_map inner_thr_confs_; + absl::flat_hash_map inner_thr_confs_; std::string process_level_cpuset_; uint32_t proc_num_ = 0; diff --git a/cyber/scheduler/scheduler_factory.cc b/cyber/scheduler/scheduler_factory.cc index 9d0cbf1b..8afe5780 100644 --- a/cyber/scheduler/scheduler_factory.cc +++ b/cyber/scheduler/scheduler_factory.cc @@ -21,8 +21,9 @@ #include #include -#include +#include "absl/strings/str_cat.h" +#include "absl/synchronization/mutex.h" #include "cyber/common/environment.h" #include "cyber/common/file.h" #include "cyber/common/global_data.h" @@ -43,18 +44,18 @@ using apollo::cyber::common::WorkRoot; namespace { std::atomic instance = {nullptr}; -std::mutex mutex; +absl::Mutex mutex; } // namespace Scheduler* Instance() { Scheduler* obj = instance.load(std::memory_order_acquire); if (obj == nullptr) { - std::lock_guard lock(mutex); + absl::WriterMutexLock lock(&mutex); obj = instance.load(std::memory_order_relaxed); if (obj == nullptr) { std::string policy("classic"); - std::string conf("conf/"); - conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf"); + auto conf = absl::StrCat("conf/", GlobalData::Instance()->ProcessGroup(), + ".conf"); auto cfg_file = GetAbsolutePath(WorkRoot(), conf); apollo::cyber::proto::CyberConfig cfg; if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) { diff --git a/cyber/scheduler/scheduler_test.cc b/cyber/scheduler/scheduler_test.cc index db911a15..eaf6fc31 100644 --- a/cyber/scheduler/scheduler_test.cc +++ b/cyber/scheduler/scheduler_test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" +#include "absl/container/flat_hash_map.h" #include "cyber/proto/scheduler_conf.pb.h" #include "cyber/common/global_data.h" @@ -72,7 +73,7 @@ TEST(SchedulerTest, set_inner_thread_attr) { auto sched = Instance(); cyber::Init("scheduler_test"); std::thread t = std::thread([]() {}); - std::unordered_map thread_confs; + absl::flat_hash_map thread_confs; InnerThread inner_thread; inner_thread.set_cpuset("0-1"); inner_thread.set_policy("SCHED_FIFO");