diff --git a/deps/oblib/src/lib/container/ob_array_array.h b/deps/oblib/src/lib/container/ob_array_array.h index 1ad6447c3f..fd7e374337 100644 --- a/deps/oblib/src/lib/container/ob_array_array.h +++ b/deps/oblib/src/lib/container/ob_array_array.h @@ -21,6 +21,7 @@ #include "lib/container/ob_se_array.h" #include "lib/utility/ob_template_utils.h" #include "lib/utility/ob_hang_fatal_error.h" +#include "lib/utility/serialization.h" namespace oceanbase { @@ -47,6 +48,9 @@ public: int reserve(const int64_t size); int at(const int64_t array_idx, ObIArray &array); int at(const int64_t array_idx, const int64_t idx, T &obj); + int assign(const ObArrayArray &other); + NEED_SERIALIZE_AND_DESERIALIZE; + OB_INLINE T &at(const int64_t array_idx, const int64_t idx) { if (OB_UNLIKELY(0 > array_idx || array_idx >= count_)) { @@ -71,6 +75,18 @@ public: } return *array_ptrs_[array_idx]; } + OB_INLINE const ObIArray &at(const int64_t array_idx) const + { + if (OB_UNLIKELY(0 > array_idx || array_idx >= count_)) { + LIB_LOG_RET(ERROR, common::OB_INVALID_ARGUMENT, "Unexpected array idx", K_(count), K_(capacity), K(array_idx)); + right_to_die_or_duty_to_live(); + } else if (OB_ISNULL(array_ptrs_) || OB_ISNULL(array_ptrs_[array_idx])) { + LIB_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "Unexpected null array array ptr", K_(count), K_(capacity), K(array_idx), + KP_(array_ptrs)); + right_to_die_or_duty_to_live(); + } + return *array_ptrs_[array_idx]; + } int push_back(const ObIArray &obj_array); int push_back(const int64_t array_idx, const T &obj); OB_INLINE int64_t count() const { return count_; } @@ -210,7 +226,7 @@ int ObArrayArray::reserv } if (OB_SUCC(ret)) { void *ptr = nullptr; - for (int64_t i = count_; OB_SUCC(ret) && i < capacity; i++) { + for (int64_t i = capacity_; OB_SUCC(ret) && i < capacity; i++) { if (OB_NOT_NULL(new_array_ptrs[i])) { ret = OB_ERR_UNEXPECTED; LIB_LOG(ERROR, "Unexpecte not null array array ptr", K(i), K_(count), K_(capacity), K(capacity), @@ -334,8 +350,80 @@ int64_t ObArrayArray::to return pos; } +template +int ObArrayArray::assign(const ObArrayArray &other) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(this != &other)) { + this->reuse(); + int64_t N = other.count(); + ret = reserve(N); + for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) { + if (OB_FAIL(array_ptrs_[i]->assign(other.at(i)))) { + LIB_LOG(WARN, "fail to assign array", K(ret)); + } + } + if (OB_SUCC(ret)) { + count_ = N; + } + } + return ret; +} +template +int ObArrayArray::serialize( + char *buf, const int64_t buf_len, int64_t &pos) const +{ + int ret = OB_SUCCESS; + if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, count()))) { + LIB_LOG(WARN, "fail to encode ob array count", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < count(); i++) { + const LocalArrayT &item = *array_ptrs_[i]; + if (OB_SUCCESS != (ret = serialization::encode(buf, buf_len, pos, item))) { + LIB_LOG(WARN, "fail to encode item", K(i), K(ret)); + } + } + return ret; +} + +template +int ObArrayArray::deserialize( + const char *buf, int64_t data_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t count = 0; + reset(); + if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, &count))) { + LIB_LOG(WARN, "fail to decode ob array count", K(ret)); + } else if (OB_SUCCESS != (ret = reserve(count))) { + LIB_LOG(WARN, "fail to allocate space", K(ret), K(count)); + } else { + count_ = count; + } + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + LocalArrayT &item = *array_ptrs_[i]; + if (OB_SUCCESS != (ret = serialization::decode(buf, data_len, pos, item))) { + LIB_LOG(WARN, "fail to decode array item", K(ret), K(i), K(count)); + } + } + return ret; +} + +template +int64_t ObArrayArray::get_serialize_size() +const +{ + int64_t size = 0; + size += serialization::encoded_length_vi64(count()); + for (int64_t i = 0; i < count(); i++) { + const LocalArrayT &item = *array_ptrs_[i]; + size += serialization::encoded_length(item); + } + return size; +} + } // common } // oceanbase diff --git a/deps/oblib/unittest/lib/container/test_array_array.cpp b/deps/oblib/unittest/lib/container/test_array_array.cpp index 1ffdd41513..49e72508ab 100644 --- a/deps/oblib/unittest/lib/container/test_array_array.cpp +++ b/deps/oblib/unittest/lib/container/test_array_array.cpp @@ -47,6 +47,7 @@ private: protected: // function members void setup(int64_t N, ObIArray &arr); + void setup(int64_t N, ObIArray &arr, ObArrayArray &arr_arr); void verify(int64_t N, const ObIArray &arr); void verify(const ObIArray &arr1, const ObIArray &arr2); void extend_to(const int64_t N, ObIArray &arr); @@ -80,6 +81,16 @@ void TestArrayArray::setup(int64_t N, ObIArray &arr) } // end for } +void TestArrayArray::setup(int64_t N, ObIArray &arr, ObArrayArray &arr_arr) +{ + arr.reset(); + arr_arr.reset(); + for (int64_t i = 0; i < N; ++i) { + setup(i, arr); + OK(arr_arr.push_back(arr)); + } // end for +} + void TestArrayArray::verify(int64_t N, const ObIArray &arr) { ASSERT_EQ(N, arr.count()); @@ -155,6 +166,36 @@ TEST_F(TestArrayArray, array_push) } +TEST_F(TestArrayArray, assign) +{ + int N = 10; + ObArrayArray src_arr_arr; + ObSEArray arr; + ObArrayArray dst_arr_arr; + for (int64_t i = 1; i < N; i++) { + setup(i, arr, src_arr_arr); + OK(dst_arr_arr.assign(src_arr_arr)); + ASSERT_EQ(dst_arr_arr.count(), src_arr_arr.count()); + for (int64_t j = 0; j < src_arr_arr.count(); j++) { + ASSERT_EQ(dst_arr_arr.count(j), src_arr_arr.count(j)); + verify(dst_arr_arr.at(j), src_arr_arr.at(j)); + } + } + COMMON_LOG(INFO, "print array array", K(dst_arr_arr)); + + N = 20; + for (int64_t i = 1; i < N; i++) { + setup(i, arr, src_arr_arr); + OK(dst_arr_arr.assign(src_arr_arr)); + ASSERT_EQ(dst_arr_arr.count(), src_arr_arr.count()); + for (int64_t j = 0; j < src_arr_arr.count(); j++) { + ASSERT_EQ(dst_arr_arr.count(j), src_arr_arr.count(j)); + verify(dst_arr_arr.at(j), src_arr_arr.at(j)); + } + } + COMMON_LOG(INFO, "print array array", K(dst_arr_arr)); +} + int main(int argc, char **argv) { //system("rm -rf test_array_array.log*"); diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 95f2a0819f..1d5c9e5bee 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -64,6 +64,7 @@ ob_set_subtarget(ob_sql das das/ob_das_task_result.cpp das/ob_das_spatial_index_lookup_op.cpp das/ob_das_retry_ctrl.cpp + das/ob_das_simple_op.cpp ) ob_set_subtarget(ob_sql dtl diff --git a/src/sql/das/ob_das_def_reg.h b/src/sql/das/ob_das_def_reg.h index 6afc3648bc..bd4ca6c3be 100644 --- a/src/sql/das/ob_das_def_reg.h +++ b/src/sql/das/ob_das_def_reg.h @@ -89,6 +89,16 @@ struct ObDASScanCtDef; struct ObDASScanRtDef; REGISTER_DAS_OP(DAS_OP_TABLE_BATCH_SCAN, ObDASGroupScanOp, ObDASScanResult, ObDASScanCtDef, ObDASScanRtDef); +class ObDASSplitRangesOp; +class ObDASSplitRangesResult; +class ObDASEmptyCtDef; +class ObDASEmptyRtDef; +REGISTER_DAS_OP(DAS_OP_SPLIT_MULTI_RANGES, ObDASSplitRangesOp, ObDASSplitRangesResult, ObDASEmptyCtDef, ObDASEmptyRtDef); + +class ObDASRangesCostOp; +class ObDASRangesCostResult; +REGISTER_DAS_OP(DAS_OP_GET_RANGES_COST, ObDASRangesCostOp, ObDASRangesCostResult, ObDASEmptyCtDef, ObDASEmptyRtDef); + #undef REGISTER_DAS_OP } // namespace sql } // namespace oceanbase diff --git a/src/sql/das/ob_das_define.h b/src/sql/das/ob_das_define.h index cf79c8acb9..360fe36c40 100644 --- a/src/sql/das/ob_das_define.h +++ b/src/sql/das/ob_das_define.h @@ -78,6 +78,8 @@ enum ObDASOpType DAS_OP_TABLE_DELETE, DAS_OP_TABLE_LOCK, DAS_OP_TABLE_BATCH_SCAN, + DAS_OP_SPLIT_MULTI_RANGES, + DAS_OP_GET_RANGES_COST, //append OpType before me DAS_OP_MAX }; diff --git a/src/sql/das/ob_das_factory.cpp b/src/sql/das/ob_das_factory.cpp index e6b539e8a9..7c78fa6968 100644 --- a/src/sql/das/ob_das_factory.cpp +++ b/src/sql/das/ob_das_factory.cpp @@ -18,6 +18,7 @@ #include "sql/das/ob_das_delete_op.h" #include "sql/das/ob_das_update_op.h" #include "sql/das/ob_das_lock_op.h" +#include "sql/das/ob_das_simple_op.h" #include "sql/das/ob_das_extra_data.h" #include "sql/das/ob_das_def_reg.h" #include "sql/das/ob_das_rpc_processor.h" diff --git a/src/sql/das/ob_das_simple_op.cpp b/src/sql/das/ob_das_simple_op.cpp new file mode 100644 index 0000000000..5ec0210fbf --- /dev/null +++ b/src/sql/das/ob_das_simple_op.cpp @@ -0,0 +1,280 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_DAS +#include "sql/das/ob_das_simple_op.h" +#include "sql/das/ob_das_ref.h" +#include "storage/tx_storage/ob_access_service.h" + +namespace oceanbase +{ +namespace sql +{ + +ObDASSimpleOp::ObDASSimpleOp(ObIAllocator &op_alloc) + : ObIDASTaskOp(op_alloc) {} + +int ObDASSimpleOp::release_op() +{ + int ret = OB_SUCCESS; + return ret; +} + +int ObDASSimpleOp::init_task_info(uint32_t row_extend_size) +{ + int ret = OB_SUCCESS; + UNUSED(row_extend_size); + return ret; +} + +int ObDASSimpleOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) +{ + int ret = OB_SUCCESS; + UNUSED(remote_info); + return ret; +} +OB_SERIALIZE_MEMBER((ObDASSimpleOp, ObIDASTaskOp)); + +OB_SERIALIZE_MEMBER(ObDASEmptyCtDef); +OB_SERIALIZE_MEMBER(ObDASEmptyRtDef); + +ObDASSplitRangesOp::ObDASSplitRangesOp(ObIAllocator &op_alloc) + : ObDASSimpleOp(op_alloc), expected_task_count_(0) {} + +int ObDASSplitRangesOp::open_op() +{ + int ret = OB_SUCCESS; + ObAccessService *access_service = MTL(ObAccessService *); + if (OB_FAIL(access_service->split_multi_ranges(ls_id_, + tablet_id_, + ranges_, + expected_task_count_, + op_alloc_, + multi_range_split_array_))) { + LOG_WARN("failed to split multi ranges", K(ret), K_(ls_id), K_(tablet_id)); + } + return ret; +} + +int ObDASSplitRangesOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) +{ + int ret = OB_SUCCESS; + UNUSED(memory_limit); +#if !defined(NDEBUG) + CK(typeid(task_result) == typeid(ObDASSplitRangesResult)); +#endif + if (OB_SUCC(ret)) { + ObDASSplitRangesResult &result = static_cast(task_result); + result.assign(multi_range_split_array_); + has_more = false; + } + return ret; +} + +int ObDASSplitRangesOp::decode_task_result(ObIDASTaskResult *task_result) +{ + int ret = OB_SUCCESS; +#if !defined(NDEBUG) + CK(typeid(*task_result) == typeid(ObDASSplitRangesResult)); + CK(task_id_ == task_result->get_task_id()); +#endif + if (OB_SUCC(ret)) { + ObDASSplitRangesResult *result = static_cast(task_result); + if (OB_FAIL(multi_range_split_array_.assign(result->get_split_array()))) { + LOG_WARN("failed to decode multi_range_split_array", K(ret)); + } + } + return ret; +} + +int ObDASSplitRangesOp::init(const common::ObIArray &ranges, int64_t expected_task_count) +{ + int ret = OB_SUCCESS; + expected_task_count_ = expected_task_count; + if (OB_FAIL(ranges_.assign(ranges))) { + LOG_WARN("failed to assign ranges array", K(ret)); + } + return ret; +} + +OB_SERIALIZE_MEMBER((ObDASSplitRangesOp, ObIDASTaskOp), + ranges_, + expected_task_count_); + +ObDASSplitRangesResult::ObDASSplitRangesResult() + : ObIDASTaskResult() {} + +ObDASSplitRangesResult::~ObDASSplitRangesResult() +{ + multi_range_split_array_.reset(); +} + +int ObDASSplitRangesResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) +{ + int ret = OB_SUCCESS; + UNUSED(op); + UNUSED(alloc); + multi_range_split_array_.reset(); + return ret; +} + +int ObDASSplitRangesResult::reuse() +{ + int ret = OB_SUCCESS; + multi_range_split_array_.reuse(); + return ret; +} + +int ObDASSplitRangesResult::assign(const ObArrayArray &array) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(multi_range_split_array_.assign(array))) { + LOG_WARN("failed to assign multi ranges array", K(ret)); + } + return ret; +} + +OB_SERIALIZE_MEMBER((ObDASSplitRangesResult, ObIDASTaskResult), + multi_range_split_array_); + +ObDASRangesCostOp::ObDASRangesCostOp(common::ObIAllocator &op_alloc) + : ObDASSimpleOp(op_alloc), total_size_(0) {} + +int ObDASRangesCostOp::open_op() +{ + int ret = OB_SUCCESS; + ObAccessService *access_service = MTL(ObAccessService *); + if (OB_FAIL(access_service->get_multi_ranges_cost(ls_id_, + tablet_id_, + ranges_, + total_size_))) { + LOG_WARN("failed to get multi ranges cost", K(ret), K_(ls_id), K_(tablet_id)); + } + return ret; +} + +int ObDASRangesCostOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) +{ + int ret = OB_SUCCESS; + UNUSED(memory_limit); +#if !defined(NDEBUG) + CK(typeid(task_result) == typeid(ObDASSplitRangesResult)); +#endif + if (OB_SUCC(ret)) { + ObDASRangesCostResult &result = static_cast(task_result); + result.set_total_size(total_size_); + has_more = false; + } + return ret; +} + +int ObDASRangesCostOp::decode_task_result(ObIDASTaskResult *task_result) +{ + int ret = OB_SUCCESS; +#if !defined(NDEBUG) + CK(typeid(*task_result) == typeid(ObDASSplitRangesResult)); + CK(task_id_ == task_result->get_task_id()); +#endif + if (OB_SUCC(ret)) { + ObDASRangesCostResult *result = static_cast(task_result); + total_size_ = result->get_total_size(); + } + return ret; +} + +int ObDASRangesCostOp::init(const common::ObIArray &ranges) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ranges_.assign(ranges))) { + LOG_WARN("failed to assign ranges array", K(ret)); + } + return ret; +} + +OB_SERIALIZE_MEMBER((ObDASRangesCostOp, ObIDASTaskOp), + ranges_, + total_size_); + +ObDASRangesCostResult::ObDASRangesCostResult() + : ObIDASTaskResult(), total_size_(0) {} + +int ObDASRangesCostResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) +{ + int ret = OB_SUCCESS; + UNUSED(op); + UNUSED(alloc); + total_size_ = 0; + return ret; +} + +int ObDASRangesCostResult::reuse() +{ + int ret = OB_SUCCESS; + return ret; +} + +OB_SERIALIZE_MEMBER((ObDASRangesCostResult, ObIDASTaskResult), + total_size_); + +int ObDASSimpleUtils::split_multi_ranges(ObExecContext &exec_ctx, + ObDASTabletLoc *tablet_loc, + const common::ObIArray &ranges, + const int64_t expected_task_count, + ObArrayArray &multi_range_split_array) +{ + int ret = OB_SUCCESS; + ObIDASTaskOp *task_op = nullptr; + ObDASSplitRangesOp *split_ranges_op = nullptr; + ObEvalCtx eval_ctx(exec_ctx); + ObDASRef das_ref(eval_ctx, exec_ctx); + if (OB_FAIL(das_ref.create_das_task(tablet_loc, DAS_OP_SPLIT_MULTI_RANGES, task_op))) { + LOG_WARN("prepare das split_multi_ranges task failed", K(ret)); + } else { + split_ranges_op = static_cast(task_op); + if (OB_FAIL(split_ranges_op->init(ranges, expected_task_count))) { + LOG_WARN("failed to init das split ranges op", K(ret)); + } else if (OB_FAIL(das_ref.execute_all_task())) { + LOG_WARN("execute das split_multi_ranges task failed", K(ret)); + } else if (OB_FAIL(multi_range_split_array.assign(split_ranges_op->get_split_array()))) { + LOG_WARN("assgin split multi ranges array failed", K(ret)); + } + } + return ret; +} + +int ObDASSimpleUtils::get_multi_ranges_cost(ObExecContext &exec_ctx, + ObDASTabletLoc *tablet_loc, + const common::ObIArray &ranges, + int64_t &total_size) +{ + int ret = OB_SUCCESS; + ObIDASTaskOp *task_op = nullptr; + ObDASRangesCostOp *ranges_cost_op = nullptr; + ObEvalCtx eval_ctx(exec_ctx); + ObDASRef das_ref(eval_ctx, exec_ctx); + if (OB_FAIL(das_ref.create_das_task(tablet_loc, DAS_OP_GET_RANGES_COST, task_op))) { + LOG_WARN("prepare das get_multi_ranges_cost task failed", K(ret)); + } else { + ranges_cost_op = static_cast(task_op); + if (OB_FAIL(ranges_cost_op->init(ranges))) { + LOG_WARN("failed to init das ranges cost op", K(ret)); + } else if (OB_FAIL(das_ref.execute_all_task())) { + LOG_WARN("execute das get_multi_ranges_cost task failed", K(ret)); + } else { + total_size = ranges_cost_op->get_total_size(); + } + } + return ret; +} + +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/das/ob_das_simple_op.h b/src/sql/das/ob_das_simple_op.h new file mode 100644 index 0000000000..d15db053e4 --- /dev/null +++ b/src/sql/das/ob_das_simple_op.h @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OBDEV_SRC_SQL_DAS_OB_DAS_SIMPLE_OP_H +#define OBDEV_SRC_SQL_DAS_OB_DAS_SIMPLE_OP_H +#include "sql/das/ob_das_task.h" + +namespace oceanbase +{ +namespace common +{ +class ObStoreRange; +} +namespace sql +{ + +class ObDASSimpleOp : public ObIDASTaskOp +{ + OB_UNIS_VERSION(1); +public: + ObDASSimpleOp(common::ObIAllocator &op_alloc); + virtual ~ObDASSimpleOp() = default; + virtual int open_op() = 0; + virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) = 0; + virtual int decode_task_result(ObIDASTaskResult *task_result) = 0; + + virtual int release_op() override; + virtual int init_task_info(uint32_t row_extend_size) override; + virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; +}; + +struct ObDASEmptyCtDef : ObDASBaseCtDef +{ + OB_UNIS_VERSION(1); +public: + ObDASEmptyCtDef(common::ObIAllocator &alloc) + : ObDASBaseCtDef(DAS_OP_INVALID) {} +}; + +struct ObDASEmptyRtDef : ObDASBaseRtDef +{ + OB_UNIS_VERSION(1); +public: + ObDASEmptyRtDef() + : ObDASBaseRtDef(DAS_OP_INVALID) {} +}; + +class ObDASSplitRangesOp : public ObDASSimpleOp +{ + OB_UNIS_VERSION(1); +public: + ObDASSplitRangesOp(common::ObIAllocator &op_alloc); + virtual ~ObDASSplitRangesOp() = default; + virtual int open_op() override; + virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; + virtual int decode_task_result(ObIDASTaskResult *task_result) override; + int init(const common::ObIArray &ranges, int64_t expected_task_count); + const ObArrayArray &get_split_array() { return multi_range_split_array_; } + INHERIT_TO_STRING_KV("parent", ObDASSimpleOp, + K_(ranges), + K_(expected_task_count), + K_(multi_range_split_array)); +private: + common::ObSEArray ranges_; + int64_t expected_task_count_; + ObArrayArray multi_range_split_array_; +}; + +class ObDASSplitRangesResult : public ObIDASTaskResult +{ + OB_UNIS_VERSION_V(1); +public: + ObDASSplitRangesResult(); + virtual ~ObDASSplitRangesResult(); + virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; + const ObArrayArray &get_split_array() const { return multi_range_split_array_; } + ObArrayArray &get_split_array() { return multi_range_split_array_; } + int assign(const ObArrayArray &array); + INHERIT_TO_STRING_KV("parent", ObIDASTaskResult, + K_(multi_range_split_array)); +private: + ObArrayArray multi_range_split_array_; +}; + +class ObDASRangesCostOp : public ObDASSimpleOp +{ + OB_UNIS_VERSION(1); +public: + ObDASRangesCostOp(common::ObIAllocator &op_alloc); + virtual ~ObDASRangesCostOp() = default; + virtual int open_op() override; + virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; + virtual int decode_task_result(ObIDASTaskResult *task_result) override; + int init(const common::ObIArray &ranges); + int64_t get_total_size() const { return total_size_; } + INHERIT_TO_STRING_KV("parent", ObDASSimpleOp, + K_(ranges), + K_(total_size)); +private: + common::ObSEArray ranges_; + int64_t total_size_; +}; + +class ObDASRangesCostResult : public ObIDASTaskResult +{ + OB_UNIS_VERSION_V(1); +public: + ObDASRangesCostResult(); + virtual ~ObDASRangesCostResult() = default; + virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override; + virtual int reuse() override; + int64_t get_total_size() const { return total_size_; } + void set_total_size(int64_t total_size) { total_size_ = total_size; } + INHERIT_TO_STRING_KV("parent", ObIDASTaskResult, + K_(total_size)); +private: + int64_t total_size_; +}; + +class ObDASSimpleUtils +{ +public: + static int split_multi_ranges(ObExecContext &exec_ctx, + ObDASTabletLoc *tablet_loc, + const common::ObIArray &ranges, + const int64_t expected_task_count, + ObArrayArray &multi_range_split_array); + + static int get_multi_ranges_cost(ObExecContext &exec_ctx, + ObDASTabletLoc *tablet_loc, + const common::ObIArray &ranges, + int64_t &total_size); +}; + +} // namespace sql +} // namespace oceanbase +#endif /* OBDEV_SRC_SQL_DAS_OB_DAS_DELETE_OP_H_ */ diff --git a/src/sql/engine/px/ob_granule_pump.cpp b/src/sql/engine/px/ob_granule_pump.cpp index f1380f8a47..8e4d7a43bd 100644 --- a/src/sql/engine/px/ob_granule_pump.cpp +++ b/src/sql/engine/px/ob_granule_pump.cpp @@ -752,7 +752,8 @@ int ObGranuleSplitter::split_gi_task(ObGranulePumpArgs &args, taskset_ranges, taskset_idxs); } else { - ret = ObGranuleUtil::split_block_ranges(args.ctx_->get_allocator(), + ret = ObGranuleUtil::split_block_ranges(*args.ctx_, + args.ctx_->get_allocator(), tsc, ranges, tablets, @@ -1321,7 +1322,8 @@ int ObPartitionWiseGranuleSplitter::split_insert_gi_task(ObGranulePumpArgs &args LOG_WARN("failed to make whole range", K(ret)); } else if (OB_FAIL(ranges.push_back(each_partition_range))) { LOG_WARN("failed to push partition range to ranges", K(ret)); - } else if (OB_FAIL(ObGranuleUtil::split_block_ranges(args.ctx_->get_allocator(), + } else if (OB_FAIL(ObGranuleUtil::split_block_ranges(*args.ctx_, + args.ctx_->get_allocator(), NULL, ranges, tablets, diff --git a/src/sql/engine/px/ob_granule_util.cpp b/src/sql/engine/px/ob_granule_util.cpp index 20a31c892f..670607f7c0 100644 --- a/src/sql/engine/px/ob_granule_util.cpp +++ b/src/sql/engine/px/ob_granule_util.cpp @@ -27,6 +27,7 @@ #include "share/external_table/ob_external_table_file_mgr.h" #include "share/external_table/ob_external_table_utils.h" #include "sql/engine/table/ob_external_table_access_service.h" +#include "sql/das/ob_das_simple_op.h" using namespace oceanbase::common; using namespace oceanbase::share; @@ -132,7 +133,8 @@ int ObGranuleUtil::split_granule_for_external_table(ObIAllocator &allocator, return ret; } -int ObGranuleUtil::split_block_ranges(ObIAllocator &allocator, +int ObGranuleUtil::split_block_ranges(ObExecContext &exec_ctx, + ObIAllocator &allocator, const ObTableScanSpec *tsc,//may be is null, attention use const ObIArray &in_ranges, const ObIArray &tablets, @@ -184,7 +186,8 @@ int ObGranuleUtil::split_block_ranges(ObIAllocator &allocator, } } LOG_TRACE("gi partition granule"); - } else if (OB_FAIL(split_block_granule(allocator, + } else if (OB_FAIL(split_block_granule(exec_ctx, + allocator, tsc, ranges, tablets, @@ -227,7 +230,8 @@ int ObGranuleUtil::remove_empty_range(const common::ObIArray return ret; } -int ObGranuleUtil::split_block_granule(ObIAllocator &allocator, +int ObGranuleUtil::split_block_granule(ObExecContext &exec_ctx, + ObIAllocator &allocator, const ObTableScanSpec *tsc,//may be is null, attention use! const ObIArray &input_ranges, const ObIArray &tablets, @@ -273,10 +277,9 @@ int ObGranuleUtil::split_block_granule(ObIAllocator &allocator, input_store_ranges, need_convert_new_range))) { LOG_WARN("failed to convert new range to store range", K(ret)); - } else if (OB_FAIL(access_service->get_multi_ranges_cost(tablet.ls_id_, - tablet.tablet_id_, - input_store_ranges, - partition_size))) { + } else if (OB_FAIL(ObDASSimpleUtils::get_multi_ranges_cost(exec_ctx, tablets.at(i), + input_store_ranges, + partition_size))) { LOG_WARN("failed to get multi ranges cost", K(ret), K(tablet)); } else { // B to MB @@ -339,7 +342,8 @@ int ObGranuleUtil::split_block_granule(ObIAllocator &allocator, input_store_ranges, need_convert_new_range))) { LOG_WARN("failed to convert new range to store range", K(ret)); - } else if (OB_FAIL(get_tasks_for_partition(allocator, + } else if (OB_FAIL(get_tasks_for_partition(exec_ctx, + allocator, expected_task_cnt, *tablet, input_store_ranges, @@ -470,7 +474,8 @@ int ObGranuleUtil::compute_task_count_each_partition(int64_t total_size, return ret; } -int ObGranuleUtil::get_tasks_for_partition(ObIAllocator &allocator, +int ObGranuleUtil::get_tasks_for_partition(ObExecContext &exec_ctx, + ObIAllocator &allocator, int64_t expected_task_cnt, ObDASTabletLoc &tablet, ObIArray &input_storage_ranges, @@ -504,12 +509,11 @@ int ObGranuleUtil::get_tasks_for_partition(ObIAllocator &allocator, if (!range_independent) { tablet_idx++; } - } else if (OB_FAIL(access_service->split_multi_ranges(tablet.ls_id_, - tablet.tablet_id_, - input_storage_ranges, - expected_task_cnt, - allocator, - multi_range_split_array))) { + } else if (OB_FAIL(ObDASSimpleUtils::split_multi_ranges(exec_ctx, + &tablet, + input_storage_ranges, + expected_task_cnt, + multi_range_split_array))) { LOG_WARN("failed to split multi ranges", K(ret), K(tablet), K(expected_task_cnt)); } else { LOG_TRACE("split multi ranges", diff --git a/src/sql/engine/px/ob_granule_util.h b/src/sql/engine/px/ob_granule_util.h index 469f8448d8..0b3938428d 100644 --- a/src/sql/engine/px/ob_granule_util.h +++ b/src/sql/engine/px/ob_granule_util.h @@ -171,7 +171,8 @@ public: * range_independent IN the random type witch affects the granule_idx * */ - static int split_block_ranges(common::ObIAllocator &allocator, + static int split_block_ranges(ObExecContext &exec_ctx, + common::ObIAllocator &allocator, const ObTableScanSpec *tsc, const common::ObIArray &ranges, const common::ObIArray &tablets, @@ -259,7 +260,8 @@ public: * range_independent IN the random type witch affects the granule_idx * */ - static int split_block_granule(common::ObIAllocator &allocator, + static int split_block_granule(ObExecContext &exec_ctx, + common::ObIAllocator &allocator, const ObTableScanSpec *tsc, const common::ObIArray &input_ranges, const common::ObIArray &tablet_array, @@ -319,7 +321,8 @@ private: * pkey_idx OUT the idx in granule ranges * range_independent IN the random type witch affects the granule_idx */ - static int get_tasks_for_partition(common::ObIAllocator &allocator, + static int get_tasks_for_partition(ObExecContext &exec_ctx, + common::ObIAllocator &allocator, int64_t expected_task_cnt, ObDASTabletLoc &tablet, common::ObIArray &input_storage_ranges,