From 9bd86a8db44389eed9bd8cf83623b46516eac148 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 8 Feb 2023 11:11:16 +0800 Subject: [PATCH] fix shared hj hang when 1 of worker is interrupted in sync_open && fix 3-stage aggr do not reset distinct info leads 4005 --- .../engine/aggregate/ob_hash_groupby_op.cpp | 14 ++++++++++- src/sql/engine/aggregate/ob_hash_groupby_op.h | 1 + .../ob_hash_partitioning_infrastructure_op.h | 25 +++++++++++++++++++ src/sql/engine/join/ob_hash_join_op.cpp | 10 ++++++-- src/sql/engine/join/ob_hash_join_op.h | 11 +++++++- 5 files changed, 57 insertions(+), 4 deletions(-) diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp index 619c4fc005..f32dba0ab2 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp @@ -141,7 +141,7 @@ void ObHashGroupByOp::reset() first_batch_from_store_ = true; is_init_distinct_data_ = false; use_distinct_data_ = false; - distinct_data_set_.reset(); + reset_distinct_info(); bypass_ctrl_.reset(); by_pass_nth_group_ = 0; by_pass_child_brs_ = nullptr; @@ -736,6 +736,17 @@ int ObHashGroupByOp::init_distinct_info(bool is_part) return ret; } +void ObHashGroupByOp::reset_distinct_info() +{ + hash_funcs_.destroy(); + cmp_funcs_.destroy(); + sort_collations_.destroy(); + distinct_data_set_.destroy_my_skip(); + distinct_data_set_.destroy_items(); + distinct_data_set_.destroy_distinct_map(); + distinct_data_set_.reset(); +} + int ObHashGroupByOp::finish_insert_distinct_data() { int ret = OB_SUCCESS; @@ -760,6 +771,7 @@ int ObHashGroupByOp::insert_distinct_data() bool inserted = false; const ObChunkDatumStore::StoredRow *store_row = nullptr; if (!is_init_distinct_data_ && OB_FAIL(init_distinct_info(false))) { + LOG_WARN("failed to init distinct info", K(ret)); } else if (OB_FAIL(distinct_data_set_.insert_row(distinct_origin_exprs_, has_exists, inserted))) { LOG_WARN("failed to insert row", K(ret)); } else { diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.h b/src/sql/engine/aggregate/ob_hash_groupby_op.h index 5aeded11af..071074fe5f 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.h +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.h @@ -363,6 +363,7 @@ private: int insert_distinct_data(); int finish_insert_distinct_data(); int init_distinct_info(bool is_part); + void reset_distinct_info(); int batch_insert_distinct_data(const ObBatchRows &child_brs); int batch_insert_all_distinct_data(const int64_t batch_size); diff --git a/src/sql/engine/basic/ob_hash_partitioning_infrastructure_op.h b/src/sql/engine/basic/ob_hash_partitioning_infrastructure_op.h index 2702ed6067..d3f16faf5c 100644 --- a/src/sql/engine/basic/ob_hash_partitioning_infrastructure_op.h +++ b/src/sql/engine/basic/ob_hash_partitioning_infrastructure_op.h @@ -582,6 +582,16 @@ public: return ret; } + void destroy_my_skip() + { + if (OB_NOT_NULL(mem_context_)) { + if (OB_NOT_NULL(alloc_)) { + alloc_->free(my_skip_); + my_skip_ = nullptr; + } + } + } + int init_items(const int64_t batch_size) { int ret = OB_SUCCESS; if (OB_ISNULL(alloc_)) { @@ -595,6 +605,16 @@ public: return ret; } + void destroy_items() + { + if (OB_NOT_NULL(mem_context_)) { + if (OB_NOT_NULL(alloc_)) { + alloc_->free(items_); + items_ = nullptr; + } + } + } + int init_distinct_map(const int64_t batch_size) { int ret = OB_SUCCESS; if (distinct_map_.is_inited()) { @@ -604,6 +624,11 @@ public: return ret; } + void destroy_distinct_map() + { + distinct_map_.destroy(); + } + int set_funcs(const common::ObIArray *hash_funcs, const common::ObIArray *sort_collations, const common::ObIArray *cmp_funcs, diff --git a/src/sql/engine/join/ob_hash_join_op.cpp b/src/sql/engine/join/ob_hash_join_op.cpp index 325c9d3855..3c447c4b5c 100644 --- a/src/sql/engine/join/ob_hash_join_op.cpp +++ b/src/sql/engine/join/ob_hash_join_op.cpp @@ -32,7 +32,7 @@ OB_SERIALIZE_MEMBER(ObHashJoinInput, shared_hj_info_); // The ctx is owned by thread // sync_event is shared -int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPred pred, bool ignore_interrupt) +int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPred pred, bool ignore_interrupt, bool is_open) { int ret = OB_SUCCESS; ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); @@ -82,12 +82,18 @@ int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPre LOG_DEBUG("debug sync event", K(ret), K(lbt()), K(sync_event), K(loop), K(exit_cnt)); break; + } else if (ignore_interrupt /*close stage*/ && OB_SUCCESS != ATOMIC_LOAD(&shared_hj_info->open_ret_)) { + LOG_WARN("some op have failed in open stage", K(ATOMIC_LOAD(&shared_hj_info->open_ret_))); + break; } else { auto key = shared_hj_info->cond_.get_key(); // wait one time per 1000 us shared_hj_info->cond_.wait(key, 1000); } } // end while + if (OB_FAIL(ret) && is_open) { + set_open_ret(ret); + } } return ret; } @@ -2406,7 +2412,7 @@ int ObHashJoinOp::sync_wait_open() ctx_, hj_input->get_open_cnt(), [&](int64_t n_times) { UNUSED(n_times); - }))) { + }, false /*ignore_interrupt*/, true /*is_open*/))) { LOG_WARN("failed to sync open", K(ret), K(spec_.id_)); } else { LOG_TRACE("debug sync sync open", K(ret), K(spec_.id_)); diff --git a/src/sql/engine/join/ob_hash_join_op.h b/src/sql/engine/join/ob_hash_join_op.h index c7f1a376f2..915f42d8a4 100644 --- a/src/sql/engine/join/ob_hash_join_op.h +++ b/src/sql/engine/join/ob_hash_join_op.h @@ -46,6 +46,7 @@ struct ObHashTableSharedTableInfo int64_t total_memory_row_count_; int64_t total_memory_size_; int64_t open_cnt_; + int open_ret_; }; class ObHashJoinInput : public ObOpInput @@ -73,7 +74,7 @@ public: return ret; } - int sync_wait(ObExecContext &ctx, int64_t &sys_event, EventPred pred, bool ignore_interrupt = false); + int sync_wait(ObExecContext &ctx, int64_t &sys_event, EventPred pred, bool ignore_interrupt = false, bool is_open = false); int64_t get_sync_val() { ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); @@ -161,6 +162,13 @@ public: ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); ATOMIC_SET(&shared_hj_info->ret_, in_ret); } + + void set_open_ret(int in_ret) + { + ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast(shared_hj_info_); + ATOMIC_SET(&shared_hj_info->open_ret_, in_ret); + } + virtual void reset() override { if (0 != shared_hj_info_) { @@ -185,6 +193,7 @@ public: shared_hj_info->close_cnt_ = 0; shared_hj_info->open_cnt_ = 0; shared_hj_info->ret_ = OB_SUCCESS; + shared_hj_info->open_ret_ = OB_SUCCESS; shared_hj_info->read_null_in_naaj_ = false; new (&shared_hj_info->cond_)common::SimpleCond(common::ObWaitEventIds::SQL_SHARED_HJ_COND_WAIT); new (&shared_hj_info->lock_)ObSpinLock(common::ObLatchIds::SQL_SHARED_HJ_COND_LOCK);