Added RW Mutex

This commit is contained in:
WolverinDEV 2020-03-04 16:27:07 +01:00
parent ae4751ee6f
commit 0d0d7dd192
5 changed files with 3432 additions and 3432 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
// //
// Created by WolverinDEV on 03/03/2020. // Created by WolverinDEV on 03/03/2020.
// //
#include "rw_mutex.h" #include "rw_mutex.h"

File diff suppressed because it is too large Load Diff

View File

@ -1,269 +1,269 @@
// //
// Created by WolverinDEV on 03/03/2020. // Created by WolverinDEV on 03/03/2020.
// //
#include "src/lock/rw_mutex.h" #include "src/lock/rw_mutex.h"
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <functional> #include <functional>
#include <algorithm> #include <algorithm>
namespace helper { namespace helper {
std::mutex threads_lock{}; std::mutex threads_lock{};
size_t running_threads{0}; size_t running_threads{0};
std::vector<std::thread> thread_handles{}; std::vector<std::thread> thread_handles{};
bool sync_pending{false}; bool sync_pending{false};
inline void sync_threads() { inline void sync_threads() {
static std::condition_variable sync_cv; static std::condition_variable sync_cv;
static size_t synced_threads{0}; static size_t synced_threads{0};
auto timeout = std::chrono::system_clock::now() + std::chrono::seconds{5}; auto timeout = std::chrono::system_clock::now() + std::chrono::seconds{5};
std::unique_lock lock{threads_lock}; std::unique_lock lock{threads_lock};
if(++synced_threads == running_threads) { if(++synced_threads == running_threads) {
sync_cv.notify_all(); sync_cv.notify_all();
synced_threads = 0; synced_threads = 0;
sync_pending = false; sync_pending = false;
return; return;
} }
sync_pending = true; sync_pending = true;
if(sync_cv.wait_until(lock, timeout) == std::cv_status::timeout) { if(sync_cv.wait_until(lock, timeout) == std::cv_status::timeout) {
std::cerr << "failed to sync threads" << std::endl; std::cerr << "failed to sync threads" << std::endl;
abort(); abort();
} }
} }
template <typename... args_t> template <typename... args_t>
inline void create_thread(args_t&&... arguments) { inline void create_thread(args_t&&... arguments) {
auto callback = std::bind(arguments...); auto callback = std::bind(arguments...);
std::lock_guard tlock{threads_lock}; std::lock_guard tlock{threads_lock};
thread_handles.emplace_back([callback]{ thread_handles.emplace_back([callback]{
callback(); callback();
std::lock_guard tlock{threads_lock}; std::lock_guard tlock{threads_lock};
auto it = std::find_if(thread_handles.begin(), thread_handles.end(), [](const auto& thread) { auto it = std::find_if(thread_handles.begin(), thread_handles.end(), [](const auto& thread) {
return thread.get_id() == std::this_thread::get_id(); return thread.get_id() == std::this_thread::get_id();
}); });
if(it == thread_handles.end()) if(it == thread_handles.end())
return; /* thread will be joined via await_all_threads */ return; /* thread will be joined via await_all_threads */
if(sync_pending) { if(sync_pending) {
std::cerr << "thread terminated while sync was pending" << std::endl; std::cerr << "thread terminated while sync was pending" << std::endl;
abort(); abort();
} }
running_threads--; running_threads--;
it->detach(); it->detach();
thread_handles.erase(it); thread_handles.erase(it);
}); });
running_threads++; running_threads++;
} }
inline void await_all_threads() { inline void await_all_threads() {
std::unique_lock thread_lock{threads_lock}; std::unique_lock thread_lock{threads_lock};
while(!thread_handles.empty()) { while(!thread_handles.empty()) {
auto thread = std::move(thread_handles.back()); auto thread = std::move(thread_handles.back());
thread_handles.pop_back(); thread_handles.pop_back();
thread_lock.unlock(); thread_lock.unlock();
thread.join(); thread.join();
thread_lock.lock(); thread_lock.lock();
if(sync_pending) { if(sync_pending) {
std::cerr << "thread terminated while sync was pending" << std::endl; std::cerr << "thread terminated while sync was pending" << std::endl;
abort(); abort();
} }
running_threads--; running_threads--;
} }
} }
} }
void testSharedLockLock() { void testSharedLockLock() {
ts::rw_mutex lock{}; ts::rw_mutex lock{};
lock.lock_shared(); lock.lock_shared();
lock.lock_shared(); lock.lock_shared();
lock.unlock_shared(); lock.unlock_shared();
lock.lock_shared(); lock.lock_shared();
lock.unlock_shared(); lock.unlock_shared();
lock.unlock_shared(); lock.unlock_shared();
} }
void testExclusiveLock() { void testExclusiveLock() {
ts::rw_mutex lock{}; ts::rw_mutex lock{};
size_t read_locked{0}, write_locked{0}; size_t read_locked{0}, write_locked{0};
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
if(rand() % 2 == 0) { if(rand() % 2 == 0) {
write_locked++; write_locked++;
lock.lock(); lock.lock();
lock.unlock(); lock.unlock();
} else { } else {
read_locked++; read_locked++;
lock.lock_shared(); lock.lock_shared();
lock.unlock_shared(); lock.unlock_shared();
} }
} }
/* to ensure that the read and write function has been tested more than once */ /* to ensure that the read and write function has been tested more than once */
assert(read_locked > 1); assert(read_locked > 1);
assert(write_locked > 1); assert(write_locked > 1);
} }
void testRW_Multithread() { void testRW_Multithread() {
ts::rw_mutex lock{}; ts::rw_mutex lock{};
for(int i = 0; i < 20; i++) { for(int i = 0; i < 20; i++) {
const auto do_sync = i < 5; const auto do_sync = i < 5;
std::atomic<bool> write_locked{false}; std::atomic<bool> write_locked{false};
helper::create_thread([&]{ helper::create_thread([&]{
lock.lock(); lock.lock();
write_locked = true; write_locked = true;
if(do_sync) helper::sync_threads(); /* sync point 1 */ if(do_sync) helper::sync_threads(); /* sync point 1 */
std::this_thread::sleep_for(std::chrono::seconds{1}); std::this_thread::sleep_for(std::chrono::seconds{1});
write_locked = false; write_locked = false;
lock.unlock(); lock.unlock();
}); });
helper::create_thread([&]{ helper::create_thread([&]{
if(do_sync) helper::sync_threads(); /* sync point 1 */ if(do_sync) helper::sync_threads(); /* sync point 1 */
lock.lock_shared(); lock.lock_shared();
std::cout << "shared locked" << std::endl; std::cout << "shared locked" << std::endl;
if(write_locked) { if(write_locked) {
std::cerr << "Shared lock succeeded, but thread has been write locked!" << std::endl; std::cerr << "Shared lock succeeded, but thread has been write locked!" << std::endl;
return; return;
} }
std::this_thread::sleep_for(std::chrono::seconds{1}); std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << "unlocking shared lock" << std::endl; std::cout << "unlocking shared lock" << std::endl;
lock.unlock_shared(); lock.unlock_shared();
}); });
helper::await_all_threads(); helper::await_all_threads();
std::cout << "Loop " << i << " passed" << std::endl; std::cout << "Loop " << i << " passed" << std::endl;
} }
} }
void testLockUpgrade() { void testLockUpgrade() {
std::atomic<bool> lock_released{true}; std::atomic<bool> lock_released{true};
std::atomic<bool> lock_shared{false}; std::atomic<bool> lock_shared{false};
ts::rw_mutex lock{}; ts::rw_mutex lock{};
lock.lock_shared(); lock.lock_shared();
std::cout << "[main] Locked shared lock" << std::endl; std::cout << "[main] Locked shared lock" << std::endl;
lock_released = false; lock_released = false;
lock_shared = true; lock_shared = true;
helper::create_thread([&]{ helper::create_thread([&]{
std::cout << "Awaiting exclusive lock" << std::endl; std::cout << "Awaiting exclusive lock" << std::endl;
lock.lock(); lock.lock();
std::cout << "Received exclusive lock" << std::endl; std::cout << "Received exclusive lock" << std::endl;
if(!lock_released) { if(!lock_released) {
std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl; std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl;
abort(); abort();
} }
lock.unlock(); lock.unlock();
}); });
std::this_thread::sleep_for(std::chrono::milliseconds {100}); std::this_thread::sleep_for(std::chrono::milliseconds {100});
if(auto err = lock.upgrade_lock(); err) { if(auto err = lock.upgrade_lock(); err) {
std::cerr << "Failed to upgrade lock: " << err << std::endl; std::cerr << "Failed to upgrade lock: " << err << std::endl;
abort(); abort();
} }
std::cout << "[main] Upgraded shared lock" << std::endl; std::cout << "[main] Upgraded shared lock" << std::endl;
lock_shared = false; lock_shared = false;
helper::create_thread([&]{ helper::create_thread([&]{
std::cout << "Awaiting shared lock" << std::endl; std::cout << "Awaiting shared lock" << std::endl;
lock.lock_shared(); lock.lock_shared();
std::cout << "Received shared lock" << std::endl; std::cout << "Received shared lock" << std::endl;
if(!lock_shared) { if(!lock_shared) {
std::cerr << "Acquired write lock even thou is hasn't been lock in shared mode!" << std::endl; std::cerr << "Acquired write lock even thou is hasn't been lock in shared mode!" << std::endl;
abort(); abort();
} }
lock.unlock_shared(); lock.unlock_shared();
}); });
std::this_thread::sleep_for(std::chrono::milliseconds {100}); std::this_thread::sleep_for(std::chrono::milliseconds {100});
lock_shared = true; lock_shared = true;
std::cout << "[main] Downgrading exclusive lock" << std::endl; std::cout << "[main] Downgrading exclusive lock" << std::endl;
lock.downgrade_lock(); lock.downgrade_lock();
std::this_thread::sleep_for(std::chrono::milliseconds {10}); std::this_thread::sleep_for(std::chrono::milliseconds {10});
lock_released = true; lock_released = true;
std::cout << "[main] Releasing shared lock" << std::endl; std::cout << "[main] Releasing shared lock" << std::endl;
lock.unlock_shared(); lock.unlock_shared();
helper::await_all_threads(); helper::await_all_threads();
} }
void testLockUpgradeRelease() { void testLockUpgradeRelease() {
{ {
ts::rw_mutex lock{}; ts::rw_mutex lock{};
lock.lock_shared(); lock.lock_shared();
(void) lock.upgrade_lock(); (void) lock.upgrade_lock();
lock.unlock(); lock.unlock();
} }
{ {
ts::rw_mutex lock{}; ts::rw_mutex lock{};
lock.lock_shared(); lock.lock_shared();
(void) lock.upgrade_lock(); (void) lock.upgrade_lock();
lock.downgrade_lock(); lock.downgrade_lock();
lock.unlock_shared(); lock.unlock_shared();
} }
} }
void testLockGuard() { void testLockGuard() {
ts::rw_mutex mutex{}; ts::rw_mutex mutex{};
ts::rwshared_lock lock{mutex}; ts::rwshared_lock lock{mutex};
bool lock_allowed{false}; bool lock_allowed{false};
if(auto err = lock.auto_lock_exclusive(); err) { if(auto err = lock.auto_lock_exclusive(); err) {
std::cerr << "Failed to upgrade lock: " << err << std::endl; std::cerr << "Failed to upgrade lock: " << err << std::endl;
abort(); abort();
} }
assert(lock.exclusive_locked()); assert(lock.exclusive_locked());
helper::create_thread([&]{ helper::create_thread([&]{
std::cout << "Awaiting exclusive lock" << std::endl; std::cout << "Awaiting exclusive lock" << std::endl;
mutex.lock(); mutex.lock();
std::cout << "Received exclusive lock" << std::endl; std::cout << "Received exclusive lock" << std::endl;
if(!lock_allowed) { if(!lock_allowed) {
std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl; std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl;
abort(); abort();
} }
mutex.unlock(); mutex.unlock();
}); });
std::this_thread::sleep_for(std::chrono::milliseconds {100}); std::this_thread::sleep_for(std::chrono::milliseconds {100});
lock.auto_lock_shared(); lock.auto_lock_shared();
std::this_thread::sleep_for(std::chrono::milliseconds {100}); std::this_thread::sleep_for(std::chrono::milliseconds {100});
lock_allowed = true; lock_allowed = true;
lock.auto_unlock(); lock.auto_unlock();
helper::await_all_threads(); helper::await_all_threads();
lock.lock_exclusive(); lock.lock_exclusive();
} }
int main() { int main() {
{ /* get rid of the unused warnings */ { /* get rid of the unused warnings */
ts::rw_unsafe_mutex mutex_a{}; ts::rw_unsafe_mutex mutex_a{};
ts::rw_safe_mutex mutex_b{}; ts::rw_safe_mutex mutex_b{};
} }
testSharedLockLock(); testSharedLockLock();
testExclusiveLock(); testExclusiveLock();
//testRW_Multithread(); //testRW_Multithread();
//TODO: Test lock order //TODO: Test lock order
testLockUpgrade(); testLockUpgrade();
testLockUpgradeRelease(); testLockUpgradeRelease();
testLockGuard(); testLockGuard();
} }