fix wrong timeout error code in async das task

This commit is contained in:
obdev
2023-02-21 13:41:52 +00:00
committed by ob-robot
parent 37cb2f29ae
commit 0aa6a494c8
7 changed files with 44 additions and 21 deletions

View File

@ -278,15 +278,18 @@ int ObDASTaskFactory::create_das_extra_data(ObDASExtraData *&extra_result)
}
int ObDASTaskFactory::create_das_async_cb(
const common::ObSEArray<ObIDASTaskOp *, 2> &task_ops, const ObMemAttr &attr,
ObDASRef &das_ref, ObRpcDasAsyncAccessCallBack *&async_cb) {
const common::ObSEArray<ObIDASTaskOp *, 2> &task_ops,
const ObMemAttr &attr,
ObDASRef &das_ref,
ObRpcDasAsyncAccessCallBack *&async_cb,
int64_t timeout_ts) {
int ret = OB_SUCCESS;
void *buffer = nullptr;
ObDasAsyncRpcCallBackContext *context = nullptr;
if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObDasAsyncRpcCallBackContext)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate das async cb context memory", K(ret), K(sizeof(ObDasAsyncRpcCallBackContext)));
} else if (FALSE_IT(context = new (buffer) ObDasAsyncRpcCallBackContext(das_ref, task_ops))) {
} else if (FALSE_IT(context = new (buffer) ObDasAsyncRpcCallBackContext(das_ref, task_ops, timeout_ts))) {
} else if (OB_FAIL(context->init(attr))) {
LOG_WARN("fail to init das async cb context", K(ret));
} else if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObRpcDasAsyncAccessCallBack)))) {

View File

@ -39,8 +39,10 @@ public:
int create_das_ctdef(ObDASOpType op_type, ObDASBaseCtDef *&ctdef);
int create_das_rtdef(ObDASOpType op_type, ObDASBaseRtDef *&rtdef);
int create_das_async_cb(const common::ObSEArray<ObIDASTaskOp *, 2> &task_ops,
const ObMemAttr &attr, ObDASRef &das_ref,
ObRpcDasAsyncAccessCallBack *&async_cb);
const ObMemAttr &attr,
ObDASRef &das_ref,
ObRpcDasAsyncAccessCallBack *&async_cb,
int64_t timeout_ts);
static int create_das_ctdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseCtDef *&ctdef);
static int create_das_rtdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseRtDef *&rtdef);
template <typename CtDef>

View File

@ -376,12 +376,14 @@ int ObDASRef::wait_all_tasks()
return OB_UNIMPLEMENTED_FEATURE;
}
int ObDASRef::allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops)
int ObDASRef::allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb,
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops,
int64_t timeout_ts)
{
int ret = OB_SUCCESS;
OB_ASSERT(async_cb == nullptr);
ObDASTaskFactory &das_factory = get_das_factory();
if (OB_FAIL(das_factory.create_das_async_cb(task_ops, das_alloc_.get_attr(), *this, async_cb))) {
if (OB_FAIL(das_factory.create_das_async_cb(task_ops, das_alloc_.get_attr(), *this, async_cb, timeout_ts))) {
LOG_WARN("failed to create das async cb", K(ret));
} else if (OB_ISNULL(async_cb)) {
ret = OB_ERR_UNEXPECTED;

View File

@ -143,7 +143,9 @@ public:
int acquire_task_execution_resource();
int get_aggregated_tasks_count() const { return aggregated_tasks_.get_size(); }
int wait_all_tasks();
int allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops);
int allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb,
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops,
int64_t timeout_ts);
void remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb);
private:
DISABLE_COPY_ASSIGN(ObDASRef);

View File

