[fix](load_profile) fix rows stat and add close_wait in sink (#20181)

This commit is contained in:
Yongqiang YANG
2023-05-31 18:23:30 +08:00
committed by GitHub
parent 1aefc26ca0
commit 6ee99c4138
7 changed files with 67 additions and 29 deletions

View File

@ -349,16 +349,16 @@ void DeltaWriter::_reset_mem_table() {
COUNTER_UPDATE(_segment_num, 1);
_mem_table->set_callback([this](MemTableStat& stat) {
_memtable_stat += stat;
COUNTER_UPDATE(_sort_timer, _memtable_stat.sort_ns);
COUNTER_UPDATE(_agg_timer, _memtable_stat.agg_ns);
COUNTER_UPDATE(_memtable_duration_timer, _memtable_stat.duration_ns);
COUNTER_UPDATE(_segment_writer_timer, _memtable_stat.segment_writer_ns);
COUNTER_UPDATE(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
COUNTER_UPDATE(_put_into_output_timer, _memtable_stat.put_into_output_ns);
COUNTER_UPDATE(_sort_times, _memtable_stat.sort_times);
COUNTER_UPDATE(_agg_times, _memtable_stat.agg_times);
COUNTER_UPDATE(_raw_rows_num, _memtable_stat.raw_rows);
COUNTER_UPDATE(_merged_rows_num, _memtable_stat.merged_rows);
COUNTER_SET(_sort_timer, _memtable_stat.sort_ns);
COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
COUNTER_SET(_sort_times, _memtable_stat.sort_times);
COUNTER_SET(_agg_times, _memtable_stat.agg_times);
COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows);
COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows);
});
}

View File

@ -106,8 +106,8 @@ public:
int64_t delete_bitmap_ns = 0;
int64_t segment_writer_ns = 0;
int64_t duration_ns = 0;
int32_t sort_times = 0;
int32_t agg_times = 0;
int64_t sort_times = 0;
int64_t agg_times = 0;
};
class MemTable {

View File

@ -60,6 +60,7 @@ void LoadChannel::_init_profile() {
_add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
_peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES);
_add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
_handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
_add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
}

View File

@ -128,6 +128,7 @@ protected:
Status _handle_eos(std::shared_ptr<TabletsChannel>& channel,
const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response) {
_self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id()));
bool finished = false;
auto index_id = request.index_id();
RETURN_IF_ERROR(channel->close(
@ -163,6 +164,7 @@ private:
RuntimeProfile::Counter* _add_batch_times = nullptr;
RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _handle_eos_timer = nullptr;
// lock protect the tablets channel map
std::mutex _lock;

View File

@ -630,7 +630,6 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
nodes->push_back(TRuntimeProfileNode());
TRuntimeProfileNode& node = (*nodes)[index];
node.name = _name;
node.num_children = _children.size();
node.metadata = _metadata;
node.timestamp = _timestamp;
node.indent = true;
@ -662,6 +661,7 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
std::lock_guard<std::mutex> l(_children_lock);
children = _children;
}
node.num_children = children.size();
for (int i = 0; i < children.size(); ++i) {
int child_idx = nodes->size();

View File

@ -569,7 +569,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
// It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close().
while (!_cancelled && _pending_batches_num > 0 &&
_pending_batches_bytes > _max_pending_batches_bytes) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
@ -579,6 +579,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
std::unique_ptr<Payload> temp_payload = nullptr;
if (_index_channel != nullptr && _index_channel->get_where_clause() != nullptr) {
SCOPED_RAW_TIMER(&_stat.where_clause_ns);
temp_payload.reset(new Payload(
std::unique_ptr<vectorized::IColumn::Selector>(new vectorized::IColumn::Selector()),
std::vector<int64_t>()));
@ -622,6 +623,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
}
}
SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
if (is_append) {
// Do not split the data of the block by tablets but append it to a single delta writer.
// This is a faster way to send block than append_block_by_selector
@ -1055,6 +1057,10 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
_row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime");
_filter_timer = ADD_CHILD_TIMER(_profile, "FilterTime", "SendDataTime");
_where_clause_timer = ADD_CHILD_TIMER(_profile, "WhereClauseTime", "SendDataTime");
_append_node_channel_timer = ADD_CHILD_TIMER(_profile, "AppendNodeChannelTime", "SendDataTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
@ -1316,6 +1322,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
// Recaculate is needed
_partition_to_tablet_map.clear();
}
_row_distribution_watch.start();
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
continue;
@ -1336,11 +1343,13 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
_open_partition(partition);
}
}
_row_distribution_watch.stop();
// Random distribution and the block belongs to a single tablet, we could optimize to append the whole
// block into node channel.
bool load_block_to_single_tablet =
!_schema->is_dynamic_schema() && _partition_to_tablet_map.size() == 1;
if (load_block_to_single_tablet) {
SCOPED_RAW_TIMER(&_filter_ns);
// clear and release the references of columns
input_block->clear();
// Filter block
@ -1383,6 +1392,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return _close_status;
}
SCOPED_TIMER(_close_timer);
vectorized::VExpr::close(_output_vexpr_ctxs, state);
Status status = exec_status;
if (status.ok()) {
@ -1391,12 +1401,12 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(_profile->total_time_counter());
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0,
actual_consume_ns = 0, total_add_batch_exec_time_ns = 0,
max_add_batch_exec_time_ns = 0, total_wait_exec_time_ns = 0,
max_wait_exec_time_ns = 0, total_add_batch_num = 0, num_node_channels = 0;
int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0,
total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 0,
total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0,
num_node_channels = 0;
VNodeChannelStat channel_stat;
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel(
[](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); });
@ -1408,7 +1418,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
int64_t wait_exec_time = 0;
index_channel->for_each_node_channel(
[&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns, &actual_consume_ns,
&channel_stat, &queue_push_lock_ns, &actual_consume_ns,
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_wait_exec_time_ns, &wait_exec_time,
&total_add_batch_num](const std::shared_ptr<VNodeChannel>& ch) {
@ -1423,10 +1433,10 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
<< ", close channel failed, err: " << err_msg;
}
ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns,
&actual_consume_ns, &total_add_batch_exec_time_ns,
&add_batch_exec_time, &total_wait_exec_time_ns,
&wait_exec_time, &total_add_batch_num);
&channel_stat, &queue_push_lock_ns, &actual_consume_ns,
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_wait_exec_time_ns, &wait_exec_time,
&total_add_batch_num);
});
if (add_batch_exec_time > max_add_batch_exec_time_ns) {
@ -1447,7 +1457,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
} // end for index channels
}
// TODO need to be improved
LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns
LOG(INFO) << "total mem_exceeded_block_ns=" << channel_stat.mem_exceeded_block_ns
<< ", total queue_push_lock_ns=" << queue_push_lock_ns
<< ", total actual_consume_ns=" << actual_consume_ns
<< ", load id=" << print_id(_load_id);
@ -1456,7 +1466,11 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_filter_timer, _filter_ns);
COUNTER_SET(_append_node_channel_timer, channel_stat.append_node_channel_ns);
COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns);
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);

