Currently, there are many profiles using add child profile to orgnanize profile into blocks. But it is wrong. Child profile will have a total time counter. Actually, what we should use is just a label.
- MemoryUsage:
- HashTable: 23.98 KB
- SerializeKeyArena: 446.75 KB
Add a new macro ADD_LABEL_COUNTER to add just a label in the profile.
---------
Co-authored-by: yiguolei <yiguolei@gmail.com>
193 lines
6.9 KiB
C++
193 lines
6.9 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#pragma once
|
|
|
|
#include <gen_cpp/PlanNodes_types.h>
|
|
#include <gen_cpp/Types_types.h>
|
|
#include <stddef.h>
|
|
#include <stdint.h>
|
|
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "common/global_types.h"
|
|
#include "common/status.h"
|
|
#include "exec/exec_node.h"
|
|
#include "util/runtime_profile.h"
|
|
#include "vec/aggregate_functions/aggregate_function.h"
|
|
#include "vec/columns/column.h"
|
|
#include "vec/common/arena.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/data_types/data_type.h"
|
|
#include "vec/exprs/vexpr_fwd.h"
|
|
|
|
namespace doris {
|
|
class DescriptorTbl;
|
|
class ObjectPool;
|
|
class RuntimeState;
|
|
class TupleDescriptor;
|
|
|
|
} // namespace doris
|
|
|
|
namespace doris::vectorized {
|
|
|
|
struct BlockRowPos {
|
|
BlockRowPos() : block_num(0), row_num(0), pos(0) {}
|
|
int64_t block_num; //the pos at which block
|
|
int64_t row_num; //the pos at which row
|
|
int64_t pos; //pos = all blocks size + row_num
|
|
std::string debug_string() {
|
|
std::string res = "\t block_num: ";
|
|
res += std::to_string(block_num);
|
|
res += "\t row_num: ";
|
|
res += std::to_string(row_num);
|
|
res += "\t pos: ";
|
|
res += std::to_string(pos);
|
|
return res;
|
|
}
|
|
};
|
|
|
|
class AggFnEvaluator;
|
|
|
|
class VAnalyticEvalNode : public ExecNode {
|
|
public:
|
|
~VAnalyticEvalNode() override = default;
|
|
VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
|
|
|
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
|
Status prepare(RuntimeState* state) override;
|
|
Status open(RuntimeState* state) override;
|
|
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
|
|
Status close(RuntimeState* state) override;
|
|
Status alloc_resource(RuntimeState* state) override;
|
|
void release_resource(RuntimeState* state) override;
|
|
Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override;
|
|
Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
|
|
bool can_read();
|
|
bool can_write();
|
|
|
|
protected:
|
|
using ExecNode::debug_string;
|
|
virtual std::string debug_string();
|
|
|
|
private:
|
|
Status _get_next_for_rows(size_t rows);
|
|
Status _get_next_for_range(size_t rows);
|
|
Status _get_next_for_partition(size_t rows);
|
|
|
|
void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start,
|
|
int64_t frame_end);
|
|
|
|
Status _reset_agg_status();
|
|
Status _init_result_columns();
|
|
Status _create_agg_status();
|
|
Status _destroy_agg_status();
|
|
Status _insert_range_column(vectorized::Block* block, const VExprContextSPtr& expr,
|
|
IColumn* dst_column, size_t length);
|
|
|
|
void _update_order_by_range();
|
|
bool _init_next_partition(BlockRowPos found_partition_end);
|
|
void _insert_result_info(int64_t current_block_rows);
|
|
Status _output_current_block(Block* block);
|
|
BlockRowPos _get_partition_by_end();
|
|
BlockRowPos _compare_row_to_find_end(int idx, BlockRowPos start, BlockRowPos end,
|
|
bool need_check_first = false);
|
|
|
|
Status _fetch_next_block_data(RuntimeState* state);
|
|
Status _consumed_block_and_init_partition(RuntimeState* state, bool* next_partition, bool* eos);
|
|
bool whether_need_next_partition(BlockRowPos found_partition_end);
|
|
|
|
std::string debug_window_bound_string(TAnalyticWindowBoundary b);
|
|
using vectorized_execute = std::function<void(int64_t peer_group_start, int64_t peer_group_end,
|
|
int64_t frame_start, int64_t frame_end)>;
|
|
using vectorized_get_next = std::function<Status(size_t rows)>;
|
|
using vectorized_get_result = std::function<void(int64_t current_block_rows)>;
|
|
using vectorized_closer = std::function<void()>;
|
|
|
|
struct executor {
|
|
vectorized_execute execute;
|
|
vectorized_get_next get_next;
|
|
vectorized_get_result insert_result;
|
|
vectorized_closer close;
|
|
};
|
|
|
|
executor _executor;
|
|
|
|
void _release_mem();
|
|
|
|
private:
|
|
enum AnalyticFnScope { PARTITION, RANGE, ROWS };
|
|
std::vector<Block> _input_blocks;
|
|
std::vector<int64_t> input_block_first_row_positions;
|
|
std::vector<AggFnEvaluator*> _agg_functions;
|
|
std::vector<VExprContextSPtrs> _agg_expr_ctxs;
|
|
VExprContextSPtrs _partition_by_eq_expr_ctxs;
|
|
VExprContextSPtrs _order_by_eq_expr_ctxs;
|
|
std::vector<std::vector<MutableColumnPtr>> _agg_intput_columns;
|
|
std::vector<MutableColumnPtr> _result_window_columns;
|
|
|
|
BlockRowPos _order_by_start;
|
|
BlockRowPos _order_by_end;
|
|
BlockRowPos _partition_by_start;
|
|
BlockRowPos _partition_by_end;
|
|
BlockRowPos _all_block_end;
|
|
std::vector<int64_t> _ordey_by_column_idxs;
|
|
std::vector<int64_t> _partition_by_column_idxs;
|
|
|
|
bool _input_eos = false;
|
|
bool _next_partition = false;
|
|
std::atomic_bool _need_more_input = true;
|
|
BlockRowPos _found_partition_end;
|
|
int64_t _input_total_rows = 0;
|
|
int64_t _output_block_index = 0;
|
|
int64_t _window_end_position = 0;
|
|
int64_t _current_row_position = 0;
|
|
int64_t _rows_start_offset = 0;
|
|
int64_t _rows_end_offset = 0;
|
|
size_t _agg_functions_size = 0;
|
|
bool _agg_functions_created = false;
|
|
|
|
/// The offset of the n-th functions.
|
|
std::vector<size_t> _offsets_of_aggregate_states;
|
|
/// The total size of the row from the functions.
|
|
size_t _total_size_of_aggregate_states = 0;
|
|
/// The max align size for functions
|
|
size_t _align_aggregate_states = 1;
|
|
std::unique_ptr<Arena> _agg_arena_pool;
|
|
AggregateDataPtr _fn_place_ptr;
|
|
|
|
TTupleId _buffered_tuple_id = 0;
|
|
TupleId _intermediate_tuple_id;
|
|
TupleId _output_tuple_id;
|
|
TAnalyticWindow _window;
|
|
AnalyticFnScope _fn_scope;
|
|
TupleDescriptor* _intermediate_tuple_desc;
|
|
TupleDescriptor* _output_tuple_desc;
|
|
std::vector<int64_t> _origin_cols;
|
|
|
|
RuntimeProfile::Counter* _evaluation_timer;
|
|
RuntimeProfile::Counter* _memory_usage_counter;
|
|
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
|
|
|
|
std::vector<bool> _change_to_nullable_flags;
|
|
};
|
|
} // namespace doris::vectorized
|