[enhancement](metrics) enhance visibility of flush thread pool (#26544)

This commit is contained in:
Siyang Tang
2023-11-11 19:53:24 +08:00
committed by GitHub
parent 8b33b0c4a4
commit 196fadc044
5 changed files with 106 additions and 25 deletions

View File

@ -18,9 +18,9 @@
#include "olap/memtable_flush_executor.h"
#include <gen_cpp/olap_file.pb.h>
#include <stddef.h>
#include <algorithm>
#include <cstddef>
#include <ostream>
#include "common/config.h"
@ -29,12 +29,18 @@
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
namespace doris {
using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT);
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
@ -42,9 +48,11 @@ public:
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {}
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
}
~MemtableFlushTask() override = default;
~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
_flush_token->_flush_memtable(_memtable.get(), _segment_id, _submit_task_time);
@ -122,7 +130,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
return Status::OK();
}
void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t submit_task_time) {
void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
int64_t submit_task_time) {
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
@ -135,10 +144,10 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable->memory_usage();
size_t memory_usage = mem_table->memory_usage();
int64_t flush_size;
Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
@ -161,7 +170,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_running_count--;
_stats.flush_size_bytes += memtable->memory_usage();
_stats.flush_size_bytes += mem_table->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}
@ -180,6 +189,7 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_high_prio_flush_pool));
_register_metrics();
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
@ -189,26 +199,38 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
if (!is_high_priority) {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
flush_token.reset(
new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
flush_token = std::make_unique<FlushToken>(
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
flush_token.reset(
new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
flush_token = std::make_unique<FlushToken>(
_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
} else {
if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
flush_token = std::make_unique<FlushToken>(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
} else {
// alpha rowset do not support flush in CONCURRENT.
flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
flush_token = std::make_unique<FlushToken>(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
}
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
[this]() { return _flush_pool->num_threads(); })
}
void MemTableFlushExecutor::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
}
} // namespace doris

View File

@ -17,9 +17,8 @@
#pragma once
#include <stdint.h>
#include <atomic>
#include <cstdint>
#include <iosfwd>
#include <memory>
#include <utility>
@ -108,8 +107,9 @@ private:
// ...
class MemTableFlushExecutor {
public:
MemTableFlushExecutor() {}
MemTableFlushExecutor() = default;
~MemTableFlushExecutor() {
_deregister_metrics();
_flush_pool->shutdown();
_high_prio_flush_pool->shutdown();
}
@ -122,6 +122,9 @@ public:
bool should_serial, bool is_high_priority);
private:
void _register_metrics();
static void _deregister_metrics();
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
};

View File

@ -228,6 +228,18 @@ public:
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;
UIntGauge* flush_thread_pool_queue_size;
UIntGauge* flush_thread_pool_thread_num;
UIntGauge* local_scan_thread_pool_queue_size;
UIntGauge* local_scan_thread_pool_thread_num;
UIntGauge* remote_scan_thread_pool_queue_size;
UIntGauge* remote_scan_thread_pool_thread_num;
UIntGauge* limited_scan_thread_pool_queue_size;
UIntGauge* limited_scan_thread_pool_thread_num;
UIntGauge* group_local_scan_thread_pool_queue_size;
UIntGauge* group_local_scan_thread_pool_thread_num;
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;

View File

@ -17,11 +17,11 @@
#include "scanner_scheduler.h"
#include <stdint.h>
#include <algorithm>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <ostream>
#include <string>
#include <typeinfo>
@ -40,6 +40,7 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
@ -53,6 +54,15 @@
namespace doris::vectorized {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num, MetricUnit::NOUNIT);
ScannerScheduler::ScannerScheduler() = default;
ScannerScheduler::~ScannerScheduler() {
@ -64,6 +74,7 @@ ScannerScheduler::~ScannerScheduler() {
delete _pending_queues[i];
}
delete[] _pending_queues;
_deregister_metrics();
}
void ScannerScheduler::stop() {
@ -107,9 +118,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}
// 2. local scan thread pool
_local_scan_thread_pool.reset(
new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan"));
_local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan");
// 3. remote scan thread pool
static_cast<void>(
@ -141,7 +152,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
this->_task_group_scanner_scan(this, _task_group_local_scan_queue.get());
}));
}
_register_metrics();
_is_init = true;
return Status::OK();
}
@ -179,7 +190,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
auto f = reinterpret_cast<std::function<void()>*>(arg);
auto* f = reinterpret_cast<std::function<void()>*>(arg);
(*f)();
delete f;
return nullptr;
@ -463,4 +474,34 @@ void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
}
}
void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
[this]() { return _local_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
[this]() { return _local_scan_thread_pool->get_active_threads(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
[this]() { return _remote_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
[this]() { return _remote_scan_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
[this]() { return _limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
[this]() { return _limited_scan_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size,
[this]() { return _group_local_scan_thread_pool->get_queue_size(); })
REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num,
[this]() { return _group_local_scan_thread_pool->num_threads(); });
}
void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
}
} // namespace doris::vectorized

View File

@ -86,6 +86,9 @@ private:
void _task_group_scanner_scan(ScannerScheduler* scheduler,
taskgroup::ScanTaskTaskGroupQueue* scan_queue);
void _register_metrics();
static void _deregister_metrics();
// Scheduling queue number.
// TODO: make it configurable.