View File

@ -195,6 +195,20 @@ class VOlapTableSink;
// pair<row_id,tablet_id>
using Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>;
class VNodeChannelStat {
public:
VNodeChannelStat& operator+=(const VNodeChannelStat& stat) {
mem_exceeded_block_ns += stat.mem_exceeded_block_ns;
where_clause_ns += stat.where_clause_ns;
append_node_channel_ns += stat.append_node_channel_ns;
return *this;
};
int64_t mem_exceeded_block_ns = 0;
int64_t where_clause_ns = 0;
int64_t append_node_channel_ns = 0;
};
class VNodeChannel {
public:
VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t node_id);
@ -238,7 +252,7 @@ public:
void cancel(const std::string& cancel_msg);
void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns,
int64_t* serialize_batch_ns, VNodeChannelStat* stat,
int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns,
int64_t* total_wait_exec_time_ns, int64_t* wait_exec_time_ns,
@ -246,7 +260,7 @@ public:
(*add_batch_counter_map)[_node_id] += _add_batch_counter;
(*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
*serialize_batch_ns += _serialize_batch_ns;
*mem_exceeded_block_ns += _mem_exceeded_block_ns;
*stat += _stat;
*queue_push_lock_ns += _queue_push_lock_ns;
*actual_consume_ns += _actual_consume_ns;
*add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000);
@ -325,10 +339,10 @@ protected:
AddBatchCounter _add_batch_counter;
std::atomic<int64_t> _serialize_batch_ns {0};
std::atomic<int64_t> _mem_exceeded_block_ns {0};
std::atomic<int64_t> _queue_push_lock_ns {0};
std::atomic<int64_t> _actual_consume_ns {0};
VNodeChannelStat _stat;
// lock to protect _is_closed.
// The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel.
// However, this rpc callback may occur after the whole task is finished (e.g. due to network latency),
@ -563,11 +577,18 @@ private:
int64_t _number_output_rows = 0;
int64_t _number_filtered_rows = 0;
int64_t _number_immutable_partition_filtered_rows = 0;
int64_t _filter_ns = 0;
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
RuntimeProfile::Counter* _row_distribution_timer = nullptr;
RuntimeProfile::Counter* _append_node_channel_timer = nullptr;
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _where_clause_timer = nullptr;
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;