[pipelineX](partition sort) Add some nessacery metrics (#32020)
Add some necessary metrics
This commit is contained in:
@ -45,6 +45,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
|
||||
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
|
||||
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
|
||||
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
|
||||
_passthrough_rows_counter = ADD_COUNTER(_profile, "PassThroughRowsCounter", TUnit::UNIT);
|
||||
_partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
|
||||
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
|
||||
p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
|
||||
@ -60,7 +61,11 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
|
||||
_pool(pool),
|
||||
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
|
||||
_limit(tnode.limit),
|
||||
_topn_phase(tnode.partition_sort_node.ptopn_phase) {}
|
||||
_partition_exprs_num(tnode.partition_sort_node.partition_exprs.size()),
|
||||
_topn_phase(tnode.partition_sort_node.ptopn_phase),
|
||||
_has_global_limit(tnode.partition_sort_node.has_global_limit),
|
||||
_top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
|
||||
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
|
||||
|
||||
Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
|
||||
@ -75,12 +80,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
|
||||
if (tnode.partition_sort_node.__isset.partition_exprs) {
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
|
||||
tnode.partition_sort_node.partition_exprs, _partition_expr_ctxs));
|
||||
_partition_exprs_num = _partition_expr_ctxs.size();
|
||||
}
|
||||
|
||||
_has_global_limit = tnode.partition_sort_node.has_global_limit;
|
||||
_top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
|
||||
_partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -101,15 +102,14 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
auto& local_state = get_local_state(state);
|
||||
auto current_rows = input_block->rows();
|
||||
SCOPED_TIMER(local_state.exec_time_counter());
|
||||
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
|
||||
if (current_rows > 0) {
|
||||
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
|
||||
local_state.child_input_rows = local_state.child_input_rows + current_rows;
|
||||
if (UNLIKELY(_partition_exprs_num == 0)) {
|
||||
if (UNLIKELY(local_state._value_places.empty())) {
|
||||
local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks(
|
||||
local_state._partition_sort_info, local_state._value_places.empty())));
|
||||
}
|
||||
//no partition key
|
||||
local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc());
|
||||
} else {
|
||||
//just simply use partition num to check
|
||||
@ -118,6 +118,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
local_state._num_partition > config::partition_topn_partition_threshold &&
|
||||
local_state.child_input_rows < 10000 * local_state._num_partition) {
|
||||
{
|
||||
COUNTER_UPDATE(local_state._passthrough_rows_counter,
|
||||
(int64_t)input_block->rows());
|
||||
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
|
||||
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
|
||||
// buffer have data, source could read this.
|
||||
|
||||
@ -78,8 +78,8 @@ private:
|
||||
RuntimeProfile::Counter* _build_timer = nullptr;
|
||||
RuntimeProfile::Counter* _emplace_key_timer = nullptr;
|
||||
RuntimeProfile::Counter* _selector_block_timer = nullptr;
|
||||
|
||||
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
|
||||
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
|
||||
void _init_hash_method();
|
||||
};
|
||||
|
||||
@ -108,19 +108,18 @@ private:
|
||||
friend class PartitionSortSinkLocalState;
|
||||
ObjectPool* _pool = nullptr;
|
||||
const RowDescriptor _row_descriptor;
|
||||
int64_t _limit = -1;
|
||||
int _partition_exprs_num = 0;
|
||||
const int64_t _limit = -1;
|
||||
const int _partition_exprs_num = 0;
|
||||
const TPartTopNPhase::type _topn_phase;
|
||||
const bool _has_global_limit = false;
|
||||
const TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
|
||||
const int64_t _partition_inner_limit = 0;
|
||||
|
||||
vectorized::VExprContextSPtrs _partition_expr_ctxs;
|
||||
|
||||
TPartTopNPhase::type _topn_phase;
|
||||
|
||||
// Expressions and parameters used for build _sort_description
|
||||
vectorized::VSortExecExprs _vsort_exec_exprs;
|
||||
std::vector<bool> _is_asc_order;
|
||||
std::vector<bool> _nulls_first;
|
||||
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
|
||||
bool _has_global_limit = false;
|
||||
int64_t _partition_inner_limit = 0;
|
||||
|
||||
Status _split_block_by_partition(vectorized::Block* input_block,
|
||||
PartitionSortSinkLocalState& local_state, bool eos);
|
||||
|
||||
@ -60,6 +60,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
|
||||
local_state._dependency->block();
|
||||
}
|
||||
}
|
||||
if (!output_block->empty()) {
|
||||
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
|
||||
COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
@ -78,6 +82,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
|
||||
*eos = local_state._shared_state->blocks_buffer.empty() &&
|
||||
local_state._sort_idx >= local_state._shared_state->partition_sorts.size();
|
||||
}
|
||||
if (!output_block->empty()) {
|
||||
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
|
||||
COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user