Skip to content

Commit 8fb5f1e

Browse files
Add mutex
1 parent 9e86503 commit 8fb5f1e

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

c/typedb_driver.i

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,73 @@ struct TransactionCallbackDirector {
118118
#include <memory>
119119
#include <iostream>
120120
#include <unordered_map>
121-
// TODO: ADD MUTEX
122-
static std::unordered_map<size_t, TransactionCallbackDirector*> transactionOnCloseCallbacks {};
121+
122+
class ThreadSafeTransactionCallbacks {
123+
private:
124+
// 1. The static map to protect
125+
static std::unordered_map<size_t, TransactionCallbackDirector*> s_transactionOnCloseCallbacks;
126+
127+
// 2. The static mutex to manage access
128+
static std::mutex s_mutex;
129+
130+
public:
131+
// Delete copy/move constructors and assignment operators
132+
// to prevent accidental copying of the singleton-like structure
133+
ThreadSafeTransactionCallbacks(const ThreadSafeTransactionCallbacks&) = delete;
134+
ThreadSafeTransactionCallbacks& operator=(const ThreadSafeTransactionCallbacks&) = delete;
135+
136+
// --- Core Operations ---
137+
138+
/**
139+
* @brief Inserts a key-value pair into the map in a thread-safe manner.
140+
*/
141+
static void insert(size_t key, TransactionCallbackDirector* value) {
142+
// Lock the mutex for the duration of this scope
143+
std::lock_guard<std::mutex> lock(s_mutex);
144+
145+
// Thread-safe insertion
146+
s_transactionOnCloseCallbacks[key] = value;
147+
}
148+
149+
/**
150+
* @brief Retrieves a value associated with a key in a thread-safe manner.
151+
* @returns The value pointer, or nullptr if the key is not found.
152+
*/
153+
static TransactionCallbackDirector* find(size_t key) {
154+
// Lock the mutex for the duration of this scope
155+
std::lock_guard<std::mutex> lock(s_mutex);
156+
157+
// Thread-safe lookup
158+
auto it = s_transactionOnCloseCallbacks.find(key);
159+
if (it != s_transactionOnCloseCallbacks.end()) {
160+
return it->second;
161+
}
162+
return nullptr; // Return nullptr if not found
163+
}
164+
165+
/**
166+
* @brief Removes a key-value pair from the map in a thread-safe manner.
167+
*/
168+
static void remove(size_t key) {
169+
// Lock the mutex for the duration of this scope
170+
std::lock_guard<std::mutex> lock(s_mutex);
171+
172+
// Thread-safe removal
173+
s_transactionOnCloseCallbacks.erase(key);
174+
}
175+
176+
// Add other necessary map operations (e.g., size(), contains(), clear()) here...
177+
};
178+
179+
// Initialize the static members
180+
std::unordered_map<size_t, TransactionCallbackDirector*> ThreadSafeTransactionCallbacks::s_transactionOnCloseCallbacks;
181+
std::mutex ThreadSafeTransactionCallbacks::s_mutex;
182+
123183
static void transaction_callback_execute(size_t ID, Error* error) {
124184
try {
125-
auto cb = transactionOnCloseCallbacks.at(ID);
185+
auto cb = ThreadSafeTransactionCallbacks::find(ID);
126186
cb->callback(error);
127-
transactionOnCloseCallbacks.erase(ID);
187+
ThreadSafeTransactionCallbacks::remove(ID);
128188
} catch (std::exception const& e) {
129189
std::cerr << "[ERROR] " << e.what() << std::endl;
130190
}
@@ -138,7 +198,7 @@ static void transaction_callback_execute(size_t ID, Error* error) {
138198
void transaction_on_close_register(const Transaction* transaction, TransactionCallbackDirector* handler) {
139199
static std::atomic_size_t nextID;
140200
std::size_t ID = nextID.fetch_add(1);
141-
transactionOnCloseCallbacks.insert({ID, handler});
201+
ThreadSafeTransactionCallbacks::insert(ID, handler);
142202
transaction_on_close(transaction, ID, &transaction_callback_execute);
143203
}
144204
%}

rust/tests/integration/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ fn transaction_callback() {
7272
}
7373
}));
7474

75-
drop(transaction); // TODO: drop isn't blocking... so we need to spin?
75+
drop(transaction); // TODO: drop isn't blocking... so we need to spin? or is there an alternative?
7676

7777
while !close_called.load(Ordering::Acquire) {
7878
// Yield the current time slice to the OS scheduler.

0 commit comments

Comments
 (0)