[improve](distinct agg) add check of hash table to decide whether emplace value (#32063)
* [improve](distinct agg) add check of hash table to emplace value
This commit is contained in:
@ -31,6 +31,30 @@ class RuntimeState;
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
struct StreamingHtMinReductionEntry {
|
||||
// Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
|
||||
// bytes is greater than this threshold.
|
||||
int min_ht_mem;
|
||||
// The minimum reduction factor to expand the hash tables.
|
||||
double streaming_ht_min_reduction;
|
||||
};
|
||||
|
||||
// TODO: experimentally tune these values and also programmatically get the cache size
|
||||
// of the machine that we're running on.
|
||||
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
|
||||
// Expand up to L2 cache always.
|
||||
{0, 0.0},
|
||||
// Expand into L3 cache if we look like we're getting some reduction.
|
||||
// At present, The L2 cache is generally 1024k or more
|
||||
{1024 * 1024, 1.1},
|
||||
// Expand into main memory if we're getting a significant reduction.
|
||||
// The L3 cache is generally 16MB or more
|
||||
{16 * 1024 * 1024, 2.0},
|
||||
};
|
||||
|
||||
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
|
||||
sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
|
||||
|
||||
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
|
||||
OperatorXBase* parent)
|
||||
: PipelineXLocalState<FakeSharedState>(state, parent),
|
||||
@ -69,6 +93,65 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo&
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
|
||||
if (!_should_expand_hash_table) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return std::visit(
|
||||
[&](auto&& agg_method) -> bool {
|
||||
auto& hash_tbl = *agg_method.hash_table;
|
||||
auto [ht_mem, ht_rows] =
|
||||
std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
|
||||
|
||||
// Need some rows in tables to have valid statistics.
|
||||
if (ht_rows == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Find the appropriate reduction factor in our table for the current hash table sizes.
|
||||
int cache_level = 0;
|
||||
while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
|
||||
ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
|
||||
++cache_level;
|
||||
}
|
||||
|
||||
// Compare the number of rows in the hash table with the number of input rows that
|
||||
// were aggregated into it. Exclude passed through rows from this calculation since
|
||||
// they were not in hash tables.
|
||||
const int64_t input_rows = _input_num_rows;
|
||||
const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned;
|
||||
// TODO chenhao
|
||||
// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
|
||||
double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows;
|
||||
|
||||
// TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
|
||||
// inaccurate, which could lead to a divide by zero below.
|
||||
if (aggregated_input_rows <= 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Extrapolate the current reduction factor (r) using the formula
|
||||
// R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
|
||||
// set, N is the number of input rows, excluding passed-through rows, and n is the
|
||||
// number of rows inserted or merged into the hash tables. This is a very rough
|
||||
// approximation but is good enough to be useful.
|
||||
// TODO: consider collecting more statistics to better estimate reduction.
|
||||
// double estimated_reduction = aggregated_input_rows >= expected_input_rows
|
||||
// ? current_reduction
|
||||
// : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
|
||||
double min_reduction =
|
||||
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
|
||||
|
||||
// COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
|
||||
// COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
|
||||
// return estimated_reduction > min_reduction;
|
||||
_should_expand_hash_table = current_reduction > min_reduction;
|
||||
return _should_expand_hash_table;
|
||||
},
|
||||
_agg_data->method_variant);
|
||||
}
|
||||
|
||||
void DistinctStreamingAggLocalState::_init_hash_method(
|
||||
const vectorized::VExprContextSPtrs& probe_exprs) {
|
||||
DCHECK(probe_exprs.size() >= 1);
|
||||
@ -144,6 +227,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
|
||||
|
||||
size_t key_size = _probe_expr_ctxs.size();
|
||||
vectorized::ColumnRawPtrs key_columns(key_size);
|
||||
std::vector<int> result_idxs(key_size);
|
||||
{
|
||||
SCOPED_TIMER(_expr_timer);
|
||||
for (size_t i = 0; i < key_size; ++i) {
|
||||
@ -153,6 +237,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
|
||||
in_block->get_by_position(result_column_id)
|
||||
.column->convert_to_full_column_if_const();
|
||||
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
|
||||
result_idxs[i] = result_column_id;
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,24 +247,41 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
|
||||
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
|
||||
// need use _cur_num_rows_returned to decide whether to do continue emplace into hash table
|
||||
_cur_num_rows_returned += _distinct_row.size();
|
||||
|
||||
bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
|
||||
out_block->mem_reuse();
|
||||
if (mem_reuse) {
|
||||
for (int i = 0; i < key_size; ++i) {
|
||||
auto dst = out_block->get_by_position(i).column->assume_mutable();
|
||||
key_columns[i]->append_data_by_selector(dst, _distinct_row);
|
||||
auto output_column = out_block->get_by_position(i).column;
|
||||
if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
|
||||
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
|
||||
in_block->replace_by_position(result_idxs[i], output_column);
|
||||
} else {
|
||||
auto dst = output_column->assume_mutable();
|
||||
key_columns[i]->append_data_by_selector(dst, _distinct_row);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
vectorized::ColumnsWithTypeAndName columns_with_schema;
|
||||
for (int i = 0; i < key_size; ++i) {
|
||||
auto distinct_column = key_columns[i]->clone_empty();
|
||||
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
|
||||
columns_with_schema.emplace_back(std::move(distinct_column),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
if (_stop_emplace_flag) {
|
||||
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
} else {
|
||||
auto distinct_column = key_columns[i]->clone_empty();
|
||||
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
|
||||
columns_with_schema.emplace_back(std::move(distinct_column),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
}
|
||||
}
|
||||
out_block->swap(vectorized::Block(columns_with_schema));
|
||||
if (_stop_emplace_flag) {
|
||||
in_block->clear(); // clear the column ref with stop_emplace_flag = true
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -201,6 +303,14 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
|
||||
SCOPED_TIMER(_hash_table_compute_timer);
|
||||
using HashMethodType = std::decay_t<decltype(agg_method)>;
|
||||
using AggState = typename HashMethodType::State;
|
||||
auto& hash_tbl = *agg_method.hash_table;
|
||||
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg &&
|
||||
hash_tbl.add_elem_size_overflow(num_rows)) {
|
||||
if (!_should_expand_preagg_hash_tables()) {
|
||||
_stop_emplace_flag = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
AggState state(key_columns);
|
||||
agg_method.init_serialized_keys(key_columns, num_rows);
|
||||
size_t row = 0;
|
||||
@ -339,12 +449,12 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc
|
||||
in_block, local_state._aggregated_block.get()));
|
||||
|
||||
// get enough data or reached limit rows, need push block to queue
|
||||
if (_limit != -1 &&
|
||||
if (!local_state._stop_emplace_flag && _limit != -1 &&
|
||||
(local_state._aggregated_block->rows() + local_state._output_distinct_rows) >= _limit) {
|
||||
auto limit_rows = _limit - local_state._output_distinct_rows;
|
||||
local_state._aggregated_block->set_num_rows(limit_rows);
|
||||
local_state._output_distinct_rows += limit_rows;
|
||||
} else if (local_state._aggregated_block->rows() >= state->batch_size()) {
|
||||
} else if (!local_state._stop_emplace_flag) {
|
||||
local_state._output_distinct_rows += local_state._aggregated_block->rows();
|
||||
}
|
||||
}
|
||||
@ -371,7 +481,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
|
||||
RETURN_IF_ERROR(
|
||||
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
|
||||
}
|
||||
|
||||
local_state.add_num_rows_returned(block->rows());
|
||||
*eos = local_state._child_eos || (_limit != -1 && local_state._output_distinct_rows >= _limit);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -56,12 +56,16 @@ private:
|
||||
vectorized::ColumnRawPtrs& key_columns,
|
||||
const size_t num_rows);
|
||||
void _make_nullable_output_key(vectorized::Block* block);
|
||||
bool _should_expand_preagg_hash_tables();
|
||||
|
||||
std::shared_ptr<char> dummy_mapped_data;
|
||||
vectorized::IColumn::Selector _distinct_row;
|
||||
vectorized::Arena _arena;
|
||||
int64_t _output_distinct_rows = 0;
|
||||
size_t _input_num_rows = 0;
|
||||
bool _should_expand_hash_table = true;
|
||||
int64_t _cur_num_rows_returned = 0;
|
||||
bool _stop_emplace_flag = false;
|
||||
|
||||
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
|
||||
vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
|
||||
|
||||
@ -52,16 +52,18 @@ Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::B
|
||||
}
|
||||
RETURN_IF_ERROR(
|
||||
_node->_distinct_pre_agg_with_serialized_key(in_block, _output_block.get()));
|
||||
|
||||
bool stop_emplace_flag = _node->is_stop_emplace_flag();
|
||||
// get enough data or reached limit rows, need push block to queue
|
||||
if (_node->limit() != -1 &&
|
||||
if (!stop_emplace_flag && _node->limit() != -1 &&
|
||||
(_output_block->rows() + _output_distinct_rows) >= _node->limit()) {
|
||||
auto limit_rows = _node->limit() - _output_distinct_rows;
|
||||
_output_block->set_num_rows(limit_rows);
|
||||
_output_distinct_rows += limit_rows;
|
||||
_data_queue->push_block(std::move(_output_block));
|
||||
} else if (_output_block->rows() >= state->batch_size()) {
|
||||
_output_distinct_rows += _output_block->rows();
|
||||
} else if (stop_emplace_flag || _output_block->rows() >= state->batch_size()) {
|
||||
if (!stop_emplace_flag) { // if stop_emplace_flag = true, will be return rows directly, not get distinct
|
||||
_output_distinct_rows += _output_block->rows();
|
||||
}
|
||||
_data_queue->push_block(std::move(_output_block));
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vector
|
||||
block->columns()));
|
||||
}
|
||||
|
||||
rows_have_returned += block->rows();
|
||||
_node->add_num_rows_returned(block->rows());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -71,7 +71,6 @@ Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, vector
|
||||
std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, std::placeholders::_1,
|
||||
std::placeholders::_2, std::placeholders::_3)));
|
||||
if (UNLIKELY(eos)) {
|
||||
_node->set_num_rows_returned(rows_have_returned);
|
||||
source_state = SourceState::FINISHED;
|
||||
} else {
|
||||
source_state = SourceState::DEPEND_ON_SOURCE;
|
||||
|
||||
@ -60,7 +60,6 @@ public:
|
||||
Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos);
|
||||
|
||||
private:
|
||||
int64_t rows_have_returned = 0;
|
||||
std::shared_ptr<DataQueue> _data_queue;
|
||||
};
|
||||
|
||||
|
||||
@ -700,7 +700,7 @@ void Block::clear_column_data(int column_size) noexcept {
|
||||
}
|
||||
}
|
||||
for (auto& d : data) {
|
||||
DCHECK_EQ(d.column->use_count(), 1);
|
||||
DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count();
|
||||
(*std::move(d.column)).assume_mutable()->clear();
|
||||
}
|
||||
row_same_bit.clear();
|
||||
|
||||
@ -39,6 +39,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
|
||||
|
||||
size_t key_size = _probe_expr_ctxs.size();
|
||||
ColumnRawPtrs key_columns(key_size);
|
||||
std::vector<int> result_idxs(key_size);
|
||||
{
|
||||
SCOPED_TIMER(_expr_timer);
|
||||
for (size_t i = 0; i < key_size; ++i) {
|
||||
@ -48,6 +49,7 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
|
||||
in_block->get_by_position(result_column_id)
|
||||
.column->convert_to_full_column_if_const();
|
||||
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
|
||||
result_idxs[i] = result_column_id;
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,24 +59,40 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
|
||||
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
|
||||
|
||||
// if get stop_emplace_flag = true, means have no need to emplace value into hash table
|
||||
// so return block directly and notice the column ref is 2, need deal with.
|
||||
SCOPED_TIMER(_insert_keys_to_column_timer);
|
||||
bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
|
||||
if (mem_reuse) {
|
||||
for (int i = 0; i < key_size; ++i) {
|
||||
auto dst = out_block->get_by_position(i).column->assume_mutable();
|
||||
key_columns[i]->append_data_by_selector(dst, _distinct_row);
|
||||
auto output_column = out_block->get_by_position(i).column;
|
||||
if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
|
||||
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
|
||||
in_block->replace_by_position(result_idxs[i], output_column);
|
||||
} else {
|
||||
auto dst = output_column->assume_mutable();
|
||||
key_columns[i]->append_data_by_selector(dst, _distinct_row);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ColumnsWithTypeAndName columns_with_schema;
|
||||
for (int i = 0; i < key_size; ++i) {
|
||||
auto distinct_column = key_columns[i]->clone_empty();
|
||||
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
|
||||
columns_with_schema.emplace_back(std::move(distinct_column),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
if (_stop_emplace_flag) {
|
||||
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
} else {
|
||||
auto distinct_column = key_columns[i]->clone_empty();
|
||||
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
|
||||
columns_with_schema.emplace_back(std::move(distinct_column),
|
||||
_probe_expr_ctxs[i]->root()->data_type(),
|
||||
_probe_expr_ctxs[i]->root()->expr_name());
|
||||
}
|
||||
}
|
||||
out_block->swap(Block(columns_with_schema));
|
||||
if (_stop_emplace_flag) {
|
||||
in_block->clear(); // clear the column ref with stop_emplace_flag = true
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -88,6 +106,13 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele
|
||||
SCOPED_TIMER(_hash_table_compute_timer);
|
||||
using HashMethodType = std::decay_t<decltype(agg_method)>;
|
||||
using AggState = typename HashMethodType::State;
|
||||
auto& hash_tbl = *agg_method.hash_table;
|
||||
if (is_streaming_preagg() && hash_tbl.add_elem_size_overflow(num_rows)) {
|
||||
if (!_should_expand_preagg_hash_tables()) {
|
||||
_stop_emplace_flag = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
AggState state(key_columns);
|
||||
agg_method.init_serialized_keys(key_columns, num_rows);
|
||||
|
||||
|
||||
@ -43,8 +43,9 @@ public:
|
||||
DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~DistinctAggregationNode() override = default;
|
||||
Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block);
|
||||
void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; }
|
||||
void add_num_rows_returned(int64_t rows) { _num_rows_returned += rows; }
|
||||
vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }
|
||||
bool is_stop_emplace_flag() const { return _stop_emplace_flag; }
|
||||
|
||||
private:
|
||||
void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
|
||||
@ -52,6 +53,7 @@ private:
|
||||
|
||||
char* dummy_mapped_data = nullptr;
|
||||
IColumn::Selector _distinct_row;
|
||||
bool _stop_emplace_flag = false;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -420,6 +420,9 @@ public:
|
||||
bool is_streaming_preagg() const { return _is_streaming_preagg; }
|
||||
bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); }
|
||||
void _make_nullable_output_key(Block* block);
|
||||
/// Return true if we should keep expanding hash tables in the preagg. If false,
|
||||
/// the preagg should pass through any rows it can't fit in its tables.
|
||||
bool _should_expand_preagg_hash_tables();
|
||||
|
||||
protected:
|
||||
bool _is_streaming_preagg;
|
||||
@ -498,9 +501,6 @@ private:
|
||||
std::unique_ptr<AggregateDataContainer> _aggregate_data_container;
|
||||
|
||||
void _release_self_resource(RuntimeState* state);
|
||||
/// Return true if we should keep expanding hash tables in the preagg. If false,
|
||||
/// the preagg should pass through any rows it can't fit in its tables.
|
||||
bool _should_expand_preagg_hash_tables();
|
||||
|
||||
size_t _get_hash_table_size();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user