[CP] reuse das task result

This commit is contained in:
obdev
2023-03-07 11:10:56 +00:00
committed by ob-robot
parent f5e563764a
commit 24d6c8b818
12 changed files with 83 additions and 16 deletions

View File

@ -194,6 +194,13 @@ int ObDASDeleteResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
return OB_SUCCESS;
}
int ObDASDeleteResult::reuse()
{
int ret = OB_SUCCESS;
affected_rows_ = 0;
return ret;
}
OB_SERIALIZE_MEMBER((ObDASDeleteResult, ObIDASTaskResult),
affected_rows_);
} // namespace sql

View File

@ -62,6 +62,7 @@ public:
ObDASDeleteResult();
virtual ~ObDASDeleteResult();
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int reuse() override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -465,6 +465,18 @@ int ObDASInsertResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
return OB_SUCCESS;
}
int ObDASInsertResult::reuse()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 != result_buffer_.get_row_cnt())) {
ret = OB_NOT_SUPPORTED;
} else {
affected_rows_ = 0;
is_duplicated_ = false;
}
return ret;
}
OB_SERIALIZE_MEMBER((ObDASInsertResult, ObIDASTaskResult),
affected_rows_,
result_buffer_,

View File

@ -104,6 +104,7 @@ public:
ObDASInsertResult();
virtual ~ObDASInsertResult();
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int reuse() override;
virtual int get_next_row(ObNewRow *&row) override;
virtual int get_next_row() override;
virtual int get_next_rows(int64_t &count, int64_t capacity) override;

View File

@ -169,6 +169,13 @@ int ObDASLockResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
return OB_SUCCESS;
}
int ObDASLockResult::reuse()
{
int ret = OB_SUCCESS;
affected_rows_ = 0;
return ret;
}
OB_SERIALIZE_MEMBER((ObDASLockResult, ObIDASTaskResult),
affected_rows_);
} // namespace sql

View File

@ -62,6 +62,7 @@ public:
ObDASLockResult();
virtual ~ObDASLockResult();
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int reuse() override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -729,6 +729,14 @@ int ObDASScanResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
return ret;
}
int ObDASScanResult::reuse()
{
int ret = OB_SUCCESS;
result_iter_.reset();
datum_store_.reset();
return ret;
}
int ObDASScanResult::link_extra_result(ObDASExtraData &extra_result)
{
extra_result.set_output_info(output_exprs_, eval_ctx_);

View File

@ -217,6 +217,7 @@ public:
ObDASScanResult();
virtual ~ObDASScanResult();
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int reuse() override;
virtual int get_next_row(ObNewRow *&row) override;
virtual int get_next_row() override;
virtual int get_next_rows(int64_t &count, int64_t capacity) override;

View File

@ -199,8 +199,9 @@ public:
int state_advance();
void set_cur_agg_list(DasTaskLinkedList *list) { cur_agg_list_ = list; };
DasTaskLinkedList *get_cur_agg_list() { return cur_agg_list_; };
void set_remote_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; };
ObIDASTaskResult *get_op_result() const { return op_result_; }
void set_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; }
protected:
int start_das_task();
int end_das_task();
@ -241,7 +242,7 @@ protected:
DasTaskNode das_task_node_; // tasks's linked list node, do not serialize
ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize
DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize
ObIDASTaskResult* op_result_; // as async result, do not serialize
ObIDASTaskResult *op_result_;
public:
const static uint32_t DAS_ROW_EXTEND_SIZE = 16;
@ -256,6 +257,7 @@ public:
ObIDASTaskResult() : task_id_(0) { }
virtual ~ObIDASTaskResult() { }
virtual int init(const ObIDASTaskOp &task_op, common::ObIAllocator &alloc) = 0;
virtual int reuse() = 0;
virtual int link_extra_result(ObDASExtraData &extra_result)
{
UNUSED(extra_result);

View File

@ -410,6 +410,13 @@ int ObDASUpdateResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
return OB_SUCCESS;
}
int ObDASUpdateResult::reuse()
{
int ret = OB_SUCCESS;
affected_rows_ = 0;
return ret;
}
OB_SERIALIZE_MEMBER((ObDASUpdateResult, ObIDASTaskResult),
affected_rows_);
} // namespace sql

