From b9d7c45e4062fce11de813f683e640f13c9d1760 Mon Sep 17 00:00:00 2001 From: gabime Date: Tue, 22 May 2018 21:59:27 +0300 Subject: [PATCH] Use blocking queue --- example/example.cpp | 26 +- include/spdlog/details/async_logger_impl.h | 2 +- include/spdlog/details/mpmc_blocking_q.h | 84 +++++ include/spdlog/details/mpmc_bounded_q.h | 183 ---------- include/spdlog/details/thread_pool.h | 371 ++++++++++----------- tests/CMakeLists.txt | 3 +- tests/test_async.cpp | 134 ++++++++ tests/test_sink.h | 48 +++ tests/tests.vcxproj | 1 + tests/tests.vcxproj.filters | 3 + 10 files changed, 458 insertions(+), 397 deletions(-) create mode 100644 include/spdlog/details/mpmc_blocking_q.h delete mode 100644 include/spdlog/details/mpmc_bounded_q.h create mode 100644 tests/test_async.cpp create mode 100644 tests/test_sink.h diff --git a/example/example.cpp b/example/example.cpp index bfa46d9a..f09a1a5c 100644 --- a/example/example.cpp +++ b/example/example.cpp @@ -28,6 +28,8 @@ int main(int, char *[]) try { + async_example(); + return 0; auto console = spdlog::stdout_color_st("console"); console->info("Welcome to spdlog!"); @@ -82,7 +84,7 @@ int main(int, char *[]) // Asynchronous logging is very fast.. // Just call spdlog::set_async_mode(q_size) and all created loggers from now on will be asynchronous.. - async_example(); + //async_example(); // Log user-defined types example user_defined_example(); @@ -107,15 +109,23 @@ int main(int, char *[]) #include "spdlog/async.h" void async_example() { - auto async_file = spd::basic_logger_mt("async_file_logger", "logs/async_log.txt"); - for (int i = 0; i < 100; ++i) - { - async_file->info("Async message #{}", i); - } + //auto async_file = spd::basic_logger_mt("async_file_logger", "logs/async_log.txt"); + + for (int j = 0; j < 1; j++) + { + spdlog::init_thread_pool(1024, 10); + auto async_file = spd::stderr_color_mt("console"); + for (int i = 0; i < 1024; ++i) + { + async_file->info("{} Async message #{}", j, i); + } + spdlog::drop_all(); + } + //std::this_thread::sleep_for(std::chrono::seconds(1)); + // you can also modify thread pool settings *before* creating the logger: - // spdlog::init_thread_pool(32768, 4); // queue with 32k of pre allocated items and 4 backing threads. - // if not called a defaults are: preallocated 8192 queue items and 1 worker thread. + // spdlog::init_thread_pool(32768, 4); // queue with max 32k items 4 backing threads. } // syslog example (linux/osx/freebsd) diff --git a/include/spdlog/details/async_logger_impl.h b/include/spdlog/details/async_logger_impl.h index 969ec72b..49f51c91 100644 --- a/include/spdlog/details/async_logger_impl.h +++ b/include/spdlog/details/async_logger_impl.h @@ -8,7 +8,7 @@ // async logger implementation // uses a thread pool to perform the actual logging -#include "../details/thread_pool.h" +#include "spdlog/details/thread_pool.h" #include #include diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h new file mode 100644 index 00000000..d4a8a41c --- /dev/null +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -0,0 +1,84 @@ +#pragma once + +// +// Copyright(c) 2018 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +// async log helper : +// multi producer-multi consumer blocking queue +// enqueue(..) - will block until room found to put the new message +// enqueue_nowait(..) - will return immediatly with false if no room left in the queue +// dequeue_for(..) - will block until the queue is not empty or timeout passed + +#include +#include +#include + +namespace spdlog { +namespace details { + +template +class mpmc_bounded_queue +{ +public: + using item_type = T; + explicit mpmc_bounded_queue(size_t max_items) + : max_items_(max_items) + { + } + + // try to enqueue and block if no room left + void enqueue(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] { return this->q_.size() < this->max_items_; }); + q_.push(std::move(item)); + } + push_cv_.notify_one(); + } + + // try to enqueue and return immdeialty false if no room left + bool enqueue_nowait(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + if (q_.size() == this->max_items_) + { + return false; + } + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + return true; + } + + // try to dequeue item. if no item found. wait upto timeout and try again + // Return true, if succeeded dequeue item, false otherwise + bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) + { + { + std::unique_lock lock(queue_mutex_); + if (!push_cv_.wait_for(lock, wait_duration, [this] { return this->q_.size() > 0; })) + { + return false; + } + + popped_item = std::move(q_.front()); + q_.pop(); + } + pop_cv_.notify_one(); + return true; + } + +private: + size_t max_items_; + std::mutex queue_mutex_; + std::condition_variable push_cv_; + std::condition_variable pop_cv_; + + std::queue q_; +}; +} // namespace details +} // namespace spdlog diff --git a/include/spdlog/details/mpmc_bounded_q.h b/include/spdlog/details/mpmc_bounded_q.h deleted file mode 100644 index ff8687b7..00000000 --- a/include/spdlog/details/mpmc_bounded_q.h +++ /dev/null @@ -1,183 +0,0 @@ -/* -A modified version of Bounded MPMC queue by Dmitry Vyukov. - -Original code from: -http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -licensed by Dmitry Vyukov under the terms below: - -Simplified BSD license - -Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of -conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list -of conditions and the following disclaimer in the documentation and/or other materials -provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT -SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, -OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and -should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. -*/ - -/* -The code in its current form adds the license below: - -Copyright(c) 2015 Gabi Melman. -Distributed under the MIT License (http://opensource.org/licenses/MIT) - -*/ - -#pragma once - -#include "spdlog/common.h" - -#include -#include - -namespace spdlog { -namespace details { - -template -class mpmc_bounded_queue -{ -public: - using item_type = T; - - explicit mpmc_bounded_queue(size_t buffer_size) - : max_size_(buffer_size) - , buffer_(new cell_t[buffer_size]) - , buffer_mask_(buffer_size - 1) - { - // queue size must be power of two - if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0))) - { - throw spdlog_ex("async logger queue size must be power of two"); - } - - for (size_t i = 0; i != buffer_size; i += 1) - { - buffer_[i].sequence_.store(i, std::memory_order_relaxed); - } - enqueue_pos_.store(0, std::memory_order_relaxed); - dequeue_pos_.store(0, std::memory_order_relaxed); - } - - ~mpmc_bounded_queue() - { - delete[] buffer_; - } - - mpmc_bounded_queue(mpmc_bounded_queue const &) = delete; - void operator=(mpmc_bounded_queue const &) = delete; - - bool enqueue(T &&data) - { - cell_t *cell; - size_t pos = enqueue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos); - if (dif == 0) - { - if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = enqueue_pos_.load(std::memory_order_relaxed); - } - } - cell->data_ = std::move(data); - cell->sequence_.store(pos + 1, std::memory_order_release); - return true; - } - - bool dequeue(T &data) - { - cell_t *cell; - size_t pos = dequeue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos + 1); - if (dif == 0) - { - if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = dequeue_pos_.load(std::memory_order_relaxed); - } - } - data = std::move(cell->data_); - cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); - return true; - } - - bool is_empty() - { - size_t front, front1, back; - // try to take a consistent snapshot of front/tail. - do - { - front = enqueue_pos_.load(std::memory_order_acquire); - back = dequeue_pos_.load(std::memory_order_acquire); - front1 = enqueue_pos_.load(std::memory_order_relaxed); - } while (front != front1); - return back == front; - } - -private: - struct cell_t - { - std::atomic sequence_; - T data_; - }; - - size_t const max_size_; - - static size_t const cacheline_size = 64; - using cacheline_pad_t = char[cacheline_size]; - - cacheline_pad_t pad0_; - cell_t *const buffer_; - size_t const buffer_mask_; - cacheline_pad_t pad1_; - std::atomic enqueue_pos_; - cacheline_pad_t pad2_; - std::atomic dequeue_pos_; - cacheline_pad_t pad3_; -}; - -} // namespace details -} // namespace spdlog diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index db8b562b..fb74c178 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -1,7 +1,7 @@ #pragma once #include "spdlog/details/log_msg.h" -#include "spdlog/details/mpmc_bounded_q.h" +#include "spdlog/details/mpmc_blocking_q.h" #include "spdlog/details/os.h" #include @@ -10,233 +10,196 @@ #include namespace spdlog { -namespace details { + namespace details { -using async_logger_ptr = std::shared_ptr; + using async_logger_ptr = std::shared_ptr; -enum class async_msg_type -{ - log, - flush, - terminate -}; + enum class async_msg_type + { + log, + flush, + terminate + }; -// Async msg to move to/from the queue -// Movable only. should never be copied -struct async_msg -{ - async_msg_type msg_type; - level::level_enum level; - log_clock::time_point time; - size_t thread_id; - fmt::MemoryWriter raw; + // Async msg to move to/from the queue + // Movable only. should never be copied + struct async_msg + { + async_msg_type msg_type; + level::level_enum level; + log_clock::time_point time; + size_t thread_id; + fmt::MemoryWriter raw; - size_t msg_id; - async_logger_ptr worker_ptr; + size_t msg_id; + async_logger_ptr worker_ptr; - async_msg() = default; - ~async_msg() = default; + async_msg() = default; + ~async_msg() = default; - // never copy or assign. should only be move assigned in to the queue.. - async_msg(const async_msg &) = delete; - async_msg &operator=(const async_msg &other) = delete; - async_msg(async_msg &&other) = delete; + // should only be moved in or out of the queue.. + async_msg(const async_msg &) = delete; + async_msg(async_msg &&other) = default; + async_msg &operator=(async_msg &&other) = default; - // construct from log_msg with given type - async_msg(async_logger_ptr &&worker, async_msg_type the_type, details::log_msg &&m) - : msg_type(the_type) - , level(m.level) - , time(m.time) - , thread_id(m.thread_id) - , raw(std::move(m.raw)) - , msg_id(m.msg_id) - , worker_ptr(std::forward(worker)) - { - } + // construct from log_msg with given type + async_msg(async_logger_ptr &&worker, async_msg_type the_type, details::log_msg &&m) + : msg_type(the_type) + , level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , raw(std::move(m.raw)) + , msg_id(m.msg_id) + , worker_ptr(std::forward(worker)) + { + } - async_msg(async_logger_ptr &&worker, async_msg_type the_type) - : async_msg(std::forward(worker), the_type, details::log_msg()) - { - } + async_msg(async_logger_ptr &&worker, async_msg_type the_type) + : async_msg(std::forward(worker), the_type, details::log_msg()) + { + } - async_msg(async_msg_type the_type) - : async_msg(nullptr, the_type, details::log_msg()) - { - } + async_msg(async_msg_type the_type) + : async_msg(nullptr, the_type, details::log_msg()) + { + } - // used to move to the message queue - async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT - { - msg_type = other.msg_type; - level = other.level; - time = other.time; - thread_id = other.thread_id; - raw = std::move(other.raw); - msg_id = other.msg_id; - worker_ptr = std::move(other.worker_ptr); - return *this; - } + // copy into log_msg + void to_log_msg(log_msg &&msg) + { + msg.logger_name = &worker_ptr->name(); + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw = std::move(raw); + msg.formatted.clear(); + msg.msg_id = msg_id; + msg.color_range_start = 0; + msg.color_range_end = 0; + } + }; - // copy into log_msg - void to_log_msg(log_msg &&msg) - { - msg.logger_name = &worker_ptr->name(); - msg.level = level; - msg.time = time; - msg.thread_id = thread_id; - msg.raw = std::move(raw); - msg.formatted.clear(); - msg.msg_id = msg_id; - msg.color_range_start = 0; - msg.color_range_end = 0; - } -}; + class thread_pool + { + public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue; + using clock_type = std::chrono::steady_clock; -class thread_pool -{ -public: - using item_type = async_msg; - using q_type = details::mpmc_bounded_queue; - using clock_type = std::chrono::steady_clock; + thread_pool(size_t q_size_bytes, size_t threads_n) + : msg_counter_(0) + , _q(q_size_bytes) + { + // std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl; + if (threads_n == 0 || threads_n > 1000) + { + throw spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)"); + } + for (size_t i = 0; i < threads_n; i++) + { + _threads.emplace_back(std::bind(&thread_pool::worker_loop, this)); + } + } - thread_pool(size_t q_size_bytes, size_t threads_n) - : _msg_counter(0) - , _q(q_size_bytes) - { - // std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl; - if (threads_n == 0 || threads_n > 1000) - { - throw spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)"); - } - for (size_t i = 0; i < threads_n; i++) - { - _threads.emplace_back(std::bind(&thread_pool::_worker_loop, this)); - } - } + // message all threads to terminate gracefully join them + ~thread_pool() + { + try + { + for (size_t i = 0; i < _threads.size(); i++) + { + post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + } - // message all threads to terminate gracefully join them - ~thread_pool() - { - try - { - for (size_t i = 0; i < _threads.size(); i++) - { - _post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); - } + for (auto &t : _threads) + { + t.join(); + } + // std::cout << "~thread_pool() msg_counter_: " << msg_counter_ << std::endl; + } + catch (...) + { + } + } - for (auto &t : _threads) - { - t.join(); - } - // std::cout << "~thread_pool() _msg_counter: " << _msg_counter << std::endl; - } - catch (...) - { - } - } + void post_log(async_logger_ptr &&worker_ptr, details::log_msg &&msg, async_overflow_policy overflow_policy) + { + async_msg async_m(std::forward(worker_ptr), async_msg_type::log, std::forward(msg)); + post_async_msg(std::move(async_m), overflow_policy); + } - void post_log(async_logger_ptr &&worker_ptr, details::log_msg &&msg, async_overflow_policy overflow_policy) - { - async_msg as_m(std::forward(worker_ptr), async_msg_type::log, std::forward(msg)); - _post_async_msg(std::move(as_m), overflow_policy); - } + void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) + { + post_async_msg(async_msg(std::forward(worker_ptr), async_msg_type::flush), overflow_policy); + } - void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) - { - _post_async_msg(async_msg(std::forward(worker_ptr), async_msg_type::flush), overflow_policy); - } + size_t msg_counter() + { + return msg_counter_.load(std::memory_order_relaxed); + } - size_t msg_counter() - { - return _msg_counter.load(std::memory_order_relaxed); - } + private: + std::atomic msg_counter_; // total # of messages processed in this pool + q_type _q; -private: - std::atomic _msg_counter; // total # of messages processed in this pool - q_type _q; + std::vector _threads; - std::vector _threads; + void post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy) + { + if (overflow_policy == async_overflow_policy::block_retry) + { + _q.enqueue(std::move(new_msg)); + } + else + { + _q.enqueue_nowait(std::move(new_msg)); + } + } - void _post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy) - { + void worker_loop() + { + while (process_next_msg()) + { + }; + } - if (!_q.enqueue(std::forward(new_msg)) && overflow_policy == async_overflow_policy::block_retry) - { - auto last_op_time = clock_type::now(); - auto now = last_op_time; - do - { - now = clock_type::now(); - sleep_or_yield(now, last_op_time); - } while (!_q.enqueue(std::move(new_msg))); - } - } + // process next message in the queue + // return true if this thread should still be active (while no terminate msg was received) + bool process_next_msg() + { + async_msg incoming_async_msg; + bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(10)); + if (!dequeued) + { + return true; + } - // pop log messages from the queue and send to the logger worker - void _worker_loop() - { - async_msg popped_async_msg; - log_msg msg; - bool active = true; - auto last_pop_time = clock_type::now(); - while (active) - { - if (_q.dequeue(popped_async_msg)) - { - last_pop_time = clock_type::now(); - switch (popped_async_msg.msg_type) - { - case async_msg_type::flush: - { - auto worker = std::move(popped_async_msg.worker_ptr); - worker->_backend_flush(); - break; - } + switch (incoming_async_msg.msg_type) + { + case async_msg_type::flush: + { + incoming_async_msg.worker_ptr->_backend_flush(); + return true; + } - case async_msg_type::terminate: - active = false; - break; + case async_msg_type::terminate: + { + return false; + } - default: - { - popped_async_msg.to_log_msg(std::move(msg)); - auto worker = std::move(popped_async_msg.worker_ptr); - worker->_backend_log(msg); - _msg_counter.fetch_add(1, std::memory_order_relaxed); - } - } - } - else // queue is empty - the only place we can terminate the thread if needed. - { - sleep_or_yield(clock_type::now(), last_pop_time); - } - } - } + default: + { + log_msg msg; + incoming_async_msg.to_log_msg(std::move(msg)); + incoming_async_msg.worker_ptr->_backend_log(msg); + msg_counter_.fetch_add(1, std::memory_order_relaxed); + return true; + } + } + assert(false); + return true; // should not be reached + } + }; - // spin, yield or sleep. use the time passed since last message as a hint - static void sleep_or_yield(const clock_type::time_point &now, const clock_type::time_point &last_op_time) - { - using std::chrono::microseconds; - using std::chrono::milliseconds; - - auto time_since_op = now - last_op_time; - - // yield upto 150 micros - if (time_since_op <= microseconds(150)) - { - return std::this_thread::yield(); - } - - // sleep for 20 ms upto 200 ms - if (time_since_op <= milliseconds(200)) - { - return details::os::sleep_for_millis(20); - } - - // sleep for 500 ms - return details::os::sleep_for_millis(500); - } -}; - -} // namespace details + } // namespace details } // namespace spdlog diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 119fab55..ba8b7487 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,7 +7,8 @@ set(SPDLOG_UTESTS_SOURCES file_helper.cpp file_log.cpp test_misc.cpp - test_pattern_formatter + test_pattern_formatter.cpp + test_async.cpp includes.h registry.cpp test_macros.cpp diff --git a/tests/test_async.cpp b/tests/test_async.cpp new file mode 100644 index 00000000..eb5840be --- /dev/null +++ b/tests/test_async.cpp @@ -0,0 +1,134 @@ +#include "includes.h" +#include "test_sink.h" +#include "spdlog/async.h" +#include "spdlog/sinks/simple_file_sink.h" + +//std::unique_ptr create_logger(size_t tp_queue_size, size_t tp_threads) +//{ +// auto tp = std::make_shared(8192, 1); +// auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); +//} + +TEST_CASE("basic async test ", "[async]") +{ + using namespace spdlog; + auto test_sink = std::make_shared(); + size_t queue_size = 128; + size_t messages = 256; + { + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); + for (size_t i = 0; i < messages; i++) + { + logger->info("Hello message #{}", i); + } + logger->flush(); + } + REQUIRE(test_sink->msg_counter() == messages); + REQUIRE(test_sink->flushed_msg_counter() == messages); +} + +TEST_CASE("discard policy ", "[async]") +{ + using namespace spdlog; + auto test_sink = std::make_shared(); + size_t queue_size = 2; + size_t messages = 1024; + { + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::discard_log_msg); + for (size_t i = 0; i < messages; i++) + { + logger->info("Hello message #{}", i); + } + } + + REQUIRE(test_sink->msg_counter() < messages); + REQUIRE(test_sink->flushed_msg_counter() < messages); +} + +TEST_CASE("flush", "[async]") +{ + using namespace spdlog; + auto test_sink = std::make_shared(); + size_t queue_size = 256; + size_t messages = 256; + { + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); + for (size_t i = 0; i < messages; i++) + { + logger->info("Hello message #{}", i); + } + + logger->flush(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + REQUIRE(test_sink->msg_counter() == messages); + REQUIRE(test_sink->flushed_msg_counter() == messages); +} + +TEST_CASE("multi threads", "[async]") +{ + using namespace spdlog; + auto test_sink = std::make_shared(); + size_t queue_size = 128; + size_t messages = 256; + size_t n_threads = 10; + { + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); + + std::vector threads; + for (size_t i = 0; i < n_threads; i++) + { + threads.emplace_back([logger, messages] { + for (size_t j = 0; j < messages; j++) + { + logger->info("Hello message #{}", j); + } + + }); + } + + for (auto &t : threads) + { + t.join(); + } + logger->flush(); + + } + + REQUIRE(test_sink->msg_counter() == messages * n_threads); + REQUIRE(test_sink->flushed_msg_counter() == messages * n_threads); +} + +TEST_CASE("to_file", "[async]") +{ + prepare_logdir(); + size_t queue_size = 512; + size_t messages = 512; + size_t n_threads = 4; + spdlog::init_thread_pool(queue_size, n_threads); + auto logger= spdlog::basic_logger_mt("as", "logs/async_test.log", true); + + std::vector threads; + for (size_t i = 0; i < n_threads; i++) + { + threads.emplace_back([logger, messages] { + for (size_t j = 0; j < messages; j++) + { + logger->info("Hello message #{}", j); + } + }); + } + + for (auto &t : threads) + { + t.join(); + } + logger.reset(); + spdlog::drop("as"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + REQUIRE(count_lines("logs/async_test.log") == messages * n_threads); +} diff --git a/tests/test_sink.h b/tests/test_sink.h new file mode 100644 index 00000000..5beb0157 --- /dev/null +++ b/tests/test_sink.h @@ -0,0 +1,48 @@ +// +// Copyright(c) 2018 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +#pragma once + +#include "spdlog/details/null_mutex.h" +#include "spdlog/sinks/base_sink.h" + +#include + +namespace spdlog { +namespace sinks { + +template +class test_sink : public base_sink +{ +public: + size_t msg_counter() + { + return msg_counter_; + } + + size_t flushed_msg_counter() + { + return flushed_msg_counter_; + } + +protected: + void _sink_it(const details::log_msg &) override + { + msg_counter_++; + } + + void _flush() override + { + flushed_msg_counter_ += msg_counter_; + } + size_t msg_counter_{0}; + size_t flushed_msg_counter_{0}; +}; + +using test_sink_mt = test_sink; +using test_sink_st = test_sink; + +} // namespace sinks +} // namespace spdlog diff --git a/tests/tests.vcxproj b/tests/tests.vcxproj index f13f9f9d..4565f197 100644 --- a/tests/tests.vcxproj +++ b/tests/tests.vcxproj @@ -129,6 +129,7 @@ + diff --git a/tests/tests.vcxproj.filters b/tests/tests.vcxproj.filters index d1fdf73d..c70cc496 100644 --- a/tests/tests.vcxproj.filters +++ b/tests/tests.vcxproj.filters @@ -42,6 +42,9 @@ Source Files + + Source Files +