[pipelineX](profile) Add necessary metrics (#24820)
This commit is contained in:
@ -177,7 +177,6 @@ class MultiCastDataStreamSinkLocalState final
|
||||
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
|
||||
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
|
||||
: Base(parent, state) {}
|
||||
std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer();
|
||||
friend class MultiCastDataStreamSinkOperatorX;
|
||||
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
|
||||
using Base = PipelineXSinkLocalState<MultiCastDependency>;
|
||||
@ -209,6 +208,7 @@ public:
|
||||
SourceState source_state) override {
|
||||
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
|
||||
SCOPED_TIMER(local_state.profile()->total_time_counter());
|
||||
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
|
||||
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
|
||||
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
|
||||
auto st = local_state._shared_state->_multi_cast_data_streamer->push(
|
||||
|
||||
@ -33,6 +33,8 @@ Status PartitionSortSourceLocalState::close(RuntimeState* state) {
|
||||
if (_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
SCOPED_TIMER(_close_timer);
|
||||
_shared_state->previous_row = nullptr;
|
||||
_shared_state->partition_sorts.clear();
|
||||
return PipelineXLocalState<PartitionSortDependency>::close(state);
|
||||
@ -42,6 +44,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
|
||||
SourceState& source_state) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
|
||||
SCOPED_TIMER(local_state.profile()->total_time_counter());
|
||||
output_block->clear_column_data();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
|
||||
|
||||
@ -61,6 +61,8 @@ public:
|
||||
|
||||
Status init(RuntimeState* state, LocalStateInfo& info) override {
|
||||
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
SCOPED_TIMER(_open_timer);
|
||||
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
|
||||
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
|
||||
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
|
||||
|
||||
@ -698,6 +698,9 @@ public:
|
||||
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
|
||||
_async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id());
|
||||
_writer->set_dependency(_async_writer_dependency.get());
|
||||
|
||||
_wait_for_dependency_timer = ADD_TIMER(
|
||||
_profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -717,6 +720,8 @@ public:
|
||||
if (_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
COUNTER_SET(_wait_for_dependency_timer,
|
||||
_async_writer_dependency->write_watcher_elapse_time());
|
||||
if (_writer->need_normal_close()) {
|
||||
if (exec_status.ok() && !state->is_cancelled()) {
|
||||
RETURN_IF_ERROR(_writer->commit_trans());
|
||||
|
||||
Reference in New Issue
Block a user