From 8e4710079d778393a521fd99da49fdea1a77ce31 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 24 Apr 2023 09:41:57 +0800 Subject: [PATCH] [improvement](profile) Insert into add LoadChannel runtime profile (#18908) TabletSink and LoadChannel in BE are M: N relationship, Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, so usually all LoadChannel runtime profiles are saved on each TabletSink, and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, and ensures to update the latest LoadChannel profile according to the timestamp. --- be/src/olap/delta_writer.cpp | 19 ++++--- be/src/olap/delta_writer.h | 6 ++- be/src/pipeline/pipeline_fragment_context.cpp | 5 ++ be/src/runtime/fragment_mgr.cpp | 14 +++-- be/src/runtime/fragment_mgr.h | 1 + be/src/runtime/load_channel.cpp | 20 ++++++-- be/src/runtime/load_channel.h | 46 ++++++++++++++++- be/src/runtime/load_channel_mgr.cpp | 3 +- be/src/runtime/plan_fragment_executor.cpp | 19 +++++-- be/src/runtime/plan_fragment_executor.h | 4 +- be/src/runtime/runtime_state.cpp | 5 ++ be/src/runtime/runtime_state.h | 2 + be/src/runtime/tablets_channel.cpp | 51 ++++++++++++++++--- be/src/runtime/tablets_channel.h | 14 ++++- be/src/util/runtime_profile.cpp | 4 ++ be/src/util/runtime_profile.h | 6 +++ be/src/vec/exec/join/vhash_join_node.cpp | 2 +- be/src/vec/exec/scan/vscan_node.cpp | 2 +- be/src/vec/exec/vaggregation_node.cpp | 2 +- be/src/vec/exec/vanalytic_eval_node.cpp | 2 +- be/src/vec/exec/vsort_node.cpp | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- be/src/vec/sink/vtablet_sink.cpp | 14 +++++ .../common/profile/ProfileTreeBuilder.java | 31 +++++++++-- .../doris/common/util/RuntimeProfile.java | 7 +++ .../java/org/apache/doris/qe/Coordinator.java | 19 +++++-- gensrc/proto/internal_service.proto | 2 + gensrc/thrift/FrontendService.thrift | 2 + gensrc/thrift/RuntimeProfile.thrift | 2 + 29 files changed, 265 insertions(+), 43 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b846a5479f..54bb6a7fa0 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -307,8 +307,8 @@ void DeltaWriter::_reset_mem_table() { #endif { std::lock_guard l(_mem_table_tracker_lock); - _mem_table_tracker.push_back(mem_table_insert_tracker); - _mem_table_tracker.push_back(mem_table_flush_tracker); + _mem_table_insert_trackers.push_back(mem_table_insert_tracker); + _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), _delete_bitmap, @@ -463,7 +463,7 @@ Status DeltaWriter::cancel_with_status(const Status& st) { void DeltaWriter::save_mem_consumption_snapshot() { std::lock_guard l(_lock); - _mem_consumption_snapshot = mem_consumption(); + _mem_consumption_snapshot = mem_consumption(MemType::ALL); if (_mem_table == nullptr) { _memtable_consumption_snapshot = 0; } else { @@ -480,7 +480,7 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot() const { return _memtable_consumption_snapshot; } -int64_t DeltaWriter::mem_consumption() { +int64_t DeltaWriter::mem_consumption(MemType mem) { if (_flush_token == nullptr) { // This method may be called before this writer is initialized. // So _flush_token may be null. @@ -489,8 +489,15 @@ int64_t DeltaWriter::mem_consumption() { int64_t mem_usage = 0; { std::lock_guard l(_mem_table_tracker_lock); - for (auto mem_table_tracker : _mem_table_tracker) { - mem_usage += mem_table_tracker->consumption(); + if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 + for (auto mem_table_tracker : _mem_table_insert_trackers) { + mem_usage += mem_table_tracker->consumption(); + } + } + if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 + for (auto mem_table_tracker : _mem_table_flush_trackers) { + mem_usage += mem_table_tracker->consumption(); + } } } return mem_usage; diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 03b30160fe..2e265b27c8 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -55,6 +55,7 @@ class Block; } // namespace vectorized enum WriteType { LOAD = 1, LOAD_DELETE = 2, DELETE = 3 }; +enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; struct WriteRequest { int64_t tablet_id; @@ -113,7 +114,7 @@ public: int64_t partition_id() const; - int64_t mem_consumption(); + int64_t mem_consumption(MemType mem); // Wait all memtable in flush queue to be flushed Status wait_flush(); @@ -169,7 +170,8 @@ private: StorageEngine* _storage_engine; UniqueId _load_id; std::unique_ptr _flush_token; - std::vector> _mem_table_tracker; + std::vector> _mem_table_insert_trackers; + std::vector> _mem_table_flush_trackers; SpinLock _mem_table_tracker_lock; std::atomic _mem_table_num = 1; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5b4868e885..4fcd343047 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -394,6 +394,10 @@ void PipelineFragmentContext::report_profile() { std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); _runtime_state->runtime_profile()->pretty_print(&ss); + if (_runtime_state->load_channel_profile()) { + // _runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + _runtime_state->load_channel_profile()->pretty_print(&ss); + } VLOG_FILE << ss.str(); } @@ -752,6 +756,7 @@ void PipelineFragmentContext::send_report(bool done) { _report_status_cb( {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, + _is_report_success ? _runtime_state->load_channel_profile() : nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, _fragment_instance_id, _backend_num, _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6c905afa9a..54a42740b9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -172,7 +172,8 @@ public: void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } private: - void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done); + void coordinator_callback(const Status& status, RuntimeProfile* profile, + RuntimeProfile* load_channel_profile, bool done); // Id of this query TUniqueId _query_id; @@ -214,7 +215,7 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, _backend_num(backend_num), _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)), + std::placeholders::_3, std::placeholders::_4)), _set_rsc_info(false), _timeout_second(-1), _query_ctx(std::move(query_ctx)), @@ -297,10 +298,10 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const // Also, the reported status will always reflect the most recent execution status, // including the final status when execution finishes. void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, - bool done) { + RuntimeProfile* load_channel_profile, bool done) { _report_status_cb_impl( - {status, profile, done, _coord_addr, _query_id, -1, _fragment_instance_id, _backend_num, - _executor.runtime_state(), + {status, profile, load_channel_profile, done, _coord_addr, _query_id, -1, + _fragment_instance_id, _backend_num, _executor.runtime_state(), std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, std::placeholders::_2)}); @@ -401,7 +402,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__isset.profile = false; } else { req.profile->to_thrift(¶ms.profile); + if (req.load_channel_profile) + req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); params.__isset.profile = true; + params.__isset.loadChannelProfile = true; } if (!req.runtime_state->output_files().empty()) { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 2cc7fcc251..ad7f830a7f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -68,6 +68,7 @@ std::string to_load_error_http_path(const std::string& file_name); struct ReportStatusRequest { const Status& status; RuntimeProfile* profile; + RuntimeProfile* load_channel_profile; bool done; TNetworkAddress coord_addr; TUniqueId query_id; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 3df55c9eff..b29f760486 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -26,16 +26,19 @@ namespace doris { LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr mem_tracker, - int64_t timeout_s, bool is_high_priority, const std::string& sender_ip) + int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, + int64_t backend_id) : _load_id(load_id), _mem_tracker(std::move(mem_tracker)), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(sender_ip) { + _sender_ip(sender_ip), + _backend_id(backend_id) { // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. _last_updated_time.store(time(nullptr)); + _init_profile(); } LoadChannel::~LoadChannel() { @@ -44,6 +47,17 @@ LoadChannel::~LoadChannel() { << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; } +void LoadChannel::_init_profile() { + _profile = std::make_unique("LoadChannels"); + _self_profile = + _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})", + _load_id.to_string(), _sender_ip, _backend_id), + true, true); + _profile->add_child(_self_profile, false, nullptr); + _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); + _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); +} + Status LoadChannel::open(const PTabletWriterOpenRequest& params) { int64_t index_id = params.index_id(); std::shared_ptr channel; @@ -55,7 +69,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _load_id, _is_high_priority)); + channel.reset(new TabletsChannel(key, _load_id, _is_high_priority, _self_profile)); { std::lock_guard l(_tablets_channels_lock); _tablets_channels.insert({index_id, channel}); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 2abf822121..dc23defc4a 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -36,7 +36,9 @@ #include "common/status.h" #include "runtime/memory/mem_tracker.h" #include "runtime/tablets_channel.h" +#include "util/runtime_profile.h" #include "util/spinlock.h" +#include "util/thrift_util.h" #include "util/uid_util.h" namespace doris { @@ -48,7 +50,7 @@ class PTabletWriterOpenRequest; class LoadChannel { public: LoadChannel(const UniqueId& load_id, std::unique_ptr mem_tracker, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip); + bool is_high_priority, const std::string& sender_ip, int64_t backend_id); ~LoadChannel(); // open a new load channel if not exist @@ -135,11 +137,21 @@ protected: return Status::OK(); } + void _init_profile(); + template + // thread safety + void _report_profile(TabletWriterAddResult* response); + private: UniqueId _load_id; // Tracks the total memory consumed by current load job on this BE std::unique_ptr _mem_tracker; + std::unique_ptr _profile; + RuntimeProfile* _self_profile; + RuntimeProfile::Counter* _add_batch_number_counter = nullptr; + RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; + // lock protect the tablets channel map std::mutex _lock; // index id -> tablets channel @@ -160,7 +172,9 @@ private: bool _is_high_priority = false; // the ip where tablet sink locate - std::string _sender_ip = ""; + std::string _sender_ip; + + int64_t _backend_id; }; template @@ -178,19 +192,47 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest& request, // 2. add block to tablets channel if (request.has_block()) { RETURN_IF_ERROR(channel->add_batch(request, response)); + _add_batch_number_counter->update(1); } // 3. handle eos if (request.has_eos() && request.eos()) { st = _handle_eos(channel, request, response); + _report_profile(response); if (!st.ok()) { return st; } + } else if (_add_batch_number_counter->value() % 10 == 1) { + _report_profile(response); } _last_updated_time.store(time(nullptr)); return st; } +template +void LoadChannel::_report_profile(TabletWriterAddResult* response) { + COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); + // TabletSink and LoadChannel in BE are M: N relationship, + // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, + // so usually all LoadChannel runtime profiles are saved on each TabletSink, + // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, + // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, + // and ensures to update the latest LoadChannel profile according to the timestamp. + _self_profile->set_timestamp(_last_updated_time); + + TRuntimeProfileTree tprofile; + _profile->to_thrift(&tprofile); + ThriftSerializer ser(false, 4096); + uint8_t* buf = nullptr; + uint32_t len = 0; + auto st = ser.serialize(&tprofile, &len, &buf); + if (st.ok()) { + response->set_load_channel_profile(std::string((const char*)buf, len)); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; + } +} + inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() << ", last_update_time=" << static_cast(load_channel.last_updated_time()) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index f280e87abc..068ce2261c 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -124,7 +124,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); #endif channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker), - channel_timeout_s, is_high_priority, params.sender_ip())); + channel_timeout_s, is_high_priority, params.sender_ip(), + params.backend_id())); _load_channels.insert({load_id, channel}); } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index ba1db4b69d..d80ce3d43c 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -421,6 +421,10 @@ void PlanFragmentExecutor::report_profile() { std::stringstream ss; profile()->compute_time_in_profile(); profile()->pretty_print(&ss); + if (load_channel_profile()) { + // load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + load_channel_profile()->pretty_print(&ss); + } VLOG_FILE << ss.str(); } @@ -458,7 +462,8 @@ void PlanFragmentExecutor::send_report(bool done) { // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - _report_status_cb(status, _is_report_success ? profile() : nullptr, done || !status.ok()); + _report_status_cb(status, _is_report_success ? profile() : nullptr, + _is_report_success ? load_channel_profile() : nullptr, done || !status.ok()); } void PlanFragmentExecutor::stop_report_thread() { @@ -504,6 +509,10 @@ RuntimeProfile* PlanFragmentExecutor::profile() { return _runtime_state->runtime_profile(); } +RuntimeProfile* PlanFragmentExecutor::load_channel_profile() { + return _runtime_state->load_channel_profile(); +} + void PlanFragmentExecutor::close() { if (_closed) { return; @@ -537,8 +546,12 @@ void PlanFragmentExecutor::close() { // After add the operation, the print out like that: // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%) // We can easily know the exec node execute time without child time consumed. - _runtime_state->runtime_profile()->compute_time_in_profile(); - _runtime_state->runtime_profile()->pretty_print(&ss); + profile()->compute_time_in_profile(); + profile()->pretty_print(&ss); + if (load_channel_profile()) { + // load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + load_channel_profile()->pretty_print(&ss); + } LOG(INFO) << ss.str(); } LOG(INFO) << "Close() fragment_instance_id=" diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index bc58a861d1..f74256cdcb 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -79,7 +79,8 @@ public: // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - using report_status_callback = std::function; + using report_status_callback = + std::function; // report_status_cb, if !empty(), is used to report the accumulated profile // information periodically during execution (open() or get_next()). @@ -126,6 +127,7 @@ public: // Profile information for plan and output sink. RuntimeProfile* profile(); + RuntimeProfile* load_channel_profile(); const Status& status() const { return _status; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 69a1a560d9..57fb9e8972 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -48,6 +48,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -72,6 +73,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(fragment_exec_params.query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -100,6 +102,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -125,6 +128,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, RuntimeState::RuntimeState(const TQueryGlobals& query_globals) : _profile(""), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), @@ -158,6 +162,7 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals) RuntimeState::RuntimeState() : _profile(""), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 82f7263ae2..903e48f3f0 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -113,6 +113,7 @@ public: // Returns runtime state profile RuntimeProfile* runtime_profile() { return &_profile; } + RuntimeProfile* load_channel_profile() { return &_load_channel_profile; } bool enable_function_pushdown() const { return _query_options.__isset.enable_function_pushdown && @@ -395,6 +396,7 @@ private: // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. RuntimeProfile _profile; + RuntimeProfile _load_channel_profile; const DescriptorTbl* _desc_tbl; std::shared_ptr _obj_pool; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index b7f6f24953..5571ab9a87 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -49,13 +49,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic TabletsChannel::_s_tablet_writer_count; TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, - bool is_high_priority) + bool is_high_priority, RuntimeProfile* profile) : _key(key), _state(kInitialized), _load_id(load_id), _closed_senders(64), _is_high_priority(is_high_priority) { static std::once_flag once_flag; + _init_profile(profile); std::call_once(once_flag, [] { REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); }); }); @@ -69,6 +70,25 @@ TabletsChannel::~TabletsChannel() { delete _schema; } +void TabletsChannel::_init_profile(RuntimeProfile* profile) { + _profile = + profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true); + profile->add_child(_profile, false, nullptr); + _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); + + auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true); + _profile->add_child(memory_usage, false, nullptr); + _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES); + _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES); + _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES); + _max_tablet_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES); + _max_tablet_write_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES); + _max_tablet_flush_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES); +} + Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { std::lock_guard l(_lock); if (_state == kOpened) { @@ -221,17 +241,33 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, } int64_t TabletsChannel::mem_consumption() { - int64_t mem_usage = 0; + int64_t write_mem_usage = 0; + int64_t flush_mem_usage = 0; + int64_t max_tablet_mem_usage = 0; + int64_t max_tablet_write_mem_usage = 0; + int64_t max_tablet_flush_mem_usage = 0; { std::lock_guard l(_tablet_writers_lock); _mem_consumptions.clear(); for (auto& it : _tablet_writers) { - int64_t writer_mem = it.second->mem_consumption(); - mem_usage += writer_mem; - _mem_consumptions.emplace(writer_mem, it.first); + int64_t write_mem = it.second->mem_consumption(MemType::WRITE); + write_mem_usage += write_mem; + int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH); + flush_mem_usage += flush_mem; + if (write_mem > max_tablet_write_mem_usage) max_tablet_write_mem_usage = write_mem; + if (flush_mem > max_tablet_flush_mem_usage) max_tablet_flush_mem_usage = flush_mem; + if (write_mem + flush_mem > max_tablet_mem_usage) + max_tablet_mem_usage = write_mem + flush_mem; + _mem_consumptions.emplace(write_mem + flush_mem, it.first); } } - return mem_usage; + COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage); + COUNTER_SET(_write_memory_usage_counter, write_mem_usage); + COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage); + COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage); + COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage); + COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage); + return write_mem_usage + flush_mem_usage; } Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { @@ -305,7 +341,7 @@ std::string TabletsChannelKey::to_string() const { } std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { - os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; + os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")"; return os; } @@ -313,6 +349,7 @@ template Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { int64_t cur_seq = 0; + _add_batch_number_counter->update(1); auto status = _get_current_seq(cur_seq, request); if (UNLIKELY(!status.ok())) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 954f1543b0..66c512f3d5 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -33,6 +33,7 @@ #include "common/status.h" #include "util/bitmap.h" +#include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/uid_util.h" @@ -80,7 +81,8 @@ class LoadChannel; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority); + TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, + RuntimeProfile* profile); ~TabletsChannel(); @@ -134,6 +136,7 @@ private: void _add_broken_tablet(int64_t tablet_id); bool _is_broken_tablet(int64_t tablet_id); + void _init_profile(RuntimeProfile* profile); // id of this load channel TabletsChannelKey _key; @@ -189,6 +192,15 @@ private: // mem -> tablet_id // sort by memory size std::multimap> _mem_consumptions; + + RuntimeProfile* _profile; + RuntimeProfile::Counter* _add_batch_number_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _write_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _flush_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_write_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_flush_memory_usage_counter = nullptr; }; template diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 186c61fa56..050b8138ac 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -50,6 +50,7 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile : _pool(new ObjectPool()), _name(name), _metadata(-1), + _timestamp(-1), _is_averaged_profile(is_averaged_profile), _counter_total_time(TUnit::TIME_NS, 0), _local_time_percent(0) { @@ -115,6 +116,7 @@ void RuntimeProfile::merge(RuntimeProfile* other) { child = _pool->add(new RuntimeProfile(other_child->_name)); child->_local_time_percent = other_child->_local_time_percent; child->_metadata = other_child->_metadata; + child->_timestamp = other_child->_timestamp; bool indent_other_child = other->_children[i].second; _child_map[child->_name] = child; _children.push_back(std::make_pair(child, indent_other_child)); @@ -203,6 +205,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* } else { child = _pool->add(new RuntimeProfile(tchild.name)); child->_metadata = tchild.metadata; + child->_timestamp = tchild.timestamp; _child_map[tchild.name] = child; _children.push_back(std::make_pair(child, tchild.indent)); } @@ -639,6 +642,7 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { node.name = _name; node.num_children = _children.size(); node.metadata = _metadata; + node.timestamp = _timestamp; node.indent = true; CounterMap counter_map; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index cd37f9fd43..74f8352a1b 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -352,6 +352,9 @@ public: int64_t metadata() const { return _metadata; } void set_metadata(int64_t md) { _metadata = md; } + time_t timestamp() const { return _timestamp; } + void set_timestamp(time_t ss) { _timestamp = ss; } + // Derived counter function: return measured throughput as input_value/second. static int64_t units_per_second(const Counter* total_counter, const Counter* timer); @@ -410,6 +413,9 @@ private: // user-supplied, uninterpreted metadata. int64_t _metadata; + // The timestamp when the profile was modified, make sure the update is up to date. + time_t _timestamp; + /// True if this profile is an average derived from other profiles. /// All counters in this profile must be of unit AveragedCounter. bool _is_averaged_profile; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f1cba5c0c7..f811775d69 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -406,7 +406,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _build_blocks_memory_usage = ADD_COUNTER(memory_usage, "BuildBlocks", TUnit::BYTES); _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index fb2ade6485..704cd356ae 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -267,7 +267,7 @@ Status VScanNode::_init_profile() { _scanner_profile.reset(new RuntimeProfile("VScanner")); runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); - auto* memory_usage = _scanner_profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _scanner_profile->create_child("PeakMemoryUsage", true, true); _runtime_profile->add_child(memory_usage, false, nullptr); _queued_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 56ec4a585f..12592a72fd 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -324,7 +324,7 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) } Status AggregationNode::prepare_profile(RuntimeState* state) { - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); _serialize_key_arena_memory_usage = diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index dba8f6e126..0deb226c5b 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -168,7 +168,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 6fb7f36b2a..fd905df958 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -113,7 +113,7 @@ Status VSortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); - auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _runtime_profile->create_child("PeakMemoryUsage", true, true); _runtime_profile->add_child(memory_usage, false, nullptr); _sort_blocks_memory_usage = ADD_COUNTER(memory_usage, "SortBlocks", TUnit::BYTES); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index ec0f4f9d88..c8d95a5053 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -327,7 +327,7 @@ VDataStreamRecvr::VDataStreamRecvr( } // Initialize the counters - auto* memory_usage = _profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true); _profile->add_child(memory_usage, false, nullptr); _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 7718d000e0..398e2a0cb2 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -62,6 +62,7 @@ #include "util/telemetry/telemetry.h" #include "util/thread.h" #include "util/threadpool.h" +#include "util/thrift_util.h" #include "util/time.h" #include "util/uid_util.h" #include "vec/columns/column.h" @@ -320,6 +321,7 @@ void VNodeChannel::open() { request.set_is_high_priority(_parent->_is_high_priority); request.set_sender_ip(BackendOptions::get_localhost()); request.set_is_vectorized(true); + request.set_backend_id(_node_id); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -454,6 +456,18 @@ Status VNodeChannel::open_wait() { _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); _add_batch_counter.add_batch_num++; } + if (result.has_load_channel_profile()) { + TRuntimeProfileTree tprofile; + const uint8_t* buf = (const uint8_t*)result.load_channel_profile().data(); + uint32_t len = result.load_channel_profile().size(); + auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); + if (st.ok()) { + _state->load_channel_profile()->update(tprofile); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" + << st; + } + } }); return status; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java index e2bea7e39c..56a14e5b7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java @@ -132,7 +132,7 @@ public class ProfileTreeBuilder { public void build() throws UserException { reset(); checkProfile(); - analyzeAndBuildFragmentTrees(); + analyzeAndBuild(); assembleFragmentTrees(); } @@ -151,8 +151,33 @@ public class ProfileTreeBuilder { } } - private void analyzeAndBuildFragmentTrees() throws UserException { + private void analyzeAndBuild() throws UserException { List> childrenFragment = profileRoot.getChildList(); + for (Pair pair : childrenFragment) { + String name = pair.first.getName(); + if (name.equals("Fragments")) { + analyzeAndBuildFragmentTrees(pair.first); + } else if (name.equals("LoadChannels")) { + analyzeAndBuildLoadChannels(pair.first); + } else { + throw new UserException("Invalid execution profile name: " + name); + } + } + } + + private void analyzeAndBuildLoadChannels(RuntimeProfile loadChannelsProfile) throws UserException { + List> childrenFragment = loadChannelsProfile.getChildList(); + for (Pair pair : childrenFragment) { + analyzeAndBuildLoadChannel(pair.first); + } + } + + private void analyzeAndBuildLoadChannel(RuntimeProfile loadChannelsProfil) throws UserException { + // TODO, `show load profile` add load channel profile, or add `show load channel profile`. + } + + private void analyzeAndBuildFragmentTrees(RuntimeProfile fragmentsProfile) throws UserException { + List> childrenFragment = fragmentsProfile.getChildList(); for (Pair pair : childrenFragment) { analyzeAndBuildFragmentTree(pair.first); } @@ -191,7 +216,7 @@ public class ProfileTreeBuilder { fragmentTreeRoot = instanceTreeRoot; } - // 2. Build tree for each single instance + // 3. Build tree for each single instance int i = 0; Map instanceTrees = Maps.newHashMap(); for (Pair pair : fragmentChildren) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index e3e61faf18..9bb49363ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -64,6 +64,8 @@ public class RuntimeProfile { private String name; + private Long timestamp = -1L; + public RuntimeProfile(String name) { this(); this.name = name; @@ -137,6 +139,11 @@ public class RuntimeProfile { // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { TRuntimeProfileNode node = nodes.get(idx.getRef()); + // Make sure to update the latest LoadChannel profile according to the timestamp. + if (node.timestamp != -1 && node.timestamp < timestamp) { + return; + } + Preconditions.checkState(timestamp == -1 || node.timestamp != -1); // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index af0801e0ad..b83913c465 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -191,7 +191,9 @@ public class Coordinator { private RuntimeProfile queryProfile; + private RuntimeProfile fragmentsProfile; private List fragmentProfile; + private RuntimeProfile loadChannelProfile; private ProfileWriter profileWriter; @@ -526,12 +528,17 @@ public class Coordinator { int fragmentSize = fragments.size(); queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); + fragmentsProfile = new RuntimeProfile("Fragments"); + queryProfile.addChild(fragmentsProfile); fragmentProfile = new ArrayList(); for (int i = 0; i < fragmentSize; i++) { fragmentProfile.add(new RuntimeProfile("Fragment " + i)); - queryProfile.addChild(fragmentProfile.get(i)); + fragmentsProfile.addChild(fragmentProfile.get(i)); } + loadChannelProfile = new RuntimeProfile("LoadChannels"); + queryProfile.addChild(loadChannelProfile); + this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { LOG.debug("idToBackend size={}", idToBackend.size()); @@ -729,7 +736,7 @@ public class Coordinator { for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID); + profileFragmentId, tParam, this.addressToBackendID, loadChannelProfile); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -2562,6 +2569,7 @@ public class Coordinator { boolean hasCanceled; int profileFragmentId; RuntimeProfile profile; + RuntimeProfile loadChannelProfile; TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; @@ -2569,7 +2577,8 @@ public class Coordinator { TUniqueId instanceId; public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId, - TExecPlanFragmentParams rpcParams, Map addressToBackendID) { + TExecPlanFragmentParams rpcParams, Map addressToBackendID, + RuntimeProfile loadChannelProfile) { this.profileFragmentId = profileFragmentId; this.fragmentId = fragmentId; this.rpcParams = rpcParams; @@ -2582,6 +2591,7 @@ public class Coordinator { this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort()); String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; + this.loadChannelProfile = loadChannelProfile; this.profile = new RuntimeProfile(name); this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); @@ -2611,6 +2621,9 @@ public class Coordinator { if (params.isSetProfile()) { profile.update(params.profile); } + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); + } this.done = params.done; if (statsErrorEstimator != null) { statsErrorEstimator.updateExactReturnedRows(params); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index fb01e2adb4..d41a670052 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -83,6 +83,7 @@ message PTabletWriterOpenRequest { optional bool is_high_priority = 10 [default = false]; optional string sender_ip = 11 [default = ""]; optional bool is_vectorized = 12 [default = false]; + optional int64 backend_id = 13 [default = -1]; }; message PTabletWriterOpenResult { @@ -178,6 +179,7 @@ message PTabletWriterAddBlockResult { optional int64 wait_execution_time_us = 5; repeated PTabletError tablet_errors = 6; map success_slave_tablet_node_ids = 7; + optional bytes load_channel_profile = 8; }; // tablet writer cancel diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b1fc35734a..54ec33c360 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -431,6 +431,8 @@ struct TReportExecStatusParams { 19: optional i32 fragment_id 20: optional PaloInternalService.TQueryType query_type + + 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile } struct TFeResult { diff --git a/gensrc/thrift/RuntimeProfile.thrift b/gensrc/thrift/RuntimeProfile.thrift index 2b5a470136..365050954d 100644 --- a/gensrc/thrift/RuntimeProfile.thrift +++ b/gensrc/thrift/RuntimeProfile.thrift @@ -49,6 +49,8 @@ struct TRuntimeProfileNode { // map from parent counter name to child counter name 8: required map> child_counters_map + + 9: required i64 timestamp } // A flattened tree of runtime profiles, obtained by an