diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h index 860fdc67..32248ecf 100644 --- a/include/spdlog/details/mpmc_blocking_q.h +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -72,6 +72,13 @@ public: return true; } + // wait until the queue is empty + void wait_empty() + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] { return this->q_.empty(); }); + } + private: size_t max_items_; std::mutex queue_mutex_; diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index 205403c5..6739f731 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -139,6 +139,11 @@ public: return msg_counter_.load(std::memory_order_relaxed); } + void wait_empty() + { + q_.wait_empty(); + } + private: std::atomic msg_counter_; // total # of messages processed in this pool q_type q_; diff --git a/tests/test_async.cpp b/tests/test_async.cpp index a57d1969..f1f261ab 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -3,12 +3,6 @@ #include "spdlog/sinks/simple_file_sink.h" #include "test_sink.h" -// std::unique_ptr create_logger(size_t tp_queue_size, size_t tp_threads) -//{ -// auto tp = std::make_shared(8192, 1); -// auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); -//} - TEST_CASE("basic async test ", "[async]") { using namespace spdlog; @@ -25,7 +19,7 @@ TEST_CASE("basic async test ", "[async]") logger->flush(); } REQUIRE(test_sink->msg_counter() == messages); - REQUIRE(test_sink->flushed_msg_counter() == messages); + REQUIRE(test_sink->flush_counter() == 1); } TEST_CASE("discard policy ", "[async]") @@ -43,8 +37,7 @@ TEST_CASE("discard policy ", "[async]") } } - REQUIRE(test_sink->msg_counter() < messages); - REQUIRE(test_sink->flushed_msg_counter() < messages); + REQUIRE(test_sink->msg_counter() < messages); } TEST_CASE("flush", "[async]") @@ -65,7 +58,27 @@ TEST_CASE("flush", "[async]") } std::this_thread::sleep_for(std::chrono::milliseconds(250)); REQUIRE(test_sink->msg_counter() == messages); - REQUIRE(test_sink->flushed_msg_counter() == messages); + REQUIRE(test_sink->flush_counter() == 1); +} + +TEST_CASE("tp->wait_empty() ", "[async]") +{ + using namespace spdlog; + auto test_sink = std::make_shared(); + test_sink->set_delay(std::chrono::milliseconds(5)); + size_t messages = 50; + + auto tp = std::make_shared(messages, 2); + auto logger = std::make_shared("as", test_sink, tp, async_overflow_policy::block_retry); + for (size_t i = 0; i < messages; i++) + { + logger->info("Hello message #{}", i); + } + logger->flush(); + tp->wait_empty(); + + REQUIRE(test_sink->msg_counter() == messages); + REQUIRE(test_sink->flush_counter() == 1); } TEST_CASE("multi threads", "[async]") @@ -88,17 +101,18 @@ TEST_CASE("multi threads", "[async]") logger->info("Hello message #{}", j); } }); + logger->flush(); } for (auto &t : threads) { t.join(); } - logger->flush(); + } REQUIRE(test_sink->msg_counter() == messages * n_threads); - REQUIRE(test_sink->flushed_msg_counter() == messages * n_threads); + REQUIRE(test_sink->flush_counter() == n_threads); } TEST_CASE("to_file", "[async]") diff --git a/tests/test_sink.h b/tests/test_sink.h index 5beb0157..89d59b1e 100644 --- a/tests/test_sink.h +++ b/tests/test_sink.h @@ -8,7 +8,9 @@ #include "spdlog/details/null_mutex.h" #include "spdlog/sinks/base_sink.h" +#include #include +#include namespace spdlog { namespace sinks { @@ -22,23 +24,30 @@ public: return msg_counter_; } - size_t flushed_msg_counter() + size_t flush_counter() { - return flushed_msg_counter_; + return flush_counter_; + } + + void set_delay(std::chrono::milliseconds delay) + { + delay_ = delay; } protected: void _sink_it(const details::log_msg &) override { msg_counter_++; + std::this_thread::sleep_for(delay_); } void _flush() override { - flushed_msg_counter_ += msg_counter_; + flush_counter_++; } size_t msg_counter_{0}; - size_t flushed_msg_counter_{0}; + size_t flush_counter_{0}; + std::chrono::milliseconds delay_{0}; }; using test_sink_mt = test_sink;