diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b846a5479f..54bb6a7fa0 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -307,8 +307,8 @@ void DeltaWriter::_reset_mem_table() { #endif { std::lock_guard l(_mem_table_tracker_lock); - _mem_table_tracker.push_back(mem_table_insert_tracker); - _mem_table_tracker.push_back(mem_table_flush_tracker); + _mem_table_insert_trackers.push_back(mem_table_insert_tracker); + _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), _delete_bitmap, @@ -463,7 +463,7 @@ Status DeltaWriter::cancel_with_status(const Status& st) { void DeltaWriter::save_mem_consumption_snapshot() { std::lock_guard l(_lock); - _mem_consumption_snapshot = mem_consumption(); + _mem_consumption_snapshot = mem_consumption(MemType::ALL); if (_mem_table == nullptr) { _memtable_consumption_snapshot = 0; } else { @@ -480,7 +480,7 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot() const { return _memtable_consumption_snapshot; } -int64_t DeltaWriter::mem_consumption() { +int64_t DeltaWriter::mem_consumption(MemType mem) { if (_flush_token == nullptr) { // This method may be called before this writer is initialized. // So _flush_token may be null. @@ -489,8 +489,15 @@ int64_t DeltaWriter::mem_consumption() { int64_t mem_usage = 0; { std::lock_guard l(_mem_table_tracker_lock); - for (auto mem_table_tracker : _mem_table_tracker) { - mem_usage += mem_table_tracker->consumption(); + if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 + for (auto mem_table_tracker : _mem_table_insert_trackers) { + mem_usage += mem_table_tracker->consumption(); + } + } + if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 + for (auto mem_table_tracker : _mem_table_flush_trackers) { + mem_usage += mem_table_tracker->consumption(); + } } } return mem_usage; diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 03b30160fe..2e265b27c8 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -55,6 +55,7 @@ class Block; } // namespace vectorized enum WriteType { LOAD = 1, LOAD_DELETE = 2, DELETE = 3 }; +enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; struct WriteRequest { int64_t tablet_id; @@ -113,7 +114,7 @@ public: int64_t partition_id() const; - int64_t mem_consumption(); + int64_t mem_consumption(MemType mem); // Wait all memtable in flush queue to be flushed Status wait_flush(); @@ -169,7 +170,8 @@ private: StorageEngine* _storage_engine; UniqueId _load_id; std::unique_ptr _flush_token; - std::vector> _mem_table_tracker; + std::vector> _mem_table_insert_trackers; + std::vector> _mem_table_flush_trackers; SpinLock _mem_table_tracker_lock; std::atomic _mem_table_num = 1; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5b4868e885..4fcd343047 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -394,6 +394,10 @@ void PipelineFragmentContext::report_profile() { std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); _runtime_state->runtime_profile()->pretty_print(&ss); + if (_runtime_state->load_channel_profile()) { + // _runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + _runtime_state->load_channel_profile()->pretty_print(&ss); + } VLOG_FILE << ss.str(); } @@ -752,6 +756,7 @@ void PipelineFragmentContext::send_report(bool done) { _report_status_cb( {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, + _is_report_success ? _runtime_state->load_channel_profile() : nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, _fragment_instance_id, _backend_num, _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6c905afa9a..54a42740b9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -172,7 +172,8 @@ public: void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } private: - void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done); + void coordinator_callback(const Status& status, RuntimeProfile* profile, + RuntimeProfile* load_channel_profile, bool done); // Id of this query TUniqueId _query_id; @@ -214,7 +215,7 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, _backend_num(backend_num), _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)), + std::placeholders::_3, std::placeholders::_4)), _set_rsc_info(false), _timeout_second(-1), _query_ctx(std::move(query_ctx)), @@ -297,10 +298,10 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const // Also, the reported status will always reflect the most recent execution status, // including the final status when execution finishes. void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, - bool done) { + RuntimeProfile* load_channel_profile, bool done) { _report_status_cb_impl( - {status, profile, done, _coord_addr, _query_id, -1, _fragment_instance_id, _backend_num, - _executor.runtime_state(), + {status, profile, load_channel_profile, done, _coord_addr, _query_id, -1, + _fragment_instance_id, _backend_num, _executor.runtime_state(), std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, std::placeholders::_2)}); @@ -401,7 +402,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__isset.profile = false; } else { req.profile->to_thrift(¶ms.profile); + if (req.load_channel_profile) + req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); params.__isset.profile = true; + params.__isset.loadChannelProfile = true; } if (!req.runtime_state->output_files().empty()) { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 2cc7fcc251..ad7f830a7f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -68,6 +68,7 @@ std::string to_load_error_http_path(const std::string& file_name); struct ReportStatusRequest { const Status& status; RuntimeProfile* profile; + RuntimeProfile* load_channel_profile; bool done; TNetworkAddress coord_addr; TUniqueId query_id; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 3df55c9eff..b29f760486 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -26,16 +26,19 @@ namespace doris { LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr mem_tracker, - int64_t timeout_s, bool is_high_priority, const std::string& sender_ip) + int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, + int64_t backend_id) : _load_id(load_id), _mem_tracker(std::move(mem_tracker)), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(sender_ip) { + _sender_ip(sender_ip), + _backend_id(backend_id) { // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. _last_updated_time.store(time(nullptr)); + _init_profile(); } LoadChannel::~LoadChannel() { @@ -44,6 +47,17 @@ LoadChannel::~LoadChannel() { << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; } +void LoadChannel::_init_profile() { + _profile = std::make_unique("LoadChannels"); + _self_profile = + _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})", + _load_id.to_string(), _sender_ip, _backend_id), + true, true); + _profile->add_child(_self_profile, false, nullptr); + _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); + _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); +} + Status LoadChannel::open(const PTabletWriterOpenRequest& params) { int64_t index_id = params.index_id(); std::shared_ptr channel; @@ -55,7 +69,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _load_id, _is_high_priority)); + channel.reset(new TabletsChannel(key, _load_id, _is_high_priority, _self_profile)); { std::lock_guard l(_tablets_channels_lock); _tablets_channels.insert({index_id, channel}); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 2abf822121..dc23defc4a 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -36,7 +36,9 @@ #include "common/status.h" #include "runtime/memory/mem_tracker.h" #include "runtime/tablets_channel.h" +#include "util/runtime_profile.h" #include "util/spinlock.h" +#include "util/thrift_util.h" #include "util/uid_util.h" namespace doris { @@ -48,7 +50,7 @@ class PTabletWriterOpenRequest; class LoadChannel { public: LoadChannel(const UniqueId& load_id, std::unique_ptr mem_tracker, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip); + bool is_high_priority, const std::string& sender_ip, int64_t backend_id); ~LoadChannel(); // open a new load channel if not exist @@ -135,11 +137,21 @@ protected: return Status::OK(); } + void _init_profile(); + template + // thread safety + void _report_profile(TabletWriterAddResult* response); + private: UniqueId _load_id; // Tracks the total memory consumed by current load job on this BE std::unique_ptr _mem_tracker; + std::unique_ptr _profile; + RuntimeProfile* _self_profile; + RuntimeProfile::Counter* _add_batch_number_counter = nullptr; + RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; + // lock protect the tablets channel map std::mutex _lock; // index id -> tablets channel @@ -160,7 +172,9 @@ private: bool _is_high_priority = false; // the ip where tablet sink locate - std::string _sender_ip = ""; + std::string _sender_ip; + + int64_t _backend_id; }; template @@ -178,19 +192,47 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest& request, // 2. add block to tablets channel if (request.has_block()) { RETURN_IF_ERROR(channel->add_batch(request, response)); + _add_batch_number_counter->update(1); } // 3. handle eos if (request.has_eos() && request.eos()) { st = _handle_eos(channel, request, response); + _report_profile(response); if (!st.ok()) { return st; } + } else if (_add_batch_number_counter->value() % 10 == 1) { + _report_profile(response); } _last_updated_time.store(time(nullptr)); return st; } +template +void LoadChannel::_report_profile(TabletWriterAddResult* response) { + COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); + // TabletSink and LoadChannel in BE are M: N relationship, + // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, + // so usually all LoadChannel runtime profiles are saved on each TabletSink, + // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, + // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, + // and ensures to update the latest LoadChannel profile according to the timestamp. + _self_profile->set_timestamp(_last_updated_time); + + TRuntimeProfileTree tprofile; + _profile->to_thrift(&tprofile); + ThriftSerializer ser(false, 4096); + uint8_t* buf = nullptr; + uint32_t len = 0; + auto st = ser.serialize(&tprofile, &len, &buf); + if (st.ok()) { + response->set_load_channel_profile(std::string((const char*)buf, len)); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; + } +} + inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() << ", last_update_time=" << static_cast(load_channel.last_updated_time()) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index f280e87abc..068ce2261c 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -124,7 +124,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); #endif channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker), - channel_timeout_s, is_high_priority, params.sender_ip())); + channel_timeout_s, is_high_priority, params.sender_ip(), + params.backend_id())); _load_channels.insert({load_id, channel}); } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index ba1db4b69d..d80ce3d43c 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -421,6 +421,10 @@ void PlanFragmentExecutor::report_profile() { std::stringstream ss; profile()->compute_time_in_profile(); profile()->pretty_print(&ss); + if (load_channel_profile()) { + // load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + load_channel_profile()->pretty_print(&ss); + } VLOG_FILE << ss.str(); } @@ -458,7 +462,8 @@ void PlanFragmentExecutor::send_report(bool done) { // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - _report_status_cb(status, _is_report_success ? profile() : nullptr, done || !status.ok()); + _report_status_cb(status, _is_report_success ? profile() : nullptr, + _is_report_success ? load_channel_profile() : nullptr, done || !status.ok()); } void PlanFragmentExecutor::stop_report_thread() { @@ -504,6 +509,10 @@ RuntimeProfile* PlanFragmentExecutor::profile() { return _runtime_state->runtime_profile(); } +RuntimeProfile* PlanFragmentExecutor::load_channel_profile() { + return _runtime_state->load_channel_profile(); +} + void PlanFragmentExecutor::close() { if (_closed) { return; @@ -537,8 +546,12 @@ void PlanFragmentExecutor::close() { // After add the operation, the print out like that: // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%) // We can easily know the exec node execute time without child time consumed. - _runtime_state->runtime_profile()->compute_time_in_profile(); - _runtime_state->runtime_profile()->pretty_print(&ss); + profile()->compute_time_in_profile(); + profile()->pretty_print(&ss); + if (load_channel_profile()) { + // load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer + load_channel_profile()->pretty_print(&ss); + } LOG(INFO) << ss.str(); } LOG(INFO) << "Close() fragment_instance_id=" diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index bc58a861d1..f74256cdcb 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -79,7 +79,8 @@ public: // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - using report_status_callback = std::function; + using report_status_callback = + std::function; // report_status_cb, if !empty(), is used to report the accumulated profile // information periodically during execution (open() or get_next()). @@ -126,6 +127,7 @@ public: // Profile information for plan and output sink. RuntimeProfile* profile(); + RuntimeProfile* load_channel_profile(); const Status& status() const { return _status; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 69a1a560d9..57fb9e8972 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -48,6 +48,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -72,6 +73,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(fragment_exec_params.query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -100,6 +102,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), @@ -125,6 +128,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params, RuntimeState::RuntimeState(const TQueryGlobals& query_globals) : _profile(""), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), @@ -158,6 +162,7 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals) RuntimeState::RuntimeState() : _profile(""), + _load_channel_profile(""), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 82f7263ae2..903e48f3f0 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -113,6 +113,7 @@ public: // Returns runtime state profile RuntimeProfile* runtime_profile() { return &_profile; } + RuntimeProfile* load_channel_profile() { return &_load_channel_profile; } bool enable_function_pushdown() const { return _query_options.__isset.enable_function_pushdown && @@ -395,6 +396,7 @@ private: // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. RuntimeProfile _profile; + RuntimeProfile _load_channel_profile; const DescriptorTbl* _desc_tbl; std::shared_ptr _obj_pool; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index b7f6f24953..5571ab9a87 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -49,13 +49,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic TabletsChannel::_s_tablet_writer_count; TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, - bool is_high_priority) + bool is_high_priority, RuntimeProfile* profile) : _key(key), _state(kInitialized), _load_id(load_id), _closed_senders(64), _is_high_priority(is_high_priority) { static std::once_flag once_flag; + _init_profile(profile); std::call_once(once_flag, [] { REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); }); }); @@ -69,6 +70,25 @@ TabletsChannel::~TabletsChannel() { delete _schema; } +void TabletsChannel::_init_profile(RuntimeProfile* profile) { + _profile = + profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true); + profile->add_child(_profile, false, nullptr); + _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); + + auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true); + _profile->add_child(memory_usage, false, nullptr); + _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES); + _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES); + _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES); + _max_tablet_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES); + _max_tablet_write_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES); + _max_tablet_flush_memory_usage_counter = + memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES); +} + Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { std::lock_guard l(_lock); if (_state == kOpened) { @@ -221,17 +241,33 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, } int64_t TabletsChannel::mem_consumption() { - int64_t mem_usage = 0; + int64_t write_mem_usage = 0; + int64_t flush_mem_usage = 0; + int64_t max_tablet_mem_usage = 0; + int64_t max_tablet_write_mem_usage = 0; + int64_t max_tablet_flush_mem_usage = 0; { std::lock_guard l(_tablet_writers_lock); _mem_consumptions.clear(); for (auto& it : _tablet_writers) { - int64_t writer_mem = it.second->mem_consumption(); - mem_usage += writer_mem; - _mem_consumptions.emplace(writer_mem, it.first); + int64_t write_mem = it.second->mem_consumption(MemType::WRITE); + write_mem_usage += write_mem; + int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH); + flush_mem_usage += flush_mem; + if (write_mem > max_tablet_write_mem_usage) max_tablet_write_mem_usage = write_mem; + if (flush_mem > max_tablet_flush_mem_usage) max_tablet_flush_mem_usage = flush_mem; + if (write_mem + flush_mem > max_tablet_mem_usage) + max_tablet_mem_usage = write_mem + flush_mem; + _mem_consumptions.emplace(write_mem + flush_mem, it.first); } } - return mem_usage; + COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage); + COUNTER_SET(_write_memory_usage_counter, write_mem_usage); + COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage); + COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage); + COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage); + COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage); + return write_mem_usage + flush_mem_usage; } Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { @@ -305,7 +341,7 @@ std::string TabletsChannelKey::to_string() const { } std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { - os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; + os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")"; return os; } @@ -313,6 +349,7 @@ template Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { int64_t cur_seq = 0; + _add_batch_number_counter->update(1); auto status = _get_current_seq(cur_seq, request); if (UNLIKELY(!status.ok())) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 954f1543b0..66c512f3d5 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -33,6 +33,7 @@ #include "common/status.h" #include "util/bitmap.h" +#include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/uid_util.h" @@ -80,7 +81,8 @@ class LoadChannel; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority); + TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, + RuntimeProfile* profile); ~TabletsChannel(); @@ -134,6 +136,7 @@ private: void _add_broken_tablet(int64_t tablet_id); bool _is_broken_tablet(int64_t tablet_id); + void _init_profile(RuntimeProfile* profile); // id of this load channel TabletsChannelKey _key; @@ -189,6 +192,15 @@ private: // mem -> tablet_id // sort by memory size std::multimap> _mem_consumptions; + + RuntimeProfile* _profile; + RuntimeProfile::Counter* _add_batch_number_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _write_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _flush_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_write_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _max_tablet_flush_memory_usage_counter = nullptr; }; template diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 186c61fa56..050b8138ac 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -50,6 +50,7 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile : _pool(new ObjectPool()), _name(name), _metadata(-1), + _timestamp(-1), _is_averaged_profile(is_averaged_profile), _counter_total_time(TUnit::TIME_NS, 0), _local_time_percent(0) { @@ -115,6 +116,7 @@ void RuntimeProfile::merge(RuntimeProfile* other) { child = _pool->add(new RuntimeProfile(other_child->_name)); child->_local_time_percent = other_child->_local_time_percent; child->_metadata = other_child->_metadata; + child->_timestamp = other_child->_timestamp; bool indent_other_child = other->_children[i].second; _child_map[child->_name] = child; _children.push_back(std::make_pair(child, indent_other_child)); @@ -203,6 +205,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* } else { child = _pool->add(new RuntimeProfile(tchild.name)); child->_metadata = tchild.metadata; + child->_timestamp = tchild.timestamp; _child_map[tchild.name] = child; _children.push_back(std::make_pair(child, tchild.indent)); } @@ -639,6 +642,7 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { node.name = _name; node.num_children = _children.size(); node.metadata = _metadata; + node.timestamp = _timestamp; node.indent = true; CounterMap counter_map; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index cd37f9fd43..74f8352a1b 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -352,6 +352,9 @@ public: int64_t metadata() const { return _metadata; } void set_metadata(int64_t md) { _metadata = md; } + time_t timestamp() const { return _timestamp; } + void set_timestamp(time_t ss) { _timestamp = ss; } + // Derived counter function: return measured throughput as input_value/second. static int64_t units_per_second(const Counter* total_counter, const Counter* timer); @@ -410,6 +413,9 @@ private: // user-supplied, uninterpreted metadata. int64_t _metadata; + // The timestamp when the profile was modified, make sure the update is up to date. + time_t _timestamp; + /// True if this profile is an average derived from other profiles. /// All counters in this profile must be of unit AveragedCounter. bool _is_averaged_profile; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f1cba5c0c7..f811775d69 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -406,7 +406,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _build_blocks_memory_usage = ADD_COUNTER(memory_usage, "BuildBlocks", TUnit::BYTES); _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index fb2ade6485..704cd356ae 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -267,7 +267,7 @@ Status VScanNode::_init_profile() { _scanner_profile.reset(new RuntimeProfile("VScanner")); runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); - auto* memory_usage = _scanner_profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _scanner_profile->create_child("PeakMemoryUsage", true, true); _runtime_profile->add_child(memory_usage, false, nullptr); _queued_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 56ec4a585f..12592a72fd 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -324,7 +324,7 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) } Status AggregationNode::prepare_profile(RuntimeState* state) { - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); _serialize_key_arena_memory_usage = diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index dba8f6e126..0deb226c5b 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -168,7 +168,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); - auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 6fb7f36b2a..fd905df958 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -113,7 +113,7 @@ Status VSortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); - auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _runtime_profile->create_child("PeakMemoryUsage", true, true); _runtime_profile->add_child(memory_usage, false, nullptr); _sort_blocks_memory_usage = ADD_COUNTER(memory_usage, "SortBlocks", TUnit::BYTES); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index ec0f4f9d88..c8d95a5053 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -327,7 +327,7 @@ VDataStreamRecvr::VDataStreamRecvr( } // Initialize the counters - auto* memory_usage = _profile->create_child("MemoryUsage", true, true); + auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true); _profile->add_child(memory_usage, false, nullptr); _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 7718d000e0..398e2a0cb2 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -62,6 +62,7 @@ #include "util/telemetry/telemetry.h" #include "util/thread.h" #include "util/threadpool.h" +#include "util/thrift_util.h" #include "util/time.h" #include "util/uid_util.h" #include "vec/columns/column.h" @@ -320,6 +321,7 @@ void VNodeChannel::open() { request.set_is_high_priority(_parent->_is_high_priority); request.set_sender_ip(BackendOptions::get_localhost()); request.set_is_vectorized(true); + request.set_backend_id(_node_id); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -454,6 +456,18 @@ Status VNodeChannel::open_wait() { _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); _add_batch_counter.add_batch_num++; } + if (result.has_load_channel_profile()) { + TRuntimeProfileTree tprofile; + const uint8_t* buf = (const uint8_t*)result.load_channel_profile().data(); + uint32_t len = result.load_channel_profile().size(); + auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); + if (st.ok()) { + _state->load_channel_profile()->update(tprofile); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" + << st; + } + } }); return status; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java index e2bea7e39c..56a14e5b7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java @@ -132,7 +132,7 @@ public class ProfileTreeBuilder { public void build() throws UserException { reset(); checkProfile(); - analyzeAndBuildFragmentTrees(); + analyzeAndBuild(); assembleFragmentTrees(); } @@ -151,8 +151,33 @@ public class ProfileTreeBuilder { } } - private void analyzeAndBuildFragmentTrees() throws UserException { + private void analyzeAndBuild() throws UserException { List> childrenFragment = profileRoot.getChildList(); + for (Pair pair : childrenFragment) { + String name = pair.first.getName(); + if (name.equals("Fragments")) { + analyzeAndBuildFragmentTrees(pair.first); + } else if (name.equals("LoadChannels")) { + analyzeAndBuildLoadChannels(pair.first); + } else { + throw new UserException("Invalid execution profile name: " + name); + } + } + } + + private void analyzeAndBuildLoadChannels(RuntimeProfile loadChannelsProfile) throws UserException { + List> childrenFragment = loadChannelsProfile.getChildList(); + for (Pair pair : childrenFragment) { + analyzeAndBuildLoadChannel(pair.first); + } + } + + private void analyzeAndBuildLoadChannel(RuntimeProfile loadChannelsProfil) throws UserException { + // TODO, `show load profile` add load channel profile, or add `show load channel profile`. + } + + private void analyzeAndBuildFragmentTrees(RuntimeProfile fragmentsProfile) throws UserException { + List> childrenFragment = fragmentsProfile.getChildList(); for (Pair pair : childrenFragment) { analyzeAndBuildFragmentTree(pair.first); } @@ -191,7 +216,7 @@ public class ProfileTreeBuilder { fragmentTreeRoot = instanceTreeRoot; } - // 2. Build tree for each single instance + // 3. Build tree for each single instance int i = 0; Map instanceTrees = Maps.newHashMap(); for (Pair pair : fragmentChildren) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index e3e61faf18..9bb49363ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -64,6 +64,8 @@ public class RuntimeProfile { private String name; + private Long timestamp = -1L; + public RuntimeProfile(String name) { this(); this.name = name; @@ -137,6 +139,11 @@ public class RuntimeProfile { // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { TRuntimeProfileNode node = nodes.get(idx.getRef()); + // Make sure to update the latest LoadChannel profile according to the timestamp. + if (node.timestamp != -1 && node.timestamp < timestamp) { + return; + } + Preconditions.checkState(timestamp == -1 || node.timestamp != -1); // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index af0801e0ad..b83913c465 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -191,7 +191,9 @@ public class Coordinator { private RuntimeProfile queryProfile; + private RuntimeProfile fragmentsProfile; private List fragmentProfile; + private RuntimeProfile loadChannelProfile; private ProfileWriter profileWriter; @@ -526,12 +528,17 @@ public class Coordinator { int fragmentSize = fragments.size(); queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); + fragmentsProfile = new RuntimeProfile("Fragments"); + queryProfile.addChild(fragmentsProfile); fragmentProfile = new ArrayList(); for (int i = 0; i < fragmentSize; i++) { fragmentProfile.add(new RuntimeProfile("Fragment " + i)); - queryProfile.addChild(fragmentProfile.get(i)); + fragmentsProfile.addChild(fragmentProfile.get(i)); } + loadChannelProfile = new RuntimeProfile("LoadChannels"); + queryProfile.addChild(loadChannelProfile); + this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { LOG.debug("idToBackend size={}", idToBackend.size()); @@ -729,7 +736,7 @@ public class Coordinator { for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID); + profileFragmentId, tParam, this.addressToBackendID, loadChannelProfile); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -2562,6 +2569,7 @@ public class Coordinator { boolean hasCanceled; int profileFragmentId; RuntimeProfile profile; + RuntimeProfile loadChannelProfile; TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; @@ -2569,7 +2577,8 @@ public class Coordinator { TUniqueId instanceId; public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId, - TExecPlanFragmentParams rpcParams, Map addressToBackendID) { + TExecPlanFragmentParams rpcParams, Map addressToBackendID, + RuntimeProfile loadChannelProfile) { this.profileFragmentId = profileFragmentId; this.fragmentId = fragmentId; this.rpcParams = rpcParams; @@ -2582,6 +2591,7 @@ public class Coordinator { this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort()); String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; + this.loadChannelProfile = loadChannelProfile; this.profile = new RuntimeProfile(name); this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); @@ -2611,6 +2621,9 @@ public class Coordinator { if (params.isSetProfile()) { profile.update(params.profile); } + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); + } this.done = params.done; if (statsErrorEstimator != null) { statsErrorEstimator.updateExactReturnedRows(params); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index fb01e2adb4..d41a670052 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -83,6 +83,7 @@ message PTabletWriterOpenRequest { optional bool is_high_priority = 10 [default = false]; optional string sender_ip = 11 [default = ""]; optional bool is_vectorized = 12 [default = false]; + optional int64 backend_id = 13 [default = -1]; }; message PTabletWriterOpenResult { @@ -178,6 +179,7 @@ message PTabletWriterAddBlockResult { optional int64 wait_execution_time_us = 5; repeated PTabletError tablet_errors = 6; map success_slave_tablet_node_ids = 7; + optional bytes load_channel_profile = 8; }; // tablet writer cancel diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b1fc35734a..54ec33c360 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -431,6 +431,8 @@ struct TReportExecStatusParams { 19: optional i32 fragment_id 20: optional PaloInternalService.TQueryType query_type + + 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile } struct TFeResult { diff --git a/gensrc/thrift/RuntimeProfile.thrift b/gensrc/thrift/RuntimeProfile.thrift index 2b5a470136..365050954d 100644 --- a/gensrc/thrift/RuntimeProfile.thrift +++ b/gensrc/thrift/RuntimeProfile.thrift @@ -49,6 +49,8 @@ struct TRuntimeProfileNode { // map from parent counter name to child counter name 8: required map> child_counters_map + + 9: required i64 timestamp } // A flattened tree of runtime profiles, obtained by an