Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cyber/base/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ cc_library(
cc_library(
name = "thread_safe_queue",
hdrs = ["thread_safe_queue.h"],
deps = [
"@com_google_absl//absl/synchronization",
],
)

cc_library(
Expand Down
31 changes: 17 additions & 14 deletions cyber/base/thread_safe_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
#ifndef CYBER_BASE_THREAD_SAFE_QUEUE_H_
#define CYBER_BASE_THREAD_SAFE_QUEUE_H_

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>

#include "absl/synchronization/mutex.h"

namespace apollo {
namespace cyber {
namespace base {
Expand All @@ -37,13 +36,12 @@ class ThreadSafeQueue {
~ThreadSafeQueue() { BreakAllWait(); }

void Enqueue(const T& element) {
std::lock_guard<std::mutex> lock(mutex_);
absl::MutexLock lock(&mutex_);
queue_.emplace(element);
cv_.notify_one();
}

bool Dequeue(T* element) {
std::lock_guard<std::mutex> lock(mutex_);
absl::MutexLock lock(&mutex_);
if (queue_.empty()) {
return false;
}
Expand All @@ -53,8 +51,8 @@ class ThreadSafeQueue {
}

bool WaitDequeue(T* element) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return break_all_wait_ || !queue_.empty(); });
absl::MutexLock lock(&mutex_);
mutex_.Await(absl::Condition(this, &ThreadSafeQueue::IsReadyForDequeue));
if (break_all_wait_) {
return false;
}
Expand All @@ -64,25 +62,30 @@ class ThreadSafeQueue {
}

typename std::queue<T>::size_type Size() {
std::lock_guard<std::mutex> lock(mutex_);
absl::MutexLock lock(&mutex_);
return queue_.size();
}

bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
absl::MutexLock lock(&mutex_);
return queue_.empty();
}

void BreakAllWait() {
absl::MutexLock lock(&mutex_);
break_all_wait_ = true;
cv_.notify_all();
}

private:
volatile bool break_all_wait_ = false;
std::mutex mutex_;
// Returns true when the queue is ready for dequeue: either the queue has
// elements or BreakAllWait() has been called.
bool IsReadyForDequeue() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return break_all_wait_ || !queue_.empty();
}

bool break_all_wait_ ABSL_GUARDED_BY(mutex_) = false;
absl::Mutex mutex_;
std::queue<T> queue_;
std::condition_variable cv_;
};

} // namespace base
Expand Down
10 changes: 10 additions & 0 deletions cyber/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ cc_library(
srcs = ["scheduler.cc"],
hdrs = ["scheduler.h"],
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"//cyber/croutine",
"//cyber/scheduler:mutex_wrapper",
"//cyber/scheduler:pin_thread",
Expand Down Expand Up @@ -67,6 +70,8 @@ cc_library(
srcs = ["scheduler_factory.cc"],
hdrs = ["scheduler_factory.h"],
deps = [
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"//cyber/proto:component_conf_cc_proto",
"//cyber/scheduler:scheduler_choreography",
"//cyber/scheduler:scheduler_classic",
Expand All @@ -78,6 +83,8 @@ cc_library(
srcs = ["policy/scheduler_choreography.cc"],
hdrs = ["policy/scheduler_choreography.h"],
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/synchronization",
"//cyber/scheduler",
"//cyber/scheduler:choreography_context",
"//cyber/scheduler:classic_context",
Expand All @@ -89,6 +96,8 @@ cc_library(
srcs = ["policy/scheduler_classic.cc"],
hdrs = ["policy/scheduler_classic.h"],
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/synchronization",
"//cyber/scheduler",
"//cyber/scheduler:classic_context",
],
Expand Down Expand Up @@ -123,6 +132,7 @@ cc_test(
size = "small",
srcs = ["scheduler_test.cc"],
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"//cyber",
"@com_google_googletest//:gtest_main",
],
Expand Down
5 changes: 3 additions & 2 deletions cyber/scheduler/policy/scheduler_choreography.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <utility>

#include "absl/synchronization/mutex.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/scheduler/policy/choreography_context.h"
Expand Down Expand Up @@ -126,7 +127,7 @@ bool SchedulerChoreography::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
MutexWrapper* wrapper = nullptr;
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
{
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
absl::WriterMutexLock wl_lg(&cr_wl_mtx_);
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
wrapper = new MutexWrapper();
id_map_mutex_.Set(cr->id(), wrapper);
Expand Down Expand Up @@ -194,7 +195,7 @@ bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) {
MutexWrapper* wrapper = nullptr;
if (!id_map_mutex_.Get(crid, &wrapper)) {
{
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
absl::WriterMutexLock wl_lg(&cr_wl_mtx_);
if (!id_map_mutex_.Get(crid, &wrapper)) {
wrapper = new MutexWrapper();
id_map_mutex_.Set(crid, wrapper);
Expand Down
4 changes: 2 additions & 2 deletions cyber/scheduler/policy/scheduler_choreography.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "cyber/croutine/croutine.h"
#include "cyber/proto/choreography_conf.pb.h"
#include "cyber/scheduler/scheduler.h"
Expand All @@ -46,7 +46,7 @@ class SchedulerChoreography : public Scheduler {
void CreateProcessor();
bool NotifyProcessor(uint64_t crid) override;

std::unordered_map<std::string, ChoreographyTask> cr_confs_;
absl::flat_hash_map<std::string, ChoreographyTask> cr_confs_;

int32_t choreography_processor_prio_;
int32_t pool_processor_prio_;
Expand Down
5 changes: 3 additions & 2 deletions cyber/scheduler/policy/scheduler_classic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <utility>

#include "absl/synchronization/mutex.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/scheduler/policy/classic_context.h"
Expand Down Expand Up @@ -115,7 +116,7 @@ bool SchedulerClassic::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
MutexWrapper* wrapper = nullptr;
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
{
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
absl::WriterMutexLock wl_lg(&cr_wl_mtx_);
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
wrapper = new MutexWrapper();
id_map_mutex_.Set(cr->id(), wrapper);
Expand Down Expand Up @@ -196,7 +197,7 @@ bool SchedulerClassic::RemoveCRoutine(uint64_t crid) {
MutexWrapper* wrapper = nullptr;
if (!id_map_mutex_.Get(crid, &wrapper)) {
{
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
absl::WriterMutexLock wl_lg(&cr_wl_mtx_);
if (!id_map_mutex_.Get(crid, &wrapper)) {
wrapper = new MutexWrapper();
id_map_mutex_.Set(crid, wrapper);
Expand Down
4 changes: 2 additions & 2 deletions cyber/scheduler/policy/scheduler_classic.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "cyber/croutine/croutine.h"
#include "cyber/proto/classic_conf.pb.h"
#include "cyber/scheduler/scheduler.h"
Expand All @@ -47,7 +47,7 @@ class SchedulerClassic : public Scheduler {
void CreateProcessor();
bool NotifyProcessor(uint64_t crid) override;

std::unordered_map<std::string, ClassicTask> cr_confs_;
absl::flat_hash_map<std::string, ClassicTask> cr_confs_;

ClassicConf classic_conf_;
};
Expand Down
18 changes: 9 additions & 9 deletions cyber/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <utility>

#include "absl/strings/str_cat.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/common/global_data.h"
Expand Down Expand Up @@ -106,18 +107,17 @@ void Scheduler::CheckSchedStatus() {
auto snap = processor->ProcSnapshot();
if (snap->execute_start_time.load()) {
auto execute_time = (now - snap->execute_start_time.load()) / 1000000;
snap_info.append(std::to_string(snap->processor_id.load()))
.append(":")
.append(snap->routine_name)
.append(":")
.append(std::to_string(execute_time));
absl::StrAppend(&snap_info,
std::to_string(snap->processor_id.load()), ":",
snap->routine_name, ":",
std::to_string(execute_time));
} else {
snap_info.append(std::to_string(snap->processor_id.load()))
.append(":idle");
absl::StrAppend(&snap_info,
std::to_string(snap->processor_id.load()), ":idle");
}
snap_info.append(", ");
absl::StrAppend(&snap_info, ", ");
}
snap_info.append("timestamp: ").append(std::to_string(now));
absl::StrAppend(&snap_info, "timestamp: ", std::to_string(now));
AINFO << snap_info;
snap_info.clear();
}
Expand Down
12 changes: 6 additions & 6 deletions cyber/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include <atomic>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "cyber/proto/choreography_conf.pb.h"

#include "cyber/base/atomic_hash_map.h"
Expand Down Expand Up @@ -80,7 +80,7 @@ class Scheduler {
void CheckSchedStatus();

void SetInnerThreadConfs(
const std::unordered_map<std::string, InnerThread>& confs) {
const absl::flat_hash_map<std::string, InnerThread>& confs) {
inner_thr_confs_ = confs;
}

Expand All @@ -89,13 +89,13 @@ class Scheduler {

AtomicRWLock id_cr_lock_;
AtomicHashMap<uint64_t, MutexWrapper*> id_map_mutex_;
std::mutex cr_wl_mtx_;
absl::Mutex cr_wl_mtx_;

std::unordered_map<uint64_t, std::shared_ptr<CRoutine>> id_cr_;
absl::flat_hash_map<uint64_t, std::shared_ptr<CRoutine>> id_cr_;
std::vector<std::shared_ptr<ProcessorContext>> pctxs_;
std::vector<std::shared_ptr<Processor>> processors_;

std::unordered_map<std::string, InnerThread> inner_thr_confs_;
absl::flat_hash_map<std::string, InnerThread> inner_thr_confs_;

std::string process_level_cpuset_;
uint32_t proc_num_ = 0;
Expand Down
11 changes: 6 additions & 5 deletions cyber/scheduler/scheduler_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

#include <atomic>
#include <string>
#include <unordered_map>

#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/common/global_data.h"
Expand All @@ -43,18 +44,18 @@ using apollo::cyber::common::WorkRoot;

namespace {
std::atomic<Scheduler*> instance = {nullptr};
std::mutex mutex;
absl::Mutex mutex;
} // namespace

Scheduler* Instance() {
Scheduler* obj = instance.load(std::memory_order_acquire);
if (obj == nullptr) {
std::lock_guard<std::mutex> lock(mutex);
absl::WriterMutexLock lock(&mutex);
obj = instance.load(std::memory_order_relaxed);
if (obj == nullptr) {
std::string policy("classic");
std::string conf("conf/");
conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
auto conf = absl::StrCat("conf/", GlobalData::Instance()->ProcessGroup(),
".conf");
auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
apollo::cyber::proto::CyberConfig cfg;
if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
Expand Down
3 changes: 2 additions & 1 deletion cyber/scheduler/scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "gtest/gtest.h"

#include "absl/container/flat_hash_map.h"
#include "cyber/proto/scheduler_conf.pb.h"

#include "cyber/common/global_data.h"
Expand Down Expand Up @@ -72,7 +73,7 @@ TEST(SchedulerTest, set_inner_thread_attr) {
auto sched = Instance();
cyber::Init("scheduler_test");
std::thread t = std::thread([]() {});
std::unordered_map<std::string, InnerThread> thread_confs;
absl::flat_hash_map<std::string, InnerThread> thread_confs;
InnerThread inner_thread;
inner_thread.set_cpuset("0-1");
inner_thread.set_policy("SCHED_FIFO");
Expand Down