async flush now waits for queue to be empty before returning

This commit is contained in:
gabime 2016-08-22 22:07:29 +03:00
parent dfa2c7a950
commit 4f52cc4dec
2 changed files with 95 additions and 84 deletions

View File

@ -1,76 +1,77 @@
// //
// Copyright(c) 2015 Gabi Melman. // Copyright(c) 2015 Gabi Melman.
// Distributed under the MIT License (http://opensource.org/licenses/MIT) // Distributed under the MIT License (http://opensource.org/licenses/MIT)
// //
#pragma once #pragma once
// Very fast asynchronous logger (millions of logs per second on an average desktop) // Very fast asynchronous logger (millions of logs per second on an average desktop)
// Uses pre allocated lockfree queue for maximum throughput even under large number of threads. // Uses pre allocated lockfree queue for maximum throughput even under large number of threads.
// Creates a single back thread to pop messages from the queue and log them. // Creates a single back thread to pop messages from the queue and log them.
// //
// Upon each log write the logger: // Upon each log write the logger:
// 1. Checks if its log level is enough to log the message // 1. Checks if its log level is enough to log the message
// 2. Push a new copy of the message to a queue (or block the caller until space is available in the queue) // 2. Push a new copy of the message to a queue (or block the caller until space is available in the queue)
// 3. will throw spdlog_ex upon log exceptions // 3. will throw spdlog_ex upon log exceptions
// Upon destruction, logs all remaining messages in the queue before destructing.. // Upon destruction, logs all remaining messages in the queue before destructing..
#include <spdlog/common.h> #include <spdlog/common.h>
#include <spdlog/logger.h> #include <spdlog/logger.h>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <string> #include <string>
#include <memory> #include <memory>
namespace spdlog namespace spdlog
{ {
namespace details namespace details
{ {
class async_log_helper; class async_log_helper;
} }
class async_logger :public logger class async_logger :public logger
{ {
public: public:
template<class It> template<class It>
async_logger(const std::string& name, async_logger(const std::string& name,
const It& begin, const It& begin,
const It& end, const It& end,
size_t queue_size, size_t queue_size,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr, const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr); const std::function<void()>& worker_teardown_cb = nullptr);
async_logger(const std::string& logger_name, async_logger(const std::string& logger_name,
sinks_init_list sinks, sinks_init_list sinks,
size_t queue_size, size_t queue_size,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr, const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr); const std::function<void()>& worker_teardown_cb = nullptr);
async_logger(const std::string& logger_name, async_logger(const std::string& logger_name,
sink_ptr single_sink, sink_ptr single_sink,
size_t queue_size, size_t queue_size,
const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
const std::function<void()>& worker_warmup_cb = nullptr, const std::function<void()>& worker_warmup_cb = nullptr,
const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(), const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero(),
const std::function<void()>& worker_teardown_cb = nullptr); const std::function<void()>& worker_teardown_cb = nullptr);
//Wait for the queue to be empty, and flush synchronously
void flush() override; //Warning: this can potentialy last forever as we wait it to complete
protected: void flush() override;
void _sink_it(details::log_msg& msg) override; protected:
void _set_formatter(spdlog::formatter_ptr msg_formatter) override; void _sink_it(details::log_msg& msg) override;
void _set_pattern(const std::string& pattern) override; void _set_formatter(spdlog::formatter_ptr msg_formatter) override;
void _set_pattern(const std::string& pattern) override;
private:
std::unique_ptr<details::async_log_helper> _async_log_helper; private:
}; std::unique_ptr<details::async_log_helper> _async_log_helper;
} };
}
#include <spdlog/details/async_logger_impl.h>
#include <spdlog/details/async_logger_impl.h>

View File

@ -77,8 +77,8 @@ async_msg(async_msg&& other) SPDLOG_NOEXCEPT:
thread_id = other.thread_id; thread_id = other.thread_id;
txt = std::move(other.txt); txt = std::move(other.txt);
msg_type = other.msg_type; msg_type = other.msg_type;
return *this; return *this;
} }
// never copy or assign. should only be moved.. // never copy or assign. should only be moved..
async_msg(const async_msg&) = delete; async_msg(const async_msg&) = delete;
@ -179,6 +179,9 @@ private:
// sleep,yield or return immediatly using the time passed since last message as a hint // sleep,yield or return immediatly using the time passed since last message as a hint
static void sleep_or_yield(const spdlog::log_clock::time_point& now, const log_clock::time_point& last_op_time); static void sleep_or_yield(const spdlog::log_clock::time_point& now, const log_clock::time_point& last_op_time);
// wait until the queue is empty
void wait_empty_q();
}; };
} }
} }
@ -249,12 +252,9 @@ inline void spdlog::details::async_log_helper::push_msg(details::async_log_helpe
//wait for the queue be empty and request flush from its sinks //wait for the queue be empty and request flush from its sinks
inline void spdlog::details::async_log_helper::flush() inline void spdlog::details::async_log_helper::flush()
{ {
auto last_op = details::os::now(); wait_empty_q();
while (_q.approx_size() > 0)
{
sleep_or_yield(details::os::now(), last_op);
}
push_msg(async_msg(async_msg_type::flush)); push_msg(async_msg(async_msg_type::flush));
wait_empty_q(); //make sure the above flush message was processed
} }
inline void spdlog::details::async_log_helper::worker_loop() inline void spdlog::details::async_log_helper::worker_loop()
@ -304,7 +304,7 @@ inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_
incoming_async_msg.fill_log_msg(incoming_log_msg); incoming_async_msg.fill_log_msg(incoming_log_msg);
_formatter->format(incoming_log_msg); _formatter->format(incoming_log_msg);
for (auto &s : _sinks) for (auto &s : _sinks)
s->log(incoming_log_msg); s->log(incoming_log_msg);
} }
return true; return true;
} }
@ -366,6 +366,16 @@ inline void spdlog::details::async_log_helper::sleep_or_yield(const spdlog::log_
return sleep_for(milliseconds(200)); return sleep_for(milliseconds(200));
} }
// wait for the queue to be empty
inline void spdlog::details::async_log_helper::wait_empty_q()
{
auto last_op = details::os::now();
while (_q.approx_size() > 0) {
sleep_or_yield(details::os::now(), last_op);
}
}