From 24d6c8b8186f70ee246c8d3d039fc8db4db789e3 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 7 Mar 2023 11:10:56 +0000 Subject: [PATCH] [CP] reuse das task result --- src/sql/das/ob_das_delete_op.cpp | 7 ++++ src/sql/das/ob_das_delete_op.h | 1 + src/sql/das/ob_das_insert_op.cpp | 12 +++++++ src/sql/das/ob_das_insert_op.h | 1 + src/sql/das/ob_das_lock_op.cpp | 7 ++++ src/sql/das/ob_das_lock_op.h | 1 + src/sql/das/ob_das_scan_op.cpp | 8 +++++ src/sql/das/ob_das_scan_op.h | 1 + src/sql/das/ob_das_task.h | 6 ++-- src/sql/das/ob_das_update_op.cpp | 7 ++++ src/sql/das/ob_das_update_op.h | 1 + src/sql/das/ob_data_access_service.cpp | 47 ++++++++++++++++++-------- 12 files changed, 83 insertions(+), 16 deletions(-) diff --git a/src/sql/das/ob_das_delete_op.cpp b/src/sql/das/ob_das_delete_op.cpp index 616332bdd4..2872987d1b 100644 --- a/src/sql/das/ob_das_delete_op.cpp +++ b/src/sql/das/ob_das_delete_op.cpp @@ -194,6 +194,13 @@ int ObDASDeleteResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) return OB_SUCCESS; } +int ObDASDeleteResult::reuse() +{ + int ret = OB_SUCCESS; + affected_rows_ = 0; + return ret; +} + OB_SERIALIZE_MEMBER((ObDASDeleteResult, ObIDASTaskResult), affected_rows_); } // namespace sql diff --git a/src/sql/das/ob_das_delete_op.h b/src/sql/das/ob_das_delete_op.h index 79bb6345fe..4d277341cd 100644 --- a/src/sql/das/ob_das_delete_op.h +++ b/src/sql/das/ob_das_delete_op.h @@ -62,6 +62,7 @@ public: ObDASDeleteResult(); virtual ~ObDASDeleteResult(); virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; int64_t get_affected_rows() const { return affected_rows_; } void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; } INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult, diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index db8a92f6e6..3e6f86e9a5 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -465,6 +465,18 @@ int ObDASInsertResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) return OB_SUCCESS; } +int ObDASInsertResult::reuse() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(0 != result_buffer_.get_row_cnt())) { + ret = OB_NOT_SUPPORTED; + } else { + affected_rows_ = 0; + is_duplicated_ = false; + } + return ret; +} + OB_SERIALIZE_MEMBER((ObDASInsertResult, ObIDASTaskResult), affected_rows_, result_buffer_, diff --git a/src/sql/das/ob_das_insert_op.h b/src/sql/das/ob_das_insert_op.h index c375d2f2ab..a602a4cdfd 100644 --- a/src/sql/das/ob_das_insert_op.h +++ b/src/sql/das/ob_das_insert_op.h @@ -104,6 +104,7 @@ public: ObDASInsertResult(); virtual ~ObDASInsertResult(); virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; virtual int get_next_row(ObNewRow *&row) override; virtual int get_next_row() override; virtual int get_next_rows(int64_t &count, int64_t capacity) override; diff --git a/src/sql/das/ob_das_lock_op.cpp b/src/sql/das/ob_das_lock_op.cpp index efbd796681..6953279dbf 100644 --- a/src/sql/das/ob_das_lock_op.cpp +++ b/src/sql/das/ob_das_lock_op.cpp @@ -169,6 +169,13 @@ int ObDASLockResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) return OB_SUCCESS; } +int ObDASLockResult::reuse() +{ + int ret = OB_SUCCESS; + affected_rows_ = 0; + return ret; +} + OB_SERIALIZE_MEMBER((ObDASLockResult, ObIDASTaskResult), affected_rows_); } // namespace sql diff --git a/src/sql/das/ob_das_lock_op.h b/src/sql/das/ob_das_lock_op.h index 4812974974..dad51525d5 100644 --- a/src/sql/das/ob_das_lock_op.h +++ b/src/sql/das/ob_das_lock_op.h @@ -62,6 +62,7 @@ public: ObDASLockResult(); virtual ~ObDASLockResult(); virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; int64_t get_affected_rows() const { return affected_rows_; } void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; } INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult, diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index 34a1845f43..3a25485274 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -729,6 +729,14 @@ int ObDASScanResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) return ret; } +int ObDASScanResult::reuse() +{ + int ret = OB_SUCCESS; + result_iter_.reset(); + datum_store_.reset(); + return ret; +} + int ObDASScanResult::link_extra_result(ObDASExtraData &extra_result) { extra_result.set_output_info(output_exprs_, eval_ctx_); diff --git a/src/sql/das/ob_das_scan_op.h b/src/sql/das/ob_das_scan_op.h index 240171896e..8463214549 100644 --- a/src/sql/das/ob_das_scan_op.h +++ b/src/sql/das/ob_das_scan_op.h @@ -217,6 +217,7 @@ public: ObDASScanResult(); virtual ~ObDASScanResult(); virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; virtual int get_next_row(ObNewRow *&row) override; virtual int get_next_row() override; virtual int get_next_rows(int64_t &count, int64_t capacity) override; diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index e042b45b56..195bccc187 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -199,8 +199,9 @@ public: int state_advance(); void set_cur_agg_list(DasTaskLinkedList *list) { cur_agg_list_ = list; }; DasTaskLinkedList *get_cur_agg_list() { return cur_agg_list_; }; - void set_remote_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; }; + ObIDASTaskResult *get_op_result() const { return op_result_; } + void set_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; } protected: int start_das_task(); int end_das_task(); @@ -241,7 +242,7 @@ protected: DasTaskNode das_task_node_; // tasks's linked list node, do not serialize ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize - ObIDASTaskResult* op_result_; // as async result, do not serialize + ObIDASTaskResult *op_result_; public: const static uint32_t DAS_ROW_EXTEND_SIZE = 16; @@ -256,6 +257,7 @@ public: ObIDASTaskResult() : task_id_(0) { } virtual ~ObIDASTaskResult() { } virtual int init(const ObIDASTaskOp &task_op, common::ObIAllocator &alloc) = 0; + virtual int reuse() = 0; virtual int link_extra_result(ObDASExtraData &extra_result) { UNUSED(extra_result); diff --git a/src/sql/das/ob_das_update_op.cpp b/src/sql/das/ob_das_update_op.cpp index fc2b09f889..51a57466cf 100644 --- a/src/sql/das/ob_das_update_op.cpp +++ b/src/sql/das/ob_das_update_op.cpp @@ -410,6 +410,13 @@ int ObDASUpdateResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) return OB_SUCCESS; } +int ObDASUpdateResult::reuse() +{ + int ret = OB_SUCCESS; + affected_rows_ = 0; + return ret; +} + OB_SERIALIZE_MEMBER((ObDASUpdateResult, ObIDASTaskResult), affected_rows_); } // namespace sql diff --git a/src/sql/das/ob_das_update_op.h b/src/sql/das/ob_das_update_op.h index 0d3993ff68..7506305209 100644 --- a/src/sql/das/ob_das_update_op.h +++ b/src/sql/das/ob_das_update_op.h @@ -60,6 +60,7 @@ public: ObDASUpdateResult(); virtual ~ObDASUpdateResult(); virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; int64_t get_affected_rows() const { return affected_rows_; } void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; } INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult, diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index e10ae97732..dffff1da62 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -389,13 +389,12 @@ int ObDataAccessService::do_async_remote_das_task( timeout_ts = current_ts + timeout; } uint64_t tenant_id = session->get_rpc_tenant_id(); - ObIDASTaskOp *task_op = task_arg.get_task_op(); common::ObSEArray &task_ops = task_arg.get_task_ops(); ObDASRemoteInfo remote_info; remote_info.exec_ctx_ = &das_ref.get_exec_ctx(); remote_info.frame_info_ = das_ref.get_expr_frame_info(); remote_info.trans_desc_ = session->get_tx_desc(); - remote_info.snapshot_ = *task_op->get_snapshot(); + remote_info.snapshot_ = *task_arg.get_task_op()->get_snapshot(); remote_info.need_tx_ = (remote_info.trans_desc_ != nullptr); task_arg.set_remote_info(&remote_info); ObDASRemoteInfo::get_remote_info() = &remote_info; @@ -406,7 +405,13 @@ int ObDataAccessService::do_async_remote_das_task( } // prepare op result in advance avoiding racing condition. for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) { - if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { + if (OB_UNLIKELY(NULL != (op_result = task_ops.at(i)->get_op_result()))) { + // currently, we either disable async mode or try async task once + // and fall back to sync mode in error + // thus, get_op_result() should always be null + ret = OB_ERR_UNEXPECTED; + LOG_WARN("op_result is not null", KR(ret), KP(op_result), KPC(task_ops.at(i))); + } else if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { LOG_WARN("failed to create das task result", K(ret)); } else if (OB_ISNULL(op_result)) { ret = OB_ERR_UNEXPECTED; @@ -415,10 +420,13 @@ int ObDataAccessService::do_async_remote_das_task( LOG_WARN("failed to add task result", K(ret)); } else if (OB_FAIL(op_result->init(*task_ops.at(i), das_async_cb->get_result_alloc()))) { LOG_WARN("failed to init task result", K(ret)); + } else { + task_ops.at(i)->set_op_result(op_result); } } LOG_DEBUG("begin to do remote das task", K(task_arg)); if (OB_FAIL(ret)) { + // do nothing } else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) { LOG_WARN("collect das task info failed", K(ret)); } else if (OB_UNLIKELY(timeout <= 0)) { @@ -464,7 +472,6 @@ int ObDataAccessService::do_sync_remote_das_task( ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx(); int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time(); uint64_t tenant_id = session->get_rpc_tenant_id(); - ObIDASTaskOp *task_op = task_arg.get_task_op(); common::ObSEArray &task_ops = task_arg.get_task_ops(); ObIDASTaskResult *op_result = nullptr; ObDASExtraData *extra_result = nullptr; @@ -472,7 +479,7 @@ int ObDataAccessService::do_sync_remote_das_task( remote_info.exec_ctx_ = &das_ref.get_exec_ctx(); remote_info.frame_info_ = das_ref.get_expr_frame_info(); remote_info.trans_desc_ = session->get_tx_desc(); - remote_info.snapshot_ = *task_op->get_snapshot(); + remote_info.snapshot_ = *task_arg.get_task_op()->get_snapshot(); remote_info.need_tx_ = (remote_info.trans_desc_ != nullptr); task_arg.set_remote_info(&remote_info); ObDASRemoteInfo::get_remote_info() = &remote_info; @@ -484,18 +491,30 @@ int ObDataAccessService::do_sync_remote_das_task( // prepare op result in advance avoiding racing condition. for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) { - if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { - LOG_WARN("failed to create das task result", K(ret)); - } else if (OB_ISNULL(op_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get op result", K(ret)); - } else if (OB_FAIL(task_resp.get_op_results().push_back(op_result))) { - LOG_WARN("failed to add task result", K(ret)); - } else if (OB_FAIL(op_result->init(*task_ops.at(i), das_ref.get_das_alloc()))) { - LOG_WARN("failed to init task result", K(ret)); + if (NULL != (op_result = task_ops.at(i)->get_op_result())) { + if (OB_FAIL(op_result->reuse())) { + LOG_WARN("reuse task result failed", K(ret)); + } + } else { + if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { + LOG_WARN("failed to create das task result", K(ret)); + } else if (OB_ISNULL(op_result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get op result", K(ret)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(task_resp.get_op_results().push_back(op_result))) { + LOG_WARN("failed to add task result", K(ret)); + } else if (OB_FAIL(op_result->init(*task_ops.at(i), das_ref.get_das_alloc()))) { + LOG_WARN("failed to init task result", K(ret)); + } else { + task_ops.at(i)->set_op_result(op_result); + } } } if (OB_FAIL(ret)) { + // do nothing } else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) { LOG_WARN("collect das task info failed", K(ret)); } else if (OB_UNLIKELY(timeout <= 0)) {