View File

@ -60,6 +60,7 @@ public:
ObDASUpdateResult();
virtual ~ObDASUpdateResult();
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int reuse() override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -389,13 +389,12 @@ int ObDataAccessService::do_async_remote_das_task(
timeout_ts = current_ts + timeout;
}
uint64_t tenant_id = session->get_rpc_tenant_id();
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
ObDASRemoteInfo remote_info;
remote_info.exec_ctx_ = &das_ref.get_exec_ctx();
remote_info.frame_info_ = das_ref.get_expr_frame_info();
remote_info.trans_desc_ = session->get_tx_desc();
remote_info.snapshot_ = *task_op->get_snapshot();
remote_info.snapshot_ = *task_arg.get_task_op()->get_snapshot();
remote_info.need_tx_ = (remote_info.trans_desc_ != nullptr);
task_arg.set_remote_info(&remote_info);
ObDASRemoteInfo::get_remote_info() = &remote_info;
@ -406,7 +405,13 @@ int ObDataAccessService::do_async_remote_das_task(
}
// prepare op result in advance avoiding racing condition.
for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) {
if (OB_UNLIKELY(NULL != (op_result = task_ops.at(i)->get_op_result()))) {
// currently, we either disable async mode or try async task once
// and fall back to sync mode in error
// thus, get_op_result() should always be null
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op_result is not null", KR(ret), KP(op_result), KPC(task_ops.at(i)));
} else if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) {
LOG_WARN("failed to create das task result", K(ret));
} else if (OB_ISNULL(op_result)) {
ret = OB_ERR_UNEXPECTED;
@ -415,10 +420,13 @@ int ObDataAccessService::do_async_remote_das_task(
LOG_WARN("failed to add task result", K(ret));
} else if (OB_FAIL(op_result->init(*task_ops.at(i), das_async_cb->get_result_alloc()))) {
LOG_WARN("failed to init task result", K(ret));
} else {
task_ops.at(i)->set_op_result(op_result);
}
}
LOG_DEBUG("begin to do remote das task", K(task_arg));
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) {
LOG_WARN("collect das task info failed", K(ret));
} else if (OB_UNLIKELY(timeout <= 0)) {
@ -464,7 +472,6 @@ int ObDataAccessService::do_sync_remote_das_task(
ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx();
int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time();
uint64_t tenant_id = session->get_rpc_tenant_id();
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
ObIDASTaskResult *op_result = nullptr;
ObDASExtraData *extra_result = nullptr;
@ -472,7 +479,7 @@ int ObDataAccessService::do_sync_remote_das_task(
remote_info.exec_ctx_ = &das_ref.get_exec_ctx();
remote_info.frame_info_ = das_ref.get_expr_frame_info();
remote_info.trans_desc_ = session->get_tx_desc();
remote_info.snapshot_ = *task_op->get_snapshot();
remote_info.snapshot_ = *task_arg.get_task_op()->get_snapshot();
remote_info.need_tx_ = (remote_info.trans_desc_ != nullptr);
task_arg.set_remote_info(&remote_info);
ObDASRemoteInfo::get_remote_info() = &remote_info;
@ -484,18 +491,30 @@ int ObDataAccessService::do_sync_remote_das_task(
// prepare op result in advance avoiding racing condition.
for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
if (NULL != (op_result = task_ops.at(i)->get_op_result())) {
if (OB_FAIL(op_result->reuse())) {
LOG_WARN("reuse task result failed", K(ret));
}
} else {
if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) {
LOG_WARN("failed to create das task result", K(ret));
} else if (OB_ISNULL(op_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get op result", K(ret));
} else if (OB_FAIL(task_resp.get_op_results().push_back(op_result))) {
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(task_resp.get_op_results().push_back(op_result))) {
LOG_WARN("failed to add task result", K(ret));
} else if (OB_FAIL(op_result->init(*task_ops.at(i), das_ref.get_das_alloc()))) {
LOG_WARN("failed to init task result", K(ret));
} else {
task_ops.at(i)->set_op_result(op_result);
}
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) {
LOG_WARN("collect das task info failed", K(ret));
} else if (OB_UNLIKELY(timeout <= 0)) {