diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index f72f2e9ceb..e198ad09a6 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -349,16 +349,16 @@ void DeltaWriter::_reset_mem_table() { COUNTER_UPDATE(_segment_num, 1); _mem_table->set_callback([this](MemTableStat& stat) { _memtable_stat += stat; - COUNTER_UPDATE(_sort_timer, _memtable_stat.sort_ns); - COUNTER_UPDATE(_agg_timer, _memtable_stat.agg_ns); - COUNTER_UPDATE(_memtable_duration_timer, _memtable_stat.duration_ns); - COUNTER_UPDATE(_segment_writer_timer, _memtable_stat.segment_writer_ns); - COUNTER_UPDATE(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns); - COUNTER_UPDATE(_put_into_output_timer, _memtable_stat.put_into_output_ns); - COUNTER_UPDATE(_sort_times, _memtable_stat.sort_times); - COUNTER_UPDATE(_agg_times, _memtable_stat.agg_times); - COUNTER_UPDATE(_raw_rows_num, _memtable_stat.raw_rows); - COUNTER_UPDATE(_merged_rows_num, _memtable_stat.merged_rows); + COUNTER_SET(_sort_timer, _memtable_stat.sort_ns); + COUNTER_SET(_agg_timer, _memtable_stat.agg_ns); + COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns); + COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns); + COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns); + COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns); + COUNTER_SET(_sort_times, _memtable_stat.sort_times); + COUNTER_SET(_agg_times, _memtable_stat.agg_times); + COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows); + COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows); }); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 41e7ffa9ad..b99127bbf6 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -106,8 +106,8 @@ public: int64_t delete_bitmap_ns = 0; int64_t segment_writer_ns = 0; int64_t duration_ns = 0; - int32_t sort_times = 0; - int32_t agg_times = 0; + int64_t sort_times = 0; + int64_t agg_times = 0; }; class MemTable { diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index d51accacd4..24701c04b3 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -60,6 +60,7 @@ void LoadChannel::_init_profile() { _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime"); + _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime"); _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT); } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 6c75699a7a..e746014fde 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -128,6 +128,7 @@ protected: Status _handle_eos(std::shared_ptr& channel, const PTabletWriterAddBlockRequest& request, PTabletWriterAddBlockResult* response) { + _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id())); bool finished = false; auto index_id = request.index_id(); RETURN_IF_ERROR(channel->close( @@ -163,6 +164,7 @@ private: RuntimeProfile::Counter* _add_batch_times = nullptr; RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr; RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr; + RuntimeProfile::Counter* _handle_eos_timer = nullptr; // lock protect the tablets channel map std::mutex _lock; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index c7ccf802db..b7cd3a63af 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -630,7 +630,6 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { nodes->push_back(TRuntimeProfileNode()); TRuntimeProfileNode& node = (*nodes)[index]; node.name = _name; - node.num_children = _children.size(); node.metadata = _metadata; node.timestamp = _timestamp; node.indent = true; @@ -662,6 +661,7 @@ void RuntimeProfile::to_thrift(std::vector* nodes) { std::lock_guard l(_children_lock); children = _children; } + node.num_children = children.size(); for (int i = 0; i < children.size(); ++i) { int child_idx = nodes->size(); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index a934ce86cf..9533ad639e 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -569,7 +569,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close(). while (!_cancelled && _pending_batches_num > 0 && _pending_batches_bytes > _max_pending_batches_bytes) { - SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); + SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -579,6 +579,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, std::unique_ptr temp_payload = nullptr; if (_index_channel != nullptr && _index_channel->get_where_clause() != nullptr) { + SCOPED_RAW_TIMER(&_stat.where_clause_ns); temp_payload.reset(new Payload( std::unique_ptr(new vectorized::IColumn::Selector()), std::vector())); @@ -622,6 +623,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, } } + SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); if (is_append) { // Do not split the data of the block by tablets but append it to a single delta writer. // This is a faster way to send block than append_block_by_selector @@ -1055,6 +1057,10 @@ Status VOlapTableSink::prepare(RuntimeState* state) { _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); + _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime"); + _filter_timer = ADD_CHILD_TIMER(_profile, "FilterTime", "SendDataTime"); + _where_clause_timer = ADD_CHILD_TIMER(_profile, "WhereClauseTime", "SendDataTime"); + _append_node_channel_timer = ADD_CHILD_TIMER(_profile, "AppendNodeChannelTime", "SendDataTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); @@ -1316,6 +1322,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, // Recaculate is needed _partition_to_tablet_map.clear(); } + _row_distribution_watch.start(); for (int i = 0; i < num_rows; ++i) { if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { continue; @@ -1336,11 +1343,13 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, _open_partition(partition); } } + _row_distribution_watch.stop(); // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = !_schema->is_dynamic_schema() && _partition_to_tablet_map.size() == 1; if (load_block_to_single_tablet) { + SCOPED_RAW_TIMER(&_filter_ns); // clear and release the references of columns input_block->clear(); // Filter block @@ -1383,6 +1392,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return _close_status; } + SCOPED_TIMER(_close_timer); vectorized::VExpr::close(_output_vexpr_ctxs, state); Status status = exec_status; if (status.ok()) { @@ -1391,12 +1401,12 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_profile->total_time_counter()); // BE id -> add_batch method counter std::unordered_map node_add_batch_counter_map; - int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, - actual_consume_ns = 0, total_add_batch_exec_time_ns = 0, - max_add_batch_exec_time_ns = 0, total_wait_exec_time_ns = 0, - max_wait_exec_time_ns = 0, total_add_batch_num = 0, num_node_channels = 0; + int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0, + total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 0, + total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0, + num_node_channels = 0; + VNodeChannelStat channel_stat; { - SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { index_channel->for_each_node_channel( [](const std::shared_ptr& ch) { ch->mark_close(); }); @@ -1408,7 +1418,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { int64_t wait_exec_time = 0; index_channel->for_each_node_channel( [&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns, - &mem_exceeded_block_ns, &queue_push_lock_ns, &actual_consume_ns, + &channel_stat, &queue_push_lock_ns, &actual_consume_ns, &total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns, &wait_exec_time, &total_add_batch_num](const std::shared_ptr& ch) { @@ -1423,10 +1433,10 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { << ", close channel failed, err: " << err_msg; } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, - &mem_exceeded_block_ns, &queue_push_lock_ns, - &actual_consume_ns, &total_add_batch_exec_time_ns, - &add_batch_exec_time, &total_wait_exec_time_ns, - &wait_exec_time, &total_add_batch_num); + &channel_stat, &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, + &total_wait_exec_time_ns, &wait_exec_time, + &total_add_batch_num); }); if (add_batch_exec_time > max_add_batch_exec_time_ns) { @@ -1447,7 +1457,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { } // end for index channels } // TODO need to be improved - LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns + LOG(INFO) << "total mem_exceeded_block_ns=" << channel_stat.mem_exceeded_block_ns << ", total queue_push_lock_ns=" << queue_push_lock_ns << ", total actual_consume_ns=" << actual_consume_ns << ", load id=" << print_id(_load_id); @@ -1456,7 +1466,11 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); - COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns); + COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); + COUNTER_SET(_filter_timer, _filter_ns); + COUNTER_SET(_append_node_channel_timer, channel_stat.append_node_channel_ns); + COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns); + COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index d6ac07068e..f9edeff693 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -195,6 +195,20 @@ class VOlapTableSink; // pair using Payload = std::pair, std::vector>; +class VNodeChannelStat { +public: + VNodeChannelStat& operator+=(const VNodeChannelStat& stat) { + mem_exceeded_block_ns += stat.mem_exceeded_block_ns; + where_clause_ns += stat.where_clause_ns; + append_node_channel_ns += stat.append_node_channel_ns; + return *this; + }; + + int64_t mem_exceeded_block_ns = 0; + int64_t where_clause_ns = 0; + int64_t append_node_channel_ns = 0; +}; + class VNodeChannel { public: VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t node_id); @@ -238,7 +252,7 @@ public: void cancel(const std::string& cancel_msg); void time_report(std::unordered_map* add_batch_counter_map, - int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, + int64_t* serialize_batch_ns, VNodeChannelStat* stat, int64_t* queue_push_lock_ns, int64_t* actual_consume_ns, int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns, int64_t* total_wait_exec_time_ns, int64_t* wait_exec_time_ns, @@ -246,7 +260,7 @@ public: (*add_batch_counter_map)[_node_id] += _add_batch_counter; (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms; *serialize_batch_ns += _serialize_batch_ns; - *mem_exceeded_block_ns += _mem_exceeded_block_ns; + *stat += _stat; *queue_push_lock_ns += _queue_push_lock_ns; *actual_consume_ns += _actual_consume_ns; *add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000); @@ -325,10 +339,10 @@ protected: AddBatchCounter _add_batch_counter; std::atomic _serialize_batch_ns {0}; - std::atomic _mem_exceeded_block_ns {0}; std::atomic _queue_push_lock_ns {0}; std::atomic _actual_consume_ns {0}; + VNodeChannelStat _stat; // lock to protect _is_closed. // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency), @@ -563,11 +577,18 @@ private: int64_t _number_output_rows = 0; int64_t _number_filtered_rows = 0; int64_t _number_immutable_partition_filtered_rows = 0; + int64_t _filter_ns = 0; + + MonotonicStopWatch _row_distribution_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; RuntimeProfile::Counter* _output_rows_counter = nullptr; RuntimeProfile::Counter* _filtered_rows_counter = nullptr; RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _row_distribution_timer = nullptr; + RuntimeProfile::Counter* _append_node_channel_timer = nullptr; + RuntimeProfile::Counter* _filter_timer = nullptr; + RuntimeProfile::Counter* _where_clause_timer = nullptr; RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr;