From b4cde3fc213a5541c3a9dc95a67647c9efd62ae7 Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 14 Apr 2018 04:11:03 +0300 Subject: [PATCH] Added missing files --- include/spdlog/async.h | 61 +++++++ include/spdlog/details/thread_pool.h | 257 +++++++++++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 include/spdlog/async.h create mode 100644 include/spdlog/details/thread_pool.h diff --git a/include/spdlog/async.h b/include/spdlog/async.h new file mode 100644 index 00000000..925976bc --- /dev/null +++ b/include/spdlog/async.h @@ -0,0 +1,61 @@ + +// +// Copyright(c) 2018 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +#pragma once + +// +// async logging using global thread pool +// all loggers created here share same global thread pool. +// each log message is pushed to a queue along withe a shared pointer to the logger. +// If a logger gets out of scope or deleted while having pending messages in the queue, +// it's destruction will defer until all its messages are processed by the thread pool. + +#include "async_logger.h" +#include "details/registry.h" +#include "details/thread_pool.h" + +#include +namespace spdlog { + +// async logger factory- creates a-synchronous loggers +// creates a global thread pool with default queue size of 8192 items and single thread. +struct create_async +{ + template + static std::shared_ptr create(const std::string &logger_name, SinkArgs &&... args) + { + using details::registry; + + std::lock_guard lock(registry::instance().tp_mutex()); + auto tp = registry::instance().get_thread_pool(); + if (tp == nullptr) + { + tp = std::make_shared(8192, 1); + registry::instance().set_thread_pool(tp); + } + + auto sink = std::make_shared(std::forward(args)...); + auto new_logger = std::make_shared(logger_name, std::move(sink), std::move(tp), async_overflow_policy::block_retry); + registry::instance().register_and_init(new_logger); + return new_logger; + } +}; + +template +inline std::shared_ptr create_as(const std::string &logger_name, SinkArgs &&... sink_args) +{ + return create_async::create(logger_name, std::forward(sink_args)...); +} + +// set global thread pool. q_size must be power of 2 +inline void init_thread_pool(size_t q_size, size_t thread_count) +{ + using details::registry; + using details::thread_pool; + auto tp = std::make_shared(q_size, thread_count); + registry::instance().set_thread_pool(std::move(tp)); +} +} // namespace spdlog diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h new file mode 100644 index 00000000..7c4990f7 --- /dev/null +++ b/include/spdlog/details/thread_pool.h @@ -0,0 +1,257 @@ +#pragma once + +#include "../details/log_msg.h" +#include "../details/mpmc_bounded_q.h" +#include "../details/os.h" + +#include +#include +#include +#include + +namespace spdlog { +namespace details { + +using async_logger_ptr = std::shared_ptr; + +enum class async_msg_type +{ + log, + flush, + terminate +}; + +// Async msg to move to/from the queue +// Movable only. should never be copied +struct async_msg +{ + async_msg_type msg_type; + level::level_enum level{level::info}; + log_clock::time_point time; + size_t thread_id; + fmt::MemoryWriter raw; + + size_t msg_id; + async_logger_ptr worker_ptr{nullptr}; + + async_msg() = default; + ~async_msg() = default; + + // never copy or assign. should only be move assigned in to the queue.. + async_msg(const async_msg &) = delete; + async_msg &operator=(const async_msg &other) = delete; + async_msg(async_msg &&other) = delete; + + // construct from log_msg with given type + async_msg(async_logger_ptr &&worker, async_msg_type the_type, details::log_msg &&m) + : msg_type(the_type) + , level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , raw(std::move(m.raw)) + , msg_id(m.msg_id) + , worker_ptr(std::forward(worker)) + { + } + + async_msg(async_logger_ptr &&worker, async_msg_type the_type) + : async_msg(std::forward(worker), the_type, details::log_msg()) + { + } + + async_msg(async_msg_type the_type) + : async_msg(nullptr, the_type, details::log_msg()) + { + } + + // used to move to the message queue + async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT + { + msg_type = other.msg_type; + level = other.level; + time = other.time; + thread_id = other.thread_id; + raw = std::move(other.raw); + msg_id = other.msg_id; + worker_ptr = std::move(other.worker_ptr); + return *this; + } + + // copy into log_msg + void to_log_msg(log_msg &msg) + { + msg.logger_name = &worker_ptr->name(); + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw = std::move(raw); + msg.formatted.clear(); + msg.msg_id = msg_id; + msg.color_range_start = 0; + msg.color_range_end = 0; + } +}; + +class thread_pool +{ +public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue; + using clock_type = std::chrono::steady_clock; + + thread_pool(size_t q_size_bytes, size_t threads_n) + : _msg_counter(0) + , _q(q_size_bytes) + { + // std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl; + if (threads_n == 0 || threads_n > 1000) + { + throw spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)"); + } + for (size_t i = 0; i < threads_n; i++) + { + _threads.emplace_back(std::bind(&thread_pool::_worker_loop, this)); + } + } + + // message all threads to terminate gracefully join them + ~thread_pool() + { + try + { + for (size_t i = 0; i < _threads.size(); i++) + { + _post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + } + + for (auto &t : _threads) + { + t.join(); + } + // std::cout << "~thread_pool() _msg_counter: " << _msg_counter << std::endl; + } + catch (...) + { + } + } + + void post_log(async_logger_ptr &&worker_ptr, details::log_msg &&msg, async_overflow_policy overflow_policy) + { + async_msg as_m(std::forward(worker_ptr), async_msg_type::log, std::forward(msg)); + _post_async_msg(std::move(as_m), overflow_policy); + } + + void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) + { + _post_async_msg(async_msg(std::forward(worker_ptr), async_msg_type::flush), overflow_policy); + } + + size_t msg_counter() + { + return _msg_counter.load(std::memory_order_relaxed); + } + + void wait_empty_q() + { + auto last_op = clock_type::now(); + while (!_q.is_empty()) + { + sleep_or_yield(clock_type::now(), last_op); + } + } + +private: + std::atomic _msg_counter; // total # of messages processed in this pool + q_type _q; + + std::vector _threads; + + void _post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy) + { + + if (!_q.enqueue(std::forward(new_msg)) && overflow_policy == async_overflow_policy::block_retry) + { + auto last_op_time = clock_type::now(); + auto now = last_op_time; + do + { + now = clock_type::now(); + sleep_or_yield(now, last_op_time); + } while (!_q.enqueue(std::move(new_msg))); + } + } + + // pop log messages from the queue and send to the logger worker + void _worker_loop() + { + async_msg popped_async_msg; + log_msg msg; + bool active = true; + auto last_pop_time = clock_type::now(); + while (active) + { + if (_q.dequeue(popped_async_msg)) + { + last_pop_time = clock_type::now(); + switch (popped_async_msg.msg_type) + { + case async_msg_type::flush: + { + auto worker = std::move(popped_async_msg.worker_ptr); + worker->_backend_flush(); + break; + } + + case async_msg_type::terminate: + active = false; + break; + + default: + { + popped_async_msg.to_log_msg(msg); + auto worker = std::move(popped_async_msg.worker_ptr); + worker->_backend_log(msg); + _msg_counter.fetch_add(1, std::memory_order_relaxed); + } + } + } + else // queue is empty - the only place we can terminate the thread if needed. + { + sleep_or_yield(clock_type::now(), last_pop_time); + } + } + } + + // spin, yield or sleep. use the time passed since last message as a hint + static void sleep_or_yield(const clock_type::time_point &now, const clock_type::time_point &last_op_time) + { + using std::chrono::microseconds; + using std::chrono::milliseconds; + + auto time_since_op = now - last_op_time; + + // spin upto 50 micros + if (time_since_op <= microseconds(50)) + { + return; + } + + // yield upto 150 micros + if (time_since_op <= microseconds(100)) + { + return std::this_thread::yield(); + } + + // sleep for 20 ms upto 200 ms + if (time_since_op <= milliseconds(200)) + { + return details::os::sleep_for_millis(20); + } + + // sleep for 500 ms + return details::os::sleep_for_millis(500); + } +}; + +} // namespace details +} // namespace spdlog