diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index 3e6f86e9a5..6d7c40aa2a 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -473,6 +473,8 @@ int ObDASInsertResult::reuse() } else { affected_rows_ = 0; is_duplicated_ = false; + result_buffer_.~ObDASWriteBuffer(); + new(&result_buffer_) ObDASWriteBuffer(); } return ret; } diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index dffff1da62..52ac233a17 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -405,23 +405,29 @@ int ObDataAccessService::do_async_remote_das_task( } // prepare op result in advance avoiding racing condition. for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) { - if (OB_UNLIKELY(NULL != (op_result = task_ops.at(i)->get_op_result()))) { - // currently, we either disable async mode or try async task once - // and fall back to sync mode in error - // thus, get_op_result() should always be null + if (OB_UNLIKELY(ObDasTaskStatus::UNSTART != task_ops.at(i)->get_task_status())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("op_result is not null", KR(ret), KP(op_result), KPC(task_ops.at(i))); - } else if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { - LOG_WARN("failed to create das task result", K(ret)); - } else if (OB_ISNULL(op_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get op result", K(ret)); - } else if (OB_FAIL(das_async_cb->get_op_results().push_back(op_result))) { - LOG_WARN("failed to add task result", K(ret)); - } else if (OB_FAIL(op_result->init(*task_ops.at(i), das_async_cb->get_result_alloc()))) { - LOG_WARN("failed to init task result", K(ret)); + LOG_WARN("task status unexpected", KR(ret), K(task_ops.at(i)->get_task_status()), KPC(task_ops.at(i))); + } else if (NULL != (op_result = task_ops.at(i)->get_op_result())) { + if (OB_FAIL(op_result->reuse())) { + LOG_WARN("reuse task result failed", K(ret)); + } } else { - task_ops.at(i)->set_op_result(op_result); + if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) { + LOG_WARN("failed to create das task result", K(ret)); + } else if (OB_ISNULL(op_result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get op result", K(ret)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(das_async_cb->get_op_results().push_back(op_result))) { + LOG_WARN("failed to add task result", K(ret)); + } else if (OB_FAIL(op_result->init(*task_ops.at(i), das_async_cb->get_result_alloc()))) { + LOG_WARN("failed to init task result", K(ret)); + } else { + task_ops.at(i)->set_op_result(op_result); + } } } LOG_DEBUG("begin to do remote das task", K(task_arg)); @@ -491,7 +497,10 @@ int ObDataAccessService::do_sync_remote_das_task( // prepare op result in advance avoiding racing condition. for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) { - if (NULL != (op_result = task_ops.at(i)->get_op_result())) { + if (OB_UNLIKELY(ObDasTaskStatus::UNSTART != task_ops.at(i)->get_task_status())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task status unexpected", KR(ret), K(task_ops.at(i)->get_task_status()), KPC(task_ops.at(i))); + } else if (NULL != (op_result = task_ops.at(i)->get_op_result())) { if (OB_FAIL(op_result->reuse())) { LOG_WARN("reuse task result failed", K(ret)); }