-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAtomicTaskScheduler.cpp
More file actions
140 lines (120 loc) · 5.28 KB
/
AtomicTaskScheduler.cpp
File metadata and controls
140 lines (120 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
An atomic task scheduler ensures that tasks are executed at specific times with thread safety and no overlapping execution. Below is an implementation of an atomic task scheduler in C++ using C++14, leveraging std::thread, std::mutex, and std::condition_variable for concurrency and synchronization
Explanation
1. Task Structure:
• Each task has a function (func) to execute and a time point (executeAt) indicating when it should run.
• A priority_queue is used to manage tasks, with the earliest task at the top.
2. Scheduler Thread:
• Continuously checks the task queue for tasks ready to execute.
• Waits until the next task's execution time or until a new task is added.
3. Thread Safety:
• A mutex ensures thread-safe access to the task queue.
• A condition_variable is used to notify the scheduler thread when new tasks are added.
4. Task Scheduling:
• schedule: Schedules a task to run at a specific time.
• scheduleAfter: Schedules a task to run after a delay (in milliseconds).
5. Main Function:
• Demonstrates scheduling three tasks with different delays.
• The main thread sleeps to allow the scheduler to execute tasks.
---
Output
For the above code, the output will look something like this (timestamps will vary):
Task 3 executed at 1234567890123
Task 1 executed at 1234567891123
Task 2 executed at 1234567892123
Key Features
1. Atomic Execution: Tasks are executed one at a time in the order of their scheduled time.
2. Concurrency: The scheduler runs in a separate thread.
3. Thread Safety: Uses mutex and condition_variable for safe access to shared resources.
4. Flexible Scheduling: Supports scheduling tasks at specific times or after delays.
---
Complexity
• Task Scheduling: O(log n) for adding tasks to the priority queue.
• Task Execution: O(1) for executing the top task.
• Space Complexity: O(n), where n is the number of tasks in the queue.
This implementation is efficient and ensures atomic execution of tasks.
*/
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
using namespace std;
class AtomicTaskScheduler {
private:
struct Task {
function<void()> func; // The task to execute
chrono::time_point<chrono::steady_clock> executeAt; // Execution time
// Comparator for priority queue (earliest task first)
bool operator>(const Task& other) const {
return executeAt > other.executeAt;
}
};
priority_queue<Task, vector<Task>, greater<Task>> taskQueue; // Min-heap for tasks
mutex mtx; // Mutex for thread safety
condition_variable cv; // Condition variable for task scheduling
bool stopScheduler = false; // Flag to stop the scheduler
thread schedulerThread; // Scheduler thread
// Scheduler thread function
void run() {
while (true) {
unique_lock<mutex> lock(mtx);
// Wait until there is a task or the scheduler is stopped
cv.wait(lock, [this]() { return !taskQueue.empty() || stopScheduler; });
if (stopScheduler && taskQueue.empty()) {
break; // Exit the thread if the scheduler is stopped
}
auto now = chrono::steady_clock::now();
auto nextTask = taskQueue.top();
if (now >= nextTask.executeAt) {
// Execute the task
taskQueue.pop();
lock.unlock(); // Unlock before executing the task
nextTask.func();
} else {
// Wait until the next task's execution time
cv.wait_until(lock, nextTask.executeAt);
}
}
}
public:
AtomicTaskScheduler() {
// Start the scheduler thread
schedulerThread = thread([this]() { run(); });
}
~AtomicTaskScheduler() {
{
lock_guard<mutex> lock(mtx);
stopScheduler = true;
}
cv.notify_all(); // Notify all threads to stop
if (schedulerThread.joinable()) {
schedulerThread.join(); // Wait for the scheduler thread to finish
}
}
// Schedule a task to run at a specific time
void schedule(function<void()> func, chrono::time_point<chrono::steady_clock> time) {
{
lock_guard<mutex> lock(mtx);
taskQueue.push({func, time});
}
cv.notify_all(); // Notify the scheduler thread
}
// Schedule a task to run after a delay (in milliseconds)
void scheduleAfter(function<void()> func, int delayMs) {
auto executeAt = chrono::steady_clock::now() + chrono::milliseconds(delayMs);
schedule(func, executeAt);
}
};
int main() {
AtomicTaskScheduler scheduler;
// Schedule tasks
scheduler.scheduleAfter([]() { cout << "Task 1 executed at " << chrono::steady_clock::now().time_since_epoch().count() << endl; }, 1000); // 1 second delay
scheduler.scheduleAfter([]() { cout << "Task 2 executed at " << chrono::steady_clock::now().time_since_epoch().count() << endl; }, 2000); // 2 seconds delay
scheduler.scheduleAfter([]() { cout << "Task 3 executed at " << chrono::steady_clock::now().time_since_epoch().count() << endl; }, 500); // 0.5 second delay
// Keep the main thread alive for a while to let tasks execute
this_thread::sleep_for(chrono::seconds(3));
return 0;
}