diff --git a/src/sql/das/ob_das_task.cpp b/src/sql/das/ob_das_task.cpp index 192ebfba0b..f647673445 100644 --- a/src/sql/das/ob_das_task.cpp +++ b/src/sql/das/ob_das_task.cpp @@ -361,7 +361,74 @@ OB_SERIALIZE_MEMBER(ObIDASTaskOp, related_rtdefs_, related_tablet_ids_, attach_ctdef_, - attach_rtdef_); + attach_rtdef_, + das_gts_opt_info_); + +OB_DEF_SERIALIZE(ObDASGTSOptInfo) +{ + int ret = OB_SUCCESS; + bool serialize_specify_snapshot = specify_snapshot_ == nullptr ? false : true; + LST_DO_CODE(OB_UNIS_ENCODE, + use_specify_snapshot_, + isolation_level_, + serialize_specify_snapshot); + if (serialize_specify_snapshot) { + OB_UNIS_ENCODE(*specify_snapshot_); + } + return ret; +} + +OB_DEF_DESERIALIZE(ObDASGTSOptInfo) +{ + int ret = OB_SUCCESS; + bool serialize_specify_snapshot = false; + LST_DO_CODE(OB_UNIS_DECODE, + use_specify_snapshot_, + isolation_level_, + serialize_specify_snapshot); + if (serialize_specify_snapshot) { + if (OB_FAIL(init(isolation_level_))) { + LOG_WARN("fail to init gts_opt_info", K(ret)); + } else { + OB_UNIS_DECODE(*specify_snapshot_); + } + } + return ret; +} + +OB_DEF_SERIALIZE_SIZE(ObDASGTSOptInfo) +{ + int64_t len = 0; + bool serialize_specify_snapshot = specify_snapshot_ == nullptr ? false : true; + LST_DO_CODE(OB_UNIS_ADD_LEN, + use_specify_snapshot_, + isolation_level_, + serialize_specify_snapshot); + if (serialize_specify_snapshot) { + OB_UNIS_ADD_LEN(*specify_snapshot_); + } + return len; +} + +int ObDASGTSOptInfo::init(transaction::ObTxIsolationLevel isolation_level) +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + void *buf2 = nullptr; + if (OB_ISNULL(buf = alloc_.alloc(sizeof(ObTxReadSnapshot)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory for ObTxReadSnapshot", K(ret), K(sizeof(ObTxReadSnapshot))); + } else if (OB_ISNULL(buf2 = alloc_.alloc(sizeof(ObTxReadSnapshot)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory for ObTxReadSnapshot", K(ret), K(sizeof(ObTxReadSnapshot))); + } else { + use_specify_snapshot_ = true; + isolation_level_ = isolation_level; + specify_snapshot_ = new(buf) ObTxReadSnapshot(); + response_snapshot_ = new(buf2) ObTxReadSnapshot(); + } + return ret; +} ObDASTaskArg::ObDASTaskArg() : timeout_ts_(0), diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index 385057c63f..d4fec3f4a7 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -42,6 +42,49 @@ class ObDasAggregatedTasks; typedef ObDLinkNode DasTaskNode; typedef ObDList DasTaskLinkedList; +struct ObDASGTSOptInfo +{ + OB_UNIS_VERSION(1); +public: + ObDASGTSOptInfo(common::ObIAllocator &alloc) + : alloc_(alloc), + use_specify_snapshot_(false), + isolation_level_(), + specify_snapshot_(nullptr), + response_snapshot_(nullptr) + { + } + + ~ObDASGTSOptInfo() + { + if (specify_snapshot_ != nullptr) { + specify_snapshot_->~ObTxReadSnapshot(); + } + if (response_snapshot_ != nullptr) { + response_snapshot_->~ObTxReadSnapshot(); + } + } + + int init(transaction::ObTxIsolationLevel isolation_level); + void set_use_specify_snapshot(bool v) + { + use_specify_snapshot_ = v; + } + bool get_use_specify_snapshot() { return use_specify_snapshot_; } + transaction::ObTxReadSnapshot *get_specify_snapshot() { return specify_snapshot_; } + transaction::ObTxReadSnapshot *get_response_snapshot() { return response_snapshot_; } + + TO_STRING_KV(K_(use_specify_snapshot), + K_(isolation_level), + KPC_(specify_snapshot), + KPC_(response_snapshot)); + common::ObIAllocator &alloc_; // inited by op_alloc_ in das_op + bool use_specify_snapshot_; + transaction::ObTxIsolationLevel isolation_level_; + transaction::ObTxReadSnapshot *specify_snapshot_; // 给task指定snapshot_version + transaction::ObTxReadSnapshot *response_snapshot_; // 远端或者本地获取到的snapshot信息 +}; + struct ObDASRemoteInfo { OB_UNIS_VERSION(1); @@ -124,7 +167,8 @@ public: cur_agg_list_(nullptr), op_result_(nullptr), attach_ctdef_(nullptr), - attach_rtdef_(nullptr) + attach_rtdef_(nullptr), + das_gts_opt_info_(op_alloc) { das_task_node_.get_data() = this; } @@ -285,6 +329,7 @@ protected: //rowkey merging for index merge operations, and so on. const ObDASBaseCtDef *attach_ctdef_; ObDASBaseRtDef *attach_rtdef_; + ObDASGTSOptInfo das_gts_opt_info_; }; typedef common::ObObjStore DasTaskList; typedef DasTaskList::Iterator DASTaskIter; diff --git a/src/sql/engine/dml/ob_table_modify_op.cpp b/src/sql/engine/dml/ob_table_modify_op.cpp index 4383ba9b03..743aedd89b 100644 --- a/src/sql/engine/dml/ob_table_modify_op.cpp +++ b/src/sql/engine/dml/ob_table_modify_op.cpp @@ -626,7 +626,8 @@ ObTableModifySpec::ObTableModifySpec(common::ObIAllocator &alloc, : ObOpSpec(alloc, type), expr_frame_info_(NULL), ab_stmt_id_(nullptr), - flags_(0) + flags_(0), + das_dop_(0) { } @@ -636,6 +637,7 @@ OB_DEF_SERIALIZE(ObTableModifySpec) BASE_SER((ObTableModifySpec, ObOpSpec)); OB_UNIS_ENCODE(flags_); OB_UNIS_ENCODE(ab_stmt_id_); + OB_UNIS_ENCODE(das_dop_); return ret; } @@ -645,6 +647,7 @@ OB_DEF_DESERIALIZE(ObTableModifySpec) BASE_DESER((ObTableModifySpec, ObOpSpec)); OB_UNIS_DECODE(flags_); OB_UNIS_DECODE(ab_stmt_id_); + OB_UNIS_DECODE(das_dop_); return ret; } @@ -654,6 +657,7 @@ OB_DEF_SERIALIZE_SIZE(ObTableModifySpec) BASE_ADD_LEN((ObTableModifySpec, ObOpSpec)); OB_UNIS_ADD_LEN(flags_); OB_UNIS_ADD_LEN(ab_stmt_id_); + OB_UNIS_ADD_LEN(das_dop_); return len; } diff --git a/src/sql/engine/dml/ob_table_modify_op.h b/src/sql/engine/dml/ob_table_modify_op.h index 9ba725d6a9..bc696585a3 100644 --- a/src/sql/engine/dml/ob_table_modify_op.h +++ b/src/sql/engine/dml/ob_table_modify_op.h @@ -137,6 +137,7 @@ public: uint64_t reserved_ : 55; }; }; + int64_t das_dop_; // default is 0 private: DISALLOW_COPY_AND_ASSIGN(ObTableModifySpec); };