From 97b3bfab8b83115a08de591ba760b327271c4cdb Mon Sep 17 00:00:00 2001 From: raywill Date: Mon, 21 Mar 2022 14:34:15 +0800 Subject: [PATCH] result not in order when range partition truncated or splited --- src/sql/engine/px/ob_granule_iterator.cpp | 26 ------ src/sql/engine/px/ob_granule_iterator.h | 8 +- src/sql/engine/px/ob_granule_iterator_op.cpp | 28 +----- src/sql/engine/px/ob_granule_iterator_op.h | 8 +- src/sql/engine/px/ob_px_util.cpp | 97 ++++++++++++++++++-- src/sql/engine/px/ob_px_util.h | 10 +- src/sql/executor/ob_task_executor_ctx.cpp | 3 +- 7 files changed, 106 insertions(+), 74 deletions(-) diff --git a/src/sql/engine/px/ob_granule_iterator.cpp b/src/sql/engine/px/ob_granule_iterator.cpp index a9b71490e..c4c01dbbf 100644 --- a/src/sql/engine/px/ob_granule_iterator.cpp +++ b/src/sql/engine/px/ob_granule_iterator.cpp @@ -42,30 +42,6 @@ ObPhyOperatorType ObGIInput::get_phy_op_type() const return PHY_GRANULE_ITERATOR; } -int ObGIInput::assign_ranges(const common::ObIArray& ranges) -{ - int ret = OB_SUCCESS; - FOREACH_CNT_X(it, ranges, OB_SUCC(ret)) - { - if (OB_FAIL(ranges_.push_back(*it))) { - LOG_WARN("failed to push range", K(ret)); - } - } - return ret; -} - -int ObGIInput::assign_pkeys(const common::ObIArray& pkeys) -{ - int ret = OB_SUCCESS; - FOREACH_CNT_X(it, pkeys, OB_SUCC(ret)) - { - if (OB_FAIL(pkeys_.push_back(*it))) { - LOG_WARN("failed to push range", K(ret)); - } - } - return ret; -} - int ObGIInput::deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst) { int ret = OB_SUCCESS; @@ -324,8 +300,6 @@ int ObGranuleIterator::rescan(ObExecContext& ctx) const gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK; } } else { - // 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行. - // 在执行过程中缓存住了自己的任务队列. if (GI_UNINITIALIZED == gi_ctx->state_ || GI_PREPARED == gi_ctx->state_) { /*do nothing*/ } else { diff --git a/src/sql/engine/px/ob_granule_iterator.h b/src/sql/engine/px/ob_granule_iterator.h index 892b6c1c3..2e8c9716d 100644 --- a/src/sql/engine/px/ob_granule_iterator.h +++ b/src/sql/engine/px/ob_granule_iterator.h @@ -53,8 +53,6 @@ public: { return parallelism_; } - int assign_ranges(const common::ObIArray& ranges); - int assign_pkeys(const common::ObIArray& pkeys); void set_granule_pump(ObGranulePump* pump) { pump_ = pump; @@ -137,9 +135,9 @@ public: int64_t tablet_size_; int64_t worker_id_; uint64_t tsc_op_id_; - common::ObSEArray ranges_; - common::ObSEArray pkeys_; - ObGranulePump* pump_; + common::ObSEArray ranges_; + common::ObSEArray pkeys_; + ObGranulePump *pump_; ObGranuleIteratorState state_; bool all_task_fetched_; diff --git a/src/sql/engine/px/ob_granule_iterator_op.cpp b/src/sql/engine/px/ob_granule_iterator_op.cpp index d73d6e0d3..78b1f8ea5 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.cpp +++ b/src/sql/engine/px/ob_granule_iterator_op.cpp @@ -48,31 +48,7 @@ void ObGIOpInput::set_deserialize_allocator(common::ObIAllocator* allocator) deserialize_allocator_ = allocator; } -int ObGIOpInput::assign_ranges(const common::ObIArray& ranges) -{ - int ret = OB_SUCCESS; - FOREACH_CNT_X(it, ranges, OB_SUCC(ret)) - { - if (OB_FAIL(ranges_.push_back(*it))) { - LOG_WARN("failed to push range", K(ret)); - } - } - return ret; -} - -int ObGIOpInput::assign_pkeys(const common::ObIArray& pkeys) -{ - int ret = OB_SUCCESS; - FOREACH_CNT_X(it, pkeys, OB_SUCC(ret)) - { - if (OB_FAIL(pkeys_.push_back(*it))) { - LOG_WARN("failed to push range", K(ret)); - } - } - return ret; -} - -int ObGIOpInput::deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst) +int ObGIOpInput::deep_copy_range(ObIAllocator *allocator, const ObNewRange &src, ObNewRange &dst) { int ret = OB_SUCCESS; if (OB_ISNULL(allocator)) { @@ -362,8 +338,6 @@ int ObGranuleIteratorOp::rescan() state_ = GI_GET_NEXT_GRANULE_TASK; } } else { - // 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行. - // 在执行过程中缓存住了自己的任务队列. if (GI_UNINITIALIZED == state_ || GI_PREPARED == state_) { /*do nothing*/ } else { diff --git a/src/sql/engine/px/ob_granule_iterator_op.h b/src/sql/engine/px/ob_granule_iterator_op.h index 4fdb79766..f78dbbdca 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.h +++ b/src/sql/engine/px/ob_granule_iterator_op.h @@ -59,7 +59,6 @@ public: { return worker_id_; } - private: int deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst); @@ -71,9 +70,8 @@ public: int64_t worker_id_; // Need serialize - common::ObSEArray ranges_; - // use partition key/partition idx to tag partition - common::ObSEArray pkeys_; + common::ObSEArray ranges_; + common::ObSEArray pkeys_; ObGranulePump* pump_; private: @@ -232,8 +230,6 @@ private: const ObGITaskSet* rescan_taskset_ = NULL; common::ObSEArray rescan_tasks_; int64_t rescan_task_idx_; - // full pwj场景下, 在执行过程中缓存住了自己的任务队列. - // 供GI rescan使用 common::ObSEArray pwj_rescan_task_infos_; }; diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 8d05c60cb..e6fae54a7 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -583,7 +583,8 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext& ctx, ObDfo& df } else if (OB_FAIL(get_access_partition_order(dfo, phy_op, asc_order))) { LOG_WARN("fail to get table scan partition order", K(ret)); } else if (OB_FAIL(ObPXServerAddrUtil::reorder_all_partitions( - table_location_key, locations, temp_locations, asc_order, ctx, base_order))) { + table_location_key, table_loc->get_ref_table_id(), + locations, temp_locations, asc_order, ctx, base_order))) { LOG_WARN("fail to reorder all partitions", K(ret)); } else { LOG_TRACE("sqc partition order is", K(asc_order)); @@ -619,24 +620,106 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext& ctx, ObDfo& df return ret; } -int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, + +// used to fast lookup from phy partition id to partition order(index) +// for a range partition, the greater the range, the greater the partition_index +// for a hash partition, the index means nothing +int ObPXServerAddrUtil::build_partition_index_lookup_map(ObTaskExecutorCtx &task_exec_ctx, + uint64_t ref_table_id, + ObPartitionIndexMap &idx_map) +{ + int ret = OB_SUCCESS; + schema::ObSchemaGetterGuard schema_guard; + const schema::ObSimpleTableSchemaV2 *table_schema = NULL; + if (OB_ISNULL(task_exec_ctx.schema_service_)) { + } else if (OB_FAIL(task_exec_ctx.schema_service_->get_schema_guard(schema_guard))) { + LOG_WARN("fail get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(ref_table_id, table_schema))) { + LOG_WARN("fail get table schema", K(ref_table_id), K(ret)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("fail get schema", K(ref_table_id), K(ret)); + } else if (OB_FAIL(idx_map.create(table_schema->get_all_part_num(), "PartOrderIdx"))) { + LOG_WARN("fail create index map", K(ret), "cnt", table_schema->get_all_part_num()); + } else { + schema::ObTablePartItemIterator iter(*table_schema); + schema::ObPartitionItem item; + do { + if (OB_FAIL(iter.next(item))) { + if (OB_ITER_END != ret) { + LOG_WARN("fail get next partition item from iterator", K(ref_table_id), K(ret)); + } else { + ret = OB_SUCCESS; + break; + } + } else if (OB_FAIL(idx_map.set_refactored(item.partition_id_, item.partition_idx_))) { + LOG_WARN("fail set value to hashmap", K(item), K(ret)); + } + } while (OB_SUCC(ret)); + } + return ret; +} + + +class ObPXPartitionOrderIndexCmp +{ +public: + ObPXPartitionOrderIndexCmp(bool asc, ObPartitionIndexMap *map) + : asc_(asc), map_(map) + {} + bool operator() (const ObPartitionReplicaLocation &left, const ObPartitionReplicaLocation &right) + { + int ret = OB_SUCCESS; + bool bret = false; + int64_t lv, rv; + if (OB_FAIL(map_->get_refactored(left.get_partition_id(), lv))) { + LOG_WARN("fail get partition index", K(asc_), K(left), K(right), K(ret)); + throw OB_EXCEPTION(); + } else if (OB_FAIL(map_->get_refactored(right.get_partition_id(), rv))) { + LOG_WARN("fail get partition index", K(asc_), K(left), K(right), K(ret)); + throw OB_EXCEPTION(); + } else { + bret = asc_ ? (lv < rv) : (lv > rv); + } + return bret; + } +private: + bool asc_; + ObPartitionIndexMap *map_; +}; + +int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, int64_t ref_table_id, const ObPartitionReplicaLocationIArray& src_locations, ObPartitionReplicaLocationIArray& dst_locations, bool asc, ObExecContext& exec_ctx, ObIArray& base_order) { int ret = OB_SUCCESS; dst_locations.reset(); if (src_locations.count() > 1) { + ObPartitionIndexMap part_order_map; + if (OB_FAIL(build_partition_index_lookup_map(exec_ctx.get_task_exec_ctx(), + ref_table_id, part_order_map))) { + LOG_WARN("fail build index lookup map", K(ret)); + } for (int i = 0; i < src_locations.count() && OB_SUCC(ret); ++i) { if (OB_FAIL(dst_locations.push_back(src_locations.at(i)))) { LOG_WARN("fail to push dst locations", K(ret)); } } if (OB_SUCC(ret)) { - std::sort(&dst_locations.at(0), - &dst_locations.at(0) + dst_locations.count(), - asc ? ObPartitionReplicaLocation::compare_part_loc_asc : ObPartitionReplicaLocation::compare_part_loc_desc); - PWJPartitionIdMap* pwj_map = NULL; - if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) { + try { + std::sort(&dst_locations.at(0), + &dst_locations.at(0) + dst_locations.count(), + ObPXPartitionOrderIndexCmp(asc, &part_order_map)); + } catch (OB_BASE_EXCEPTION &except) { + if (OB_HASH_NOT_EXIST == (ret = except.get_errno())) { + // schema changed during execution, notify to retry + ret = OB_SCHEMA_ERROR; + } + } + PWJPartitionIdMap *pwj_map = NULL; + if (OB_FAIL(ret)) { + LOG_WARN("fail to sort locations", K(ret)); + } else if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) { PartitionIdArray partition_id_array; if (OB_FAIL(pwj_map->get_refactored(table_location_key, partition_id_array))) { if (OB_HASH_NOT_EXIST == ret) { diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index e1862afa5..aa17e6487 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -96,6 +96,8 @@ public: #define ENG_OP typename ObEngineOpTraits +typedef common::hash::ObHashMap ObPartitionIndexMap; + class ObPXServerAddrUtil { class ObPxSqcTaskCountMeta { public: @@ -132,8 +134,12 @@ private: template static int find_dml_ops_inner(common::ObIArray& insert_ops, const ENG_OP::Root& op); static int check_partition_wise_location_valid(ObPartitionReplicaLocationIArray& tsc_locations); - - static int reorder_all_partitions(int64_t location_key, const ObPartitionReplicaLocationIArray& src_locations, + static int build_partition_index_lookup_map( + ObTaskExecutorCtx &task_exec_ctx, + uint64_t ref_table_id, + ObPartitionIndexMap &idx_map); + static int reorder_all_partitions(int64_t location_key, int64_t ref_table_id, + const ObPartitionReplicaLocationIArray& src_locations, ObPartitionReplicaLocationIArray& tsc_locations, bool asc, ObExecContext& exec_ctx, ObIArray& base_order); diff --git a/src/sql/executor/ob_task_executor_ctx.cpp b/src/sql/executor/ob_task_executor_ctx.cpp index dec171dad..1aeb5b00e 100644 --- a/src/sql/executor/ob_task_executor_ctx.cpp +++ b/src/sql/executor/ob_task_executor_ctx.cpp @@ -335,7 +335,6 @@ int ObTaskExecutorCtxUtil::get_part_runner_server( return ret; } -// 每次调用都会 allocate 一个 table_location int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key, uint64_t ref_table_id, bool is_weak, ObPhyTableLocationGuard &table_location) { @@ -385,6 +384,8 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx, LOG_WARN("fail alloc new location", K(ret)); } else if (OB_FAIL(table_location.get_loc()->add_partition_locations(phy_location_info))) { LOG_WARN("add partition locations failed", K(ret), K(phy_location_info)); + } else { + table_location.get_loc()->set_table_location_key(table_location_key, ref_table_id); } return ret; }