-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproblemStatement.cpp
More file actions
161 lines (133 loc) · 4.48 KB
/
Copy pathproblemStatement.cpp
File metadata and controls
161 lines (133 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Include necessary headers for threading, synchronization, and data structures
#include <iostream>
#include <vector>
#include <queue>
#include <functional> // For std::function
#include <mutex> // For thread synchronization
#include <condition_variable>
#include <thread>
#include <atomic> // For atomic operations
using namespace std;
/*
Thread Pool Class (Executor)
- Manages a group of worker threads
- Maintains a queue of tasks to execute
- Handles task distribution and thread synchronization
*/
class Executor {
private:
// Member Variables
vector<thread> threads; // Container for worker threads
queue<function<void()>> tasks; // Task queue storing functions to execute
mutex task_mutex; // Protects access to the task queue
condition_variable cv; // Coordinates thread wake/sleep
atomic<bool> stop; // Thread-safe flag for shutdown
public:
// Constructor: Creates worker threads
Executor(size_t num_threads = thread::hardware_concurrency())
: stop(false) // Initialize stop flag to false
{
// Create specified number of worker threads
for (size_t i = 0; i < num_threads; ++i) {
// Each thread runs the ExecuteTask member function
threads.emplace_back(&Executor::ExecuteTask, this);
}
}
// Destructor: Ensures proper cleanup
~Executor() {
shutdown();
}
// Submit a task to the thread pool
void SubmitTask(function<void()> fn) {
{
// Lock the task queue for safe access
lock_guard<mutex> lock(task_mutex);
// Add the task to the queue
tasks.push(fn);
}
// Notify one waiting thread that a task is available
cv.notify_one();
}
// Main worker thread function
void ExecuteTask() {
while (true) {
function<void()> task;
{
// Wait until tasks are available or shutdown is requested
unique_lock<mutex> lock(task_mutex);
cv.wait(lock, [this]() {
return stop || !tasks.empty();
});
// Exit if shutdown requested and queue is empty
if (stop && tasks.empty()) return;
// Get the next task from the queue
if (!tasks.empty()) {
task = tasks.front();
tasks.pop();
}
}
// Execute the task outside the lock
if (task) task();
}
}
// Shut down the thread pool gracefully
void shutdown() {
// Set stop flag
stop = true;
// Wake up all threads
cv.notify_all();
// Wait for all threads to finish
for (auto& t : threads) {
if (t.joinable()) t.join();
}
}
// Get current number of pending tasks
size_t GetQueueSize() {
lock_guard<mutex> lock(task_mutex);
return tasks.size();
}
// Check if shutdown has been requested
bool shutdown_requested() const {
return stop.load();
}
};
// Main program demonstrating thread pool usage
int main() {
// Create thread pool with default number of threads
Executor executor;
// Thread-safe counter to track completed tasks
atomic<int> counter{0};
// Number of tasks to process (1 million)
const int NUM_TASKS = 1000000;
// Create a task producer thread
thread producer([&]() {
for (int i = 0; i < NUM_TASKS; ++i) {
// Submit task to increment counter
executor.SubmitTask([&]() {
counter++;
});
}
});
// Create monitoring thread
thread monitor([&]() {
while (true) {
// Exit condition: shutdown requested and queue empty
if (executor.shutdown_requested() &&
executor.GetQueueSize() == 0) break;
// Print current status every 100ms
cout << "Tasks in queue: " << executor.GetQueueSize()
<< " | Completed tasks: " << counter.load() << endl;
this_thread::sleep_for(chrono::milliseconds(100));
}
});
// Wait for producer to finish submitting tasks
producer.join();
// Wait for monitor to finish
monitor.join();
// Shut down thread pool
executor.shutdown();
// Final output
cout << "\nAll tasks completed!\n";
cout << "Final counter value: " << counter << endl;
return 0;
}