The das module is a placeholder for compatibility in the master branch
This commit is contained in:
committed by
ob-robot
parent
3d09cc1628
commit
1a647628f1
@ -361,7 +361,74 @@ OB_SERIALIZE_MEMBER(ObIDASTaskOp,
|
||||
related_rtdefs_,
|
||||
related_tablet_ids_,
|
||||
attach_ctdef_,
|
||||
attach_rtdef_);
|
||||
attach_rtdef_,
|
||||
das_gts_opt_info_);
|
||||
|
||||
OB_DEF_SERIALIZE(ObDASGTSOptInfo)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool serialize_specify_snapshot = specify_snapshot_ == nullptr ? false : true;
|
||||
LST_DO_CODE(OB_UNIS_ENCODE,
|
||||
use_specify_snapshot_,
|
||||
isolation_level_,
|
||||
serialize_specify_snapshot);
|
||||
if (serialize_specify_snapshot) {
|
||||
OB_UNIS_ENCODE(*specify_snapshot_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_DESERIALIZE(ObDASGTSOptInfo)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool serialize_specify_snapshot = false;
|
||||
LST_DO_CODE(OB_UNIS_DECODE,
|
||||
use_specify_snapshot_,
|
||||
isolation_level_,
|
||||
serialize_specify_snapshot);
|
||||
if (serialize_specify_snapshot) {
|
||||
if (OB_FAIL(init(isolation_level_))) {
|
||||
LOG_WARN("fail to init gts_opt_info", K(ret));
|
||||
} else {
|
||||
OB_UNIS_DECODE(*specify_snapshot_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE_SIZE(ObDASGTSOptInfo)
|
||||
{
|
||||
int64_t len = 0;
|
||||
bool serialize_specify_snapshot = specify_snapshot_ == nullptr ? false : true;
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN,
|
||||
use_specify_snapshot_,
|
||||
isolation_level_,
|
||||
serialize_specify_snapshot);
|
||||
if (serialize_specify_snapshot) {
|
||||
OB_UNIS_ADD_LEN(*specify_snapshot_);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
int ObDASGTSOptInfo::init(transaction::ObTxIsolationLevel isolation_level)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *buf = nullptr;
|
||||
void *buf2 = nullptr;
|
||||
if (OB_ISNULL(buf = alloc_.alloc(sizeof(ObTxReadSnapshot)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to allocate memory for ObTxReadSnapshot", K(ret), K(sizeof(ObTxReadSnapshot)));
|
||||
} else if (OB_ISNULL(buf2 = alloc_.alloc(sizeof(ObTxReadSnapshot)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to allocate memory for ObTxReadSnapshot", K(ret), K(sizeof(ObTxReadSnapshot)));
|
||||
} else {
|
||||
use_specify_snapshot_ = true;
|
||||
isolation_level_ = isolation_level;
|
||||
specify_snapshot_ = new(buf) ObTxReadSnapshot();
|
||||
response_snapshot_ = new(buf2) ObTxReadSnapshot();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObDASTaskArg::ObDASTaskArg()
|
||||
: timeout_ts_(0),
|
||||
|
||||
@ -42,6 +42,49 @@ class ObDasAggregatedTasks;
|
||||
typedef ObDLinkNode<ObIDASTaskOp*> DasTaskNode;
|
||||
typedef ObDList<DasTaskNode> DasTaskLinkedList;
|
||||
|
||||
struct ObDASGTSOptInfo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObDASGTSOptInfo(common::ObIAllocator &alloc)
|
||||
: alloc_(alloc),
|
||||
use_specify_snapshot_(false),
|
||||
isolation_level_(),
|
||||
specify_snapshot_(nullptr),
|
||||
response_snapshot_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
~ObDASGTSOptInfo()
|
||||
{
|
||||
if (specify_snapshot_ != nullptr) {
|
||||
specify_snapshot_->~ObTxReadSnapshot();
|
||||
}
|
||||
if (response_snapshot_ != nullptr) {
|
||||
response_snapshot_->~ObTxReadSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
int init(transaction::ObTxIsolationLevel isolation_level);
|
||||
void set_use_specify_snapshot(bool v)
|
||||
{
|
||||
use_specify_snapshot_ = v;
|
||||
}
|
||||
bool get_use_specify_snapshot() { return use_specify_snapshot_; }
|
||||
transaction::ObTxReadSnapshot *get_specify_snapshot() { return specify_snapshot_; }
|
||||
transaction::ObTxReadSnapshot *get_response_snapshot() { return response_snapshot_; }
|
||||
|
||||
TO_STRING_KV(K_(use_specify_snapshot),
|
||||
K_(isolation_level),
|
||||
KPC_(specify_snapshot),
|
||||
KPC_(response_snapshot));
|
||||
common::ObIAllocator &alloc_; // inited by op_alloc_ in das_op
|
||||
bool use_specify_snapshot_;
|
||||
transaction::ObTxIsolationLevel isolation_level_;
|
||||
transaction::ObTxReadSnapshot *specify_snapshot_; // 给task指定snapshot_version
|
||||
transaction::ObTxReadSnapshot *response_snapshot_; // 远端或者本地获取到的snapshot信息
|
||||
};
|
||||
|
||||
struct ObDASRemoteInfo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
@ -124,7 +167,8 @@ public:
|
||||
cur_agg_list_(nullptr),
|
||||
op_result_(nullptr),
|
||||
attach_ctdef_(nullptr),
|
||||
attach_rtdef_(nullptr)
|
||||
attach_rtdef_(nullptr),
|
||||
das_gts_opt_info_(op_alloc)
|
||||
{
|
||||
das_task_node_.get_data() = this;
|
||||
}
|
||||
@ -285,6 +329,7 @@ protected:
|
||||
//rowkey merging for index merge operations, and so on.
|
||||
const ObDASBaseCtDef *attach_ctdef_;
|
||||
ObDASBaseRtDef *attach_rtdef_;
|
||||
ObDASGTSOptInfo das_gts_opt_info_;
|
||||
};
|
||||
typedef common::ObObjStore<ObIDASTaskOp*, common::ObIAllocator&> DasTaskList;
|
||||
typedef DasTaskList::Iterator DASTaskIter;
|
||||
|
||||
@ -626,7 +626,8 @@ ObTableModifySpec::ObTableModifySpec(common::ObIAllocator &alloc,
|
||||
: ObOpSpec(alloc, type),
|
||||
expr_frame_info_(NULL),
|
||||
ab_stmt_id_(nullptr),
|
||||
flags_(0)
|
||||
flags_(0),
|
||||
das_dop_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -636,6 +637,7 @@ OB_DEF_SERIALIZE(ObTableModifySpec)
|
||||
BASE_SER((ObTableModifySpec, ObOpSpec));
|
||||
OB_UNIS_ENCODE(flags_);
|
||||
OB_UNIS_ENCODE(ab_stmt_id_);
|
||||
OB_UNIS_ENCODE(das_dop_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -645,6 +647,7 @@ OB_DEF_DESERIALIZE(ObTableModifySpec)
|
||||
BASE_DESER((ObTableModifySpec, ObOpSpec));
|
||||
OB_UNIS_DECODE(flags_);
|
||||
OB_UNIS_DECODE(ab_stmt_id_);
|
||||
OB_UNIS_DECODE(das_dop_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -654,6 +657,7 @@ OB_DEF_SERIALIZE_SIZE(ObTableModifySpec)
|
||||
BASE_ADD_LEN((ObTableModifySpec, ObOpSpec));
|
||||
OB_UNIS_ADD_LEN(flags_);
|
||||
OB_UNIS_ADD_LEN(ab_stmt_id_);
|
||||
OB_UNIS_ADD_LEN(das_dop_);
|
||||
return len;
|
||||
}
|
||||
|
||||
|
||||
@ -137,6 +137,7 @@ public:
|
||||
uint64_t reserved_ : 55;
|
||||
};
|
||||
};
|
||||
int64_t das_dop_; // default is 0
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableModifySpec);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user