@ -209,9 +209,17 @@ int ObDASAsyncAccessP::process()
void ObRpcDasAsyncAccessCallBack::on_timeout()
{
int ret = OB_SUCCESS;
LOG_WARN("das async task timeout", K(get_task_ops()));
result_.set_err_code(OB_TIMEOUT);
int ret = OB_TIMEOUT;
int64_t current_ts = ObTimeUtility::current_time();
int64_t timeout_ts = context_->get_timeout_ts();
// ESTIMATE_PS_RESERVE_TIME = 100 * 1000
if (timeout_ts - current_ts > 100 * 1000) {
LOG_DEBUG("rpc return OB_TIMEOUT before actual timeout, change error code to OB_RPC_CONNECT_ERROR", KR(ret),
K(timeout_ts), K(current_ts));
ret = OB_RPC_CONNECT_ERROR;
}
LOG_WARN("das async task timeout", KR(ret), K(get_task_ops()));
result_.set_err_code(ret);
result_.get_op_results().reuse();
context_->get_das_ref().inc_concurrency_limit_with_signal();
}

View File

@ -85,17 +85,21 @@ class ObDASAsyncAccessP final : public ObDASBaseAccessP<obrpc::OB_DAS_ASYNC_ACCE
class ObDasAsyncRpcCallBackContext
{
public:
ObDasAsyncRpcCallBackContext(ObDASRef &das_ref, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops)
: das_ref_(das_ref), task_ops_(task_ops), alloc_() {}
ObDasAsyncRpcCallBackContext(ObDASRef &das_ref,
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops,
int64_t timeout_ts)
: das_ref_(das_ref), task_ops_(task_ops), alloc_(), timeout_ts_(timeout_ts) {}
~ObDasAsyncRpcCallBackContext() = default;
int init(const ObMemAttr &attr);
ObDASRef &get_das_ref() { return das_ref_; };
const common::ObSEArray<ObIDASTaskOp*, 2> &get_task_ops() const { return task_ops_; };
common::ObArenaAllocator &get_alloc() { return alloc_; };
int64_t get_timeout_ts() const { return timeout_ts_; }
private:
ObDASRef &das_ref_;
const common::ObSEArray<ObIDASTaskOp*, 2> task_ops_;
common::ObArenaAllocator alloc_; // used for async rpc result allocation.
int64_t timeout_ts_;
};
class ObRpcDasAsyncAccessCallBack

View File

@ -378,14 +378,16 @@ int ObDataAccessService::do_async_remote_das_task(
FLTSpanGuard(do_async_remote_das_task);
ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session();
ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx();
int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time();
#ifdef ERRSIM
int inject_timeout = -OB_E(EventTable::EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT) OB_SUCCESS;
if (OB_SUCCESS != inject_timeout) {
LOG_INFO("das async rpc simulate timeout", K(inject_timeout));
timeout = inject_timeout - 10;
int64_t timeout_ts = plan_ctx->get_timeout_timestamp();
int64_t current_ts = ObTimeUtility::current_time();
int64_t timeout = timeout_ts - current_ts;
int64_t simulate_timeout = - EVENT_CALL(EventTable::EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT);
if (OB_UNLIKELY(simulate_timeout > 0)) {
LOG_INFO("das async rpc simulate timeout", K(simulate_timeout),
K(timeout), K(timeout_ts), K(current_ts));
timeout = simulate_timeout;
timeout_ts = current_ts + timeout;
}
#endif
uint64_t tenant_id = session->get_rpc_tenant_id();
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
@ -399,7 +401,7 @@ int ObDataAccessService::do_async_remote_das_task(
ObDASRemoteInfo::get_remote_info() = &remote_info;
ObIDASTaskResult *op_result = nullptr;
ObRpcDasAsyncAccessCallBack *das_async_cb = nullptr;
if (OB_FAIL(das_ref.allocate_async_das_cb(das_async_cb, task_ops))) {
if (OB_FAIL(das_ref.allocate_async_das_cb(das_async_cb, task_ops, timeout_ts))) {
LOG_WARN("failed to allocate das async cb", K(ret));
}
// prepare op result in advance avoiding racing condition.