[CP] add timeout param for estimation related interfaces
This commit is contained in:
		@ -49,7 +49,7 @@ OB_SERIALIZE_MEMBER(ObDASEmptyCtDef);
 | 
			
		||||
OB_SERIALIZE_MEMBER(ObDASEmptyRtDef);
 | 
			
		||||
 | 
			
		||||
ObDASSplitRangesOp::ObDASSplitRangesOp(ObIAllocator &op_alloc)
 | 
			
		||||
  : ObDASSimpleOp(op_alloc), expected_task_count_(0) {}
 | 
			
		||||
  : ObDASSimpleOp(op_alloc), expected_task_count_(0), timeout_us_(0) {}
 | 
			
		||||
 | 
			
		||||
int ObDASSplitRangesOp::open_op()
 | 
			
		||||
{
 | 
			
		||||
@ -57,6 +57,7 @@ int ObDASSplitRangesOp::open_op()
 | 
			
		||||
  ObAccessService *access_service = MTL(ObAccessService *);
 | 
			
		||||
  if (OB_FAIL(access_service->split_multi_ranges(ls_id_,
 | 
			
		||||
                                                 tablet_id_,
 | 
			
		||||
                                                 timeout_us_,
 | 
			
		||||
                                                 ranges_,
 | 
			
		||||
                                                 expected_task_count_,
 | 
			
		||||
                                                 op_alloc_,
 | 
			
		||||
@ -97,10 +98,11 @@ int ObDASSplitRangesOp::decode_task_result(ObIDASTaskResult *task_result)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count)
 | 
			
		||||
int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count, const int64_t timeout_us)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  expected_task_count_ = expected_task_count;
 | 
			
		||||
  timeout_us_ = timeout_us;
 | 
			
		||||
  if (OB_FAIL(ranges_.assign(ranges))) {
 | 
			
		||||
    LOG_WARN("failed to assign ranges array", K(ret));
 | 
			
		||||
  }
 | 
			
		||||
@ -109,7 +111,8 @@ int ObDASSplitRangesOp::init(const common::ObIArray<ObStoreRange> &ranges, int64
 | 
			
		||||
 | 
			
		||||
OB_SERIALIZE_MEMBER((ObDASSplitRangesOp, ObIDASTaskOp),
 | 
			
		||||
                     ranges_,
 | 
			
		||||
                     expected_task_count_);
 | 
			
		||||
                     expected_task_count_,
 | 
			
		||||
                     timeout_us_);
 | 
			
		||||
 | 
			
		||||
ObDASSplitRangesResult::ObDASSplitRangesResult()
 | 
			
		||||
  : ObIDASTaskResult(), result_alloc_(nullptr) {}
 | 
			
		||||
@ -196,7 +199,7 @@ OB_DEF_DESERIALIZE(ObDASSplitRangesResult)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObDASRangesCostOp::ObDASRangesCostOp(common::ObIAllocator &op_alloc)
 | 
			
		||||
  : ObDASSimpleOp(op_alloc), total_size_(0) {}
 | 
			
		||||
  : ObDASSimpleOp(op_alloc), total_size_(0), timeout_us_(0) {}
 | 
			
		||||
 | 
			
		||||
int ObDASRangesCostOp::open_op()
 | 
			
		||||
{
 | 
			
		||||
@ -204,6 +207,7 @@ int ObDASRangesCostOp::open_op()
 | 
			
		||||
  ObAccessService *access_service = MTL(ObAccessService *);
 | 
			
		||||
  if (OB_FAIL(access_service->get_multi_ranges_cost(ls_id_,
 | 
			
		||||
                                                    tablet_id_,
 | 
			
		||||
                                                    timeout_us_,
 | 
			
		||||
                                                    ranges_,
 | 
			
		||||
                                                    total_size_))) {
 | 
			
		||||
    LOG_WARN("failed to get multi ranges cost", K(ret), K_(ls_id), K_(tablet_id));
 | 
			
		||||
@ -240,18 +244,20 @@ int ObDASRangesCostOp::decode_task_result(ObIDASTaskResult *task_result)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDASRangesCostOp::init(const common::ObIArray<ObStoreRange> &ranges)
 | 
			
		||||
int ObDASRangesCostOp::init(const common::ObIArray<ObStoreRange> &ranges, const int64_t timeout_us)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_FAIL(ranges_.assign(ranges))) {
 | 
			
		||||
    LOG_WARN("failed to assign ranges array", K(ret));
 | 
			
		||||
  }
 | 
			
		||||
  timeout_us_ = timeout_us;
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
OB_SERIALIZE_MEMBER((ObDASRangesCostOp, ObIDASTaskOp),
 | 
			
		||||
                     ranges_,
 | 
			
		||||
                     total_size_);
 | 
			
		||||
                     total_size_,
 | 
			
		||||
                     timeout_us_);
 | 
			
		||||
 | 
			
		||||
ObDASRangesCostResult::ObDASRangesCostResult()
 | 
			
		||||
  : ObIDASTaskResult(), total_size_(0) {}
 | 
			
		||||
@ -291,7 +297,13 @@ int ObDASSimpleUtils::split_multi_ranges(ObExecContext &exec_ctx,
 | 
			
		||||
  } else {
 | 
			
		||||
    split_ranges_op = static_cast<ObDASSplitRangesOp*>(task_op);
 | 
			
		||||
    split_ranges_op->set_can_part_retry(GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_0);
 | 
			
		||||
    if (OB_FAIL(split_ranges_op->init(ranges, expected_task_count))) {
 | 
			
		||||
    ObPhysicalPlanCtx *plan_ctx = nullptr;
 | 
			
		||||
    if (OB_ISNULL(plan_ctx = exec_ctx.get_physical_plan_ctx())) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("unexpected nullptr", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(split_ranges_op->init(ranges,
 | 
			
		||||
                                             expected_task_count,
 | 
			
		||||
                                             plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time()))) {
 | 
			
		||||
      LOG_WARN("failed to init das split ranges op", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(das_ref.execute_all_task())) {
 | 
			
		||||
      LOG_WARN("execute das split_multi_ranges task failed", K(ret));
 | 
			
		||||
@ -340,7 +352,11 @@ int ObDASSimpleUtils::get_multi_ranges_cost(ObExecContext &exec_ctx,
 | 
			
		||||
  } else {
 | 
			
		||||
    ranges_cost_op = static_cast<ObDASRangesCostOp*>(task_op);
 | 
			
		||||
    ranges_cost_op->set_can_part_retry(GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_0);
 | 
			
		||||
    if (OB_FAIL(ranges_cost_op->init(ranges))) {
 | 
			
		||||
    ObPhysicalPlanCtx *plan_ctx = nullptr;
 | 
			
		||||
    if (OB_ISNULL(plan_ctx = exec_ctx.get_physical_plan_ctx())) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("unexpected nullptr", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(ranges_cost_op->init(ranges, plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time()))) {
 | 
			
		||||
      LOG_WARN("failed to init das ranges cost op", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(das_ref.execute_all_task())) {
 | 
			
		||||
      LOG_WARN("execute das get_multi_ranges_cost task failed", K(ret));
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@ public:
 | 
			
		||||
  virtual int open_op() override;
 | 
			
		||||
  virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
 | 
			
		||||
  virtual int decode_task_result(ObIDASTaskResult *task_result) override;
 | 
			
		||||
  int init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count);
 | 
			
		||||
  int init(const common::ObIArray<ObStoreRange> &ranges, int64_t expected_task_count, const int64_t timeout_us);
 | 
			
		||||
  const ObArrayArray<ObStoreRange> &get_split_array() { return multi_range_split_array_; }
 | 
			
		||||
  INHERIT_TO_STRING_KV("parent", ObDASSimpleOp,
 | 
			
		||||
                        K_(ranges),
 | 
			
		||||
@ -73,6 +73,7 @@ private:
 | 
			
		||||
  common::ObSEArray<ObStoreRange, 16> ranges_;
 | 
			
		||||
  int64_t expected_task_count_;
 | 
			
		||||
  ObArrayArray<ObStoreRange> multi_range_split_array_;
 | 
			
		||||
  int64_t timeout_us_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObDASSplitRangesResult : public ObIDASTaskResult
 | 
			
		||||
@ -102,7 +103,7 @@ public:
 | 
			
		||||
  virtual int open_op() override;
 | 
			
		||||
  virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
 | 
			
		||||
  virtual int decode_task_result(ObIDASTaskResult *task_result) override;
 | 
			
		||||
  int init(const common::ObIArray<ObStoreRange> &ranges);
 | 
			
		||||
  int init(const common::ObIArray<ObStoreRange> &ranges, const int64_t timeout_us);
 | 
			
		||||
  int64_t get_total_size() const { return total_size_; }
 | 
			
		||||
  INHERIT_TO_STRING_KV("parent", ObDASSimpleOp,
 | 
			
		||||
                        K_(ranges),
 | 
			
		||||
@ -110,6 +111,7 @@ public:
 | 
			
		||||
private:
 | 
			
		||||
  common::ObSEArray<ObStoreRange, 16> ranges_;
 | 
			
		||||
  int64_t total_size_;
 | 
			
		||||
  int64_t timeout_us_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObDASRangesCostResult : public ObIDASTaskResult
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user