cherry-pick from 3_1_x_release to 3.1_opensource_release

This commit is contained in:
yy0
2021-07-26 10:42:25 +08:00
committed by wangzelin.wzl
parent f2f91f98c6
commit 821da37d63
29 changed files with 318 additions and 368 deletions

View File

@ -161,7 +161,8 @@ ObExecContext::ObExecContext(ObIAllocator& allocator)
pwj_map_(nullptr),
calc_type_(CALC_NORMAL),
fixed_id_(OB_INVALID_ID),
expr_partition_id_(OB_INVALID_ID)
expr_partition_id_(OB_INVALID_ID),
iters_(256, allocator)
{}
ObExecContext::ObExecContext()
@ -220,7 +221,8 @@ ObExecContext::ObExecContext()
pwj_map_(nullptr),
calc_type_(CALC_NORMAL),
fixed_id_(OB_INVALID_ID),
expr_partition_id_(OB_INVALID_ID)
expr_partition_id_(OB_INVALID_ID),
iters_(256, allocator_)
{}
ObExecContext::~ObExecContext()
@ -252,6 +254,7 @@ ObExecContext::~ObExecContext()
DESTROY_CONTEXT(lob_fake_allocator_);
lob_fake_allocator_ = NULL;
}
iters_.reset();
}
void ObExecContext::clean_resolve_ctx()
@ -267,6 +270,30 @@ void ObExecContext::clean_resolve_ctx()
sql_ctx_ = nullptr;
}
int ObExecContext::push_back_iter(common::ObNewRowIterator *iter)
{
int ret = OB_SUCCESS;
if (OB_FAIL(iters_.push_back(iter))) {
LOG_WARN("failed to push back iter", K(ret));
}
return ret;
}
int ObExecContext::remove_iter(common::ObNewRowIterator *iter)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < iters_.count(); i++) {
if (iters_.at(i) == iter) {
if (OB_FAIL(iters_.remove(i))) {
LOG_WARN("failed to remove iter", K(ret), K(i));
} else {
break;
}
}
}
return ret;
}
uint64_t ObExecContext::get_ser_version() const
{
return GET_UNIS_CLUSTER_VERSION() < CLUSTER_VERSION_2250 ? SER_VERSION_0 : SER_VERSION_1;
@ -707,6 +734,8 @@ int ObExecContext::check_status()
LOG_WARN("session info is null");
} else if (my_session_->is_terminate(ret)) {
LOG_WARN("execution was terminated", K(ret));
} else if (OB_FAIL(release_table_ref())) {
LOG_WARN("failed to refresh table on demand", K(ret));
} else if (IS_INTERRUPTED()) {
ObInterruptCode& ic = GET_INTERRUPT_CODE();
ret = ic.code_;
@ -1016,6 +1045,23 @@ int ObExecContext::get_pwj_map(PWJPartitionIdMap*& pwj_map)
return ret;
}
// Currently, there are some limitations
// Only iterator in ITER_END can be released because of memory allocation problem
// iterator in merge sort join cannot work properly,
// because iterator may not go to end
int ObExecContext::release_table_ref()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < iters_.count(); i++) {
if (OB_FAIL(iters_.at(i)->release_table_ref())) {
LOG_WARN("failed to release table ref", K(ret), K(i));
} else {
LOG_DEBUG("succ to release_table_ref");
}
}
return ret;
}
DEFINE_SERIALIZE(ObExecContext)
{
int ret = OB_SUCCESS;

View File

@ -662,6 +662,8 @@ public:
return expr_partition_id_;
}
int push_back_iter(common::ObNewRowIterator *iter);
int remove_iter(common::ObNewRowIterator *iter);
private:
int set_phy_op_ctx_ptr(uint64_t index, void* phy_op);
void* get_phy_op_ctx_ptr(uint64_t index) const;
@ -675,7 +677,7 @@ private:
int serialize_operator_input_recursively(const ObPhyOperator* op, char*& buf, int64_t buf_len, int64_t& pos,
int32_t& real_input_count, bool is_full_tree) const;
int serialize_operator_input_len_recursively(const ObPhyOperator* op, int64_t& len, bool is_full_tree) const;
int release_table_ref();
protected:
/**
* @brief the memory of exec context.
@ -806,6 +808,7 @@ protected:
// for expr values op use
int64_t expr_partition_id_;
ObSEArray<common::ObNewRowIterator*, 1, common::ObIAllocator&> iters_;
private:
DISALLOW_COPY_AND_ASSIGN(ObExecContext);
};

View File

@ -861,6 +861,8 @@ int ObTableScan::inner_close(ObExecContext& ctx) const
LOG_WARN("fail to get physical plan ctx", K(ret));
} else if (get_batch_scan_flag() ? NULL == scan_ctx->result_iters_ : NULL == scan_ctx->result_) {
// this is normal case, so we need NOT set ret or LOG_WARN.
} else if (OB_FAIL(ctx.remove_iter(scan_ctx->result_))) {
LOG_WARN("fail to remove iter", K(ret));
} else {
if (OB_FAIL(fill_storage_feedback_info(ctx))) {
LOG_WARN("failed to fill storage feedback info", K(ret));
@ -1607,6 +1609,14 @@ inline int ObTableScan::do_table_scan(ObExecContext& ctx, bool is_rescan, bool n
LOG_DEBUG("table_scan begin", K(scan_ctx->scan_param_), K(*scan_ctx->result_), "op_id", get_id());
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(scan_ctx->result_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result should not be NULL", K(ret), K(scan_ctx->scan_param_));
} else if (OB_FAIL(ctx.push_back_iter(scan_ctx->result_))) {
LOG_WARN("failed to push back iter", K(ret));
}
}
}
if (OB_SUCC(ret)) {
ret = sim_err(my_session);

View File

@ -866,6 +866,8 @@ int ObTableScanOp::inner_close()
if (MY_SPEC.batch_scan_flag_ ? NULL == bnl_iters_ : NULL == result_) {
// this is normal case, so we need NOT set ret or LOG_WARN.
} else if (OB_FAIL(ctx_.remove_iter(result_))) {
LOG_WARN("fail to remove iter", K(ret));
} else {
if (OB_FAIL(fill_storage_feedback_info())) {
LOG_WARN("failed to fill storage feedback info", K(ret));
@ -1482,6 +1484,14 @@ inline int ObTableScanOp::do_table_scan(bool is_rescan, bool need_prepare /*=tru
LOG_DEBUG("debug trans", K(*scan_param_.trans_desc_), K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(result_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result should not be NULL", K(ret), K(scan_param_));
} else if (OB_FAIL(ctx_.push_back_iter(result_))) {
LOG_WARN("failed to push back iter", K(ret), K(scan_param_));
}
}
}
return ret;
}