diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index c5d74a6de5..ac114291a0 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -18,9 +18,9 @@ #include "olap/memtable_flush_executor.h" #include -#include #include +#include #include #include "common/config.h" @@ -29,12 +29,18 @@ #include "olap/memtable.h" #include "olap/rowset/rowset_writer.h" #include "util/doris_metrics.h" +#include "util/metrics.h" #include "util/stopwatch.hpp" #include "util/time.h" namespace doris { using namespace ErrorCode; +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT); + +bvar::Adder g_flush_task_num("memtable_flush_task_num"); + class MemtableFlushTask final : public Runnable { public: MemtableFlushTask(FlushToken* flush_token, std::unique_ptr memtable, @@ -42,9 +48,11 @@ public: : _flush_token(flush_token), _memtable(std::move(memtable)), _segment_id(segment_id), - _submit_task_time(submit_task_time) {} + _submit_task_time(submit_task_time) { + g_flush_task_num << 1; + } - ~MemtableFlushTask() override = default; + ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { _flush_token->_flush_memtable(_memtable.get(), _segment_id, _submit_task_time); @@ -122,7 +130,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in return Status::OK(); } -void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t submit_task_time) { +void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id, + int64_t submit_task_time) { uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; _stats.flush_wait_time_ns += flush_wait_time_ns; // If previous flush has failed, return directly @@ -135,10 +144,10 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t MonotonicStopWatch timer; timer.start(); - size_t memory_usage = memtable->memory_usage(); + size_t memory_usage = mem_table->memory_usage(); int64_t flush_size; - Status s = _do_flush_memtable(memtable, segment_id, &flush_size); + Status s = _do_flush_memtable(mem_table, segment_id, &flush_size); { std::shared_lock rdlk(_flush_status_lock); @@ -161,7 +170,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_finish_count++; _stats.flush_running_count--; - _stats.flush_size_bytes += memtable->memory_usage(); + _stats.flush_size_bytes += mem_table->memory_usage(); _stats.flush_disk_size_bytes += flush_size; } @@ -180,6 +189,7 @@ void MemTableFlushExecutor::init(const std::vector& data_dirs) { .set_min_threads(min_threads) .set_max_threads(max_threads) .build(&_high_prio_flush_pool)); + _register_metrics(); } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. @@ -189,26 +199,38 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr& fl if (!is_high_priority) { if (rowset_writer->type() == BETA_ROWSET && !should_serial) { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. - flush_token.reset( - new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + flush_token = std::make_unique( + _flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); } else { // alpha rowset do not support flush in CONCURRENT. - flush_token.reset( - new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + flush_token = std::make_unique( + _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); } } else { if (rowset_writer->type() == BETA_ROWSET && !should_serial) { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. - flush_token.reset(new FlushToken( - _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + flush_token = std::make_unique( + _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); } else { // alpha rowset do not support flush in CONCURRENT. - flush_token.reset(new FlushToken( - _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + flush_token = std::make_unique( + _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); } } flush_token->set_rowset_writer(rowset_writer); return Status::OK(); } +void MemTableFlushExecutor::_register_metrics() { + REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, + [this]() { return _flush_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(flush_thread_pool_thread_num, + [this]() { return _flush_pool->num_threads(); }) +} + +void MemTableFlushExecutor::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num); +} + } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index ee7194349f..d2039ce812 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -17,9 +17,8 @@ #pragma once -#include - #include +#include #include #include #include @@ -108,8 +107,9 @@ private: // ... class MemTableFlushExecutor { public: - MemTableFlushExecutor() {} + MemTableFlushExecutor() = default; ~MemTableFlushExecutor() { + _deregister_metrics(); _flush_pool->shutdown(); _high_prio_flush_pool->shutdown(); } @@ -122,6 +122,9 @@ public: bool should_serial, bool is_high_priority); private: + void _register_metrics(); + static void _deregister_metrics(); + std::unique_ptr _flush_pool; std::unique_ptr _high_prio_flush_pool; }; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 60ca3b5107..409e6dae2b 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -228,6 +228,18 @@ public: UIntGauge* heavy_work_max_threads; UIntGauge* light_work_max_threads; + UIntGauge* flush_thread_pool_queue_size; + UIntGauge* flush_thread_pool_thread_num; + + UIntGauge* local_scan_thread_pool_queue_size; + UIntGauge* local_scan_thread_pool_thread_num; + UIntGauge* remote_scan_thread_pool_queue_size; + UIntGauge* remote_scan_thread_pool_thread_num; + UIntGauge* limited_scan_thread_pool_queue_size; + UIntGauge* limited_scan_thread_pool_thread_num; + UIntGauge* group_local_scan_thread_pool_queue_size; + UIntGauge* group_local_scan_thread_pool_thread_num; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 62fd599665..e78e8dceff 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -17,11 +17,11 @@ #include "scanner_scheduler.h" -#include - #include +#include #include #include +#include #include #include #include @@ -40,6 +40,7 @@ #include "util/blocking_queue.hpp" #include "util/cpu_info.h" #include "util/defer_op.h" +#include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/thread.h" #include "util/threadpool.h" @@ -53,6 +54,15 @@ namespace doris::vectorized { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); + ScannerScheduler::ScannerScheduler() = default; ScannerScheduler::~ScannerScheduler() { @@ -64,6 +74,7 @@ ScannerScheduler::~ScannerScheduler() { delete _pending_queues[i]; } delete[] _pending_queues; + _deregister_metrics(); } void ScannerScheduler::stop() { @@ -107,9 +118,9 @@ Status ScannerScheduler::init(ExecEnv* env) { } // 2. local scan thread pool - _local_scan_thread_pool.reset( - new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "local_scan")); + _local_scan_thread_pool = std::make_unique( + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size, "local_scan"); // 3. remote scan thread pool static_cast( @@ -141,7 +152,7 @@ Status ScannerScheduler::init(ExecEnv* env) { this->_task_group_scanner_scan(this, _task_group_local_scan_queue.get()); })); } - + _register_metrics(); _is_init = true; return Status::OK(); } @@ -179,7 +190,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } [[maybe_unused]] static void* run_scanner_bthread(void* arg) { - auto f = reinterpret_cast*>(arg); + auto* f = reinterpret_cast*>(arg); (*f)(); delete f; return nullptr; @@ -463,4 +474,34 @@ void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, } } +void ScannerScheduler::_register_metrics() { + REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size, + [this]() { return _local_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num, + [this]() { return _local_scan_thread_pool->get_active_threads(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, + [this]() { return _remote_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, + [this]() { return _remote_scan_thread_pool->num_threads(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, + [this]() { return _limited_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, + [this]() { return _limited_scan_thread_pool->num_threads(); }); + REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size, + [this]() { return _group_local_scan_thread_pool->get_queue_size(); }) + REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num, + [this]() { return _group_local_scan_thread_pool->num_threads(); }); +} + +void ScannerScheduler::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 25f79e89aa..a6d450bc22 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -86,6 +86,9 @@ private: void _task_group_scanner_scan(ScannerScheduler* scheduler, taskgroup::ScanTaskTaskGroupQueue* scan_queue); + void _register_metrics(); + + static void _deregister_metrics(); // Scheduling queue number. // TODO: make it configurable.