fix shared hj hang when 1 of worker is interrupted in sync_open && fix 3-stage aggr do not reset distinct info leads 4005

This commit is contained in:
obdev
2023-02-08 11:11:16 +08:00
committed by ob-robot
parent c91a1e8f70
commit 9bd86a8db4
5 changed files with 57 additions and 4 deletions

View File

@ -141,7 +141,7 @@ void ObHashGroupByOp::reset()
first_batch_from_store_ = true;
is_init_distinct_data_ = false;
use_distinct_data_ = false;
distinct_data_set_.reset();
reset_distinct_info();
bypass_ctrl_.reset();
by_pass_nth_group_ = 0;
by_pass_child_brs_ = nullptr;
@ -736,6 +736,17 @@ int ObHashGroupByOp::init_distinct_info(bool is_part)
return ret;
}
void ObHashGroupByOp::reset_distinct_info()
{
hash_funcs_.destroy();
cmp_funcs_.destroy();
sort_collations_.destroy();
distinct_data_set_.destroy_my_skip();
distinct_data_set_.destroy_items();
distinct_data_set_.destroy_distinct_map();
distinct_data_set_.reset();
}
int ObHashGroupByOp::finish_insert_distinct_data()
{
int ret = OB_SUCCESS;
@ -760,6 +771,7 @@ int ObHashGroupByOp::insert_distinct_data()
bool inserted = false;
const ObChunkDatumStore::StoredRow *store_row = nullptr;
if (!is_init_distinct_data_ && OB_FAIL(init_distinct_info(false))) {
LOG_WARN("failed to init distinct info", K(ret));
} else if (OB_FAIL(distinct_data_set_.insert_row(distinct_origin_exprs_, has_exists, inserted))) {
LOG_WARN("failed to insert row", K(ret));
} else {

View File

@ -363,6 +363,7 @@ private:
int insert_distinct_data();
int finish_insert_distinct_data();
int init_distinct_info(bool is_part);
void reset_distinct_info();
int batch_insert_distinct_data(const ObBatchRows &child_brs);
int batch_insert_all_distinct_data(const int64_t batch_size);

View File

@ -582,6 +582,16 @@ public:
return ret;
}
void destroy_my_skip()
{
if (OB_NOT_NULL(mem_context_)) {
if (OB_NOT_NULL(alloc_)) {
alloc_->free(my_skip_);
my_skip_ = nullptr;
}
}
}
int init_items(const int64_t batch_size) {
int ret = OB_SUCCESS;
if (OB_ISNULL(alloc_)) {
@ -595,6 +605,16 @@ public:
return ret;
}
void destroy_items()
{
if (OB_NOT_NULL(mem_context_)) {
if (OB_NOT_NULL(alloc_)) {
alloc_->free(items_);
items_ = nullptr;
}
}
}
int init_distinct_map(const int64_t batch_size) {
int ret = OB_SUCCESS;
if (distinct_map_.is_inited()) {
@ -604,6 +624,11 @@ public:
return ret;
}
void destroy_distinct_map()
{
distinct_map_.destroy();
}
int set_funcs(const common::ObIArray<ObHashFunc> *hash_funcs,
const common::ObIArray<ObSortFieldCollation> *sort_collations,
const common::ObIArray<ObCmpFunc> *cmp_funcs,

View File

@ -32,7 +32,7 @@ OB_SERIALIZE_MEMBER(ObHashJoinInput, shared_hj_info_);
// The ctx is owned by thread
// sync_event is shared
int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPred pred, bool ignore_interrupt)
int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPred pred, bool ignore_interrupt, bool is_open)
{
int ret = OB_SUCCESS;
ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast<ObHashTableSharedTableInfo *>(shared_hj_info_);
@ -82,12 +82,18 @@ int ObHashJoinInput::sync_wait(ObExecContext &ctx, int64_t &sync_event, EventPre
LOG_DEBUG("debug sync event", K(ret), K(lbt()), K(sync_event),
K(loop), K(exit_cnt));
break;
} else if (ignore_interrupt /*close stage*/ && OB_SUCCESS != ATOMIC_LOAD(&shared_hj_info->open_ret_)) {
LOG_WARN("some op have failed in open stage", K(ATOMIC_LOAD(&shared_hj_info->open_ret_)));
break;
} else {
auto key = shared_hj_info->cond_.get_key();
// wait one time per 1000 us
shared_hj_info->cond_.wait(key, 1000);
}
} // end while
if (OB_FAIL(ret) && is_open) {
set_open_ret(ret);
}
}
return ret;
}
@ -2406,7 +2412,7 @@ int ObHashJoinOp::sync_wait_open()
ctx_, hj_input->get_open_cnt(),
[&](int64_t n_times) {
UNUSED(n_times);
}))) {
}, false /*ignore_interrupt*/, true /*is_open*/))) {
LOG_WARN("failed to sync open", K(ret), K(spec_.id_));
} else {
LOG_TRACE("debug sync sync open", K(ret), K(spec_.id_));

View File

@ -46,6 +46,7 @@ struct ObHashTableSharedTableInfo
int64_t total_memory_row_count_;
int64_t total_memory_size_;
int64_t open_cnt_;
int open_ret_;
};
class ObHashJoinInput : public ObOpInput
@ -73,7 +74,7 @@ public:
return ret;
}
int sync_wait(ObExecContext &ctx, int64_t &sys_event, EventPred pred, bool ignore_interrupt = false);
int sync_wait(ObExecContext &ctx, int64_t &sys_event, EventPred pred, bool ignore_interrupt = false, bool is_open = false);
int64_t get_sync_val()
{
ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast<ObHashTableSharedTableInfo *>(shared_hj_info_);
@ -161,6 +162,13 @@ public:
ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast<ObHashTableSharedTableInfo *>(shared_hj_info_);
ATOMIC_SET(&shared_hj_info->ret_, in_ret);
}
void set_open_ret(int in_ret)
{
ObHashTableSharedTableInfo *shared_hj_info = reinterpret_cast<ObHashTableSharedTableInfo *>(shared_hj_info_);
ATOMIC_SET(&shared_hj_info->open_ret_, in_ret);
}
virtual void reset() override
{
if (0 != shared_hj_info_) {
@ -185,6 +193,7 @@ public:
shared_hj_info->close_cnt_ = 0;
shared_hj_info->open_cnt_ = 0;
shared_hj_info->ret_ = OB_SUCCESS;
shared_hj_info->open_ret_ = OB_SUCCESS;
shared_hj_info->read_null_in_naaj_ = false;
new (&shared_hj_info->cond_)common::SimpleCond(common::ObWaitEventIds::SQL_SHARED_HJ_COND_WAIT);
new (&shared_hj_info->lock_)ObSpinLock(common::ObLatchIds::SQL_SHARED_HJ_COND_LOCK);