[BUG FIX] fix the compatibility issue with incorrect primary keys in older version
This commit is contained in:
@ -64,6 +64,7 @@ ob_set_subtarget(ob_sql das
|
||||
das/ob_das_domain_utils.cpp
|
||||
das/ob_text_retrieval_op.cpp
|
||||
das/ob_das_attach_define.cpp
|
||||
das/ob_group_scan_iter.cpp
|
||||
das/iter/ob_das_iter.cpp
|
||||
das/iter/ob_das_merge_iter.cpp
|
||||
das/iter/ob_das_lookup_iter.cpp
|
||||
|
@ -174,14 +174,17 @@ int ObDASMergeIter::create_das_task(const ObDASTabletLoc *tablet_loc, ObDASScanO
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObIDASTaskOp *task_op = nullptr;
|
||||
// when the cluster version is less than 4.3.1, a DAS_OP_TABLE_BATCH_SCAN task is sent on group rescan situtations
|
||||
// for compatibility considerations.
|
||||
ObDASOpType op_type = (nullptr != group_id_expr_ && GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_1_0) ? DAS_OP_TABLE_BATCH_SCAN : DAS_OP_TABLE_SCAN;
|
||||
reuse_op = false;
|
||||
if (OB_ISNULL(das_ref_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected nullptr das ref", K(ret));
|
||||
} else if (OB_NOT_NULL(task_op = das_ref_->find_das_task(tablet_loc, DAS_OP_TABLE_SCAN))) {
|
||||
} else if (OB_NOT_NULL(task_op = das_ref_->find_das_task(tablet_loc, op_type))) {
|
||||
// reuse scan op
|
||||
reuse_op = true;
|
||||
} else if (OB_FAIL(das_ref_->create_das_task(tablet_loc, DAS_OP_TABLE_SCAN, task_op))) {
|
||||
} else if (OB_FAIL(das_ref_->create_das_task(tablet_loc, op_type, task_op))) {
|
||||
LOG_WARN("das ref failed to create das task", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -20,7 +20,8 @@
|
||||
#include "rpc/obrpc/ob_rpc_result_code.h"
|
||||
|
||||
#define DAS_SCAN_OP(_task_op) \
|
||||
(::oceanbase::sql::DAS_OP_TABLE_SCAN != (_task_op)->get_type() ? \
|
||||
(::oceanbase::sql::DAS_OP_TABLE_SCAN != (_task_op)->get_type() && \
|
||||
::oceanbase::sql::DAS_OP_TABLE_BATCH_SCAN != (_task_op)->get_type() ? \
|
||||
nullptr : static_cast<::oceanbase::sql::ObDASScanOp*>(_task_op))
|
||||
#define DAS_GROUP_SCAN_OP(_task_op) \
|
||||
(::oceanbase::sql::DAS_OP_TABLE_BATCH_SCAN != (_task_op)->get_type() ? \
|
||||
|
@ -1,381 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SQL_DAS
|
||||
#include "sql/das/ob_das_group_scan_op.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace common;
|
||||
using namespace storage;
|
||||
namespace sql
|
||||
{
|
||||
ObDASGroupScanOp::ObDASGroupScanOp(ObIAllocator &op_alloc)
|
||||
: ObDASScanOp(op_alloc),
|
||||
group_lookup_op_(NULL),
|
||||
iter_(),
|
||||
result_iter_(&iter_),
|
||||
is_exec_remote_(false),
|
||||
cur_group_idx_(0),
|
||||
group_size_(0)
|
||||
{
|
||||
}
|
||||
|
||||
ObDASGroupScanOp::~ObDASGroupScanOp()
|
||||
{
|
||||
if (nullptr != group_lookup_op_) {
|
||||
group_lookup_op_->~ObGroupLookupOp();
|
||||
//Memory of lookupop come from op_alloc,We do not need free,just set ptr to null.
|
||||
group_lookup_op_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::rescan()
|
||||
{
|
||||
int &ret = errcode_;
|
||||
if (OB_FAIL(ObDASScanOp::rescan())) {
|
||||
LOG_WARN("rescan the table iterator failed", K(ret));
|
||||
} else {
|
||||
iter_.init_group_range(cur_group_idx_, group_size_);
|
||||
if (group_lookup_op_ != nullptr) {
|
||||
group_lookup_op_->reset();
|
||||
ObGroupScanIter *group_iter = NULL;
|
||||
group_iter = static_cast<ObGroupScanIter *>(group_lookup_op_->get_lookup_iter());
|
||||
if (NULL == group_iter) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(group_iter), K(*group_lookup_op_), K(ret));
|
||||
} else {
|
||||
group_iter->init_group_range(iter_.get_cur_group_idx(), iter_.get_group_size());
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::release_op()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int group_ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(ObDASScanOp::release_op())) {
|
||||
LOG_WARN("release das scan op fail", K(ret));
|
||||
}
|
||||
|
||||
if (nullptr != group_lookup_op_) {
|
||||
group_ret = group_lookup_op_->revert_iter();
|
||||
if (OB_SUCCESS != group_ret) {
|
||||
LOG_WARN("revert lookup iterator failed", K(group_ret));
|
||||
}
|
||||
//Only cover ret code when DASScanOp release success.
|
||||
//In current implement group lookup op revert always return OB_SUCCESS.
|
||||
if (OB_SUCCESS == ret) {
|
||||
ret = group_ret;
|
||||
}
|
||||
}
|
||||
group_lookup_op_ = NULL;
|
||||
iter_.reset();
|
||||
result_iter_ = &iter_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::open_op()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_size = scan_rtdef_->eval_ctx_->is_vectorized()
|
||||
? scan_rtdef_->eval_ctx_->max_batch_size_
|
||||
:1;
|
||||
|
||||
ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr();
|
||||
iter_.init_group_range(cur_group_idx_, group_size_);
|
||||
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
||||
*scan_rtdef_->eval_ctx_,
|
||||
scan_ctdef_->pd_expr_spec_.access_exprs_,
|
||||
scan_rtdef_->stmt_allocator_,
|
||||
max_size,
|
||||
scan_ctdef_->group_id_expr_,
|
||||
&this->get_scan_result(),
|
||||
scan_rtdef_->need_check_output_datum_,
|
||||
attr))) {
|
||||
LOG_WARN("fail to init iter", K(ret));
|
||||
} else if (OB_FAIL(ObDASScanOp::open_op())) {
|
||||
LOG_WARN("fail to open op", K(ret));
|
||||
}
|
||||
LOG_DEBUG("group scan open op", K(*this), K(this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::do_local_index_lookup()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t size = sizeof(ObGroupLookupOp);
|
||||
void *buf = op_alloc_.alloc(size);
|
||||
if (OB_ISNULL(buf)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("lookup op buf allocated failed", K(ret));
|
||||
} else {
|
||||
ObGroupLookupOp *op = new(buf) ObGroupLookupOp();
|
||||
op->set_rowkey_iter(&this->iter_);
|
||||
group_lookup_op_ = op;
|
||||
result_iter_ = group_lookup_op_;
|
||||
OZ(op->init(get_lookup_ctdef(),
|
||||
get_lookup_rtdef(),
|
||||
scan_ctdef_,
|
||||
scan_rtdef_,
|
||||
trans_desc_,
|
||||
snapshot_));
|
||||
if (OB_SUCC(ret)) {
|
||||
op->set_tablet_id(related_tablet_ids_.at(0));
|
||||
op->set_ls_id(ls_id_);
|
||||
op->set_is_group_scan(true);
|
||||
OZ(op->init_group_scan_iter(cur_group_idx_,
|
||||
group_size_,
|
||||
scan_ctdef_->group_id_expr_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDASGroupScanOp::init_group_range(int64_t cur_group_idx, int64_t group_size)
|
||||
{
|
||||
cur_group_idx_ = cur_group_idx;
|
||||
group_size_ = group_size;
|
||||
LOG_DEBUG("init group range", K(group_size), K(cur_group_idx));
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::switch_scan_group()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLocalIndexLookupOp *lookup_op = get_lookup_op();
|
||||
// for lookup group scan, switch lookup group scan iter
|
||||
if (NULL != lookup_op) {
|
||||
ret = lookup_op->switch_lookup_scan_group();
|
||||
} else {
|
||||
ret = iter_.switch_scan_group();
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_ITER_END != ret) {
|
||||
LOG_WARN("switch scan group failed", K(ret), KP(lookup_op), K(iter_));
|
||||
}
|
||||
LOG_DEBUG("switch scan group", K(iter_), K(ret), KP(lookup_op));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::set_scan_group(int64_t group_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLocalIndexLookupOp *lookup_op = get_lookup_op();
|
||||
// for lookup group scan, switch lookup group scan iter
|
||||
if (NULL != lookup_op) {
|
||||
ret = lookup_op->set_lookup_scan_group(group_id);
|
||||
} else {
|
||||
ret = iter_.set_scan_group(group_id);
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) && OB_ITER_END != ret) {
|
||||
LOG_WARN("set scan group failed", K(ret), KP(lookup_op), K(iter_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObNewRowIterator *ObDASGroupScanOp::get_storage_scan_iter()
|
||||
{
|
||||
ObNewRowIterator *iter = NULL;
|
||||
if (related_ctdefs_.empty()) {
|
||||
iter = result_;
|
||||
} else { // has lookup
|
||||
if (NULL == group_lookup_op_ || NULL == group_lookup_op_->get_rowkey_iter()) {
|
||||
iter = NULL;
|
||||
} else {
|
||||
iter = static_cast<ObGroupScanIter *>(group_lookup_op_->get_rowkey_iter())->get_iter();
|
||||
}
|
||||
}
|
||||
return iter;
|
||||
}
|
||||
|
||||
int ObDASGroupScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (NULL == group_lookup_op_) {
|
||||
result_iter_ = result_;
|
||||
} else {
|
||||
result_iter_ = group_lookup_op_;
|
||||
set_is_exec_remote(true);
|
||||
}
|
||||
if (OB_FAIL(ObDASScanOp::fill_task_result(task_result, has_more, memory_limit))) {
|
||||
LOG_WARN("fail to fill task result", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// init some thing before das_op->get_next_row() in local server for remote das
|
||||
int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// init result_ by ObDASScanResult
|
||||
if (OB_FAIL(ObDASScanOp::decode_task_result(task_result))) {
|
||||
LOG_WARN("fail to decode task result", K(ret));
|
||||
} else {
|
||||
// because for remote das, local server not call open_op, so here we need to init
|
||||
// something for group scan op
|
||||
//
|
||||
// init group_lookup_op_ for lookup
|
||||
if (NULL != get_lookup_ctdef() && NULL == group_lookup_op_) {
|
||||
if (OB_FAIL(do_local_index_lookup())) {
|
||||
LOG_WARN("do local index lookup failed", K(ret));
|
||||
}
|
||||
}
|
||||
// init scan iter_
|
||||
int64_t max_size = scan_rtdef_->eval_ctx_->is_vectorized()
|
||||
? scan_rtdef_->eval_ctx_->max_batch_size_
|
||||
:1;
|
||||
if (OB_SUCC(ret) && NULL == iter_.get_group_id_expr()) {
|
||||
ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr();
|
||||
iter_.init_group_range(cur_group_idx_, group_size_);
|
||||
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
||||
*scan_rtdef_->eval_ctx_,
|
||||
scan_ctdef_->pd_expr_spec_.access_exprs_,
|
||||
scan_rtdef_->stmt_allocator_,
|
||||
max_size,
|
||||
scan_ctdef_->group_id_expr_,
|
||||
&this->get_scan_result(),
|
||||
scan_rtdef_->need_check_output_datum_,
|
||||
attr))) {
|
||||
LOG_WARN("fail to init iter", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// set ObDASScanResult as the input of ObGroupScanIter
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (NULL == group_lookup_op_) {
|
||||
iter_.get_iter() = result_;
|
||||
result_iter_ = &iter_;
|
||||
} else {
|
||||
static_cast<ObGroupScanIter *>(group_lookup_op_->get_lookup_iter())->get_iter() = result_;
|
||||
result_iter_ = group_lookup_op_->get_lookup_iter();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
ObGroupLookupOp::~ObGroupLookupOp()
|
||||
{
|
||||
}
|
||||
|
||||
int ObGroupLookupOp::init_group_range(int64_t cur_group_idx, int64_t group_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObGroupScanIter *group_iter = static_cast<ObGroupScanIter*>(lookup_iter_);
|
||||
if (NULL == group_iter) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret));
|
||||
} else {
|
||||
group_iter->init_group_range(cur_group_idx, group_size);
|
||||
LOG_DEBUG("set group info",
|
||||
"scan_range", scan_param_.key_ranges_,
|
||||
K(*group_iter));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx,
|
||||
int64_t group_size,
|
||||
ObExpr *group_id_expr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_vectorized = lookup_rtdef_->p_pd_expr_op_->is_vectorized();
|
||||
|
||||
int64_t max_row_store_size = is_vectorized ? lookup_rtdef_->eval_ctx_->max_batch_size_: 1;
|
||||
ObMemAttr attr = lookup_rtdef_->stmt_allocator_.get_attr();
|
||||
group_iter_.init_group_range(cur_group_idx, group_size);
|
||||
OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_,
|
||||
*lookup_rtdef_->eval_ctx_,
|
||||
lookup_ctdef_->pd_expr_spec_.access_exprs_,
|
||||
lookup_rtdef_->stmt_allocator_,
|
||||
max_row_store_size,
|
||||
group_id_expr,
|
||||
&group_iter_.get_result_tmp_iter(),
|
||||
lookup_rtdef_->need_check_output_datum_,
|
||||
attr));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObGroupLookupOp::need_next_index_batch() const
|
||||
{
|
||||
bool bret = false;
|
||||
if (lookup_group_cnt_ >= index_group_cnt_) {
|
||||
bret = !index_end_;
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
ObNewRowIterator *&ObGroupLookupOp::get_lookup_storage_iter()
|
||||
{
|
||||
return static_cast<ObGroupScanIter *>(lookup_iter_)->get_iter();
|
||||
}
|
||||
|
||||
int ObGroupLookupOp::switch_lookup_scan_group()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
state_ = OUTPUT_ROWS;
|
||||
ObGroupScanIter *group_iter = NULL;
|
||||
group_iter = static_cast<ObGroupScanIter *>(get_lookup_iter());
|
||||
if (NULL == group_iter) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(group_iter), K(ret));
|
||||
} else {
|
||||
ret = group_iter->switch_scan_group();
|
||||
++lookup_group_cnt_;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupLookupOp::set_lookup_scan_group(int64_t group_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
state_ = OUTPUT_ROWS;
|
||||
ObGroupScanIter *group_iter = NULL;
|
||||
group_iter = static_cast<ObGroupScanIter *>(get_lookup_iter());
|
||||
if (NULL == group_iter) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(group_iter), K(ret));
|
||||
} else {
|
||||
ret = group_iter->set_scan_group(group_id);
|
||||
if(-1 == group_id) {
|
||||
++lookup_group_cnt_;
|
||||
} else {
|
||||
lookup_group_cnt_ = group_id + 1;
|
||||
}
|
||||
|
||||
if(lookup_group_cnt_ >= index_group_cnt_ && OB_ITER_END == ret && !index_end_) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGroupLookupOp::revert_iter()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObLocalIndexLookupOp::revert_iter())) {
|
||||
LOG_WARN("revert local index lookup iter from group fail.", K(ret));
|
||||
}
|
||||
group_iter_.reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER((ObDASGroupScanOp, ObDASScanOp), iter_, cur_group_idx_, group_size_);
|
||||
|
||||
} // namespace sql
|
||||
} // namespace oceanbase
|
@ -1092,16 +1092,25 @@ OB_SERIALIZE_MEMBER((ObDASScanOp, ObIDASTaskOp),
|
||||
OB_SERIALIZE_MEMBER(ObDASObsoletedObj, flag_);
|
||||
|
||||
ObDASGroupScanOp::ObDASGroupScanOp(ObIAllocator &op_alloc)
|
||||
: ObDASScanOp(op_alloc)
|
||||
: ObDASScanOp(op_alloc),
|
||||
iter_(),
|
||||
cur_group_idx_(0),
|
||||
group_size_(0)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void ObDASGroupScanOp::init_group_range(int64_t cur_group_idx, int64_t group_size)
|
||||
{
|
||||
cur_group_idx_ = cur_group_idx;
|
||||
group_size_ = group_size;
|
||||
}
|
||||
|
||||
ObDASGroupScanOp::~ObDASGroupScanOp()
|
||||
{
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER((ObDASGroupScanOp, ObDASScanOp));
|
||||
OB_SERIALIZE_MEMBER((ObDASGroupScanOp, ObDASScanOp), iter_, cur_group_idx_, group_size_);
|
||||
|
||||
ObDASScanResult::ObDASScanResult()
|
||||
: ObIDASTaskResult(),
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "storage/access/ob_dml_param.h"
|
||||
#include "sql/engine/basic/ob_chunk_datum_store.h"
|
||||
#include "sql/engine/table/ob_index_lookup_op_impl.h"
|
||||
#include "sql/das/ob_group_scan_iter.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -398,14 +399,22 @@ protected:
|
||||
};
|
||||
|
||||
// NOTE: ObDASGroupScanOp defined here is For cross-version compatibility, and it will be removed in future barrier-version;
|
||||
// For das remote execution in upgrade stage, ctrl(4.2.1) -> executor (4.2.3)
|
||||
// the executor will execute das group-rescan op as the logic of das-scan op, and return the result to ctr;
|
||||
// For das remote execution in upgrade stage,
|
||||
// 1. ctrl(4.2.1) -> executor(4.2.3):
|
||||
// the executor will execute group scan task as the logic of das scan op, and return the result to ctr;
|
||||
// 2. ctrl(4.2.3) -> executor(4.2.1):
|
||||
// the ctrl will send group scan task to executor to ensure exectuor will execute succeed;
|
||||
class ObDASGroupScanOp : public ObDASScanOp
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObDASGroupScanOp(common::ObIAllocator &op_alloc);
|
||||
virtual ~ObDASGroupScanOp();
|
||||
void init_group_range(int64_t cur_group_idx, int64_t group_size);
|
||||
private:
|
||||
ObGroupScanIter iter_;
|
||||
int64_t cur_group_idx_;
|
||||
int64_t group_size_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SQL_DAS
|
||||
#include "sql/das/ob_das_group_scan_op.h"
|
||||
#include "sql/das/ob_das_scan_op.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
|
@ -690,6 +690,8 @@ OB_INLINE int ObTableScanOp::create_one_das_task(ObDASTabletLoc *tablet_loc)
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(cherry_pick_range_by_tablet_id(scan_op))) {
|
||||
LOG_WARN("prune query range by partition id failed", K(ret), KPC(tablet_loc));
|
||||
} else if (OB_NOT_NULL(DAS_GROUP_SCAN_OP(scan_op))) {
|
||||
static_cast<ObDASGroupScanOp*>(scan_op)->init_group_range(0, tsc_rtdef_.group_size_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -885,6 +887,7 @@ int ObTableScanOp::prepare_all_das_tasks()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1640,7 +1643,12 @@ int ObTableScanOp::fill_storage_feedback_info()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// fill storage feedback info for acs
|
||||
ObTableScanParam &scan_param = DAS_SCAN_OP(*scan_iter_->begin_task_iter())->get_scan_param();
|
||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*scan_iter_->begin_task_iter());
|
||||
if (OB_ISNULL(scan_op)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected nullptr das scan op", K(ret));
|
||||
} else {
|
||||
ObTableScanParam &scan_param = scan_op->get_scan_param();
|
||||
bool is_index_back = scan_param.scan_flag_.index_back_;
|
||||
ObTableScanStat &table_scan_stat = GET_PHY_PLAN_CTX(ctx_)->get_table_scan_stat();
|
||||
if (MY_SPEC.should_scan_index()) {
|
||||
@ -1698,6 +1706,8 @@ int ObTableScanOp::fill_storage_feedback_info()
|
||||
"is_need_feedback", scan_param.scan_flag_.is_need_feedback(),
|
||||
"idx access row count", scan_param.idx_table_scan_stat_.access_row_cnt_,
|
||||
"main access row count", scan_param.main_table_scan_stat_.access_row_cnt_);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1806,7 +1816,10 @@ int ObTableScanOp::local_iter_rescan()
|
||||
DASTaskIter task_iter = scan_iter_->begin_task_iter();
|
||||
for (; OB_SUCC(ret) && !task_iter.is_end(); ++task_iter) {
|
||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
||||
if (MY_SPEC.gi_above_) {
|
||||
if (OB_ISNULL(scan_op)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected nullptr das scan op", K(ret));
|
||||
} else if (MY_SPEC.gi_above_) {
|
||||
if (!MY_SPEC.is_index_global_ && MY_CTDEF.lookup_ctdef_ != nullptr) {
|
||||
//is local index lookup, need to set the lookup ctdef to the das scan op
|
||||
if (OB_FAIL(pushdown_normal_lookup_to_das(*scan_op))) {
|
||||
@ -1851,6 +1864,10 @@ int ObTableScanOp::local_iter_reuse()
|
||||
for (DASTaskIter task_iter = scan_iter_->begin_task_iter();
|
||||
!task_iter.is_end(); ++task_iter) {
|
||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
||||
if (OB_ISNULL(scan_op)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected nullptr das scan op", K(ret));
|
||||
} else {
|
||||
bool need_switch_param = (scan_op->get_tablet_loc() != MY_INPUT.tablet_loc_ &&
|
||||
MY_INPUT.tablet_loc_ != nullptr);
|
||||
if (MY_INPUT.tablet_loc_ != nullptr) {
|
||||
@ -1860,7 +1877,9 @@ int ObTableScanOp::local_iter_reuse()
|
||||
}
|
||||
scan_op->reuse_iter();
|
||||
}
|
||||
if (OB_FAIL(reuse_table_rescan_allocator())) {
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(reuse_table_rescan_allocator())) {
|
||||
LOG_WARN("get table allocator", K(ret));
|
||||
} else {
|
||||
tsc_rtdef_.scan_rtdef_.scan_allocator_.set_alloc(table_rescan_allocator_);
|
||||
@ -2191,7 +2210,12 @@ int ObTableScanOp::inner_get_next_row_for_tsc()
|
||||
// } else if (DAS_SCAN_OP->get_scan_param().main_table_scan_stat_.bf_access_cnt_ > 0) {
|
||||
// partition->feedback_scan_access_stat(DAS_SCAN_OP->get_scan_param());
|
||||
// }
|
||||
ObTableScanParam &scan_param = DAS_SCAN_OP(*scan_iter_->begin_task_iter())->get_scan_param();
|
||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*scan_iter_->begin_task_iter());
|
||||
if (OB_ISNULL(scan_op)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected nullptr das scan op", K(ret));
|
||||
} else {
|
||||
ObTableScanParam &scan_param = scan_op->get_scan_param();
|
||||
ObTableScanStat &table_scan_stat = GET_PHY_PLAN_CTX(ctx_)->get_table_scan_stat();
|
||||
fill_table_scan_stat(scan_param.main_table_scan_stat_, table_scan_stat);
|
||||
if (MY_SPEC.should_scan_index() && scan_param.scan_flag_.index_back_) {
|
||||
@ -2206,6 +2230,7 @@ int ObTableScanOp::inner_get_next_row_for_tsc()
|
||||
ret = OB_ITER_END;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user