Added a RW lock
This commit is contained in:
parent
a086eefdc6
commit
ece70e4df4
@ -95,6 +95,8 @@ set(SOURCE_FILES
|
||||
src/misc/digest.cpp
|
||||
src/misc/base64.cpp
|
||||
|
||||
src/lock/rw_mutex.cpp
|
||||
|
||||
#Logger
|
||||
src/log/LogUtils.cpp
|
||||
src/log/LogSinks.cpp
|
||||
@ -299,5 +301,8 @@ if(BUILD_TESTS)
|
||||
|
||||
add_executable(GenerationTest test/generationTest.cpp ${SOURCE_FILES} ../server/MySQLLibSSLFix.c)
|
||||
target_link_libraries(GenerationTest ${TEST_LIBRARIES})
|
||||
|
||||
add_executable(RW-Lock-Test test/rw_lock.cpp src/lock/rw_mutex.cpp)
|
||||
target_link_libraries(GenerationTest ${TEST_LIBRARIES})
|
||||
endif()
|
||||
endif()
|
||||
|
5
src/lock/rw_mutex.cpp
Normal file
5
src/lock/rw_mutex.cpp
Normal file
@ -0,0 +1,5 @@
|
||||
//
|
||||
// Created by WolverinDEV on 03/03/2020.
|
||||
//
|
||||
|
||||
#include "rw_mutex.h"
|
671
src/lock/rw_mutex.h
Normal file
671
src/lock/rw_mutex.h
Normal file
@ -0,0 +1,671 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
#include <cassert>
|
||||
#include <algorithm>
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
|
||||
#if !defined(NDEBUG) or 1
|
||||
#define DEBUG_RW_MUTEX
|
||||
#define rw_mutex_assert(arg) assert(arg)
|
||||
#else
|
||||
#define rw_mutex_assert(arg)
|
||||
#endif
|
||||
|
||||
namespace ts {
|
||||
enum rw_action_result {
|
||||
success = 0,
|
||||
resource_error = -1,
|
||||
timeout = -2,
|
||||
would_deadlock = -3,
|
||||
};
|
||||
|
||||
template <bool write_preferred_>
|
||||
struct rw_mutex_options {
|
||||
constexpr static auto write_preferred{write_preferred_};
|
||||
};
|
||||
|
||||
struct mutex_action_validator {
|
||||
public:
|
||||
inline void try_exclusive_lock_acquire();
|
||||
inline void try_shared_lock_acquire();
|
||||
|
||||
inline void exclusive_lock_acquire();
|
||||
inline void shared_lock_acquire();
|
||||
|
||||
|
||||
inline void try_exclusive_lock_release();
|
||||
inline void try_shared_lock_release();
|
||||
|
||||
inline void exclusive_lock_release();
|
||||
inline void shared_lock_release();
|
||||
|
||||
|
||||
inline void try_shared_lock_upgrade();
|
||||
inline void try_exclusive_lock_downgrade();
|
||||
|
||||
inline void shared_lock_upgrade();
|
||||
inline void exclusive_lock_downgrade();
|
||||
|
||||
private:
|
||||
std::thread::id exclusive_lock{};
|
||||
bool exclusive_lock_upgraded{false};
|
||||
std::vector<std::thread::id> shared_lockers{};
|
||||
};
|
||||
|
||||
struct dummy_action_validator {
|
||||
inline void try_exclusive_lock_acquire() {}
|
||||
inline void try_shared_lock_acquire() {}
|
||||
|
||||
inline void exclusive_lock_acquire() {}
|
||||
inline void shared_lock_acquire() {}
|
||||
|
||||
|
||||
inline void try_exclusive_lock_release() {}
|
||||
inline void try_shared_lock_release() {}
|
||||
|
||||
inline void exclusive_lock_release() {}
|
||||
inline void shared_lock_release() {}
|
||||
|
||||
|
||||
inline void try_shared_lock_upgrade() {}
|
||||
inline void try_exclusive_lock_downgrade() {}
|
||||
|
||||
inline void shared_lock_upgrade() {}
|
||||
inline void exclusive_lock_downgrade() {}
|
||||
};
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
class rw_mutex_impl {
|
||||
public:
|
||||
rw_mutex_impl() = default;
|
||||
~rw_mutex_impl();
|
||||
|
||||
template <bool flag>
|
||||
rw_mutex_impl(const rw_mutex_impl<options_t, action_validator_t>&) = delete;
|
||||
|
||||
template <bool flag>
|
||||
rw_mutex_impl&operator=(const rw_mutex_impl<options_t, action_validator_t>&) = delete;
|
||||
|
||||
/**
|
||||
* Acquire the write lock
|
||||
*/
|
||||
inline void lock();
|
||||
|
||||
/**
|
||||
* Acquire the write lock with a timeout
|
||||
*/
|
||||
[[nodiscard]] inline std::cv_status lock_until(const std::chrono::system_clock::time_point& timeout);
|
||||
|
||||
/**
|
||||
* Acquire the write lock with a timeout
|
||||
*/
|
||||
template <typename duration_t>
|
||||
[[nodiscard]] inline std::cv_status lock_for(const duration_t& duration) {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
now += duration;
|
||||
return this->lock_until(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the read lock
|
||||
*/
|
||||
inline void lock_shared();
|
||||
|
||||
/**
|
||||
* Acquire the read lock with a timeout
|
||||
*/
|
||||
[[nodiscard]] inline std::cv_status lock_shared_until(const std::chrono::system_clock::time_point& timeout);
|
||||
|
||||
/**
|
||||
* Acquire the read lock with a timeout
|
||||
*/
|
||||
template <typename duration_t>
|
||||
[[nodiscard]] inline std::cv_status lock_shared_for(const duration_t& duration) {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
now += duration;
|
||||
return this->lock_shared_until(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the write lock
|
||||
*/
|
||||
inline void unlock();
|
||||
|
||||
/**
|
||||
* Release the read lock
|
||||
*/
|
||||
inline void unlock_shared();
|
||||
|
||||
/**
|
||||
* Upgrade from a shared lock to an exclusive lock.
|
||||
* Could fail due to the following reasons:
|
||||
* would_deadlock: Another thread already tried to upgrade the lock and we're not supposed to release the shared lock
|
||||
* resource_error: Another thread already claimed the write lock while we're waiting
|
||||
*
|
||||
* Note: This will cause a deadlock if the lock has been locked shared twice
|
||||
*/
|
||||
[[nodiscard]] inline rw_action_result upgrade_lock();
|
||||
|
||||
/**
|
||||
* Upgrade from a shared lock to an exclusive lock.
|
||||
* Could fail due to the following reasons:
|
||||
* would_deadlock: Another thread already tried to upgrade the lock and we're not supposed to release the shared lock
|
||||
* resource_error: Another thread already claimed the write lock while we're waiting
|
||||
* timeout: If the action could not be performed within the given time frame
|
||||
* Note: This will cause a deadlock if the lock has been locked shared twice
|
||||
*/
|
||||
[[nodiscard]] inline rw_action_result upgrade_lock_until(const std::chrono::system_clock::time_point& timeout);
|
||||
|
||||
/**
|
||||
* Upgrade from a shared lock to an exclusive lock with an timeout.
|
||||
* For return codes see upgrade_lock_until.
|
||||
*/
|
||||
template <typename duration_t>
|
||||
[[nodiscard]] inline rw_action_result upgrade_lock_for(const duration_t& duration) {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
now += duration;
|
||||
return this->upgrade_lock_until(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Downgrade from an exclusive lock to a shared lock.
|
||||
* No other thread could lock the lock exclusive until unlock_shared() has been called.
|
||||
*/
|
||||
inline void downgrade_lock();
|
||||
private:
|
||||
action_validator_t action_validator_{};
|
||||
|
||||
std::mutex status_lock_{};
|
||||
std::condition_variable upgrade_update_{};
|
||||
std::condition_variable write_update_{};
|
||||
std::condition_variable read_update_{};
|
||||
|
||||
bool write_locked{false};
|
||||
bool upgrade_pending{false}, write_lock_upgraded{false};
|
||||
uint32_t write_lock_pending{0};
|
||||
uint32_t read_lock_pending{0};
|
||||
uint32_t read_lock_count{0};
|
||||
};
|
||||
|
||||
typedef rw_mutex_impl<rw_mutex_options<true>, mutex_action_validator> rw_safe_mutex;
|
||||
typedef rw_mutex_impl<rw_mutex_options<true>, mutex_action_validator> rw_unsafe_mutex;
|
||||
typedef rw_safe_mutex rw_mutex;
|
||||
|
||||
template <typename lock_t>
|
||||
struct rwshared_lock {
|
||||
public:
|
||||
explicit rwshared_lock(lock_t& lock) : lock_{lock} {
|
||||
this->lock_.lock_shared();
|
||||
this->lock_type_ = locked_shared;
|
||||
}
|
||||
|
||||
~rwshared_lock() {
|
||||
if(this->lock_type_ == locked_shared) {
|
||||
this->lock_.unlock_shared();
|
||||
} else if(this->lock_type_ == locked_exclusive) {
|
||||
this->lock_.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
rwshared_lock(const rwshared_lock&) = delete;
|
||||
rwshared_lock&operator=(const rwshared_lock&) = delete;
|
||||
|
||||
/* state testers */
|
||||
[[nodiscard]] inline bool exclusive_locked() const {
|
||||
return this->lock_type_ == locked_exclusive;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline bool shared_locked() const {
|
||||
return this->lock_type_ == locked_shared;
|
||||
}
|
||||
|
||||
/* basic lock functions */
|
||||
inline void lock_shared() {
|
||||
rw_mutex_assert(this->lock_type_ == unlocked);
|
||||
this->lock_.lock_shared();
|
||||
this->lock_type_ = locked_shared;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline std::cv_status lock_shared_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
rw_mutex_assert(this->lock_type_ == unlocked);
|
||||
if(auto error = this->lock_.lock_shared_until(timeout); error == std::cv_status::timeout)
|
||||
return error;
|
||||
this->lock_type_ = locked_shared;
|
||||
return std::cv_status::no_timeout;
|
||||
}
|
||||
|
||||
inline void unlock_shared() {
|
||||
rw_mutex_assert(this->lock_type_ == locked_shared);
|
||||
this->lock_.unlock_shared();
|
||||
this->lock_type_ = unlocked;
|
||||
}
|
||||
|
||||
inline void lock_exclusive() {
|
||||
rw_mutex_assert(this->lock_type_ == unlocked);
|
||||
this->lock_.lock();
|
||||
this->lock_type_ = locked_exclusive;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline std::cv_status lock_exclusive_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
rw_mutex_assert(this->lock_type_ == unlocked);
|
||||
if(auto error = this->lock_.lock_until(timeout); error == std::cv_status::timeout)
|
||||
return error;
|
||||
this->lock_type_ = locked_exclusive;
|
||||
return std::cv_status::no_timeout;
|
||||
}
|
||||
|
||||
inline void unlock_exclusive() {
|
||||
rw_mutex_assert(this->lock_type_ == locked_exclusive);
|
||||
this->lock_.unlock();
|
||||
this->lock_type_ = unlocked;
|
||||
}
|
||||
|
||||
/* upgrade/downgrade functions */
|
||||
[[nodiscard]] inline rw_action_result upgrade_lock() {
|
||||
rw_mutex_assert(this->lock_type_ == locked_shared);
|
||||
auto err = this->lock_.upgrade_lock();
|
||||
if(err != rw_action_result::success) return err;
|
||||
|
||||
this->lock_type_ = locked_exclusive;
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline rw_action_result upgrade_lock_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
rw_mutex_assert(this->lock_type_ == locked_shared);
|
||||
auto err = this->lock_.upgrade_lock_until(timeout);
|
||||
if(err != rw_action_result::success) return err;
|
||||
|
||||
this->lock_type_ = locked_exclusive;
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
inline void downgrade_lock() {
|
||||
rw_mutex_assert(this->lock_type_ == locked_exclusive);
|
||||
this->lock_.downgrade_lock();
|
||||
this->lock_type_ = locked_shared;
|
||||
}
|
||||
|
||||
/* auto lock to the target state */
|
||||
[[nodiscard]] inline rw_action_result auto_lock_exclusive() {
|
||||
if(this->lock_type_ == unlocked)
|
||||
this->lock_exclusive();
|
||||
else if(this->lock_type_ == locked_shared)
|
||||
return this->upgrade_lock();
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline rw_action_result auto_lock_exclusive_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
if(this->lock_type_ == unlocked)
|
||||
this->lock_exclusive_until(timeout);
|
||||
else if(this->lock_type_ == locked_shared)
|
||||
return this->upgrade_lock_until(timeout);
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
inline void auto_lock_shared() {
|
||||
if(this->lock_type_ == unlocked)
|
||||
this->lock_shared();
|
||||
else if(this->lock_type_ == locked_exclusive)
|
||||
this->downgrade_lock();
|
||||
}
|
||||
|
||||
[[nodiscard]] inline std::cv_status auto_lock_shared_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
if(this->lock_type_ == unlocked)
|
||||
return this->lock_shared_until(timeout);
|
||||
else if(this->lock_type_ == locked_exclusive)
|
||||
this->downgrade_lock_until();
|
||||
return std::cv_status::no_timeout;
|
||||
}
|
||||
|
||||
void auto_unlock() {
|
||||
if(this->lock_type_ == locked_shared)
|
||||
this->unlock_shared();
|
||||
else if(this->lock_type_ == locked_exclusive)
|
||||
this->unlock_exclusive();
|
||||
}
|
||||
private:
|
||||
enum {
|
||||
unlocked,
|
||||
locked_shared,
|
||||
locked_exclusive
|
||||
} lock_type_;
|
||||
|
||||
lock_t& lock_;
|
||||
};
|
||||
}
|
||||
|
||||
/* the implementation */
|
||||
namespace ts {
|
||||
#if __cplusplus > 201703L and 0
|
||||
#define unlikely_annotation [[unlikely]]
|
||||
#else
|
||||
#define unlikely_annotation
|
||||
#endif
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
#ifndef DEBUG_RW_MUTEX
|
||||
rw_mutex_impl<options_t, action_validator_t>::~rw_mutex_impl() = default;
|
||||
#else
|
||||
rw_mutex_impl<options_t, action_validator_t>::~rw_mutex_impl() {
|
||||
rw_mutex_assert(!this->write_locked);
|
||||
rw_mutex_assert(!this->upgrade_pending);
|
||||
rw_mutex_assert(this->write_lock_pending == 0);
|
||||
rw_mutex_assert(this->read_lock_pending == 0);
|
||||
rw_mutex_assert(this->read_lock_count == 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
void rw_mutex_impl<options_t, action_validator_t>::lock() {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_exclusive_lock_acquire();
|
||||
|
||||
this->write_lock_pending++;
|
||||
acquire: {
|
||||
while (this->read_lock_count > 0) unlikely_annotation
|
||||
this->write_update_.wait(slock);
|
||||
|
||||
while (this->write_locked) unlikely_annotation
|
||||
this->write_update_.wait(slock);
|
||||
|
||||
/*
|
||||
* Even when write_preferred is enabled, another thread which wants to read lock claims the mutex
|
||||
* and read locks the lock before the write could be acquired.
|
||||
*/
|
||||
if(this->read_lock_count > 0) /* this could be changed while waiting for the write lock to unlock */
|
||||
goto acquire;
|
||||
}
|
||||
|
||||
this->write_locked = true;
|
||||
this->write_lock_pending--;
|
||||
this->action_validator_.exclusive_lock_acquire();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
std::cv_status rw_mutex_impl<options_t, action_validator_t>::lock_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_exclusive_lock_acquire();
|
||||
this->write_lock_pending++;
|
||||
acquire: {
|
||||
while(this->read_lock_count > 0) unlikely_annotation
|
||||
if(auto err = this->write_update_.wait_until(slock, timeout); err != std::cv_status::no_timeout) {
|
||||
assert(this->write_lock_pending > 0);
|
||||
this->write_lock_pending--;
|
||||
return std::cv_status::timeout;
|
||||
}
|
||||
|
||||
while(this->write_locked) unlikely_annotation
|
||||
if(auto err = this->write_update_.wait_until(slock, timeout); err != std::cv_status::no_timeout) {
|
||||
assert(this->write_lock_pending > 0);
|
||||
this->write_lock_pending--;
|
||||
return std::cv_status::timeout;
|
||||
}
|
||||
|
||||
/*
|
||||
* Even when write_preferred is enabled, another thread which wants to read lock claims the mutex
|
||||
* and read locks the lock before the write could be acquired.
|
||||
*/
|
||||
if(this->read_lock_count > 0) /* this could be changed while waiting for the write lock to unlock */
|
||||
goto acquire;
|
||||
}
|
||||
|
||||
this->write_locked = true;
|
||||
this->write_lock_pending--;
|
||||
this->action_validator_.exclusive_lock_acquire();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
void rw_mutex_impl<options_t, action_validator_t>::lock_shared() {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_shared_lock_acquire();
|
||||
this->read_lock_pending++;
|
||||
while(this->write_locked) unlikely_annotation
|
||||
this->read_update_.wait(slock);
|
||||
|
||||
this->read_lock_count++;
|
||||
this->read_lock_pending--;
|
||||
this->action_validator_.shared_lock_acquire();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
std::cv_status rw_mutex_impl<options_t, action_validator_t>::lock_shared_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_shared_lock_acquire();
|
||||
this->read_lock_pending++;
|
||||
while(this->write_locked) unlikely_annotation
|
||||
if(auto err = this->read_update_.wait_until(slock, timeout); err != std::cv_status::no_timeout) {
|
||||
rw_mutex_assert(this->read_lock_pending > 0);
|
||||
this->read_lock_pending--;
|
||||
return std::cv_status::timeout;
|
||||
}
|
||||
|
||||
this->read_lock_count++;
|
||||
this->read_lock_pending--;
|
||||
this->action_validator_.shared_lock_acquire();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
void rw_mutex_impl<options_t, action_validator_t>::unlock() {
|
||||
std::lock_guard slock{this->status_lock_};
|
||||
this->action_validator_.try_exclusive_lock_release();
|
||||
rw_mutex_assert(this->write_locked);
|
||||
|
||||
this->write_locked = false;
|
||||
if(this->write_lock_upgraded) unlikely_annotation {
|
||||
rw_mutex_assert(this->read_lock_count == 1); /* is was a upgraded lock */
|
||||
this->read_lock_count--;
|
||||
this->write_lock_upgraded = false;
|
||||
} else {
|
||||
rw_mutex_assert(this->read_lock_count == 0);
|
||||
}
|
||||
|
||||
if(options_t::write_preferred) {
|
||||
if(this->write_lock_pending > 0)
|
||||
this->write_update_.notify_one();
|
||||
else if(this->read_lock_pending > 0)
|
||||
this->read_update_.notify_one();
|
||||
} else {
|
||||
if(this->read_lock_pending > 0)
|
||||
this->read_update_.notify_all();
|
||||
else if(this->write_lock_pending > 0)
|
||||
this->write_update_.notify_one(); /* only one thread could write at the time */
|
||||
}
|
||||
this->action_validator_.exclusive_lock_release();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
void rw_mutex_impl<options_t, action_validator_t>::unlock_shared() {
|
||||
std::lock_guard slock{this->status_lock_};
|
||||
this->action_validator_.try_shared_lock_release();
|
||||
rw_mutex_assert(!this->write_locked);
|
||||
rw_mutex_assert(this->read_lock_count > 0);
|
||||
|
||||
this->read_lock_count--;
|
||||
if(this->read_lock_count == 0) {
|
||||
if(this->write_lock_pending > 0) {
|
||||
/* notify all writers that we're ready to lock */
|
||||
rw_mutex_assert(!this->upgrade_pending);
|
||||
this->write_update_.notify_one(); /* only one thread could write at the time */
|
||||
}
|
||||
} else if(this->read_lock_count == 1) unlikely_annotation {
|
||||
if(this->upgrade_pending) {
|
||||
this->write_locked = true;
|
||||
this->upgrade_pending = false;
|
||||
this->upgrade_update_.notify_one();
|
||||
}
|
||||
}
|
||||
this->action_validator_.shared_lock_release();
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
rw_action_result rw_mutex_impl<options_t, action_validator_t>::upgrade_lock() {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_shared_lock_upgrade();
|
||||
rw_mutex_assert(!this->write_locked);
|
||||
rw_mutex_assert(this->read_lock_count > 0);
|
||||
|
||||
if(this->upgrade_pending)
|
||||
return rw_action_result::would_deadlock;
|
||||
if(this->write_locked)
|
||||
return rw_action_result::resource_error;
|
||||
|
||||
this->upgrade_pending = true;
|
||||
if(this->read_lock_count > 1) {
|
||||
while(this->read_lock_count > 1) unlikely_annotation
|
||||
this->upgrade_update_.wait(slock);
|
||||
|
||||
/* whoever allowed us to upgrade should have done that already */
|
||||
rw_mutex_assert(this->write_locked);
|
||||
rw_mutex_assert(!this->upgrade_pending);
|
||||
}
|
||||
this->upgrade_pending = false;
|
||||
this->write_lock_upgraded = true;
|
||||
this->write_locked = true;
|
||||
|
||||
this->action_validator_.shared_lock_upgrade();
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
rw_action_result rw_mutex_impl<options_t, action_validator_t>::upgrade_lock_until(const std::chrono::system_clock::time_point& timeout) {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_shared_lock_upgrade();
|
||||
rw_mutex_assert(!this->write_locked);
|
||||
rw_mutex_assert(this->read_lock_count > 0);
|
||||
|
||||
if(this->upgrade_pending)
|
||||
return rw_action_result::would_deadlock;
|
||||
if(this->write_locked)
|
||||
return rw_action_result::resource_error;
|
||||
|
||||
this->upgrade_pending = true;
|
||||
if(this->read_lock_count > 1) {
|
||||
while(this->read_lock_count > 1) unlikely_annotation
|
||||
if(auto err = this->upgrade_update_.wait_until(slock, timeout); err != std::cv_status::no_timeout) {
|
||||
this->upgrade_pending = false;
|
||||
return rw_action_result::timeout;
|
||||
}
|
||||
|
||||
/* whoever allowed us to upgrade should have done that already */
|
||||
rw_mutex_assert(this->write_locked);
|
||||
rw_mutex_assert(!this->upgrade_pending);
|
||||
}
|
||||
this->upgrade_pending = false;
|
||||
this->write_lock_upgraded = true;
|
||||
this->write_locked = true;
|
||||
this->action_validator_.shared_lock_upgrade();
|
||||
|
||||
return rw_action_result::success;
|
||||
}
|
||||
|
||||
template <typename options_t, typename action_validator_t>
|
||||
void rw_mutex_impl<options_t, action_validator_t>::downgrade_lock() {
|
||||
std::unique_lock slock{this->status_lock_};
|
||||
this->action_validator_.try_exclusive_lock_downgrade();
|
||||
rw_mutex_assert(this->write_locked);
|
||||
|
||||
if(this->write_lock_upgraded) {
|
||||
rw_mutex_assert(this->read_lock_count == 1); /* our own thread */
|
||||
} else unlikely_annotation {
|
||||
this->read_lock_count++;
|
||||
}
|
||||
|
||||
this->write_lock_upgraded = false;
|
||||
this->write_locked = false;
|
||||
this->read_update_.notify_all();
|
||||
this->action_validator_.exclusive_lock_downgrade();
|
||||
}
|
||||
|
||||
/* action validator */
|
||||
/* shared lock part */
|
||||
void mutex_action_validator::try_shared_lock_acquire() {
|
||||
if(this->exclusive_lock == std::this_thread::get_id())
|
||||
throw std::logic_error{"mutex has been locked in exclusive lock by current thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::shared_lock_acquire() {
|
||||
this->shared_lockers.push_back(std::this_thread::get_id());
|
||||
}
|
||||
|
||||
void mutex_action_validator::try_shared_lock_upgrade() {
|
||||
if(this->exclusive_lock == std::this_thread::get_id())
|
||||
throw std::logic_error{"mutex has been upgraded by current thread"};
|
||||
|
||||
auto current_id = std::this_thread::get_id();
|
||||
auto count = std::count_if(this->shared_lockers.begin(), this->shared_lockers.end(), [&](const auto& id) {
|
||||
return id == current_id;
|
||||
});
|
||||
|
||||
if(count == 0)
|
||||
throw std::logic_error{"upgrade not possible because shared mutex is not locked by current thread"};
|
||||
else if(count > 1)
|
||||
throw std::logic_error{"upgrade not possible because shared mutex is locked more than once by current thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::shared_lock_upgrade() {
|
||||
this->exclusive_lock_upgraded = true;
|
||||
this->exclusive_lock = std::this_thread::get_id();
|
||||
}
|
||||
|
||||
void mutex_action_validator::try_shared_lock_release() {
|
||||
if(this->exclusive_lock == std::this_thread::get_id())
|
||||
throw std::logic_error{this->exclusive_lock_upgraded ? "mutex has been upgraded" : "mutex has been locked in exclusive mode, not in shared mode"};
|
||||
|
||||
auto it = std::find(this->shared_lockers.begin(), this->shared_lockers.end(), std::this_thread::get_id());
|
||||
if(it == this->shared_lockers.end())
|
||||
throw std::logic_error{"mutex not locked by current thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::shared_lock_release() {
|
||||
auto it = std::find(this->shared_lockers.begin(), this->shared_lockers.end(), std::this_thread::get_id());
|
||||
rw_mutex_assert(it != this->shared_lockers.end()); /* this should never happen (try_shared_lock_release has been called before) */
|
||||
this->shared_lockers.erase(it);
|
||||
}
|
||||
|
||||
/* exclusive lock part */
|
||||
void mutex_action_validator::try_exclusive_lock_acquire() {
|
||||
if(this->exclusive_lock == std::this_thread::get_id())
|
||||
throw std::logic_error{"mutex has been exclusive locked by current thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::exclusive_lock_acquire() {
|
||||
this->exclusive_lock = std::this_thread::get_id();
|
||||
this->exclusive_lock_upgraded = false;
|
||||
}
|
||||
|
||||
void mutex_action_validator::try_exclusive_lock_downgrade() {
|
||||
if(this->exclusive_lock != std::this_thread::get_id())
|
||||
throw std::logic_error{"mutex hasn't been locked in exclusive mode by this thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::exclusive_lock_downgrade() {
|
||||
if(!this->exclusive_lock_upgraded) {
|
||||
this->shared_lockers.push_back(std::this_thread::get_id());
|
||||
} else {
|
||||
/* internal state error check */
|
||||
rw_mutex_assert(std::find(this->shared_lockers.begin(), this->shared_lockers.end(), std::this_thread::get_id()) != this->shared_lockers.end());
|
||||
}
|
||||
|
||||
this->exclusive_lock_upgraded = false;
|
||||
this->exclusive_lock = std::thread::id{};
|
||||
}
|
||||
|
||||
void mutex_action_validator::try_exclusive_lock_release() {
|
||||
if(this->exclusive_lock != std::this_thread::get_id())
|
||||
throw std::logic_error{"mutex hasn't been locked in exclusive mode by this thread"};
|
||||
}
|
||||
|
||||
void mutex_action_validator::exclusive_lock_release() {
|
||||
if(this->exclusive_lock_upgraded) {
|
||||
auto it = std::find(this->shared_lockers.begin(), this->shared_lockers.end(), std::this_thread::get_id());
|
||||
assert(it != this->shared_lockers.end());
|
||||
this->shared_lockers.erase(it);
|
||||
}
|
||||
this->exclusive_lock_upgraded = false;
|
||||
this->exclusive_lock = std::thread::id{};
|
||||
}
|
||||
}
|
||||
#undef rw_mutex_assert
|
269
test/rw_lock.cpp
Normal file
269
test/rw_lock.cpp
Normal file
@ -0,0 +1,269 @@
|
||||
//
|
||||
// Created by WolverinDEV on 03/03/2020.
|
||||
//
|
||||
|
||||
#include "src/lock/rw_mutex.h"
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <algorithm>
|
||||
|
||||
namespace helper {
|
||||
std::mutex threads_lock{};
|
||||
size_t running_threads{0};
|
||||
std::vector<std::thread> thread_handles{};
|
||||
bool sync_pending{false};
|
||||
|
||||
inline void sync_threads() {
|
||||
static std::condition_variable sync_cv;
|
||||
static size_t synced_threads{0};
|
||||
|
||||
auto timeout = std::chrono::system_clock::now() + std::chrono::seconds{5};
|
||||
std::unique_lock lock{threads_lock};
|
||||
if(++synced_threads == running_threads) {
|
||||
sync_cv.notify_all();
|
||||
synced_threads = 0;
|
||||
sync_pending = false;
|
||||
return;
|
||||
}
|
||||
|
||||
sync_pending = true;
|
||||
if(sync_cv.wait_until(lock, timeout) == std::cv_status::timeout) {
|
||||
std::cerr << "failed to sync threads" << std::endl;
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename... args_t>
|
||||
inline void create_thread(args_t&&... arguments) {
|
||||
auto callback = std::bind(arguments...);
|
||||
|
||||
std::lock_guard tlock{threads_lock};
|
||||
thread_handles.emplace_back([callback]{
|
||||
callback();
|
||||
|
||||
std::lock_guard tlock{threads_lock};
|
||||
auto it = std::find_if(thread_handles.begin(), thread_handles.end(), [](const auto& thread) {
|
||||
return thread.get_id() == std::this_thread::get_id();
|
||||
});
|
||||
if(it == thread_handles.end())
|
||||
return; /* thread will be joined via await_all_threads */
|
||||
|
||||
if(sync_pending) {
|
||||
std::cerr << "thread terminated while sync was pending" << std::endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
running_threads--;
|
||||
it->detach();
|
||||
thread_handles.erase(it);
|
||||
});
|
||||
running_threads++;
|
||||
}
|
||||
|
||||
inline void await_all_threads() {
|
||||
std::unique_lock thread_lock{threads_lock};
|
||||
while(!thread_handles.empty()) {
|
||||
auto thread = std::move(thread_handles.back());
|
||||
thread_handles.pop_back();
|
||||
thread_lock.unlock();
|
||||
thread.join();
|
||||
thread_lock.lock();
|
||||
|
||||
if(sync_pending) {
|
||||
std::cerr << "thread terminated while sync was pending" << std::endl;
|
||||
abort();
|
||||
}
|
||||
running_threads--;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void testSharedLockLock() {
|
||||
ts::rw_mutex lock{};
|
||||
|
||||
lock.lock_shared();
|
||||
lock.lock_shared();
|
||||
|
||||
lock.unlock_shared();
|
||||
lock.lock_shared();
|
||||
lock.unlock_shared();
|
||||
lock.unlock_shared();
|
||||
}
|
||||
|
||||
void testExclusiveLock() {
|
||||
ts::rw_mutex lock{};
|
||||
|
||||
size_t read_locked{0}, write_locked{0};
|
||||
for(int i = 0; i < 10; i++) {
|
||||
if(rand() % 2 == 0) {
|
||||
write_locked++;
|
||||
lock.lock();
|
||||
lock.unlock();
|
||||
} else {
|
||||
read_locked++;
|
||||
lock.lock_shared();
|
||||
lock.unlock_shared();
|
||||
}
|
||||
}
|
||||
|
||||
/* to ensure that the read and write function has been tested more than once */
|
||||
assert(read_locked > 1);
|
||||
assert(write_locked > 1);
|
||||
}
|
||||
|
||||
void testRW_Multithread() {
|
||||
ts::rw_mutex lock{};
|
||||
|
||||
for(int i = 0; i < 20; i++) {
|
||||
const auto do_sync = i < 5;
|
||||
|
||||
std::atomic<bool> write_locked{false};
|
||||
helper::create_thread([&]{
|
||||
lock.lock();
|
||||
write_locked = true;
|
||||
if(do_sync) helper::sync_threads(); /* sync point 1 */
|
||||
std::this_thread::sleep_for(std::chrono::seconds{1});
|
||||
write_locked = false;
|
||||
lock.unlock();
|
||||
});
|
||||
|
||||
helper::create_thread([&]{
|
||||
if(do_sync) helper::sync_threads(); /* sync point 1 */
|
||||
lock.lock_shared();
|
||||
std::cout << "shared locked" << std::endl;
|
||||
if(write_locked) {
|
||||
std::cerr << "Shared lock succeeded, but thread has been write locked!" << std::endl;
|
||||
return;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds{1});
|
||||
std::cout << "unlocking shared lock" << std::endl;
|
||||
lock.unlock_shared();
|
||||
});
|
||||
helper::await_all_threads();
|
||||
std::cout << "Loop " << i << " passed" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void testLockUpgrade() {
|
||||
std::atomic<bool> lock_released{true};
|
||||
std::atomic<bool> lock_shared{false};
|
||||
ts::rw_mutex lock{};
|
||||
|
||||
lock.lock_shared();
|
||||
std::cout << "[main] Locked shared lock" << std::endl;
|
||||
lock_released = false;
|
||||
lock_shared = true;
|
||||
|
||||
helper::create_thread([&]{
|
||||
std::cout << "Awaiting exclusive lock" << std::endl;
|
||||
lock.lock();
|
||||
std::cout << "Received exclusive lock" << std::endl;
|
||||
if(!lock_released) {
|
||||
std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
});
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds {100});
|
||||
|
||||
if(auto err = lock.upgrade_lock(); err) {
|
||||
std::cerr << "Failed to upgrade lock: " << err << std::endl;
|
||||
abort();
|
||||
}
|
||||
std::cout << "[main] Upgraded shared lock" << std::endl;
|
||||
lock_shared = false;
|
||||
|
||||
helper::create_thread([&]{
|
||||
std::cout << "Awaiting shared lock" << std::endl;
|
||||
lock.lock_shared();
|
||||
std::cout << "Received shared lock" << std::endl;
|
||||
if(!lock_shared) {
|
||||
std::cerr << "Acquired write lock even thou is hasn't been lock in shared mode!" << std::endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
lock.unlock_shared();
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds {100});
|
||||
lock_shared = true;
|
||||
std::cout << "[main] Downgrading exclusive lock" << std::endl;
|
||||
lock.downgrade_lock();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds {10});
|
||||
lock_released = true;
|
||||
std::cout << "[main] Releasing shared lock" << std::endl;
|
||||
lock.unlock_shared();
|
||||
|
||||
helper::await_all_threads();
|
||||
}
|
||||
|
||||
void testLockUpgradeRelease() {
|
||||
{
|
||||
ts::rw_mutex lock{};
|
||||
lock.lock_shared();
|
||||
(void) lock.upgrade_lock();
|
||||
lock.unlock();
|
||||
}
|
||||
{
|
||||
ts::rw_mutex lock{};
|
||||
lock.lock_shared();
|
||||
(void) lock.upgrade_lock();
|
||||
lock.downgrade_lock();
|
||||
lock.unlock_shared();
|
||||
}
|
||||
}
|
||||
|
||||
void testLockGuard() {
|
||||
ts::rw_mutex mutex{};
|
||||
ts::rwshared_lock lock{mutex};
|
||||
bool lock_allowed{false};
|
||||
|
||||
if(auto err = lock.auto_lock_exclusive(); err) {
|
||||
std::cerr << "Failed to upgrade lock: " << err << std::endl;
|
||||
abort();
|
||||
}
|
||||
assert(lock.exclusive_locked());
|
||||
|
||||
helper::create_thread([&]{
|
||||
std::cout << "Awaiting exclusive lock" << std::endl;
|
||||
mutex.lock();
|
||||
std::cout << "Received exclusive lock" << std::endl;
|
||||
if(!lock_allowed) {
|
||||
std::cerr << "Acquired write lock even thou is hasn't been released!" << std::endl;
|
||||
abort();
|
||||
}
|
||||
|
||||
mutex.unlock();
|
||||
});
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds {100});
|
||||
|
||||
lock.auto_lock_shared();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds {100});
|
||||
lock_allowed = true;
|
||||
lock.auto_unlock();
|
||||
helper::await_all_threads();
|
||||
|
||||
lock.lock_exclusive();
|
||||
}
|
||||
|
||||
int main() {
|
||||
{ /* get rid of the unused warnings */
|
||||
ts::rw_unsafe_mutex mutex_a{};
|
||||
ts::rw_safe_mutex mutex_b{};
|
||||
}
|
||||
|
||||
testSharedLockLock();
|
||||
testExclusiveLock();
|
||||
//testRW_Multithread();
|
||||
//TODO: Test lock order
|
||||
testLockUpgrade();
|
||||
testLockUpgradeRelease();
|
||||
testLockGuard();
|
||||
}
|
Loading…
Reference in New Issue
Block a user