placeholder for fts branch DAS iter structure refactor

This commit is contained in:
saltonz
2024-04-10 08:17:34 +00:00
committed by ob-robot
parent 3d4ef9741d
commit 7d11a2a10d
9 changed files with 179 additions and 158 deletions

View File

@ -0,0 +1,49 @@
/**
* Copyright (c) 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* ob_das_attach_define.cpp
*
* Author: yuming<>
*/
#define USING_LOG_PREFIX SQL_DAS
#include "sql/das/ob_das_attach_define.h"
#include "sql/das/ob_das_factory.h"
namespace oceanbase
{
namespace sql
{
OB_DEF_SERIALIZE(ObDASAttachSpec)
{
int ret = OB_SUCCESS;
bool has_attach_ctdef = false;
OB_UNIS_ENCODE(has_attach_ctdef);
return ret;
}
OB_DEF_DESERIALIZE(ObDASAttachSpec)
{
int ret = OB_SUCCESS;
bool has_attach_ctdef = false;
OB_UNIS_DECODE(has_attach_ctdef);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObDASAttachSpec)
{
int64_t len = 0;
bool has_attach_ctdef = false;
OB_UNIS_ADD_LEN(has_attach_ctdef);
return len;
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Copyright (c) 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* ob_das_attach_define.h
*
* Author: yuming<>
*/
#ifndef OBDEV_SRC_SQL_DAS_OB_DAS_ATTACH_DEFINE_H_
#define OBDEV_SRC_SQL_DAS_OB_DAS_ATTACH_DEFINE_H_
#include "sql/das/ob_das_define.h"
#include "share/ob_define.h"
#include "sql/engine/expr/ob_expr.h"
namespace oceanbase
{
namespace sql
{
struct ObDASAttachSpec
{
OB_UNIS_VERSION(1);
public:
ObDASAttachSpec(common::ObIAllocator &alloc, ObDASBaseCtDef *scan_ctdef)
: attach_loc_metas_(alloc),
scan_ctdef_(nullptr),
allocator_(alloc),
attach_ctdef_(nullptr)
{
}
common::ObList<ObDASTableLocMeta*, common::ObIAllocator> attach_loc_metas_;
ObDASBaseCtDef *scan_ctdef_; //This ctdef represents the main task information executed by the DAS Task.
common::ObIAllocator &allocator_;
ObDASBaseCtDef *attach_ctdef_; //The attach_ctdef represents the task information that is bound to and executed on the DAS Task.
TO_STRING_KV(K_(attach_loc_metas),
K_(attach_ctdef));
};
}
}
#endif

View File

@ -171,7 +171,7 @@ ObDASScanOp::ObDASScanOp(ObIAllocator &op_alloc)
result_(nullptr),
remain_row_cnt_(0),
retry_alloc_(nullptr),
ir_param_(op_alloc)
ir_param_()
{
}
@ -744,69 +744,7 @@ OB_SERIALIZE_MEMBER((ObDASScanOp, ObIDASTaskOp),
scan_param_.ss_key_ranges_,
ir_param_);
OB_DEF_SERIALIZE(ObDASIRParam)
{
int ret = OB_SUCCESS;
const bool ir_scan = is_ir_scan();
LST_DO_CODE(OB_UNIS_ENCODE, ir_scan);
if (OB_SUCC(ret) && ir_scan) {
if (OB_ISNULL(ctdef_) || OB_ISNULL(rtdef_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else {
LST_DO_CODE(OB_UNIS_ENCODE,
*ctdef_,
*rtdef_,
ls_id_,
inv_idx_tablet_id_,
doc_id_idx_tablet_id_);
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObDASIRParam)
{
int ret = OB_SUCCESS;
bool ir_scan = false;
LST_DO_CODE(OB_UNIS_DECODE, ir_scan);
if (OB_SUCC(ret) && ir_scan) {
if (OB_ISNULL(ctdef_ = OB_NEWx(ObDASIRCtDef, &allocator_, allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ir ctdef", K(ret));
} else if (OB_ISNULL(rtdef_ = OB_NEWx(ObDASIRRtDef, &allocator_, allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ir rtdef", K(ret));
} else {
LST_DO_CODE(OB_UNIS_DECODE,
*ctdef_,
*rtdef_,
ls_id_,
inv_idx_tablet_id_,
doc_id_idx_tablet_id_);
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObDASIRParam)
{
int64_t len = 0;
const bool ir_scan = is_ir_scan();
LST_DO_CODE(OB_UNIS_ADD_LEN, ir_scan);
if (ir_scan) {
if (nullptr != ctdef_ && nullptr != rtdef_) {
LST_DO_CODE(OB_UNIS_ADD_LEN,
*ctdef_,
*rtdef_);
}
LST_DO_CODE(OB_UNIS_ADD_LEN,
ls_id_,
inv_idx_tablet_id_,
doc_id_idx_tablet_id_);
}
return len;
}
OB_SERIALIZE_MEMBER(ObDASObsoletedObj, flag_);
ObDASScanResult::ObDASScanResult()
: ObIDASTaskResult(),

View File

@ -173,30 +173,13 @@ private:
};
};
struct ObDASIRParam
struct ObDASObsoletedObj
{
OB_UNIS_VERSION(1);
public:
ObDASIRParam(ObIAllocator &alloc)
: allocator_(alloc),
ctdef_(nullptr),
rtdef_(nullptr),
ls_id_(OB_INVALID_ID),
inv_idx_tablet_id_(),
fwd_idx_tablet_id_(),
doc_id_idx_tablet_id_() {}
virtual ~ObDASIRParam() {}
inline bool is_ir_scan() const { return false; } // always false on master
TO_STRING_KV(K_(ls_id), K_(inv_idx_tablet_id), K_(fwd_idx_tablet_id),
K_(doc_id_idx_tablet_id), KP_(ctdef), KP_(rtdef));
ObIAllocator &allocator_;
ObDASIRCtDef *ctdef_;
ObDASIRRtDef *rtdef_;
share::ObLSID ls_id_;
ObTabletID inv_idx_tablet_id_;
ObTabletID fwd_idx_tablet_id_;
ObTabletID doc_id_idx_tablet_id_;
ObDASObsoletedObj() : flag_(false) {}
TO_STRING_KV(K_(flag));
bool flag_;
};
class ObDASScanOp : public ObIDASTaskOp
@ -284,7 +267,7 @@ protected:
union {
common::ObArenaAllocator retry_alloc_buf_;
};
ObDASIRParam ir_param_;
ObDASObsoletedObj ir_param_; // obsoleted attribute, please gc me at next barrier version
};
class ObDASScanResult : public ObIDASTaskResult, public common::ObNewRowIterator

View File

@ -83,6 +83,10 @@ OB_DEF_SERIALIZE(ObDASRemoteInfo)
OB_UNIS_ENCODE(session_id_);
OB_UNIS_ENCODE(plan_id_);
OB_UNIS_ENCODE(plan_hash_);
// placeholder for serialize the reference relationship between ctdefs and rtdefs.
// Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1.
// double check compatiblity before merge to master
return ret;
}
@ -177,6 +181,10 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo)
OB_UNIS_DECODE(session_id_);
OB_UNIS_DECODE(plan_id_);
OB_UNIS_DECODE(plan_hash_);
// placeholder for serialize the reference relationship between ctdefs and rtdefs.
// Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1.
// double check compatiblity before merge to master
return ret;
}
@ -216,6 +224,10 @@ OB_DEF_SERIALIZE_SIZE(ObDASRemoteInfo)
OB_UNIS_ADD_LEN(session_id_);
OB_UNIS_ADD_LEN(plan_id_);
OB_UNIS_ADD_LEN(plan_hash_);
// placeholder for serialize the reference relationship between ctdefs and rtdefs.
// Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1.
// double check compatiblity before merge to master
return len;
}
@ -277,7 +289,9 @@ OB_SERIALIZE_MEMBER(ObIDASTaskOp,
ls_id_,
related_ctdefs_,
related_rtdefs_,
related_tablet_ids_);
related_tablet_ids_,
attach_ctdef_,
attach_rtdef_);
ObDASTaskArg::ObDASTaskArg()
: timeout_ts_(0),

View File

@ -122,7 +122,9 @@ public:
das_task_node_(),
agg_tasks_(nullptr),
cur_agg_list_(nullptr),
op_result_(nullptr)
op_result_(nullptr),
attach_ctdef_(nullptr),
attach_rtdef_(nullptr)
{
das_task_node_.get_data() = this;
}
@ -264,6 +266,11 @@ protected:
ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize
DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize
ObIDASTaskResult *op_result_;
//The attach_ctdef describes the computations that are pushed down and executed as an attachment to the ObDASTaskOp,
//such as the back table operation for full-text indexes,
//rowkey merging for index merge operations, and so on.
const ObDASBaseCtDef *attach_ctdef_;
ObDASBaseRtDef *attach_rtdef_;
};
typedef common::ObObjStore<ObIDASTaskOp*, common::ObIAllocator&> DasTaskList;
typedef DasTaskList::Iterator DASTaskIter;
@ -412,17 +419,22 @@ struct DASCtEncoder
static int encode(char *buf, const int64_t buf_len, int64_t &pos, const T *val)
{
int ret = common::OB_SUCCESS;
int64_t idx = 0;
int64_t idx = common::OB_INVALID_INDEX;
const ObDASBaseCtDef *ctdef = val;
ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info();
if (OB_ISNULL(val) || OB_ISNULL(remote_info)) {
if (OB_ISNULL(remote_info)) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(val), K(remote_info));
SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(remote_info));
} else if (OB_ISNULL(val)) {
idx = common::OB_INVALID_INDEX;
} else if (!common::has_exist_in_array(remote_info->ctdefs_, ctdef, &idx)) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "val not found in ctdefs", K(ret), K(val), KPC(val));
} else if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast<int32_t>(idx)))) {
SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast<int32_t>(idx)))) {
SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx));
}
}
return ret;
}
@ -430,13 +442,15 @@ struct DASCtEncoder
static int decode(const char *buf, const int64_t data_len, int64_t &pos, const T *&val)
{
int ret = common::OB_SUCCESS;
int32_t idx = 0;
int32_t idx = common::OB_INVALID_INDEX;
ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info();
if (OB_ISNULL(remote_info)) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "remote_info is nullptr", K(ret), K(remote_info));
} else if (OB_FAIL(common::serialization::decode_i32(buf, data_len, pos, &idx))) {
SQL_DAS_LOG(WARN, "decode idx failed", K(ret), K(idx));
} else if (OB_UNLIKELY(common::OB_INVALID_INDEX == idx)) {
val = nullptr;
} else if (OB_UNLIKELY(idx < 0) || OB_UNLIKELY(idx >= remote_info->ctdefs_.count())) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "idx is invalid", K(ret), K(idx), K(remote_info->ctdefs_.count()));
@ -449,7 +463,7 @@ struct DASCtEncoder
static int64_t encoded_length(const T *val)
{
UNUSED(val);
int32_t idx = 0;
int32_t idx = common::OB_INVALID_INDEX;
return common::serialization::encoded_length_i32(idx);
}
};
@ -460,17 +474,22 @@ struct DASRtEncoder
static int encode(char *buf, const int64_t buf_len, int64_t &pos, const T *val)
{
int ret = common::OB_SUCCESS;
int64_t idx = 0;
int64_t idx = common::OB_INVALID_INDEX;
ObDASBaseRtDef *rtdef = const_cast<T*>(val);
ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info();
if (OB_ISNULL(val) || OB_ISNULL(remote_info)) {
if (OB_ISNULL(remote_info)) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(val), K(remote_info));
} else if (OB_ISNULL(val)) {
idx = common::OB_INVALID_INDEX;
} else if (!common::has_exist_in_array(remote_info->rtdefs_, rtdef, &idx)) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "val not found in rtdefs", K(ret), K(val), KPC(val));
} else if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast<int32_t>(idx)))) {
SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast<int32_t>(idx)))) {
SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx));
}
}
return ret;
}
@ -485,6 +504,8 @@ struct DASRtEncoder
SQL_DAS_LOG(WARN, "remote_info is nullptr", K(ret), K(remote_info));
} else if (OB_FAIL(common::serialization::decode_i32(buf, data_len, pos, &idx))) {
SQL_DAS_LOG(WARN, "decode idx failed", K(ret), K(idx));
} else if (OB_UNLIKELY(common::OB_INVALID_INDEX == idx)) {
val = nullptr;
} else if (OB_UNLIKELY(idx < 0) || OB_UNLIKELY(idx >= remote_info->rtdefs_.count())) {
ret = common::OB_ERR_UNEXPECTED;
SQL_DAS_LOG(WARN, "idx is invalid", K(ret), K(idx), K(remote_info->rtdefs_.count()));