From f0bc656eae2f42e12cc756301fb7ef9ecfdaa347 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 21 Feb 2024 12:45:51 +0000 Subject: [PATCH] fix hash join out of memory --- .../join/hash_join/ob_hash_join_vec_op.cpp | 66 +++++++++++-------- .../join/hash_join/ob_hash_join_vec_op.h | 7 +- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp index 834895e1c0..a27f3ed12e 100644 --- a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp +++ b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.cpp @@ -175,6 +175,7 @@ ObHashJoinVecOp::ObHashJoinVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOp null_random_hash_value_(0), skip_left_null_(false), skip_right_null_(false), + data_ratio_(1.0), output_info_() { } @@ -428,7 +429,7 @@ int ObHashJoinVecOp::inner_rescan() read_null_in_naaj_ = false; non_preserved_side_is_not_empty_ = false; } - LOG_TRACE("hash join rescan", K(ret)); + LOG_TRACE("hash join rescan", K(ret), K(spec_.id_)); return ret; } @@ -747,7 +748,7 @@ int ObHashJoinVecOp::get_next_left_row_batch(bool is_from_row_store, const ObBat ret = tmp_ret; } } - LOG_TRACE("right naaj null break", K(ret)); + LOG_TRACE("right naaj null break", K(ret), K(spec_.id_)); } } } else { @@ -794,7 +795,7 @@ int ObHashJoinVecOp::reuse_for_next_chunk() LOG_WARN("failed to rescan right", K(ret)); } LOG_TRACE("trace hash table", K(ret), K(profile_.get_bucket_size()), - K(profile_.get_row_count())); + K(profile_.get_row_count()), K(spec_.id_)); } return ret; } @@ -843,7 +844,7 @@ int ObHashJoinVecOp::build_hash_table_for_nest_loop(int64_t &num_left_rows) } } LOG_TRACE("trace block hash join", K(nth_nest_loop_), K(part_level_), K(ret), K(num_left_rows), - K(join_table_.get_nbuckets())); + K(join_table_.get_nbuckets()), K(spec_.id_)); return ret; } @@ -924,13 +925,14 @@ int ObHashJoinVecOp::get_max_memory_size(int64_t input_size) } LOG_TRACE("trace auto memory manager", K(hash_area_size), K(part_count_), K(input_size), K(extra_memory_size), K(profile_.get_expect_size()), - K(profile_.get_cache_size())); + K(profile_.get_cache_size()), K(spec_.id_)); } else { part_count_ = calc_partition_count_by_cache_aware( profile_.get_row_count(), MAX_PART_COUNT_PER_LEVEL, sql_mem_processor_.get_mem_bound()); LOG_TRACE("trace auto memory manager", K(hash_area_size), K(part_count_), - K(input_size)); + K(input_size), K(spec_.id_)); } + data_ratio_ = 1.0; if (OB_SUCC(ret)) { if (OB_FAIL(sync_wait_part_count())) { LOG_WARN("failed to sync part count", K(ret)); @@ -983,7 +985,7 @@ int ObHashJoinVecOp::calc_basic_info(bool global_info) &ctx_, MY_SPEC.px_est_size_factor_, left_->get_spec().rows_, row_count))) { LOG_WARN("failed to get px size", K(ret)); } else { - LOG_TRACE("trace left row count", K(row_count)); + LOG_TRACE("trace left row count", K(row_count), K(spec_.id_)); if (row_count < MIN_ROW_COUNT) { row_count = MIN_ROW_COUNT; } @@ -1061,7 +1063,7 @@ int ObHashJoinVecOp::calc_basic_info(bool global_info) } LOG_TRACE("calc row count", K(left_part_->get_row_store().get_file_size()), K(left_part_->get_row_store().get_row_cnt()), - K(remain_data_memory_size_)); + K(remain_data_memory_size_), K(spec_.id_)); input_size = remain_data_memory_size_; } else { ret = OB_ERR_UNEXPECTED; @@ -1159,6 +1161,7 @@ int ObHashJoinVecOp::get_processor_type() LOG_WARN("failed to get workarea size", K(ret)); } remain_data_memory_size_ = hash_area_size * 10; + data_ratio_ = 1.0; set_processor(IN_MEMORY); } else if (!enable_nest_loop && !enable_in_memory && enable_recursive && MAX_PART_LEVEL > part_level_) { @@ -1172,7 +1175,7 @@ int ObHashJoinVecOp::get_processor_type() K(is_shared_), K(part_level_), K(hj_processor_), K(part_level_), K(part_count_), K(cur_join_table_->get_nbuckets()), K(pre_total_size), K(recursive_cost), K(nest_loop_cost), K(is_skew), - K(l_size), K(r_size)); + K(l_size), K(r_size), K(spec_.id_)); } else { set_processor(NEST_LOOP); } @@ -1197,7 +1200,7 @@ int ObHashJoinVecOp::get_processor_type() K(pre_total_size), K(recursive_cost), K(nest_loop_cost), K(is_skew), K(l_size), K(r_size), K(remain_data_memory_size_), K(profile_.get_expect_size()), K(profile_.get_bucket_size()), K(profile_.get_cache_size()), K(profile_.get_row_count()), - K(profile_.get_input_size()), K(left_->get_spec().width_)); + K(profile_.get_input_size()), K(left_->get_spec().width_), K(spec_.id_)); return ret; } @@ -1230,7 +1233,7 @@ int ObHashJoinVecOp::build_hash_table_in_memory(int64_t &num_left_rows) } LOG_TRACE("trace to finish build hash table in memory", K(ret), K(num_left_rows), - K(hj_part->get_row_count_on_disk()), K(cur_join_table_->get_nbuckets())); + K(hj_part->get_row_count_on_disk()), K(cur_join_table_->get_nbuckets()), K(spec_.id_)); return ret; } @@ -1303,9 +1306,9 @@ int ObHashJoinVecOp::init_join_partition() } if (nullptr != left_part_) { LOG_TRACE("trace init partition", K(part_count_), K(part_level_), - K(left_part_->get_part_level()), K(left_part_->get_partno())); + K(left_part_->get_part_level()), K(left_part_->get_partno()), K(spec_.id_)); } else { - LOG_TRACE("trace init partition", K(part_count_), K(part_level_)); + LOG_TRACE("trace init partition", K(part_count_), K(part_level_), K(spec_.id_)); } return ret; } @@ -1323,9 +1326,10 @@ void ObHashJoinVecOp::update_remain_data_memory_size( need_more_remain_data_memory_size(row_count, total_mem_size, ratio); int64_t estimate_remain_size = total_mem_size * ratio; remain_data_memory_size_ = estimate_remain_size; + data_ratio_ = ratio; tmp_need_dump = need_dump(); LOG_TRACE("trace need more remain memory size", K(total_mem_size), K(row_count), - K(estimate_remain_size), K(tmp_need_dump)); + K(estimate_remain_size), K(tmp_need_dump), K(spec_.id_)); } bool ObHashJoinVecOp::need_more_remain_data_memory_size( @@ -1352,7 +1356,7 @@ bool ObHashJoinVecOp::need_more_remain_data_memory_size( } LOG_TRACE("trace need more remain memory size", K(total_mem_size), K(predict_total_memory_size), K(extra_memory_size), K(bucket_cnt), K(row_count), K(data_ratio), K(get_mem_used()), - K(guess_data_ratio), K(get_data_mem_used()), K(sql_mem_processor_.get_mem_bound()), K(lbt())); + K(guess_data_ratio), K(get_data_mem_used()), K(sql_mem_processor_.get_mem_bound()), K(lbt()), K(spec_.id_)); return need_more; } @@ -1373,7 +1377,7 @@ int ObHashJoinVecOp::update_remain_data_memory_size_periodically(int64_t row_cou sql_mem_processor_.set_periodic_cnt(tmp_periodic_row_count); } LOG_TRACE("trace need more remain memory size", K(profile_.get_expect_size()), - K(row_count), K(force_update)); + K(row_count), K(force_update), K(spec_.id_)); } return ret; } @@ -1418,7 +1422,7 @@ int ObHashJoinVecOp::asyn_dump_partition( } LOG_TRACE("debug dump partition", K(is_left), K(start_dumped_part_idx), K(last_dumped_partition_idx), K(cur_dumped_partition_), - K(pre_total_dumped_size), K(dumped_size), K(dump_all), K(lbt())); + K(pre_total_dumped_size), K(dumped_size), K(dump_all), K(lbt()), K(spec_.id_)); // secondly dump one buffer per partiton one by one bool finish_dump = false; while (OB_SUCC(ret) && !finish_dump) { @@ -1451,7 +1455,7 @@ int ObHashJoinVecOp::asyn_dump_partition( } // end for } // end while LOG_TRACE("debug finish dump partition", K(is_left), K(start_dumped_part_idx), - K(last_dumped_partition_idx), K(cur_dumped_partition_), K(lbt())); + K(last_dumped_partition_idx), K(cur_dumped_partition_), K(lbt()), K(spec_.id_)); if (OB_SUCC(ret) && last_dumped_partition_idx - 1 < cur_dumped_partition_) { cur_dumped_partition_ = last_dumped_partition_idx - 1; } @@ -1488,7 +1492,8 @@ int ObHashJoinVecOp::dump_build_table(int64_t row_count, bool force_update) K(profile_.get_expect_size()), K(sql_mem_processor_.get_mem_bound()), K(cur_dumped_partition_), - K(mem_used)); + K(mem_used), + K(data_ratio_)); // dump from last partition to the first partition int64_t cur_dumped_partition = part_count_ - 1; if ((all_dumped() || need_dump(mem_used)) && 0 <= cur_dumped_partition && OB_SUCC(ret)) { @@ -1501,7 +1506,9 @@ int ObHashJoinVecOp::dump_build_table(int64_t row_count, bool force_update) K(cur_dumped_partition), K(sql_mem_processor_.get_mem_bound()), K(cur_dumped_partition_), - K(mem_used)); + K(mem_used), + K(data_ratio_), + K(spec_.id_)); } } } @@ -1825,7 +1832,8 @@ int ObHashJoinVecOp::dump_remain_partition() LOG_WARN("failed to update dumped partition statistics", K(ret)); } LOG_TRACE("dump remain partition", K(ret), K(cur_dumped_partition_), - "the last partition dumped size", left_part_array_[part_count_ - 1]->get_size_on_disk()); + "the last partition dumped size", left_part_array_[part_count_ - 1]->get_size_on_disk(), + K(spec_.id_)); } return ret; } @@ -1987,7 +1995,7 @@ int ObHashJoinVecOp::split_partition(int64_t &num_left_rows) } } LOG_TRACE("trace split partition", K(ret), K(num_left_rows), K(row_count_on_disk), - K(cur_dumped_partition_), K(read_null_in_naaj_)); + K(cur_dumped_partition_), K(read_null_in_naaj_), K(spec_.id_)); return ret; } @@ -2032,7 +2040,7 @@ int ObHashJoinVecOp::prepare_hash_table() LOG_WARN("failed to update used mem size", K(ret)); } LOG_TRACE("trace prepare hash table", K(ret), K(profile_.get_bucket_size()), K(profile_.get_row_count()), - K(part_count_), K(profile_.get_expect_size())); + K(part_count_), K(profile_.get_expect_size()), K(spec_.id_)); } if (OB_SUCC(ret) && is_shared_ && OB_FAIL(sync_wait_init_build_hash(build_ht_thread_ptr))) { LOG_WARN("failed to sync wait init hash table", K(ret)); @@ -2048,7 +2056,7 @@ void ObHashJoinVecOp::trace_hash_table_collision(int64_t row_cnt) int64_t nbuckets = cur_join_table_->get_nbuckets(); LOG_TRACE("trace hash table collision", K(spec_.get_id()), K(spec_.get_name()), K(nbuckets), "avg_cnt", ((double)total_cnt/(double)used_bucket_cnt), K(total_cnt), - K(row_cnt), K(used_bucket_cnt)); + K(row_cnt), K(used_bucket_cnt), K(spec_.id_)); op_monitor_info_.otherstat_1_value_ = 0; op_monitor_info_.otherstat_2_value_ = 0; op_monitor_info_.otherstat_3_value_ = total_cnt; @@ -2167,7 +2175,7 @@ int ObHashJoinVecOp::adaptive_process(int64_t &num_left_rows) K(join_table_.get_nbuckets())); } LOG_TRACE("trace recursive process", K(part_level_), K(part_level_), K(part_count_), - K(join_table_.get_nbuckets())); + K(join_table_.get_nbuckets()), K(spec_.id_)); } else { LOG_WARN("failed to process in memory", K(ret), K(part_level_), K(part_count_), K(join_table_.get_nbuckets())); @@ -2184,7 +2192,7 @@ int ObHashJoinVecOp::adaptive_process(int64_t &num_left_rows) } LOG_TRACE("trace process type", K(hj_processor_), K(part_level_), K(part_count_), - K(num_left_rows), K(remain_data_memory_size_), K(is_shared_)); + K(num_left_rows), K(remain_data_memory_size_), K(is_shared_), K(spec_.id_)); return ret; } @@ -2192,7 +2200,7 @@ int ObHashJoinVecOp::get_next_right_batch() { int ret = OB_SUCCESS; clear_evaluated_flag(); - LOG_TRACE("hash join last traverse cnt", K(ret), K(right_batch_traverse_cnt_)); + LOG_TRACE("hash join last traverse cnt", K(ret), K(right_batch_traverse_cnt_), K(spec_.id_)); right_batch_traverse_cnt_ = 0; output_info_.reuse(); bool is_left = false; @@ -2229,7 +2237,7 @@ int ObHashJoinVecOp::get_next_right_batch() brs_.end_ = true; read_null_in_naaj_ = true; ret = OB_SUCCESS; - LOG_TRACE("null break for left naaj", K(ret)); + LOG_TRACE("null break for left naaj", K(ret), K(spec_.id_)); } } if (OB_SUCC(ret)) { @@ -2489,7 +2497,7 @@ int ObHashJoinVecOp::finish_dump(bool for_left, bool need_dump, bool force /* fa } LOG_TRACE("finish dump: ", K(part_level_), K(cur_dumped_partition_), K(for_left), K(need_dump), K(force), K(total_size), - K(dumped_row_count), K(part_count_)); + K(dumped_row_count), K(part_count_), K(spec_.id_)); } return ret; } diff --git a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h index 7a739acb00..a3241b4577 100644 --- a/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h +++ b/src/sql/engine/join/hash_join/ob_hash_join_vec_op.h @@ -438,15 +438,15 @@ private: OB_INLINE int64_t get_data_mem_used() { return sql_mem_processor_.get_data_size(); } OB_INLINE bool need_dump(int64_t mem_used) { - return mem_used > sql_mem_processor_.get_mem_bound() ; + return mem_used > sql_mem_processor_.get_mem_bound() * data_ratio_; } OB_INLINE bool need_dump() { - return get_cur_mem_used() > sql_mem_processor_.get_mem_bound(); + return get_cur_mem_used() > sql_mem_processor_.get_mem_bound() * data_ratio_; } int64_t get_need_dump_size(int64_t mem_used) { - return mem_used - sql_mem_processor_.get_mem_bound() + 2 * 1024 * 1024; + return mem_used - sql_mem_processor_.get_mem_bound() * data_ratio_ + 2 * 1024 * 1024; } OB_INLINE bool all_in_memory(int64_t size) const { return size < remain_data_memory_size_; } @@ -599,6 +599,7 @@ private: bool skip_left_null_; bool skip_right_null_; + double data_ratio_; OutputInfo output_info_; ObTempRowStore::IterationAge iter_age_; };