[refactor](profilev2) add BlocksProduced RowsProduced counter #27291

This commit is contained in:
Mryange
2023-11-21 12:01:11 +08:00
committed by GitHub
parent 3e8177bbbd
commit d809bee46e
4 changed files with 12 additions and 1 deletions

View File

@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
SourceState source_state) {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
bool all_receiver_eof = true;

View File

@ -146,6 +146,7 @@ public:
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; }
RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; }
@ -192,6 +193,7 @@ private:
RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr;
RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _rows_sent_counter = nullptr;
// Throughput per total time spent in sender
RuntimeProfile::Counter* _overall_throughput = nullptr;
// Used to counter send bytes under local data exchange

View File

@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
state->execution_timeout()));
_result_sink_dependency =
ResultSinkDependency::create_shared(_parent->operator_id(), _parent->node_id());
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
return Status::OK();
}
@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.blocks_sent_counter(), 1);
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}

View File

@ -62,6 +62,8 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
WriteDependency* dependency() override { return _result_sink_dependency.get(); }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
private:
friend class ResultSinkOperatorX;
@ -71,6 +73,8 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _rows_sent_counter = nullptr;
};
class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> {