[CP] add timeout param for estimation related interfaces

This commit is contained in:
hiddenbomb
2024-02-06 19:59:58 +00:00
committed by ob-robot
parent e1eda7c927
commit 01729c848d
13 changed files with 106 additions and 54 deletions

View File

@ -49,7 +49,7 @@ OB_SERIALIZE_MEMBER(ObDASEmptyCtDef);
OB_SERIALIZE_MEMBER(ObDASEmptyRtDef);
ObDASSplitRangesOp::ObDASSplitRangesOp(ObIAllocator &op_alloc)
: ObDASSimpleOp(op_alloc), expected_task_count_(0) {}
: ObDASSimpleOp(op_alloc), expected_task_count_(0), timeout_us_(0) {}
int ObDASSplitRangesOp::open_op()
{
@ -57,6 +57,7 @@ int ObDASSplitRangesOp::open_op()
ObAccessService *access_service = MTL(ObAccessService *);
if (OB_FAIL(access_service->split_multi_ranges(ls_id_,
tablet_id_,
timeout_us_,
ranges_,
expected_task_count_,
op_alloc_,
@ -97,10 +98,11 @@ int ObDASSplitRangesOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count)
int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count, const int64_t timeout_us)
{
int ret = OB_SUCCESS;
expected_task_count_ = expected_task_count;
timeout_us_ = timeout_us;
if (OB_FAIL(ranges_.assign(ranges))) {
LOG_WARN("failed to assign ranges array", K(ret));
}
@ -109,7 +111,8 @@ int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64
OB_SERIALIZE_MEMBER((ObDASSplitRangesOp, ObIDASTaskOp),
ranges_,
expected_task_count_);
expected_task_count_,
timeout_us_);
ObDASSplitRangesResult::ObDASSplitRangesResult()
: ObIDASTaskResult(), result_alloc_(nullptr) {}
@ -196,7 +199,7 @@ OB_DEF_DESERIALIZE(ObDASSplitRangesResult)
}
ObDASRangesCostOp::ObDASRangesCostOp(common::ObIAllocator &op_alloc)
: ObDASSimpleOp(op_alloc), total_size_(0) {}
: ObDASSimpleOp(op_alloc), total_size_(0), timeout_us_(0) {}
int ObDASRangesCostOp::open_op()
{
@ -204,6 +207,7 @@ int ObDASRangesCostOp::open_op()
ObAccessService *access_service = MTL(ObAccessService *);
if (OB_FAIL(access_service->get_multi_ranges_cost(ls_id_,
tablet_id_,
timeout_us_,
ranges_,
total_size_))) {
LOG_WARN("failed to get multi ranges cost", K(ret), K_(ls_id), K_(tablet_id));
@ -240,18 +244,20 @@ int ObDASRangesCostOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASRangesCostOp::init(const common::ObIArray<ObStoreRange> &ranges)
int ObDASRangesCostOp::init(const common::ObIArray<ObStoreRange> &ranges, const int64_t timeout_us)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ranges_.assign(ranges))) {
LOG_WARN("failed to assign ranges array", K(ret));
}
timeout_us_ = timeout_us;
return ret;
}
OB_SERIALIZE_MEMBER((ObDASRangesCostOp, ObIDASTaskOp),
ranges_,
total_size_);
total_size_,
timeout_us_);
ObDASRangesCostResult::ObDASRangesCostResult()
: ObIDASTaskResult(), total_size_(0) {}
@ -291,7 +297,13 @@ int ObDASSimpleUtils::split_multi_ranges(ObExecContext &exec_ctx,
} else {
split_ranges_op = static_cast<ObDASSplitRangesOp*>(task_op);
split_ranges_op->set_can_part_retry(GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_0);
if (OB_FAIL(split_ranges_op->init(ranges, expected_task_count))) {
ObPhysicalPlanCtx *plan_ctx = nullptr;
if (OB_ISNULL(plan_ctx = exec_ctx.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else if (OB_FAIL(split_ranges_op->init(ranges,
expected_task_count,
plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time()))) {
LOG_WARN("failed to init das split ranges op", K(ret));
} else if (OB_FAIL(das_ref.execute_all_task())) {
LOG_WARN("execute das split_multi_ranges task failed", K(ret));
@ -340,7 +352,11 @@ int ObDASSimpleUtils::get_multi_ranges_cost(ObExecContext &exec_ctx,
} else {
ranges_cost_op = static_cast<ObDASRangesCostOp*>(task_op);
ranges_cost_op->set_can_part_retry(GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_0);
if (OB_FAIL(ranges_cost_op->init(ranges))) {
ObPhysicalPlanCtx *plan_ctx = nullptr;
if (OB_ISNULL(plan_ctx = exec_ctx.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else if (OB_FAIL(ranges_cost_op->init(ranges, plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time()))) {
LOG_WARN("failed to init das ranges cost op", K(ret));
} else if (OB_FAIL(das_ref.execute_all_task())) {
LOG_WARN("execute das get_multi_ranges_cost task failed", K(ret));

View File

@ -63,7 +63,7 @@ public:
virtual int open_op() override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
int init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count);
int init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count, const int64_t timeout_us);
const ObArrayArray<ObStoreRange> &get_split_array() { return multi_range_split_array_; }
INHERIT_TO_STRING_KV("parent", ObDASSimpleOp,
K_(ranges),
@ -73,6 +73,7 @@ private:
common::ObSEArray<ObStoreRange, 16> ranges_;
int64_t expected_task_count_;
ObArrayArray<ObStoreRange> multi_range_split_array_;
int64_t timeout_us_;
};
class ObDASSplitRangesResult : public ObIDASTaskResult
@ -102,7 +103,7 @@ public:
virtual int open_op() override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
int init(const common::ObIArray<ObStoreRange> &ranges);
int init(const common::ObIArray<ObStoreRange> &ranges, const int64_t timeout_us);
int64_t get_total_size() const { return total_size_; }
INHERIT_TO_STRING_KV("parent", ObDASSimpleOp,
K_(ranges),
@ -110,6 +111,7 @@ public:
private:
common::ObSEArray<ObStoreRange, 16> ranges_;
int64_t total_size_;
int64_t timeout_us_;
};
class ObDASRangesCostResult : public ObIDASTaskResult