diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..0b4bd42 Binary files /dev/null and b/.DS_Store differ diff --git a/Thread-1/BUILD b/Thread-1/BUILD index 28f4114..35b35b6 100644 --- a/Thread-1/BUILD +++ b/Thread-1/BUILD @@ -29,6 +29,17 @@ cc_binary( srcs = [ "q1.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q1*"]), + linkopts = ["-lpthread"], ) cc_binary( @@ -36,6 +47,17 @@ cc_binary( srcs = [ "q2.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q2*"]), + linkopts = ["-lpthread"], ) cc_binary( @@ -43,6 +65,53 @@ cc_binary( srcs = [ "q3.cc" ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q3*"]), + linkopts = ["-lpthread"], +) + +cc_binary( + name = "q4", + srcs = [ + "q4.cc" + ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q4*"]), + linkopts = ["-lpthread"], +) + +cc_binary( + name = "q5", + srcs = [ + "q5.cc" + ], + deps = [ + "//lib:embedding_lib", + "//lib:instruction_lib", + "//lib:utils_lib", + "//lib:model_lib" + ], + copts = [ + "-std=c++11", + ], + data = glob(["data/q4*"]), + linkopts = ["-lpthread"], ) cc_test( diff --git a/Thread-1/data/q4_instruction.tsv b/Thread-1/data/q4_instruction.tsv index 57d9916..0b35b4d 100644 --- a/Thread-1/data/q4_instruction.tsv +++ b/Thread-1/data/q4_instruction.tsv @@ -17,4 +17,9 @@ 2 3 2 1 7 8 9 1 1 3 1 2 1 2 4 0 2 -2 2 -1 4 5 6 3 7 9 \ No newline at end of file +2 2 -1 4 5 6 3 7 9 +1 8 2 1 6 +1 9 2 0 6 +2 6 7 0 2 3 5 +2 3 3 2 3 4 5 +1 6 8 0 3 \ No newline at end of file diff --git a/Thread-1/lib/embedding.cc b/Thread-1/lib/embedding.cc index 966a76d..66ca0cc 100644 --- a/Thread-1/lib/embedding.cc +++ b/Thread-1/lib/embedding.cc @@ -2,13 +2,20 @@ #include #include #include +#include +#include +#include +#include +#include #include "utils.h" #include "embedding.h" -namespace proj1 { +namespace proj1 { + RWLock write_to_stdout_lock = RWLock(); Embedding::Embedding(int length) { + embbedingAssert(length > 0, "Non-positive length encountered!", NON_POSITIVE_LEN); this->data = new double[length]; for (int i = 0; i < length; ++i) { this->data[i] = (double) i / 10.0; @@ -67,8 +74,10 @@ std::string Embedding::to_string() { } void Embedding::write_to_stdout() { + write_to_stdout_lock.write_lock(); std::string prefix("[OUTPUT]"); std::cout << prefix << this->to_string() << '\n'; + write_to_stdout_lock.write_unlock(); } Embedding Embedding::operator+(const Embedding &another) { @@ -137,7 +146,9 @@ Embedding Embedding::operator/(const double value) { bool Embedding::operator==(const Embedding &another) { for (int i = 0; i < this->length; ++i) { - if(fabs(this->data[i]-another.data[i])>1.0e-6)return false; + if(fabs(this->data[i]-another.data[i])>1.0e-6) { + return false; + } } return true; } @@ -214,8 +225,9 @@ void EmbeddingHolder::update_embedding( } bool EmbeddingHolder::operator==(const EmbeddingHolder &another) { - if (this->get_n_embeddings() != another.emb_matx.size()) + if (this->get_n_embeddings() != another.emb_matx.size()) { return false; + } for (int i = 0; i < (int)this->emb_matx.size(); ++i) { if(!(*(this->emb_matx[i]) == *(another.get_embedding(i)))){ return false; @@ -224,4 +236,4 @@ bool EmbeddingHolder::operator==(const EmbeddingHolder &another) { return true; } -} // namespace proj1 +} // namespace proj1 \ No newline at end of file diff --git a/Thread-1/lib/embedding.h b/Thread-1/lib/embedding.h index 6e13d14..4440790 100644 --- a/Thread-1/lib/embedding.h +++ b/Thread-1/lib/embedding.h @@ -3,6 +3,12 @@ #include #include +#include +#include +#include +#include +#include +#include namespace proj1 { @@ -11,9 +17,81 @@ enum EMBEDDING_ERROR { NON_POSITIVE_LEN }; +class RWLock { + public: + int AR=0, WW=0, AW=0, WR=0; + std::mutex *mtx; + std::condition_variable *okread; + std::condition_variable *okwrite; + RWLock(){ + mtx = new std::mutex; + okread = new std::condition_variable; + okwrite = new std::condition_variable; + } + + ~RWLock() { + delete mtx; + delete okread; + delete okwrite; + } + + void read_lock() { + std::unique_lock lck(*mtx); + //printf("Start read lock\n"); + while ((this->AW + this->WW) > 0) { + this->WR ++; + this->okread->wait(lck); + this->WR --; + } + this->AR ++; + lck.unlock(); + //printf("Finish read lock\n"); + } + + void read_unlock() { + std::unique_lock lck(*mtx); + //printf("Start read unlock\n"); + this->AR --; + if(this->AR == 0 && this->WW > 0) + this->okwrite->notify_all(); + lck.unlock(); + //printf("Finish read unlock\n"); + } + + void write_lock() { + std::unique_lock lck(*mtx); + //printf("Start write lock\n"); + while ((this->AW + this->AR) > 0) { + this->WW ++; + this->okwrite->wait(lck); + this->WW --; + } + this->AW ++; + lck.unlock(); + //printf("Finish write lock\n"); + } + + void write_unlock() { + std::unique_lock lck(*mtx); + //printf("Start write unlock\n"); + this->AW --; + if (this->WW > 0) + this->okwrite->notify_all(); + else if (this->WR > 0) + this->okread->notify_all(); + lck.unlock(); + //printf("Finish write unlock\n"); + } + + private: + + //volatile std::atomic AR=0, WW=0, AW=0, WR=0; is a possible choice +}; + class Embedding{ public: - Embedding() {} + RWLock lock = RWLock(); + Embedding(){} Embedding(int); // Random init an embedding Embedding(int, double*); Embedding(int, std::string); @@ -35,8 +113,8 @@ class Embedding{ Embedding operator/(const double); bool operator==(const Embedding&); private: - int length; double* data; + int length; }; using EmbeddingMatrix = std::vector; @@ -44,6 +122,7 @@ using EmbeddingGradient = Embedding; class EmbeddingHolder{ public: + RWLock lock = RWLock(); EmbeddingHolder(std::string filename); EmbeddingHolder(EmbeddingMatrix &data); ~EmbeddingHolder(); @@ -63,4 +142,4 @@ class EmbeddingHolder{ }; } // namespace proj1 -#endif // THREAD_LIB_EMBEDDING_H_ +#endif // THREAD_LIB_EMBEDDING_H_ \ No newline at end of file diff --git a/Thread-1/lib/model.cc b/Thread-1/lib/model.cc index e857e98..e50d462 100644 --- a/Thread-1/lib/model.cc +++ b/Thread-1/lib/model.cc @@ -16,17 +16,25 @@ double similarity(Embedding* embA, Embedding* embB) { return similarity; } +// NOTE: do not rely on this exact implementation -- it may get modified. EmbeddingGradient* calc_gradient(Embedding* embA, Embedding* embB, int label) { /* For simplicity, here we just simulate the gradient backprop for: 1. a dot product between embeddings 2. a sigmoid activation function 3. a binary cross entropy loss */ + embA->lock.read_lock(); + embB->lock.read_lock(); + if(label == -1) { + label = embB->get_data()[0] > 1e-8? 0: 1; + } double distance = similarity(embA, embB); + embA->lock.read_unlock(); double pred = sigmoid(distance); double loss = binary_cross_entropy_backward((double) label, pred); loss *= sigmoid_backward(distance); EmbeddingGradient *gradA = new Embedding((*embB) * loss); + embB->lock.read_unlock(); // Here we simulate a slow calculation a_slow_function(10); @@ -37,20 +45,23 @@ EmbeddingGradient* cold_start(Embedding* user, Embedding* item) { // Do some downstream work, e.g. let the user watch this video a_slow_function(10); // Then we collect a label, e.g. whether the user finished watching the video - int label = item->get_data()[0] > 1e-8? 0: 1; - return calc_gradient(user, item, label); + return calc_gradient(user, item, -1); } Embedding* recommend(Embedding* user, std::vector items) { Embedding* maxItem; double sim, maxSim = -inf; + user->lock.read_lock(); for (auto item: items) { + item->lock.read_lock(); sim = similarity(user, item); + item->lock.read_unlock(); if (sim > maxSim) { maxItem = item; maxSim = sim; } } + user->lock.read_unlock(); return maxItem; } diff --git a/Thread-1/lib/utils.cc b/Thread-1/lib/utils.cc index 3427314..4c89bd8 100644 --- a/Thread-1/lib/utils.cc +++ b/Thread-1/lib/utils.cc @@ -8,7 +8,7 @@ namespace proj1 { void a_slow_function(int seconds) { - //std::this_thread::sleep_for(std::chrono::seconds(seconds)); + std::this_thread::sleep_for(std::chrono::seconds(seconds)); } double sigmoid(double x) { @@ -42,4 +42,4 @@ AutoTimer::~AutoTimer() { std::cout << m_name << " : " << dur.count() << " usec\n"; } -} // namespace proj1 +} // namespace proj1 \ No newline at end of file diff --git a/Thread-1/q1.cc b/Thread-1/q1.cc index 974ad23..23fe1c5 100644 --- a/Thread-1/q1.cc +++ b/Thread-1/q1.cc @@ -1,5 +1,124 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; + } + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + // int epoch = -1; + //if (inst.payloads.size() > 3) { + // epoch = inst.payloads[3]; + //} + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - std::cout << "please implement this function\n"; - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q1.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q1.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q1_instruction.tsv"); + { + proj1::AutoTimer timer("q1"); // using this to print out timing of the block + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q2.cc b/Thread-1/q2.cc index c4b7413..a4f45bb 100644 --- a/Thread-1/q2.cc +++ b/Thread-1/q2.cc @@ -1,4 +1,136 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + // int epoch = -1; + //if (inst.payloads.size() > 3) { + // epoch = inst.payloads[3]; + //} + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q2.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q2.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q2_instruction.tsv"); + { + proj1::AutoTimer timer("q2"); // using this to print out timing of the block + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q3.cc b/Thread-1/q3.cc index c4b7413..6c575f3 100644 --- a/Thread-1/q3.cc +++ b/Thread-1/q3.cc @@ -1,4 +1,191 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + //printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + if (epoch != -1) { + //printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1]; + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q3.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q3.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q3_instruction.tsv"); + { + proj1::AutoTimer timer("q3"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order != proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order != proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-1/q4.cc b/Thread-1/q4.cc new file mode 100644 index 0000000..56fdfef --- /dev/null +++ b/Thread-1/q4.cc @@ -0,0 +1,202 @@ +#include +#include + +#include // string +#include // timer +#include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + EmbeddingGradient* gradient = calc_gradient(user, item, label); + user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + user->lock.write_unlock(); + delete gradient; + + gradient = calc_gradient(item, user, label); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient, 0.001); + item->lock.write_unlock(); + delete gradient; + + if (epoch != -1) { + printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + Embedding* user = users->get_embedding(user_idx); + std::vector item_pool; + int iter_idx = inst.payloads[1] + 1; + epoch_lock->at(iter_idx)->read_lock(); + printf("Recommend! epoch = %d\n", iter_idx); + + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + epoch_lock->at(iter_idx)->read_unlock(); + + recommendation->write_to_stdout(); + break; + } + } + +} + +} // namespace proj1 + +int main(int argc, char *argv[]) { + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q4.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q4.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q4_instruction.tsv"); + { + proj1::AutoTimer timer("q4"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::INIT_EMB) continue; + if (inst.order == proj1::UPDATE_EMB) { + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + else { + int epoch = inst.payloads[1] + 1; + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; +} \ No newline at end of file diff --git a/Thread-1/q5.cc b/Thread-1/q5.cc index c4b7413..03f1513 100644 --- a/Thread-1/q5.cc +++ b/Thread-1/q5.cc @@ -1,4 +1,224 @@ +#include +#include + +#include // string +#include // timer #include // cout, endl +#include +#include +#include +#include +#include +#include + +#include "lib/utils.h" +#include "lib/model.h" +#include "lib/embedding.h" +#include "lib/instruction.h" + +namespace proj1 { + +void Coldstart(EmbeddingHolder* users, Embedding* new_user, Embedding* item_emb, int user_idx) { + // Call cold start for downstream applications, slow + + EmbeddingGradient* gradient = cold_start(new_user, item_emb); + + new_user->lock.write_lock(); + users->update_embedding(user_idx, gradient, 0.01); + new_user->lock.write_unlock(); + delete gradient; +} + +void run_one_instruction(Instruction inst, EmbeddingHolder* users, EmbeddingHolder* items, std::vector* counter, proj1::RWLock* counter_lock, std::vector* epoch_lock, int max_epoch) { + switch(inst.order) { + case INIT_EMB: { + // We need to init the embedding + users->lock.read_lock(); + int length = users->get_emb_length(); + users->lock.read_unlock(); + + Embedding* new_user = new Embedding(length); + + users->lock.write_lock(); + int user_idx = users->append(new_user); + users->lock.write_unlock(); + + std::vector multiple_coldstarts; + + for (int item_index: inst.payloads) { + Embedding* item_emb = items->get_embedding(item_index); + std::thread *t = new std::thread(Coldstart, users, new_user, item_emb, user_idx); + multiple_coldstarts.push_back(t); + } + int len = multiple_coldstarts.size(); + for (int j=0;jjoin(); + + break; + } + case UPDATE_EMB: { + int user_idx = inst.payloads[0]; + int item_idx = inst.payloads[1]; + int label = inst.payloads[2]; + // You might need to add this state in other questions. + // Here we just show you this as an example + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if (epoch != -1) { + epoch_lock->at(epoch)->read_lock(); + printf("Started! epoch = %d\n", epoch); + } + + Embedding* user = users->get_embedding(user_idx); + Embedding* item = items->get_embedding(item_idx); + + std::future t_user = std::async(proj1::calc_gradient, user, item, label); + std::future t_item = std::async(proj1::calc_gradient, item, user, label); + + EmbeddingGradient* gradient_user = t_user.get(); + EmbeddingGradient* gradient_item = t_item.get(); + + user->lock.write_lock(); + item->lock.write_lock(); + items->update_embedding(item_idx, gradient_user, 0.001); + users->update_embedding(user_idx, gradient_item, 0.01); + item->lock.write_unlock(); + user->lock.write_unlock(); + delete gradient_user; + delete gradient_item; + + if (epoch != -1) { + printf("Finished! epoch = %d\n", epoch); + epoch_lock->at(epoch)->read_unlock(); + counter_lock->write_lock(); + (*(counter->at(epoch))) --; + if ((*(counter->at(epoch))) == 0) { + for(int i=epoch+1;i<=max_epoch;++i) { + epoch_lock->at(i)->write_unlock(); + if ((*(counter->at(i))) != 0) break; + } + } + counter_lock->write_unlock(); + } + break; + } + case RECOMMEND: { + int user_idx = inst.payloads[0]; + std::vector item_pool; + int iter_idx = inst.payloads[1] + 1; + while(true) { + counter_lock->read_lock(); + bool flag = true; + if(iter_idx > 0) { + for(int i=0;iat(i))) != 0) { + flag = false; + break; + } + } + } + if(iter_idx == 0 || flag == true) { + counter_lock->read_unlock(); + printf("Recommend! epoch = %d\n", iter_idx); + proj1::EmbeddingHolder* users_copy = users; + proj1::EmbeddingHolder* items_copy = items; + Embedding* user = users_copy->get_embedding(user_idx); + for (unsigned int i = 2; i < inst.payloads.size(); ++i) { + int item_idx = inst.payloads[i]; + item_pool.push_back(items_copy->get_embedding(item_idx)); + } + Embedding* recommendation = recommend(user, item_pool); + recommendation->write_to_stdout(); + break; + } + else { + counter_lock->read_unlock(); + } + } + + //epoch_lock->at(iter_idx)->read_unlock(); + break; + } + } + +} + +} // namespace proj1 + int main(int argc, char *argv[]) { - exit(1); + + proj1::EmbeddingHolder* users = new proj1::EmbeddingHolder("data/q4.in"); + proj1::EmbeddingHolder* items = new proj1::EmbeddingHolder("data/q4.in"); + proj1::Instructions instructions = proj1::read_instructrions("data/q4_instruction.tsv"); + { + proj1::AutoTimer timer("q5"); // using this to print out timing of the block + + // Preprocesssing + int max_epoch = -1; + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::UPDATE_EMB) continue; + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + std::vector counter; + std::vector epoch_lock; + proj1::RWLock *counter_lock = new proj1::RWLock; + for (int i=0;i<=max_epoch;++i) { + epoch_lock.push_back(new proj1::RWLock); + epoch_lock[i]->write_lock(); + counter.push_back(new int(0)); + } + for (proj1::Instruction inst: instructions) { + if (inst.order == proj1::INIT_EMB) continue; + if (inst.order == proj1::UPDATE_EMB) { + int epoch = -1; + if (inst.payloads.size() > 3) { + epoch = inst.payloads[3]; + } + if(epoch >= 0) + (*counter[epoch]) ++; + } + else { + int epoch = inst.payloads[1] + 1; + max_epoch = epoch > max_epoch ? epoch : max_epoch; + } + } + for (int i=0;i<=max_epoch;++i) { + epoch_lock[i]->write_unlock(); + if ((*counter[i]) != 0) break; + } + // printf("%d\n", max_epoch); + + // Run all the instructions + std::vector threadArr; + for (proj1::Instruction inst: instructions) { + //printf("%d\n", threadArr.size()); + std::thread *t = new std::thread(proj1::run_one_instruction, inst, users, items, &counter, counter_lock, &epoch_lock, max_epoch); + threadArr.push_back(t); + } + int len = threadArr.size(); + for (int j=0;jjoin(); + delete counter_lock; + for (int i=0;i<=max_epoch;++i) { + delete counter[i]; + delete epoch_lock[i]; + } + } + + // Write the result + users->write_to_stdout(); + items->write_to_stdout(); + + // We only need to delete the embedding holders, as the pointers are all + // pointing at the emb_matx of the holders. + delete users; + delete items; + + return 0; } \ No newline at end of file diff --git a/Thread-2/.DS_Store b/Thread-2/.DS_Store new file mode 100644 index 0000000..0f6d81b Binary files /dev/null and b/Thread-2/.DS_Store differ diff --git a/Thread-2/boat/boat.cc b/Thread-2/boat/boat.cc index 7acb29b..03e802d 100644 --- a/Thread-2/boat/boat.cc +++ b/Thread-2/boat/boat.cc @@ -1,14 +1,123 @@ #include #include #include +#include +#include +#include #include "boat.h" namespace proj2{ Boat::Boat(){ + bool boatInO; + int num_children_O; + int num_adults_O; + int num_children_M; + int num_adults_M; + std::condition_variable children_condition_O; + std::condition_variable children_condition_M; + std::condition_variable adults_condition_O; + std::condition_variable adults_condition_M; + bool gameover; + bool is_pilot; + bool is_adult_go; } - void Boat:: begin(int a, int b, BoatGrader *bg){ -} + num_children_O = b; + num_adults_O = a; + num_children_M = 0; + num_adults_M = 0; + boatInO = true; + std::mutex mtx; + gameover = false; + is_pilot = true; + is_adult_go = false; + std::thread threads[a+b]; + for (int i = 0; i < b; ++i){ + threads[i] = std::thread(&Boat::child_itinerary, this, std::ref(bg)); + } + for (int i = 0; i < a; ++i){ + threads[i+b] = std::thread(&Boat::adult_itinerary, this, std::ref(bg)); + } + for (int i = 0; i < a+b; ++i){ + threads[i].join(); + } + return; +} +void Boat:: adult_itinerary(BoatGrader *bg){ + bg->initializeAdult(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::yield(); + std::unique_lock lck(mtx); + while(!(is_adult_go && boatInO)){ + adults_condition_O.wait(lck); + if(gameover) return; + } + bg->AdultRowToMolokai(); + num_adults_M++; + num_adults_O--; + boatInO = false; + is_adult_go = false; + children_condition_M.notify_one(); + adults_condition_M.wait(lck); + if(gameover) return; +} +void Boat:: child_itinerary(BoatGrader *bg){ + bg->initializeChild(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::yield(); + std::unique_lock lck(mtx); + while(!gameover){ + if(boatInO){ + if(is_adult_go){ + adults_condition_O.notify_one(); + children_condition_O.wait(lck); + if(gameover) return; + } + if(is_pilot){ + bg->ChildRowToMolokai(); + num_children_O--; + num_children_M++; + is_pilot = false; + children_condition_O.notify_one(); + children_condition_M.wait(lck); + if(gameover) return; + }else{ + bg->ChildRideToMolokai(); + boatInO = false; + num_children_O--; + num_children_M++; + is_pilot = true; + if(num_adults_O==0 && num_children_O==0){ + gameover = true; + } + if(gameover){ + std::cout<<"Success!"<ChildRowToOahu(); + boatInO = true; + num_children_O++; + num_children_M--; + if(is_adult_go){ + adults_condition_O.notify_one(); + }else{ + children_condition_O.notify_one(); + } + children_condition_O.wait(lck); + if(gameover) return; + } + } +} } \ No newline at end of file diff --git a/Thread-2/boat/boat.h b/Thread-2/boat/boat.h index efa1462..312f62a 100644 --- a/Thread-2/boat/boat.h +++ b/Thread-2/boat/boat.h @@ -3,7 +3,9 @@ #include #include +#include #include +#include #include #include "boatGrader.h" @@ -13,8 +15,25 @@ class Boat{ public: Boat(); ~Boat(){}; + bool boatInO; + int num_children_O; + int num_adults_O; + int num_children_M; + int num_adults_M; + std::mutex mtx; + std::condition_variable children_condition_O; + std::condition_variable children_condition_M; + std::condition_variable adults_condition_O; + std::condition_variable adults_condition_M; + bool gameover; + bool is_adult_go; + bool is_pilot; void begin(int, int, BoatGrader*); + void adult_itinerary(BoatGrader*); + void child_itinerary(BoatGrader*); }; +//void adult_itinerary(Boat *boat, BoatGrader *bg); +//void child_itinerary(Boat *boat, BoatGrader *bg); } #endif // BOAT_H_ diff --git a/Thread-2/deadlock/data/example.in b/Thread-2/deadlock/data/example.in index 567adc2..31b1927 100644 --- a/Thread-2/deadlock/data/example.in +++ b/Thread-2/deadlock/data/example.in @@ -1,3 +1,131 @@ 10 10 10 10 0 1 5 6 3 3 0 -1 0 5 6 3 3 0 \ No newline at end of file +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +0 1 5 6 3 3 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 +1 0 5 6 3 3 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +1 2 6 9 2 4 0 +1 2 7 8 2 4 0 +2 3 4 8 3 3 1 +2 3 9 1 -1 2 0 +3 2 8 9 -1 -1 0 \ No newline at end of file diff --git a/Thread-2/deadlock/lib/resource_manager.cc b/Thread-2/deadlock/lib/resource_manager.cc index 2231181..76a532b 100644 --- a/Thread-2/deadlock/lib/resource_manager.cc +++ b/Thread-2/deadlock/lib/resource_manager.cc @@ -1,48 +1,127 @@ #include +#include #include #include #include #include "resource_manager.h" +#include "thread_manager.h" namespace proj2 { int ResourceManager::request(RESOURCE r, int amount) { if (amount <= 0) return 1; + //std::unique_lock lock(this->resource_mutex); + while(true) { + std::unique_lock lock(this->resource_mutex); + if (amount > this->resource_amount[r]) { + std::cout<<"+++"<resource_cv.wait(lock); + std::cout<<"wakeup"< lock(this->resource_mutex); + } + else { + this->resource_amount[r] -= amount; + std::thread::id this_id = std::this_thread::get_id(); + if (r==0) GPU_alloc[this_id] += amount; + if (r==1) MEMORY_alloc[this_id] += amount; + if (r==2) DISK_alloc[this_id] += amount; + if (r==3) NETWORK_alloc[this_id] += amount; + int GPU_amount = this->resource_amount[GPU]; + int MEMORY_amount = this->resource_amount[MEMORY]; + int DISK_amount = this->resource_amount[DISK]; + int NETWORK_amount = this->resource_amount[NETWORK]; + //simulate by Banker's algorithm + bool s = safe(GPU_amount, MEMORY_amount, DISK_amount, NETWORK_amount, GPU_claim, MEMORY_claim, DISK_claim, NETWORK_claim, GPU_alloc, MEMORY_alloc, DISK_alloc, NETWORK_alloc); + if (s==true) { + std::cout<<"--"<resource_mutex.unlock(); + return 0; + } + else { + this->resource_amount[r] += amount; + if (r==0) GPU_alloc[std::this_thread::get_id()] -= amount; + if (r==1) MEMORY_alloc[std::this_thread::get_id()] -= amount; + if (r==2) DISK_alloc[std::this_thread::get_id()] -= amount; + if (r==3) NETWORK_alloc[std::this_thread::get_id()] -= amount; + std::cout<<"++"<resource_cv.wait(lock); + std::cout<<"wakeup"< lk(this->resource_mutex[r]); - while (true) { - if (this->resource_cv[r].wait_for( - lk, std::chrono::milliseconds(100), - [this, r, amount] { return this->resource_amount[r] >= amount; } - )) { - break; - } else { - auto this_id = std::this_thread::get_id(); - /* HINT: If you choose to detect the deadlock and recover, - implement your code here to kill and restart threads. - Note that you should release this thread's resources - properly. - */ - if (tmgr->is_killed(this_id)) { - return -1; +bool ResourceManager::safe(int GPU_amount, int MEMORY_amount, int DISK_amount, int NETWORK_amount, std::map GPU_claim, std::map MEMORY_claim, std::map DISK_claim, std::map NETWORK_claim, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc) { + std::map GPU_claim_c = GPU_claim; + std::map MEMORY_claim_c = MEMORY_claim; + std::map DISK_claim_c = DISK_claim; + std::map NETWORK_claim_c = NETWORK_claim; + std::map GPU_alloc_c = GPU_alloc; + std::map MEMORY_alloc_c = MEMORY_alloc; + std::map DISK_alloc_c = DISK_alloc; + std::map NETWORK_alloc_c = NETWORK_alloc; + bool possible = true; + while (possible) { + bool flag = false; + std::map::iterator iter; + for (iter = GPU_claim_c.begin(); iter != GPU_claim_c.end(); iter++) { + std::thread::id this_id = iter->first; + if (tmgr->is_killed(this_id)==true) { + flag = true; + std::cout<<"%%"<resource_amount[r] -= amount; - this->resource_mutex[r].unlock(); - return 0; + if (GPU_claim_c.size() == 0) return true; + return false; } void ResourceManager::release(RESOURCE r, int amount) { if (amount <= 0) return; - std::unique_lock lk(this->resource_mutex[r]); + std::unique_lock lock(this->resource_mutex); this->resource_amount[r] += amount; - this->resource_cv[r].notify_all(); + std::thread::id this_id = std::this_thread::get_id(); + if (r==0) GPU_alloc[this_id] -= amount; + if (r==1) MEMORY_alloc[this_id] -= amount; + if (r==2) DISK_alloc[this_id] -= amount; + if (r==3) NETWORK_alloc[this_id] -= amount; + std::cout<<"!"<resource_cv.notify_all(); + return; } void ResourceManager::budget_claim(std::map budget) { // This function is called when some workload starts. // The workload will eventually consume all resources it claims + // set resource_claim, init resource_alloc, init is_suspend + std::unique_lock lock(this->resource_mutex); + std::thread::id this_id = std::this_thread::get_id(); + + GPU_claim.insert(std::map::value_type(this_id, (budget.count(GPU)==1)?budget[GPU]:0)); + MEMORY_claim.insert(std::map::value_type(this_id, (budget.count(MEMORY)==1)?budget[MEMORY]:0)); + DISK_claim.insert(std::map::value_type(this_id, (budget.count(DISK)==1)?budget[DISK]:0)); + NETWORK_claim.insert(std::map::value_type(this_id, (budget.count(NETWORK)==1)?budget[NETWORK]:0)); + + GPU_alloc.insert(std::map::value_type(this_id, 0)); + MEMORY_alloc.insert(std::map::value_type(this_id, 0)); + DISK_alloc.insert(std::map::value_type(this_id, 0)); + NETWORK_alloc.insert(std::map::value_type(this_id, 0)); + + std::cout<<"**"< init_count): \ resource_amount(init_count), tmgr(t) {} + std::map GPU_claim; + std::map MEMORY_claim; + std::map DISK_claim; + std::map NETWORK_claim; + //std::map GPU_request; + //std::map MEMORY_request; + //std::map DISK_request; + //std::map NETWORK_request; + std::map GPU_alloc; + std::map MEMORY_alloc; + std::map DISK_alloc; + std::map NETWORK_alloc; + std::map is_suspend; + //void request_claim_release(RESOURCE , std::thread::id); + bool safe(int GPU_amount, int MEMORY_amount, int DISK_amount, int NETWORK_amount, std::map GPU_request, std::map MEMORY_request, std::map DISK_request, std::map NETWORK_request, std::map GPU_alloc, std::map MEMORY_alloc, std::map DISK_alloc, std::map NETWORK_alloc); void budget_claim(std::map budget); int request(RESOURCE, int amount); void release(RESOURCE, int amount); private: std::map resource_amount; - std::map resource_mutex; - std::map resource_cv; + //std::map resource_mutex; + std::mutex resource_mutex; + std::condition_variable resource_cv; + //std::map resource_cv; ThreadManager *tmgr; }; diff --git a/Thread-2/deadlock/lib/resource_manager_test.cc b/Thread-2/deadlock/lib/resource_manager_test.cc new file mode 100644 index 0000000..e69de29 diff --git a/Thread-2/deadlock/lib/util_test.cc b/Thread-2/deadlock/lib/util_test.cc new file mode 100644 index 0000000..e69de29 diff --git a/Thread-2/deadlock/lib/utils.cc b/Thread-2/deadlock/lib/utils.cc index f27558c..fd9bac7 100644 --- a/Thread-2/deadlock/lib/utils.cc +++ b/Thread-2/deadlock/lib/utils.cc @@ -11,7 +11,7 @@ namespace proj2 { void a_slow_function(int seconds) { - std::this_thread::sleep_for(std::chrono::seconds(seconds)); + //std::this_thread::sleep_for(std::chrono::seconds(seconds)); } int randint(int lower, int upper) { diff --git a/Thread-2/deadlock/lib/workload_lib_test.cc b/Thread-2/deadlock/lib/workload_lib_test.cc new file mode 100644 index 0000000..e69de29