diff --git a/src/sql/das/ob_das_factory.cpp b/src/sql/das/ob_das_factory.cpp index 8285a7823d..e6b539e8a9 100644 --- a/src/sql/das/ob_das_factory.cpp +++ b/src/sql/das/ob_das_factory.cpp @@ -278,15 +278,18 @@ int ObDASTaskFactory::create_das_extra_data(ObDASExtraData *&extra_result) } int ObDASTaskFactory::create_das_async_cb( - const common::ObSEArray &task_ops, const ObMemAttr &attr, - ObDASRef &das_ref, ObRpcDasAsyncAccessCallBack *&async_cb) { + const common::ObSEArray &task_ops, + const ObMemAttr &attr, + ObDASRef &das_ref, + ObRpcDasAsyncAccessCallBack *&async_cb, + int64_t timeout_ts) { int ret = OB_SUCCESS; void *buffer = nullptr; ObDasAsyncRpcCallBackContext *context = nullptr; if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObDasAsyncRpcCallBackContext)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate das async cb context memory", K(ret), K(sizeof(ObDasAsyncRpcCallBackContext))); - } else if (FALSE_IT(context = new (buffer) ObDasAsyncRpcCallBackContext(das_ref, task_ops))) { + } else if (FALSE_IT(context = new (buffer) ObDasAsyncRpcCallBackContext(das_ref, task_ops, timeout_ts))) { } else if (OB_FAIL(context->init(attr))) { LOG_WARN("fail to init das async cb context", K(ret)); } else if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObRpcDasAsyncAccessCallBack)))) { diff --git a/src/sql/das/ob_das_factory.h b/src/sql/das/ob_das_factory.h index 5dfbd350fd..0aa5fba237 100644 --- a/src/sql/das/ob_das_factory.h +++ b/src/sql/das/ob_das_factory.h @@ -39,8 +39,10 @@ public: int create_das_ctdef(ObDASOpType op_type, ObDASBaseCtDef *&ctdef); int create_das_rtdef(ObDASOpType op_type, ObDASBaseRtDef *&rtdef); int create_das_async_cb(const common::ObSEArray &task_ops, - const ObMemAttr &attr, ObDASRef &das_ref, - ObRpcDasAsyncAccessCallBack *&async_cb); + const ObMemAttr &attr, + ObDASRef &das_ref, + ObRpcDasAsyncAccessCallBack *&async_cb, + int64_t timeout_ts); static int create_das_ctdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseCtDef *&ctdef); static int create_das_rtdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseRtDef *&rtdef); template diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index 01bf4ab2eb..0151def90e 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -376,12 +376,14 @@ int ObDASRef::wait_all_tasks() return OB_UNIMPLEMENTED_FEATURE; } -int ObDASRef::allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray &task_ops) +int ObDASRef::allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, + const common::ObSEArray &task_ops, + int64_t timeout_ts) { int ret = OB_SUCCESS; OB_ASSERT(async_cb == nullptr); ObDASTaskFactory &das_factory = get_das_factory(); - if (OB_FAIL(das_factory.create_das_async_cb(task_ops, das_alloc_.get_attr(), *this, async_cb))) { + if (OB_FAIL(das_factory.create_das_async_cb(task_ops, das_alloc_.get_attr(), *this, async_cb, timeout_ts))) { LOG_WARN("failed to create das async cb", K(ret)); } else if (OB_ISNULL(async_cb)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/das/ob_das_ref.h b/src/sql/das/ob_das_ref.h index 360fdd342a..17ae1e2981 100644 --- a/src/sql/das/ob_das_ref.h +++ b/src/sql/das/ob_das_ref.h @@ -143,7 +143,9 @@ public: int acquire_task_execution_resource(); int get_aggregated_tasks_count() const { return aggregated_tasks_.get_size(); } int wait_all_tasks(); - int allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray &task_ops); + int allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, + const common::ObSEArray &task_ops, + int64_t timeout_ts); void remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb); private: DISABLE_COPY_ASSIGN(ObDASRef); diff --git a/src/sql/das/ob_das_rpc_processor.cpp b/src/sql/das/ob_das_rpc_processor.cpp index a9b2aee2b0..fa3ef072a9 100644 --- a/src/sql/das/ob_das_rpc_processor.cpp +++ b/src/sql/das/ob_das_rpc_processor.cpp @@ -209,9 +209,17 @@ int ObDASAsyncAccessP::process() void ObRpcDasAsyncAccessCallBack::on_timeout() { - int ret = OB_SUCCESS; - LOG_WARN("das async task timeout", K(get_task_ops())); - result_.set_err_code(OB_TIMEOUT); + int ret = OB_TIMEOUT; + int64_t current_ts = ObTimeUtility::current_time(); + int64_t timeout_ts = context_->get_timeout_ts(); + // ESTIMATE_PS_RESERVE_TIME = 100 * 1000 + if (timeout_ts - current_ts > 100 * 1000) { + LOG_DEBUG("rpc return OB_TIMEOUT before actual timeout, change error code to OB_RPC_CONNECT_ERROR", KR(ret), + K(timeout_ts), K(current_ts)); + ret = OB_RPC_CONNECT_ERROR; + } + LOG_WARN("das async task timeout", KR(ret), K(get_task_ops())); + result_.set_err_code(ret); result_.get_op_results().reuse(); context_->get_das_ref().inc_concurrency_limit_with_signal(); } diff --git a/src/sql/das/ob_das_rpc_processor.h b/src/sql/das/ob_das_rpc_processor.h index 1f0a25b47d..36b2da4bf7 100644 --- a/src/sql/das/ob_das_rpc_processor.h +++ b/src/sql/das/ob_das_rpc_processor.h @@ -85,17 +85,21 @@ class ObDASAsyncAccessP final : public ObDASBaseAccessP &task_ops) - : das_ref_(das_ref), task_ops_(task_ops), alloc_() {} + ObDasAsyncRpcCallBackContext(ObDASRef &das_ref, + const common::ObSEArray &task_ops, + int64_t timeout_ts) + : das_ref_(das_ref), task_ops_(task_ops), alloc_(), timeout_ts_(timeout_ts) {} ~ObDasAsyncRpcCallBackContext() = default; int init(const ObMemAttr &attr); ObDASRef &get_das_ref() { return das_ref_; }; const common::ObSEArray &get_task_ops() const { return task_ops_; }; common::ObArenaAllocator &get_alloc() { return alloc_; }; + int64_t get_timeout_ts() const { return timeout_ts_; } private: ObDASRef &das_ref_; const common::ObSEArray task_ops_; common::ObArenaAllocator alloc_; // used for async rpc result allocation. + int64_t timeout_ts_; }; class ObRpcDasAsyncAccessCallBack diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index fa17c65849..e10ae97732 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -378,14 +378,16 @@ int ObDataAccessService::do_async_remote_das_task( FLTSpanGuard(do_async_remote_das_task); ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session(); ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx(); - int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time(); -#ifdef ERRSIM - int inject_timeout = -OB_E(EventTable::EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT) OB_SUCCESS; - if (OB_SUCCESS != inject_timeout) { - LOG_INFO("das async rpc simulate timeout", K(inject_timeout)); - timeout = inject_timeout - 10; + int64_t timeout_ts = plan_ctx->get_timeout_timestamp(); + int64_t current_ts = ObTimeUtility::current_time(); + int64_t timeout = timeout_ts - current_ts; + int64_t simulate_timeout = - EVENT_CALL(EventTable::EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT); + if (OB_UNLIKELY(simulate_timeout > 0)) { + LOG_INFO("das async rpc simulate timeout", K(simulate_timeout), + K(timeout), K(timeout_ts), K(current_ts)); + timeout = simulate_timeout; + timeout_ts = current_ts + timeout; } -#endif uint64_t tenant_id = session->get_rpc_tenant_id(); ObIDASTaskOp *task_op = task_arg.get_task_op(); common::ObSEArray &task_ops = task_arg.get_task_ops(); @@ -399,7 +401,7 @@ int ObDataAccessService::do_async_remote_das_task( ObDASRemoteInfo::get_remote_info() = &remote_info; ObIDASTaskResult *op_result = nullptr; ObRpcDasAsyncAccessCallBack *das_async_cb = nullptr; - if (OB_FAIL(das_ref.allocate_async_das_cb(das_async_cb, task_ops))) { + if (OB_FAIL(das_ref.allocate_async_das_cb(das_async_cb, task_ops, timeout_ts))) { LOG_WARN("failed to allocate das async cb", K(ret)); } // prepare op result in advance avoiding racing condition.