From cab795f6b2811f13a2f64676e7876129fd1116c5 Mon Sep 17 00:00:00 2001 From: Itisfun0 <1549861585@qq.com> Date: Wed, 24 Nov 2021 21:41:20 +0800 Subject: [PATCH 1/6] init --- .DS_Store | Bin 0 -> 6148 bytes Thread-2/.DS_Store | Bin 0 -> 6148 bytes Thread-2/boat/boat.cc | 113 ++++++++++++++- Thread-2/boat/boat.h | 19 +++ Thread-2/deadlock/data/example.in | 40 ++++- Thread-2/deadlock/lib/resource_manager.cc | 137 +++++++++++++++++- Thread-2/deadlock/lib/resource_manager.h | 14 ++ .../deadlock/lib/resource_manager_test.cc | 0 Thread-2/deadlock/lib/util_test.cc | 0 Thread-2/deadlock/lib/workload_lib_test.cc | 0 10 files changed, 313 insertions(+), 10 deletions(-) create mode 100644 .DS_Store create mode 100644 Thread-2/.DS_Store create mode 100644 Thread-2/deadlock/lib/resource_manager_test.cc create mode 100644 Thread-2/deadlock/lib/util_test.cc create mode 100644 Thread-2/deadlock/lib/workload_lib_test.cc diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..0b4bd42684e760a97ff4a390a3c29ddeea4a35e2 GIT binary patch literal 6148 zcmeHK%}T>S5T0$TZYyFBf<5NqtwR4Oco0IZ2X8`(9#q>LK79Fp-Z1ho_U_#egK=D`e27dox3Re;WkD8p3OCLmnmVx? zPsS~GbVa>$Hy(J#?nKdWVY}W%(4TgTdnZvCyFu6=sp7!zW6Jeq5c<)y6-`2atj2M4 zK+3$#cZ;PYsWmHQtKMkL%2skzL%UIL&SrVJe^5O>>)ky*CC{^$SIw@#A5qD!!8yF4 zv9izyXA};j@D^k9x_Le$Gr$Zm18dBH+4AJ}8n2O;#tbk6zhi**2Md+ZHkfKuTL(6D zeWZAfkOXadOAuNHZG)*sjGzczil|G4d144%j(*GJ*#=XMx*UX>8OJd*3-dw|YIgKn zDjkHaky~bf8CYf@tGh)y|4)B@|1TGDj~QSF)`|gS5Z-O8ZYp9Af<5Nqt%v?m@F0X*58i}`9#q=I6dR0LX;X{VNM1wV$S3f1 zoY~zLOZDPGq|CtVH#$l#>Icu|3v25eLKH=Dt9a!O#b_VZZw-|#h&ih!EQEMX=;hxz1qQX@9yzw`kcPJYIZsNh)T8%&fpD< zrG-AY!#I-h4a8=3vwT8gfEXYKR+s^^<%!J|UL!4y7$63I#{ljRHYlQFFw>~E4ruWD zi17j<3fTCTK(q}y1~ZKi0pYq7P?vJ^#NfId{I4z#;>A-7VqyfAsVFf3b*q!~ij{QVj4y$L+LXN#<-_S{$CWHfRqN1@khE l^Aa%BQ4G0w6qiAjfZs*~&@q^41P=&Z1QZR_5Ceb8z$Z$6Qw9J4 literal 0 HcmV?d00001 diff --git a/Thread-2/boat/boat.cc b/Thread-2/boat/boat.cc index 7acb29b..03e802d 100644 --- a/Thread-2/boat/boat.cc +++ b/Thread-2/boat/boat.cc @@ -1,14 +1,123 @@ #include #include #include +#include +#include +#include #include "boat.h" namespace proj2{ Boat::Boat(){ + bool boatInO; + int num_children_O; + int num_adults_O; + int num_children_M; + int num_adults_M; + std::condition_variable children_condition_O; + std::condition_variable children_condition_M; + std::condition_variable adults_condition_O; + std::condition_variable adults_condition_M; + bool gameover; + bool is_pilot; + bool is_adult_go; } - void Boat:: begin(int a, int b, BoatGrader *bg){ -} + num_children_O = b; + num_adults_O = a; + num_children_M = 0; + num_adults_M = 0; + boatInO = true; + std::mutex mtx; + gameover = false; + is_pilot = true; + is_adult_go = false; + std::thread threads[a+b]; + for (int i = 0; i < b; ++i){ + threads[i] = std::thread(&Boat::child_itinerary, this, std::ref(bg)); + } + for (int i = 0; i < a; ++i){ + threads[i+b] = std::thread(&Boat::adult_itinerary, this, std::ref(bg)); + } + for (int i = 0; i < a+b; ++i){ + threads[i].join(); + } + return; +} +void Boat:: adult_itinerary(BoatGrader *bg){ + bg->initializeAdult(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::yield(); + std::unique_lock lck(mtx); + while(!(is_adult_go && boatInO)){ + adults_condition_O.wait(lck); + if(gameover) return; + } + bg->AdultRowToMolokai(); + num_adults_M++; + num_adults_O--; + boatInO = false; + is_adult_go = false; + children_condition_M.notify_one(); + adults_condition_M.wait(lck); + if(gameover) return; +} +void Boat:: child_itinerary(BoatGrader *bg){ + bg->initializeChild(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::yield(); + std::unique_lock lck(mtx); + while(!gameover){ + if(boatInO){ + if(is_adult_go){ + adults_condition_O.notify_one(); + children_condition_O.wait(lck); + if(gameover) return; + } + if(is_pilot){ + bg->ChildRowToMolokai(); + num_children_O--; + num_children_M++; + is_pilot = false; + children_condition_O.notify_one(); + children_condition_M.wait(lck); + if(gameover) return; + }else{ + bg->ChildRideToMolokai(); + boatInO = false; + num_children_O--; + num_children_M++; + is_pilot = true; + if(num_adults_O==0 && num_children_O==0){ + gameover = true; + } + if(gameover){ + std::cout<<"Success!"<ChildRowToOahu(); + boatInO = true; + num_children_O++; + num_children_M--; + if(is_adult_go){ + adults_condition_O.notify_one(); + }else{ + children_condition_O.notify_one(); + } + children_condition_O.wait(lck); + if(gameover) return; + } + } +} } \ No newline at end of file diff --git a/Thread-2/boat/boat.h b/Thread-2/boat/boat.h index efa1462..312f62a 100644 --- a/Thread-2/boat/boat.h +++ b/Thread-2/boat/boat.h @@ -3,7 +3,9 @@ #include #include +#include #include +#include #include #include "boatGrader.h" @@ -13,8 +15,25 @@ class Boat{ public: Boat(); ~Boat(){}; + bool boatInO; + int num_children_O; + int num_adults_O; + int num_children_M; + int num_adults_M; + std::mutex mtx; + std::condition_variable children_condition_O; + std::condition_variable children_condition_M; + std::condition_variable adults_condition_O; + std::condition_variable adults_condition_M; + bool gameover; + bool is_adult_go; + bool is_pilot; void begin(int, int, BoatGrader*); + void adult_itinerary(BoatGrader*); + void child_itinerary(BoatGrader*); }; +//void adult_itinerary(Boat *boat, BoatGrader *bg); +//void child_itinerary(Boat *boat, BoatGrader *bg); } #endif // BOAT_H_ diff --git a/Thread-2/deadlock/data/example.in b/Thread-2/deadlock/data/example.in index 567adc2..774c8a9 100644 --- a/Thread-2/deadlock/data/example.in +++ b/Thread-2/deadlock/data/example.in @@ -1,3 +1,41 @@ 10 10 10 10 0 1 5 6 3 3 0 -1 0 5 6 3 3 0 \ No newline at end of file +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +2 3 4 8 3 3 1 +3 2 10 9 3 2 0 +0 3 9 2 3 3 1 +2 0 9 10 2 2 1 +3 0 9 8 2 2 0 +2 3 9 1 -1 2 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +2 3 4 8 3 3 1 +3 2 10 9 3 2 0 +0 3 9 2 3 3 1 +2 0 9 10 2 2 1 +3 2 10 9 3 2 0 +0 3 9 2 3 3 1 +2 0 9 10 2 2 1 +3 0 9 8 2 2 0 +2 3 9 1 -1 2 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +2 3 4 8 3 3 1 +3 2 10 9 3 2 0 +0 3 9 2 3 3 1 +2 0 9 10 2 2 1 \ No newline at end of file diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index 2231181..289e009 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -1,16 +1,18 @@ #include +#include #include #include #include #include "resource_manager.h" namespace proj2 { - int ResourceManager::request(RESOURCE r, int amount) { if (amount <= 0) return 1; - std::unique_lock lk(this->resource_mutex[r]); - while (true) { + std::unique_lock lck(is_suspend_mutex); + is_suspend[std::this_thread::get_id()] = false; + is_suspend_mutex.unlock(); + /* while (true) { if (this->resource_cv[r].wait_for( lk, std::chrono::milliseconds(100), [this, r, amount] { return this->resource_amount[r] >= amount; } @@ -23,26 +25,147 @@ int ResourceManager::request(RESOURCE r, int amount) { Note that you should release this thread's resources properly. */ - if (tmgr->is_killed(this_id)) { + /* if (tmgr->is_killed(this_id)) { return -1; } } + }*/ + while(true) { + //std::unique_lock lock(request_mutex); + std::unique_lock lck(is_suspend_mutex); + if (amount > this->resource_amount[r]) { + //std::unique_lock lck(is_suspend_mutex); + is_suspend[std::this_thread::get_id()] = true; + is_suspend_mutex.unlock(); + std::cout<<"+++"<resource_cv[r].wait(lk); + std::unique_lock lck(is_suspend_mutex); + is_suspend[std::this_thread::get_id()] = false; + is_suspend_mutex.unlock(); + std::cout<<"wakeup"<resource_amount[r] -= amount; + std::thread::id this_id = std::this_thread::get_id(); + if (r==0) GPU_request[this_id] -= amount; + if (r==1) MEMORY_request[this_id] -= amount; + if (r==2) DISK_request[this_id] -= amount; + if (r==3) NETWORK_request[this_id] -= amount; + if (r==0) GPU_alloc[this_id] += amount; + if (r==1) MEMORY_alloc[this_id] += amount; + if (r==2) DISK_alloc[this_id] += amount; + if (r==3) NETWORK_alloc[this_id] += amount; + int GPU_amount = this->resource_amount[GPU]; + int MEMORY_amount = this->resource_amount[MEMORY]; + int DISK_amount = this->resource_amount[DISK]; + int NETWORK_amount = this->resource_amount[NETWORK]; + //simulate by Banker's algorithm + //std::lock_guard lock(request_mutex); + bool s = safe(GPU_amount, MEMORY_amount, DISK_amount, NETWORK_amount, GPU_request, MEMORY_request, DISK_request, NETWORK_request, GPU_alloc, MEMORY_alloc, DISK_alloc, NETWORK_alloc); + if (s==true) { + //request_mutex.unlock(); + is_suspend_mutex.unlock(); + std::cout<<"--"<resource_amount[r] += amount; + if (r==0) GPU_alloc[std::this_thread::get_id()] -= amount; + if (r==1) MEMORY_alloc[std::this_thread::get_id()] -= amount; + if (r==2) DISK_alloc[std::this_thread::get_id()] -= amount; + if (r==3) NETWORK_alloc[std::this_thread::get_id()] -= amount; + is_suspend[std::this_thread::get_id()] = true; + //request_mutex.unlock(); + is_suspend_mutex.unlock(); + std::cout<<"++"<resource_cv[r].wait(lk); + std::unique_lock lck(is_suspend_mutex); + is_suspend[std::this_thread::get_id()] = false; + is_suspend_mutex.unlock(); + std::cout<<"wakeup"< GPU_request, std::map MEMORY_request, std::map DISK_request, std::map NETWORK_request, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc) { + std::map GPU_request_c = GPU_request; + std::map MEMORY_request_c = MEMORY_request; + std::map DISK_request_c = DISK_request; + std::map NETWORK_request_c = NETWORK_request; + std::map GPU_alloc_c = GPU_alloc; + std::map MEMORY_alloc_c = MEMORY_alloc; + std::map DISK_alloc_c = DISK_alloc; + std::map NETWORK_alloc_c = NETWORK_alloc; + bool possible = true; + while (possible) { + bool flag = false; + std::map::iterator iter; + for (iter = GPU_request_c.begin(); iter != GPU_request_c.end(); iter++) { + std::thread::id this_id = iter->first; + if (is_suspend[this_id]==true) { + flag = true; + std::cout<<"%%"<resource_amount[r] -= amount; - this->resource_mutex[r].unlock(); - return 0; + if (GPU_request_c.size() == 0) return true; + return false; } void ResourceManager::release(RESOURCE r, int amount) { if (amount <= 0) return; std::unique_lock lk(this->resource_mutex[r]); this->resource_amount[r] += amount; + std::thread::id this_id = std::this_thread::get_id(); + if (r==0) GPU_request[this_id] += amount; + if (r==1) MEMORY_request[this_id] += amount; + if (r==2) DISK_request[this_id] += amount; + if (r==3) NETWORK_request[this_id] += amount; + if (r==0) GPU_alloc[this_id] -= amount; + if (r==1) MEMORY_alloc[this_id] -= amount; + if (r==2) DISK_alloc[this_id] -= amount; + if (r==3) NETWORK_alloc[this_id] -= amount; + std::cout<<"!"< lck(is_suspend_mutex); + is_suspend[std::this_thread::get_id()] = true; + is_suspend_mutex.unlock(); this->resource_cv[r].notify_all(); + return; } void ResourceManager::budget_claim(std::map budget) { // This function is called when some workload starts. // The workload will eventually consume all resources it claims + std::thread::id this_id = std::this_thread::get_id(); + GPU_request.insert(std::map::value_type(this_id, (budget.count(GPU)==1)?budget[GPU]:0)); + MEMORY_request.insert(std::map::value_type(this_id, (budget.count(MEMORY)==1)?budget[MEMORY]:0)); + DISK_request.insert(std::map::value_type(this_id, (budget.count(DISK)==1)?budget[DISK]:0)); + NETWORK_request.insert(std::map::value_type(this_id, (budget.count(NETWORK)==1)?budget[NETWORK]:0)); + GPU_alloc.insert(std::map::value_type(this_id, 0)); + MEMORY_alloc.insert(std::map::value_type(this_id, 0)); + DISK_alloc.insert(std::map::value_type(this_id, 0)); + NETWORK_alloc.insert(std::map::value_type(this_id, 0)); + std::cout<<"**"< lck(is_suspend_mutex); + is_suspend.insert(std::map::value_type(this_id, false)); + is_suspend_mutex.unlock(); + return; } } // namespace: proj2 diff --git a/Thread-2/deadlock/lib/resource_manager.h b/Thread-2/deadlock/lib/resource_manager.h index 54b45a6..77947e3 100644 --- a/Thread-2/deadlock/lib/resource_manager.h +++ b/Thread-2/deadlock/lib/resource_manager.h @@ -20,12 +20,26 @@ class ResourceManager { public: ResourceManager(ThreadManager *t, std::map init_count): \ resource_amount(init_count), tmgr(t) {} + std::map GPU_request; + std::map MEMORY_request; + std::map DISK_request; + std::map NETWORK_request; + std::map GPU_alloc; + std::map MEMORY_alloc; + std::map DISK_alloc; + std::map NETWORK_alloc; + std::map is_suspend; + //void request_claim_release(RESOURCE , std::thread::id); + bool safe(int GPU_amount, int MEMORY_amount, int DISK_amount, int NETWORK_amount, std::map GPU_request, std::map MEMORY_request, std::map DISK_request, std::map NETWORK_request, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc); void budget_claim(std::map budget); int request(RESOURCE, int amount); void release(RESOURCE, int amount); private: std::map resource_amount; std::map resource_mutex; + std::condition_variable request_cv; + std::mutex request_mutex; + std::mutex is_suspend_mutex; std::map resource_cv; ThreadManager *tmgr; }; diff --git a/Thread-2/deadlock/lib/resource_manager_test.cc b/Thread-2/deadlock/lib/resource_manager_test.cc new file mode 100644 index 0000000..e69de29 diff --git a/Thread-2/deadlock/lib/util_test.cc b/Thread-2/deadlock/lib/util_test.cc new file mode 100644 index 0000000..e69de29 diff --git a/Thread-2/deadlock/lib/workload_lib_test.cc b/Thread-2/deadlock/lib/workload_lib_test.cc new file mode 100644 index 0000000..e69de29 From 85538acdcbb6bc723099096033308f79d1a11076 Mon Sep 17 00:00:00 2001 From: Itisfun0 <1549861585@qq.com> Date: Fri, 26 Nov 2021 01:10:23 +0800 Subject: [PATCH 2/6] q5 and boat --- Thread-1/BUILD | 69 ++++++++++ Thread-1/data/q4_instruction.tsv | 7 +- Thread-1/lib/embedding.cc | 20 ++- Thread-1/lib/embedding.h | 85 +++++++++++- Thread-1/lib/model.cc | 15 +- Thread-1/lib/utils.cc | 4 +- Thread-1/q1.cc | 123 ++++++++++++++++- Thread-1/q2.cc | 134 +++++++++++++++++- Thread-1/q3.cc | 189 ++++++++++++++++++++++++- Thread-1/q4.cc | 202 +++++++++++++++++++++++++++ Thread-1/q5.cc | 222 +++++++++++++++++++++++++++++- Thread-2/deadlock/data/example.in | 30 ++-- Thread-2/deadlock/lib/utils.cc | 2 +- 13 files changed, 1066 insertions(+), 36 deletions(-) create mode 100644 Thread-1/q4.cc diff --git a/Thread-1/BUILD b/Thread-1/BUILD index 28f4114..35b35b6 100644 --- a/Thread-1/BUILD +++ b/Thread-1/BUILD @@ -29,6 +29,17 @@ cc_binary( srcs = [ "q1.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q1*"]), + linkopts = ["-lpthread"], ) cc_binary( @@ -36,6 +47,17 @@ cc_binary( srcs = [ "q2.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q2*"]), + linkopts = ["-lpthread"], ) cc_binary( @@ -43,6 +65,53 @@ cc_binary( srcs = [ "q3.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q3*"]), + linkopts = ["-lpthread"], +) + +cc_binary( + name = "q4", + srcs = [ + "q4.cc" + ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q4*"]), + linkopts = ["-lpthread"], +) + +cc_binary( + name = "q5", + srcs = [ + "q5.cc" + ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q4*"]), + linkopts = ["-lpthread"], ) cc_test( diff --git a/Thread-1/data/q4_instruction.tsv b/Thread-1/data/q4_instruction.tsv index 57d9916..0b35b4d 100644 --- a/Thread-1/data/q4_instruction.tsv +++ b/Thread-1/data/q4_instruction.tsv @@ -17,4 +17,9 @@ 2 3 2 1 7 8 9 1 1 3 1 2 1 2 4 0 2 -2 2 -1 4 5 6 3 7 9 \ No newline at end of file +2 2 -1 4 5 6 3 7 9 +1 8 2 1 6 +1 9 2 0 6 +2 6 7 0 2 3 5 +2 3 3 2 3 4 5 +1 6 8 0 3 \ No newline at end of file diff --git a/Thread-1/lib/embedding.cc b/Thread-1/lib/embedding.cc index 966a76d..66ca0cc 100644 --- a/Thread-1/lib/embedding.cc +++ b/Thread-1/lib/embedding.cc @@ -2,13 +2,20 @@ #include #include #include +#include +#include +#include +#include +#include #include "utils.h" #include "embedding.h" -namespace proj1 { +namespace proj1 { + RWLock write_to_stdout_lock = RWLock(); Embedding::Embedding(int length) { + embbedingAssert(length > 0, "Non-positive length encountered!", NON_POSITIVE_LEN); this->data = new double[length]; for (int i = 0; i < length; ++i) { this->data[i] = (double) i / 10.0; @@ -67,8 +74,10 @@ std::string Embedding::to_string() { } void Embedding::write_to_stdout() { + write_to_stdout_lock.write_lock(); std::string prefix("[OUTPUT]"); std::cout << prefix << this->to_string() << '\n'; + write_to_stdout_lock.write_unlock(); } Embedding Embedding::operator+(const Embedding &another) { @@ -137,7 +146,9 @@ Embedding Embedding::operator/(const double value) { bool Embedding::operator==(const Embedding &another) { for (int i = 0; i < this->length; ++i) { - if(fabs(this->data[i]-another.data[i])>1.0e-6)return false; + if(fabs(this->data[i]-another.data[i])>1.0e-6) { + return false; + } } return true; } @@ -214,8 +225,9 @@ void EmbeddingHolder::update_embedding( } bool EmbeddingHolder::operator==(const EmbeddingHolder &another) { - if (this->get_n_embeddings() != another.emb_matx.size()) + if (this->get_n_embeddings() != another.emb_matx.size()) { return false; + } for (int i = 0; i < (int)this->emb_matx.size(); ++i) { if(!(*(this->emb_matx[i]) == *(another.get_embedding(i)))){ return false; @@ -224,4 +236,4 @@ bool EmbeddingHolder::operator==(const EmbeddingHolder &another) { return true; } -} // namespace proj1 +} // namespace proj1 \ No newline at end of file diff --git a/Thread-1/lib/embedding.h b/Thread-1/lib/embedding.h index 6e13d14..4440790 100644 --- a/Thread-1/lib/embedding.h +++ b/Thread-1/lib/embedding.h @@ -3,6 +3,12 @@ #include #include +#include +#include +#include +#include +#include +#include namespace proj1 { @@ -11,9 +17,81 @@ enum EMBEDDING_ERROR { NON_POSITIVE_LEN }; +class RWLock { + public: + int AR=0, WW=0, AW=0, WR=0; + std::mutex *mtx; + std::condition_variable *okread; + std::condition_variable *okwrite; + RWLock(){ + mtx = new std::mutex; + okread = new std::condition_variable; + okwrite = new std::condition_variable; + } + + ~RWLock() { + delete mtx; + delete okread; + delete okwrite; + } + + void read_lock() { + std::unique_lock lck(*mtx); + //printf("Start read lock\n"); + while ((this->AW + this->WW) > 0) { + this->WR ++; + this->okread->wait(lck); + this->WR --; + } + this->AR ++; + lck.unlock(); + //printf("Finish read lock\n"); + } + + void read_unlock() { + std::unique_lock lck(*mtx); + //printf("Start read unlock\n"); + this->AR --; + if(this->AR == 0 && this->WW > 0) + this->okwrite->notify_all(); + lck.unlock(); + //printf("Finish read unlock\n"); + } + + void write_lock() { + std::unique_lock lck(*mtx); + //printf("Start write lock\n"); + while ((this->AW + this->AR) > 0) { + this->WW ++; + this->okwrite->wait(lck); + this->WW --; + } + this->AW ++; + lck.unlock(); + //printf("Finish write lock\n"); + } + + void write_unlock() { + std::unique_lock lck(*mtx); + //printf("Start write unlock\n"); + this->AW --; + if (this->WW > 0) + this->okwrite->notify_all(); + else if (this->WR > 0) + this->okread->notify_all(); + lck.unlock(); + //printf("Finish write unlock\n"); + } + + private: + + //volatile std::atomic AR=0, WW=0, AW=0, WR=0; is a possible choice +}; + class Embedding{ public: - Embedding() {} + RWLock lock = RWLock(); + Embedding(){} Embedding(int); // Random init an embedding Embedding(int, double*); Embedding(int, std::string); @@ -35,8 +113,8 @@ class Embedding{ Embedding operator/(const double); bool operator==(const Embedding&); private: - int length; double* data; + int length; }; using EmbeddingMatrix = std::vector; @@ -44,6 +122,7 @@ using EmbeddingGradient = Embedding; class EmbeddingHolder{ public: + RWLock lock = RWLock(); EmbeddingHolder(std::string filename); EmbeddingHolder(EmbeddingMatrix &data); ~EmbeddingHolder(); @@ -63,4 +142,4 @@ class EmbeddingHolder{ }; } // namespace proj1 -#endif // THREAD_LIB_EMBEDDING_H_ +#endif // THREAD_LIB_EMBEDDING_H_ \ No newline at end of file diff --git a/Thread-1/lib/model.cc b/Thread-1/lib/model.cc index e857e98..e50d462 100644 --- a/Thread-1/lib/model.cc +++ b/Thread-1/lib/model.cc @@ -16,17 +16,25 @@ double similarity(Embedding* embA, Embedding* embB) { return similarity; } +// NOTE: do not rely on this exact implementation -- it may get modified. EmbeddingGradient* calc_gradient(Embedding* embA, Embedding* embB, int label) { /* For simplicity, here we just simulate the gradient backprop for: 1. a dot product between embeddings 2. a sigmoid activation function 3. a binary cross entropy loss */ + embA->lock.read_lock(); + embB->lock.read_lock(); + if(label == -1) { + label = embB->get_data()[0] > 1e-8? 0: 1; + } double distance = similarity(embA, embB); + embA->lock.read_unlock(); double pred = sigmoid(distance); double loss = binary_cross_entropy_backward((double) label, pred); loss *= sigmoid_backward(distance); EmbeddingGradient *gradA = new Embedding((*embB) * loss); + embB->lock.read_unlock(); // Here we simulate a slow calculation a_slow_function(10); @@ -37,20 +45,23 @@ EmbeddingGradient* cold_start(Embedding* user, Embedding* item) { // Do some downstream work, e.g. let the user watch this video a_slow_function(10); // Then we collect a label, e.g. whether the user finished watching the video - int label = item->get_data()[0] > 1e-8? 0: 1; - return calc_gradient(user, item, label); + return calc_gradient(user, item, -1); } Embedding* recommend(Embedding* user, std::vector items) { Embedding* maxItem; double sim, maxSim = -inf; + user->lock.read_lock(); for (auto item: items) { + item->lock.read_lock(); sim = similarity(user, item); + item->lock.read_unlock(); if (sim > maxSim) { maxItem = item; maxSim = sim; } } + user->lock.read_unlock(); return maxItem; } diff --git a/Thread-1/lib/utils.cc b/Thread-1/lib/utils.cc index 3427314..4c89bd8 100644 --- a/Thread-1/lib/utils.cc +++ b/Thread-1/lib/utils.cc @@ -8,7 +8,7 @@ namespace proj1 { void a_slow_function(int seconds) { - //std::this_thread::sleep_for(std::chrono::seconds(seconds)); + std::this_thread::sleep_for(std::chrono::seconds(seconds)); } double sigmoid(double x) { @@ -42,4 +42,4 @@ AutoTimer::~AutoTimer() { std::cout << m_name << " : " << dur.count() << " usec\n"; } -} // namespace proj1 +} // namespace proj1 \ No newline at end of file diff --git a/Thread-1/q1.cc b/Thread-1/q1.cc index 974ad23..23fe1c5 100644 --- a/Thread-1/q1.cc +++ b/Thread-1/q1.cc @@ -1,5 +1,124 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; + } + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + // int epoch = -1; + //if (inst.payloads.size() > 3) { + // epoch = inst.payloads[3]; + //} + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - std::cout << "please implement this function\n"; - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q1.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q1.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q1_instruction.tsv"); + { + proj1::AutoTimer timer("q1"); // using this to print out timing of the block + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q2.cc b/Thread-1/q2.cc index c4b7413..a4f45bb 100644 --- a/Thread-1/q2.cc +++ b/Thread-1/q2.cc @@ -1,4 +1,136 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + // int epoch = -1; + //if (inst.payloads.size() > 3) { + // epoch = inst.payloads[3]; + //} + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q2.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q2.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q2_instruction.tsv"); + { + proj1::AutoTimer timer("q2"); // using this to print out timing of the block + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q3.cc b/Thread-1/q3.cc index c4b7413..6c575f3 100644 --- a/Thread-1/q3.cc +++ b/Thread-1/q3.cc @@ -1,4 +1,191 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + //printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + if (epoch != -1) { + //printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q3.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q3.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q3_instruction.tsv"); + { + proj1::AutoTimer timer("q3"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order != proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order != proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q4.cc b/Thread-1/q4.cc new file mode 100644 index 0000000..56fdfef --- /dev/null +++ b/Thread-1/q4.cc @@ -0,0 +1,202 @@ +#include +#include + +#include // string +#include // timer +#include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + if (epoch != -1) { + printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1] + 1; + epoch_lock->at(iter_idx)->read_lock(); + printf("Recommend! epoch = %d\n", iter_idx); + + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + epoch_lock->at(iter_idx)->read_unlock(); + + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + +int main(int argc, char *argv[]) { + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q4.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q4.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q4_instruction.tsv"); + { + proj1::AutoTimer timer("q4"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::INIT_EMB) continue; + if (inst.order == proj1::UPDATE_EMB) { + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + else { + int epoch = inst.payloads[1] + 1; + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; +} \ No newline at end of file diff --git a/Thread-1/q5.cc b/Thread-1/q5.cc index c4b7413..03f1513 100644 --- a/Thread-1/q5.cc +++ b/Thread-1/q5.cc @@ -1,4 +1,224 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + std::future t_user = std::async(proj1::calc_gradient, user, item, label); + std::future t_item = std::async(proj1::calc_gradient, item, user, label); + + EmbeddingGradient* gradient_user = t_user.get(); + EmbeddingGradient* gradient_item = t_item.get(); + + user->lock.write_lock(); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient_user, 0.001); + users->update_embedding(user_idx, gradient_item, 0.01); + item->lock.write_unlock(); + user->lock.write_unlock(); + delete gradient_user; + delete gradient_item; + + if (epoch != -1) { + printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + std::vector item_pool; + int iter_idx = inst.payloads[1] + 1; + while(true) { + counter_lock->read_lock(); + bool flag = true; + if(iter_idx > 0) { + for(int i=0;iat(i))) != 0) { + flag = false; + break; + } + } + } + if(iter_idx == 0 || flag == true) { + counter_lock->read_unlock(); + printf("Recommend! epoch = %d\n", iter_idx); + proj1::EmbeddingHolder* users_copy = users; + proj1::EmbeddingHolder* items_copy = items; + Embedding* user = users_copy->get_embedding(user_idx); + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items_copy->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + else { + counter_lock->read_unlock(); + } + } + + //epoch_lock->at(iter_idx)->read_unlock(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q4.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q4.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q4_instruction.tsv"); + { + proj1::AutoTimer timer("q5"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::INIT_EMB) continue; + if (inst.order == proj1::UPDATE_EMB) { + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + else { + int epoch = inst.payloads[1] + 1; + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-2/deadlock/data/example.in b/Thread-2/deadlock/data/example.in index 774c8a9..4c9eed8 100644 --- a/Thread-2/deadlock/data/example.in +++ b/Thread-2/deadlock/data/example.in @@ -8,34 +8,28 @@ 2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 -2 3 4 8 3 3 1 -3 2 10 9 3 2 0 -0 3 9 2 3 3 1 -2 0 9 10 2 2 1 -3 0 9 8 2 2 0 -2 3 9 1 -1 2 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 1 2 7 8 2 4 0 1 2 6 9 2 4 0 1 2 7 8 2 4 0 2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 -2 3 4 8 3 3 1 -3 2 10 9 3 2 0 -0 3 9 2 3 3 1 -2 0 9 10 2 2 1 -3 2 10 9 3 2 0 -0 3 9 2 3 3 1 -2 0 9 10 2 2 1 -3 0 9 8 2 2 0 -2 3 9 1 -1 2 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 1 2 7 8 2 4 0 1 2 6 9 2 4 0 1 2 7 8 2 4 0 2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 2 3 4 8 3 3 1 -3 2 10 9 3 2 0 -0 3 9 2 3 3 1 -2 0 9 10 2 2 1 \ No newline at end of file +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 \ No newline at end of file diff --git a/Thread-2/deadlock/lib/utils.cc b/Thread-2/deadlock/lib/utils.cc index f27558c..fd9bac7 100644 --- a/Thread-2/deadlock/lib/utils.cc +++ b/Thread-2/deadlock/lib/utils.cc @@ -11,7 +11,7 @@ namespace proj2 { void a_slow_function(int seconds) { - std::this_thread::sleep_for(std::chrono::seconds(seconds)); + //std::this_thread::sleep_for(std::chrono::seconds(seconds)); } int randint(int lower, int upper) { From 43593ed77c68c346a07b9b06434429facdb6606f Mon Sep 17 00:00:00 2001 From: Itisfun0 <1549861585@qq.com> Date: Fri, 26 Nov 2021 12:18:44 +0800 Subject: [PATCH 3/6] q5; boat; deadlock --- Thread-2/deadlock/data/example.in | 48 ++++++++- Thread-2/deadlock/lib/resource_manager.cc | 114 +++++++++++++++++----- Thread-2/deadlock/lib/resource_manager.h | 25 +++-- 3 files changed, 150 insertions(+), 37 deletions(-) diff --git a/Thread-2/deadlock/data/example.in b/Thread-2/deadlock/data/example.in index 4c9eed8..a07dc62 100644 --- a/Thread-2/deadlock/data/example.in +++ b/Thread-2/deadlock/data/example.in @@ -1,6 +1,18 @@ 10 10 10 10 0 1 5 6 3 3 0 1 0 5 6 3 3 0 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 1 2 6 9 2 4 0 1 2 7 8 2 4 0 1 2 6 9 2 4 0 @@ -18,6 +30,18 @@ 3 2 8 9 -1 -1 0 0 1 5 6 3 3 0 1 0 5 6 3 3 0 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 1 2 6 9 2 4 0 1 2 7 8 2 4 0 1 2 6 9 2 4 0 @@ -31,5 +55,27 @@ 1 2 6 9 2 4 0 1 2 7 8 2 4 0 2 3 4 8 3 3 1 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 2 3 9 1 -1 2 0 -3 2 8 9 -1 -1 0 \ No newline at end of file +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 \ No newline at end of file diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index 289e009..aa90a55 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -4,14 +4,15 @@ #include #include #include "resource_manager.h" +#include "thread_manager.h" namespace proj2 { -int ResourceManager::request(RESOURCE r, int amount) { +/*int ResourceManager::request(RESOURCE r, int amount) { if (amount <= 0) return 1; std::unique_lock lk(this->resource_mutex[r]); std::unique_lock lck(is_suspend_mutex); is_suspend[std::this_thread::get_id()] = false; - is_suspend_mutex.unlock(); + is_suspend_mutex.unlock();*/ /* while (true) { if (this->resource_cv[r].wait_for( lk, std::chrono::milliseconds(100), @@ -30,7 +31,7 @@ int ResourceManager::request(RESOURCE r, int amount) { } } }*/ - while(true) { + /*while(true) { //std::unique_lock lock(request_mutex); std::unique_lock lck(is_suspend_mutex); if (amount > this->resource_amount[r]) { @@ -88,13 +89,57 @@ int ResourceManager::request(RESOURCE r, int amount) { } } } +}*/ + +int ResourceManager::request(RESOURCE r, int amount) { + if (amount <= 0) return 1; + //std::unique_lock lock(this->resource_mutex); + while(true) { + std::unique_lock lock(this->resource_mutex); + if (amount > this->resource_amount[r]) { + std::cout<<"+++"<resource_cv.wait(lock); + std::cout<<"wakeup"< lock(this->resource_mutex); + } + else { + this->resource_amount[r] -= amount; + std::thread::id this_id = std::this_thread::get_id(); + if (r==0) GPU_alloc[this_id] += amount; + if (r==1) MEMORY_alloc[this_id] += amount; + if (r==2) DISK_alloc[this_id] += amount; + if (r==3) NETWORK_alloc[this_id] += amount; + int GPU_amount = this->resource_amount[GPU]; + int MEMORY_amount = this->resource_amount[MEMORY]; + int DISK_amount = this->resource_amount[DISK]; + int NETWORK_amount = this->resource_amount[NETWORK]; + //simulate by Banker's algorithm + bool s = safe(GPU_amount, MEMORY_amount, DISK_amount, NETWORK_amount, GPU_claim, MEMORY_claim, DISK_claim, NETWORK_claim, GPU_alloc, MEMORY_alloc, DISK_alloc, NETWORK_alloc); + if (s==true) { + std::cout<<"--"<resource_mutex.unlock(); + return 0; + } + else { + this->resource_amount[r] += amount; + if (r==0) GPU_alloc[std::this_thread::get_id()] -= amount; + if (r==1) MEMORY_alloc[std::this_thread::get_id()] -= amount; + if (r==2) DISK_alloc[std::this_thread::get_id()] -= amount; + if (r==3) NETWORK_alloc[std::this_thread::get_id()] -= amount; + std::cout<<"++"<resource_cv.wait(lock); + //std::unique_lock lock(this->resource_mutex); + std::cout<<"wakeup"< GPU_request, std::map MEMORY_request, std::map DISK_request, std::map NETWORK_request, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc) { - std::map GPU_request_c = GPU_request; - std::map MEMORY_request_c = MEMORY_request; - std::map DISK_request_c = DISK_request; - std::map NETWORK_request_c = NETWORK_request; +bool ResourceManager::safe(int GPU_amount, int MEMORY_amount, int DISK_amount, int NETWORK_amount, std::map GPU_claim, std::map MEMORY_claim, std::map DISK_claim, std::map NETWORK_claim, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc) { + std::map GPU_claim_c = GPU_claim; + std::map MEMORY_claim_c = MEMORY_claim; + std::map DISK_claim_c = DISK_claim; + std::map NETWORK_claim_c = NETWORK_claim; std::map GPU_alloc_c = GPU_alloc; std::map MEMORY_alloc_c = MEMORY_alloc; std::map DISK_alloc_c = DISK_alloc; @@ -103,53 +148,46 @@ bool ResourceManager::safe(int GPU_amount, int MEMORY_amount, int DISK_amount, i while (possible) { bool flag = false; std::map::iterator iter; - for (iter = GPU_request_c.begin(); iter != GPU_request_c.end(); iter++) { + for (iter = GPU_claim_c.begin(); iter != GPU_claim_c.end(); iter++) { std::thread::id this_id = iter->first; - if (is_suspend[this_id]==true) { + if (tmgr->is_killed(this_id)==true) { flag = true; - std::cout<<"%%"< lk(this->resource_mutex[r]); + std::unique_lock lock(this->resource_mutex); this->resource_amount[r] += amount; std::thread::id this_id = std::this_thread::get_id(); - if (r==0) GPU_request[this_id] += amount; - if (r==1) MEMORY_request[this_id] += amount; - if (r==2) DISK_request[this_id] += amount; - if (r==3) NETWORK_request[this_id] += amount; if (r==0) GPU_alloc[this_id] -= amount; if (r==1) MEMORY_alloc[this_id] -= amount; if (r==2) DISK_alloc[this_id] -= amount; if (r==3) NETWORK_alloc[this_id] -= amount; - std::cout<<"!"< lck(is_suspend_mutex); - is_suspend[std::this_thread::get_id()] = true; - is_suspend_mutex.unlock(); - this->resource_cv[r].notify_all(); + std::cout<<"!"<resource_cv.notify_all(); return; } -void ResourceManager::budget_claim(std::map budget) { +/*void ResourceManager::budget_claim(std::map budget) { // This function is called when some workload starts. // The workload will eventually consume all resources it claims std::thread::id this_id = std::this_thread::get_id(); @@ -166,6 +204,28 @@ void ResourceManager::budget_claim(std::map budget) { is_suspend.insert(std::map::value_type(this_id, false)); is_suspend_mutex.unlock(); return; +}*/ + +void ResourceManager::budget_claim(std::map budget) { + // This function is called when some workload starts. + // The workload will eventually consume all resources it claims + // set resource_claim, init resource_alloc, init is_suspend + std::unique_lock lock(this->resource_mutex); + std::thread::id this_id = std::this_thread::get_id(); + + GPU_claim.insert(std::map::value_type(this_id, (budget.count(GPU)==1)?budget[GPU]:0)); + MEMORY_claim.insert(std::map::value_type(this_id, (budget.count(MEMORY)==1)?budget[MEMORY]:0)); + DISK_claim.insert(std::map::value_type(this_id, (budget.count(DISK)==1)?budget[DISK]:0)); + NETWORK_claim.insert(std::map::value_type(this_id, (budget.count(NETWORK)==1)?budget[NETWORK]:0)); + + GPU_alloc.insert(std::map::value_type(this_id, 0)); + MEMORY_alloc.insert(std::map::value_type(this_id, 0)); + DISK_alloc.insert(std::map::value_type(this_id, 0)); + NETWORK_alloc.insert(std::map::value_type(this_id, 0)); + + std::cout<<"**"<resource_mutex.unlock(); + return; } } // namespace: proj2 diff --git a/Thread-2/deadlock/lib/resource_manager.h b/Thread-2/deadlock/lib/resource_manager.h index 77947e3..d1696d9 100644 --- a/Thread-2/deadlock/lib/resource_manager.h +++ b/Thread-2/deadlock/lib/resource_manager.h @@ -20,10 +20,14 @@ class ResourceManager { public: ResourceManager(ThreadManager *t, std::map init_count): \ resource_amount(init_count), tmgr(t) {} - std::map GPU_request; - std::map MEMORY_request; - std::map DISK_request; - std::map NETWORK_request; + std::map GPU_claim; + std::map MEMORY_claim; + std::map DISK_claim; + std::map NETWORK_claim; + //std::map GPU_request; + //std::map MEMORY_request; + //std::map DISK_request; + //std::map NETWORK_request; std::map GPU_alloc; std::map MEMORY_alloc; std::map DISK_alloc; @@ -36,11 +40,14 @@ class ResourceManager { void release(RESOURCE, int amount); private: std::map resource_amount; - std::map resource_mutex; - std::condition_variable request_cv; - std::mutex request_mutex; - std::mutex is_suspend_mutex; - std::map resource_cv; + //std::map resource_mutex; + //std::condition_variable alloc_cv; + //std::mutex alloc_mutex; + //std::mutex is_suspend_mutex; + //std::mutex avail_mutex; + std::mutex resource_mutex; + std::condition_variable resource_cv; + //std::map resource_cv; ThreadManager *tmgr; }; From 20df3ec324f0284c920247dc294196b27e5934bc Mon Sep 17 00:00:00 2001 From: Itisfun0 <1549861585@qq.com> Date: Fri, 26 Nov 2021 12:29:39 +0800 Subject: [PATCH 4/6] q5; boat; deadlock; --- Thread-2/deadlock/data/example.in | 56 +++++++++++++++++++++-- Thread-2/deadlock/lib/resource_manager.cc | 4 +- Thread-2/deadlock/lib/resource_manager.h | 4 -- 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/Thread-2/deadlock/data/example.in b/Thread-2/deadlock/data/example.in index a07dc62..31b1927 100644 --- a/Thread-2/deadlock/data/example.in +++ b/Thread-2/deadlock/data/example.in @@ -1,6 +1,11 @@ 10 10 10 10 0 1 5 6 3 3 0 1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 1 0 5 6 3 3 0 @@ -28,8 +33,11 @@ 2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 -0 1 5 6 3 3 0 -1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 1 0 5 6 3 3 0 @@ -55,6 +63,14 @@ 1 2 6 9 2 4 0 1 2 7 8 2 4 0 2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 1 0 5 6 3 3 0 1 2 6 9 2 4 0 @@ -66,6 +82,26 @@ 3 2 8 9 -1 -1 0 0 1 5 6 3 3 0 1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 2 3 9 1 -1 2 0 3 2 8 9 -1 -1 0 1 0 5 6 3 3 0 @@ -78,4 +114,18 @@ 3 2 8 9 -1 -1 0 0 1 5 6 3 3 0 1 0 5 6 3 3 0 -1 2 6 9 2 4 0 \ No newline at end of file +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 \ No newline at end of file diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index aa90a55..7b7ef1e 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -117,7 +117,7 @@ int ResourceManager::request(RESOURCE r, int amount) { bool s = safe(GPU_amount, MEMORY_amount, DISK_amount, NETWORK_amount, GPU_claim, MEMORY_claim, DISK_claim, NETWORK_claim, GPU_alloc, MEMORY_alloc, DISK_alloc, NETWORK_alloc); if (s==true) { std::cout<<"--"<resource_mutex.unlock(); + //this->resource_mutex.unlock(); return 0; } else { @@ -224,7 +224,7 @@ void ResourceManager::budget_claim(std::map budget) { NETWORK_alloc.insert(std::map::value_type(this_id, 0)); std::cout<<"**"<resource_mutex.unlock(); + //this->resource_mutex.unlock(); return; } diff --git a/Thread-2/deadlock/lib/resource_manager.h b/Thread-2/deadlock/lib/resource_manager.h index d1696d9..6596ad2 100644 --- a/Thread-2/deadlock/lib/resource_manager.h +++ b/Thread-2/deadlock/lib/resource_manager.h @@ -41,10 +41,6 @@ class ResourceManager { private: std::map resource_amount; //std::map resource_mutex; - //std::condition_variable alloc_cv; - //std::mutex alloc_mutex; - //std::mutex is_suspend_mutex; - //std::mutex avail_mutex; std::mutex resource_mutex; std::condition_variable resource_cv; //std::map resource_cv; From 1a3c31af561dd84275fd2d2c34d68ef8141d3da9 Mon Sep 17 00:00:00 2001 From: Itisfun0 <1549861585@qq.com> Date: Fri, 26 Nov 2021 15:03:51 +0800 Subject: [PATCH 5/6] q5; boat; deadlock --- Thread-2/deadlock/lib/resource_manager.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index 7b7ef1e..7252b41 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -128,7 +128,6 @@ int ResourceManager::request(RESOURCE r, int amount) { if (r==3) NETWORK_alloc[std::this_thread::get_id()] -= amount; std::cout<<"++"<resource_cv.wait(lock); - //std::unique_lock lock(this->resource_mutex); std::cout<<"wakeup"< Date: Fri, 26 Nov 2021 15:19:56 +0800 Subject: [PATCH 6/6] finished --- Thread-2/deadlock/lib/resource_manager.cc | 103 ---------------------- 1 file changed, 103 deletions(-) diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index 7252b41..76a532b 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -7,89 +7,6 @@ #include "thread_manager.h" namespace proj2 { -/*int ResourceManager::request(RESOURCE r, int amount) { - if (amount <= 0) return 1; - std::unique_lock lk(this->resource_mutex[r]); - std::unique_lock lck(is_suspend_mutex); - is_suspend[std::this_thread::get_id()] = false; - is_suspend_mutex.unlock();*/ - /* while (true) { - if (this->resource_cv[r].wait_for( - lk, std::chrono::milliseconds(100), - [this, r, amount] { return this->resource_amount[r] >= amount; } - )) { - break; - } else { - auto this_id = std::this_thread::get_id(); - /* HINT: If you choose to detect the deadlock and recover, - implement your code here to kill and restart threads. - Note that you should release this thread's resources - properly. - */ - /* if (tmgr->is_killed(this_id)) { - return -1; - } - } - }*/ - /*while(true) { - //std::unique_lock lock(request_mutex); - std::unique_lock lck(is_suspend_mutex); - if (amount > this->resource_amount[r]) { - //std::unique_lock lck(is_suspend_mutex); - is_suspend[std::this_thread::get_id()] = true; - is_suspend_mutex.unlock(); - std::cout<<"+++"<resource_cv[r].wait(lk); - std::unique_lock lck(is_suspend_mutex); - is_suspend[std::this_thread::get_id()] = false; - is_suspend_mutex.unlock(); - std::cout<<"wakeup"<resource_amount[r] -= amount; - std::thread::id this_id = std::this_thread::get_id(); - if (r==0) GPU_request[this_id] -= amount; - if (r==1) MEMORY_request[this_id] -= amount; - if (r==2) DISK_request[this_id] -= amount; - if (r==3) NETWORK_request[this_id] -= amount; - if (r==0) GPU_alloc[this_id] += amount; - if (r==1) MEMORY_alloc[this_id] += amount; - if (r==2) DISK_alloc[this_id] += amount; - if (r==3) NETWORK_alloc[this_id] += amount; - int GPU_amount = this->resource_amount[GPU]; - int MEMORY_amount = this->resource_amount[MEMORY]; - int DISK_amount = this->resource_amount[DISK]; - int NETWORK_amount = this->resource_amount[NETWORK]; - //simulate by Banker's algorithm - //std::lock_guard lock(request_mutex); - bool s = safe(GPU_amount, MEMORY_amount, DISK_amount, NETWORK_amount, GPU_request, MEMORY_request, DISK_request, NETWORK_request, GPU_alloc, MEMORY_alloc, DISK_alloc, NETWORK_alloc); - if (s==true) { - //request_mutex.unlock(); - is_suspend_mutex.unlock(); - std::cout<<"--"<resource_amount[r] += amount; - if (r==0) GPU_alloc[std::this_thread::get_id()] -= amount; - if (r==1) MEMORY_alloc[std::this_thread::get_id()] -= amount; - if (r==2) DISK_alloc[std::this_thread::get_id()] -= amount; - if (r==3) NETWORK_alloc[std::this_thread::get_id()] -= amount; - is_suspend[std::this_thread::get_id()] = true; - //request_mutex.unlock(); - is_suspend_mutex.unlock(); - std::cout<<"++"<resource_cv[r].wait(lk); - std::unique_lock lck(is_suspend_mutex); - is_suspend[std::this_thread::get_id()] = false; - is_suspend_mutex.unlock(); - std::cout<<"wakeup"< budget) { - // This function is called when some workload starts. - // The workload will eventually consume all resources it claims - std::thread::id this_id = std::this_thread::get_id(); - GPU_request.insert(std::map::value_type(this_id, (budget.count(GPU)==1)?budget[GPU]:0)); - MEMORY_request.insert(std::map::value_type(this_id, (budget.count(MEMORY)==1)?budget[MEMORY]:0)); - DISK_request.insert(std::map::value_type(this_id, (budget.count(DISK)==1)?budget[DISK]:0)); - NETWORK_request.insert(std::map::value_type(this_id, (budget.count(NETWORK)==1)?budget[NETWORK]:0)); - GPU_alloc.insert(std::map::value_type(this_id, 0)); - MEMORY_alloc.insert(std::map::value_type(this_id, 0)); - DISK_alloc.insert(std::map::value_type(this_id, 0)); - NETWORK_alloc.insert(std::map::value_type(this_id, 0)); - std::cout<<"**"< lck(is_suspend_mutex); - is_suspend.insert(std::map::value_type(this_id, false)); - is_suspend_mutex.unlock(); - return; -}*/ - void ResourceManager::budget_claim(std::map budget) { // This function is called when some workload starts. // The workload will eventually consume all resources it claims @@ -223,7 +121,6 @@ void ResourceManager::budget_claim(std::map budget) { NETWORK_alloc.insert(std::map::value_type(this_id, 0)); std::cout<<"**"<resource_mutex.unlock(); return; }