fix hash join out of memory

This commit is contained in:
obdev
2024-02-21 12:45:51 +00:00
committed by ob-robot
parent 65f35839d3
commit f0bc656eae
2 changed files with 41 additions and 32 deletions

View File

@ -175,6 +175,7 @@ ObHashJoinVecOp::ObHashJoinVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOp
null_random_hash_value_(0),
skip_left_null_(false),
skip_right_null_(false),
data_ratio_(1.0),
output_info_()
{
}
@ -428,7 +429,7 @@ int ObHashJoinVecOp::inner_rescan()
read_null_in_naaj_ = false;
non_preserved_side_is_not_empty_ = false;
}
LOG_TRACE("hash join rescan", K(ret));
LOG_TRACE("hash join rescan", K(ret), K(spec_.id_));
return ret;
}
@ -747,7 +748,7 @@ int ObHashJoinVecOp::get_next_left_row_batch(bool is_from_row_store, const ObBat
ret = tmp_ret;
}
}
LOG_TRACE("right naaj null break", K(ret));
LOG_TRACE("right naaj null break", K(ret), K(spec_.id_));
}
}
} else {
@ -794,7 +795,7 @@ int ObHashJoinVecOp::reuse_for_next_chunk()
LOG_WARN("failed to rescan right", K(ret));
}
LOG_TRACE("trace hash table", K(ret), K(profile_.get_bucket_size()),
K(profile_.get_row_count()));
K(profile_.get_row_count()), K(spec_.id_));
}
return ret;
}
@ -843,7 +844,7 @@ int ObHashJoinVecOp::build_hash_table_for_nest_loop(int64_t &num_left_rows)
}
}
LOG_TRACE("trace block hash join", K(nth_nest_loop_), K(part_level_), K(ret), K(num_left_rows),
K(join_table_.get_nbuckets()));
K(join_table_.get_nbuckets()), K(spec_.id_));
return ret;
}
@ -924,13 +925,14 @@ int ObHashJoinVecOp::get_max_memory_size(int64_t input_size)
}
LOG_TRACE("trace auto memory manager", K(hash_area_size), K(part_count_),
K(input_size), K(extra_memory_size), K(profile_.get_expect_size()),
K(profile_.get_cache_size()));
K(profile_.get_cache_size()), K(spec_.id_));
} else {
part_count_ = calc_partition_count_by_cache_aware(
profile_.get_row_count(), MAX_PART_COUNT_PER_LEVEL, sql_mem_processor_.get_mem_bound());
LOG_TRACE("trace auto memory manager", K(hash_area_size), K(part_count_),
K(input_size));
K(input_size), K(spec_.id_));
}
data_ratio_ = 1.0;
if (OB_SUCC(ret)) {
if (OB_FAIL(sync_wait_part_count())) {
LOG_WARN("failed to sync part count", K(ret));
@ -983,7 +985,7 @@ int ObHashJoinVecOp::calc_basic_info(bool global_info)
&ctx_, MY_SPEC.px_est_size_factor_, left_->get_spec().rows_, row_count))) {
LOG_WARN("failed to get px size", K(ret));
} else {
LOG_TRACE("trace left row count", K(row_count));
LOG_TRACE("trace left row count", K(row_count), K(spec_.id_));
if (row_count < MIN_ROW_COUNT) {
row_count = MIN_ROW_COUNT;
}
@ -1061,7 +1063,7 @@ int ObHashJoinVecOp::calc_basic_info(bool global_info)
}
LOG_TRACE("calc row count", K(left_part_->get_row_store().get_file_size()),
K(left_part_->get_row_store().get_row_cnt()),
K(remain_data_memory_size_));
K(remain_data_memory_size_), K(spec_.id_));
input_size = remain_data_memory_size_;
} else {
ret = OB_ERR_UNEXPECTED;
@ -1159,6 +1161,7 @@ int ObHashJoinVecOp::get_processor_type()
LOG_WARN("failed to get workarea size", K(ret));
}
remain_data_memory_size_ = hash_area_size * 10;
data_ratio_ = 1.0;
set_processor(IN_MEMORY);
} else if (!enable_nest_loop && !enable_in_memory
&& enable_recursive && MAX_PART_LEVEL > part_level_) {
@ -1172,7 +1175,7 @@ int ObHashJoinVecOp::get_processor_type()
K(is_shared_), K(part_level_), K(hj_processor_),
K(part_level_), K(part_count_), K(cur_join_table_->get_nbuckets()),
K(pre_total_size), K(recursive_cost), K(nest_loop_cost), K(is_skew),
K(l_size), K(r_size));
K(l_size), K(r_size), K(spec_.id_));
} else {
set_processor(NEST_LOOP);
}
@ -1197,7 +1200,7 @@ int ObHashJoinVecOp::get_processor_type()
K(pre_total_size), K(recursive_cost), K(nest_loop_cost), K(is_skew),
K(l_size), K(r_size), K(remain_data_memory_size_), K(profile_.get_expect_size()),
K(profile_.get_bucket_size()), K(profile_.get_cache_size()), K(profile_.get_row_count()),
K(profile_.get_input_size()), K(left_->get_spec().width_));
K(profile_.get_input_size()), K(left_->get_spec().width_), K(spec_.id_));
return ret;
}
@ -1230,7 +1233,7 @@ int ObHashJoinVecOp::build_hash_table_in_memory(int64_t &num_left_rows)
}
LOG_TRACE("trace to finish build hash table in memory", K(ret), K(num_left_rows),
K(hj_part->get_row_count_on_disk()), K(cur_join_table_->get_nbuckets()));
K(hj_part->get_row_count_on_disk()), K(cur_join_table_->get_nbuckets()), K(spec_.id_));
return ret;
}
@ -1303,9 +1306,9 @@ int ObHashJoinVecOp::init_join_partition()
}
if (nullptr != left_part_) {
LOG_TRACE("trace init partition", K(part_count_), K(part_level_),
K(left_part_->get_part_level()), K(left_part_->get_partno()));
K(left_part_->get_part_level()), K(left_part_->get_partno()), K(spec_.id_));
} else {
LOG_TRACE("trace init partition", K(part_count_), K(part_level_));
LOG_TRACE("trace init partition", K(part_count_), K(part_level_), K(spec_.id_));
}
return ret;
}
@ -1323,9 +1326,10 @@ void ObHashJoinVecOp::update_remain_data_memory_size(
need_more_remain_data_memory_size(row_count, total_mem_size, ratio);
int64_t estimate_remain_size = total_mem_size * ratio;
remain_data_memory_size_ = estimate_remain_size;
data_ratio_ = ratio;
tmp_need_dump = need_dump();
LOG_TRACE("trace need more remain memory size", K(total_mem_size), K(row_count),
K(estimate_remain_size), K(tmp_need_dump));
K(estimate_remain_size), K(tmp_need_dump), K(spec_.id_));
}
bool ObHashJoinVecOp::need_more_remain_data_memory_size(
@ -1352,7 +1356,7 @@ bool ObHashJoinVecOp::need_more_remain_data_memory_size(
}
LOG_TRACE("trace need more remain memory size", K(total_mem_size), K(predict_total_memory_size),
K(extra_memory_size), K(bucket_cnt), K(row_count), K(data_ratio), K(get_mem_used()),
K(guess_data_ratio), K(get_data_mem_used()), K(sql_mem_processor_.get_mem_bound()), K(lbt()));
K(guess_data_ratio), K(get_data_mem_used()), K(sql_mem_processor_.get_mem_bound()), K(lbt()), K(spec_.id_));
return need_more;
}
@ -1373,7 +1377,7 @@ int ObHashJoinVecOp::update_remain_data_memory_size_periodically(int64_t row_cou
sql_mem_processor_.set_periodic_cnt(tmp_periodic_row_count);
}
LOG_TRACE("trace need more remain memory size", K(profile_.get_expect_size()),
K(row_count), K(force_update));
K(row_count), K(force_update), K(spec_.id_));
}
return ret;
}
@ -1418,7 +1422,7 @@ int ObHashJoinVecOp::asyn_dump_partition(
}
LOG_TRACE("debug dump partition", K(is_left), K(start_dumped_part_idx),
K(last_dumped_partition_idx), K(cur_dumped_partition_),
K(pre_total_dumped_size), K(dumped_size), K(dump_all), K(lbt()));
K(pre_total_dumped_size), K(dumped_size), K(dump_all), K(lbt()), K(spec_.id_));
// secondly dump one buffer per partiton one by one
bool finish_dump = false;
while (OB_SUCC(ret) && !finish_dump) {
@ -1451,7 +1455,7 @@ int ObHashJoinVecOp::asyn_dump_partition(
} // end for
} // end while
LOG_TRACE("debug finish dump partition", K(is_left), K(start_dumped_part_idx),
K(last_dumped_partition_idx), K(cur_dumped_partition_), K(lbt()));
K(last_dumped_partition_idx), K(cur_dumped_partition_), K(lbt()), K(spec_.id_));
if (OB_SUCC(ret) && last_dumped_partition_idx - 1 < cur_dumped_partition_) {
cur_dumped_partition_ = last_dumped_partition_idx - 1;
}
@ -1488,7 +1492,8 @@ int ObHashJoinVecOp::dump_build_table(int64_t row_count, bool force_update)
K(profile_.get_expect_size()),
K(sql_mem_processor_.get_mem_bound()),
K(cur_dumped_partition_),
K(mem_used));
K(mem_used),
K(data_ratio_));
// dump from last partition to the first partition
int64_t cur_dumped_partition = part_count_ - 1;
if ((all_dumped() || need_dump(mem_used)) && 0 <= cur_dumped_partition && OB_SUCC(ret)) {
@ -1501,7 +1506,9 @@ int ObHashJoinVecOp::dump_build_table(int64_t row_count, bool force_update)
K(cur_dumped_partition),
K(sql_mem_processor_.get_mem_bound()),
K(cur_dumped_partition_),
K(mem_used));
K(mem_used),
K(data_ratio_),
K(spec_.id_));
}
}
}
@ -1825,7 +1832,8 @@ int ObHashJoinVecOp::dump_remain_partition()
LOG_WARN("failed to update dumped partition statistics", K(ret));
}
LOG_TRACE("dump remain partition", K(ret), K(cur_dumped_partition_),
"the last partition dumped size", left_part_array_[part_count_ - 1]->get_size_on_disk());
"the last partition dumped size", left_part_array_[part_count_ - 1]->get_size_on_disk(),
K(spec_.id_));
}
return ret;
}
@ -1987,7 +1995,7 @@ int ObHashJoinVecOp::split_partition(int64_t &num_left_rows)
}
}
LOG_TRACE("trace split partition", K(ret), K(num_left_rows), K(row_count_on_disk),
K(cur_dumped_partition_), K(read_null_in_naaj_));
K(cur_dumped_partition_), K(read_null_in_naaj_), K(spec_.id_));
return ret;
}
@ -2032,7 +2040,7 @@ int ObHashJoinVecOp::prepare_hash_table()
LOG_WARN("failed to update used mem size", K(ret));
}
LOG_TRACE("trace prepare hash table", K(ret), K(profile_.get_bucket_size()), K(profile_.get_row_count()),
K(part_count_), K(profile_.get_expect_size()));
K(part_count_), K(profile_.get_expect_size()), K(spec_.id_));
}
if (OB_SUCC(ret) && is_shared_ && OB_FAIL(sync_wait_init_build_hash(build_ht_thread_ptr))) {
LOG_WARN("failed to sync wait init hash table", K(ret));
@ -2048,7 +2056,7 @@ void ObHashJoinVecOp::trace_hash_table_collision(int64_t row_cnt)
int64_t nbuckets = cur_join_table_->get_nbuckets();
LOG_TRACE("trace hash table collision", K(spec_.get_id()), K(spec_.get_name()), K(nbuckets),
"avg_cnt", ((double)total_cnt/(double)used_bucket_cnt), K(total_cnt),
K(row_cnt), K(used_bucket_cnt));
K(row_cnt), K(used_bucket_cnt), K(spec_.id_));
op_monitor_info_.otherstat_1_value_ = 0;
op_monitor_info_.otherstat_2_value_ = 0;
op_monitor_info_.otherstat_3_value_ = total_cnt;
@ -2167,7 +2175,7 @@ int ObHashJoinVecOp::adaptive_process(int64_t &num_left_rows)
K(join_table_.get_nbuckets()));
}
LOG_TRACE("trace recursive process", K(part_level_), K(part_level_), K(part_count_),
K(join_table_.get_nbuckets()));
K(join_table_.get_nbuckets()), K(spec_.id_));
} else {
LOG_WARN("failed to process in memory", K(ret), K(part_level_), K(part_count_),
K(join_table_.get_nbuckets()));
@ -2184,7 +2192,7 @@ int ObHashJoinVecOp::adaptive_process(int64_t &num_left_rows)
}
LOG_TRACE("trace process type", K(hj_processor_), K(part_level_), K(part_count_),
K(num_left_rows), K(remain_data_memory_size_), K(is_shared_));
K(num_left_rows), K(remain_data_memory_size_), K(is_shared_), K(spec_.id_));
return ret;
}
@ -2192,7 +2200,7 @@ int ObHashJoinVecOp::get_next_right_batch()
{
int ret = OB_SUCCESS;
clear_evaluated_flag();
LOG_TRACE("hash join last traverse cnt", K(ret), K(right_batch_traverse_cnt_));
LOG_TRACE("hash join last traverse cnt", K(ret), K(right_batch_traverse_cnt_), K(spec_.id_));
right_batch_traverse_cnt_ = 0;
output_info_.reuse();
bool is_left = false;
@ -2229,7 +2237,7 @@ int ObHashJoinVecOp::get_next_right_batch()
brs_.end_ = true;
read_null_in_naaj_ = true;
ret = OB_SUCCESS;
LOG_TRACE("null break for left naaj", K(ret));
LOG_TRACE("null break for left naaj", K(ret), K(spec_.id_));
}
}
if (OB_SUCC(ret)) {
@ -2489,7 +2497,7 @@ int ObHashJoinVecOp::finish_dump(bool for_left, bool need_dump, bool force /* fa
}
LOG_TRACE("finish dump: ", K(part_level_),
K(cur_dumped_partition_), K(for_left), K(need_dump), K(force), K(total_size),
K(dumped_row_count), K(part_count_));
K(dumped_row_count), K(part_count_), K(spec_.id_));
}
return ret;
}

View File

@ -438,15 +438,15 @@ private:
OB_INLINE int64_t get_data_mem_used() { return sql_mem_processor_.get_data_size(); }
OB_INLINE bool need_dump(int64_t mem_used)
{
return mem_used > sql_mem_processor_.get_mem_bound() ;
return mem_used > sql_mem_processor_.get_mem_bound() * data_ratio_;
}
OB_INLINE bool need_dump()
{
return get_cur_mem_used() > sql_mem_processor_.get_mem_bound();
return get_cur_mem_used() > sql_mem_processor_.get_mem_bound() * data_ratio_;
}
int64_t get_need_dump_size(int64_t mem_used)
{
return mem_used - sql_mem_processor_.get_mem_bound() + 2 * 1024 * 1024;
return mem_used - sql_mem_processor_.get_mem_bound() * data_ratio_ + 2 * 1024 * 1024;
}
OB_INLINE bool all_in_memory(int64_t size) const
{ return size < remain_data_memory_size_; }
@ -599,6 +599,7 @@ private:
bool skip_left_null_;
bool skip_right_null_;
double data_ratio_;
OutputInfo output_info_;
ObTempRowStore::IterationAge iter_age_;
};