diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index 7f5099122..54aa7030d 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -108,7 +108,7 @@ ObExecContext::ObExecContext(ObIAllocator &allocator) frame_cnt_(0), op_kit_store_(), convert_allocator_(nullptr), - pwj_map_(nullptr), + group_pwj_map_(nullptr), calc_type_(CALC_NORMAL), fixed_id_(OB_INVALID_ID), check_status_times_(0), @@ -169,9 +169,9 @@ ObExecContext::~ObExecContext() package_guard_->~ObPLPackageGuard(); package_guard_ = NULL; } - if (OB_NOT_NULL(pwj_map_)) { - pwj_map_->destroy(); - pwj_map_ = NULL; + if (OB_NOT_NULL(group_pwj_map_)) { + group_pwj_map_->destroy(); + group_pwj_map_ = nullptr; } if (OB_NOT_NULL(vt_ift_)) { vt_ift_->~ObIVirtualTableIteratorFactory(); @@ -840,24 +840,50 @@ int ObExecContext::add_row_id_list(const common::ObIArray *row_id_list) return ret; } -int ObExecContext::get_pwj_map(PWJTabletIdMap *&pwj_map) +int ObExecContext::get_group_pwj_map(GroupPWJTabletIdMap *&group_pwj_map) { int ret = OB_SUCCESS; - pwj_map = nullptr; - if (nullptr == pwj_map_) { - void *buf = allocator_.alloc(sizeof(PWJTabletIdMap)); + group_pwj_map = nullptr; + if (nullptr == group_pwj_map_) { + void *buf = allocator_.alloc(sizeof(GroupPWJTabletIdMap)); if (nullptr == buf) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to allocate memories", K(ret)); - } else if (FALSE_IT(pwj_map_ = new(buf) PWJTabletIdMap())) { - } else if (OB_FAIL(pwj_map_->create(PARTITION_WISE_JOIN_TSC_HASH_BUCKET_NUM, /* assume no more than 8 table scan in a plan */ - ObModIds::OB_SQL_PX))) { - LOG_WARN("Failed to create gi task map", K(ret)); } else { - pwj_map = pwj_map_; + group_pwj_map_ = new (buf) GroupPWJTabletIdMap(); + /* assume no more than 8table scan in a plan */ + if (OB_FAIL(group_pwj_map_->create(PARTITION_WISE_JOIN_TSC_HASH_BUCKET_NUM, ObModIds::OB_SQL_PX))) { + LOG_WARN("Failed to create group_pwj_map_", K(ret)); + } else { + group_pwj_map = group_pwj_map_; + } } } else { - pwj_map = pwj_map_; + group_pwj_map = group_pwj_map_; + } + return ret; +} + +int ObExecContext::deep_copy_group_pwj_map(const GroupPWJTabletIdMap *src) +{ + int ret = OB_SUCCESS; + GroupPWJTabletIdMap *des = nullptr; + if (OB_ISNULL(src)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null"); + } else if (OB_FAIL(get_group_pwj_map(des))) { + LOG_WARN("failed to get_group_pwj_map"); + } else if (des->size() > 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("size should be 0", K(des->size()), K(src->size())); + } else { + FOREACH_X(iter, *src, OB_SUCC(ret)) { + const uint64_t table_id = iter->first; + const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second; + if (OB_FAIL(des->set_refactored(table_id, group_pwj_tablet_id_info))) { + LOG_WARN("failed to set refactored", K(table_id)); + } + } } return ret; } @@ -1040,6 +1066,78 @@ DEFINE_GET_SERIALIZE_SIZE(ObExecContext) return len; } +int64_t ObExecContext::get_group_pwj_map_serialize_size() const +{ + int64_t len = 0; + // add serialize size for group_pwj_map_ + int64_t pwj_map_element_count = 0; + if (group_pwj_map_ != nullptr) { + pwj_map_element_count = group_pwj_map_->size(); + OB_UNIS_ADD_LEN(pwj_map_element_count); + FOREACH(iter, *group_pwj_map_) { + const uint64_t table_id = iter->first; + const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second; + OB_UNIS_ADD_LEN(table_id); + OB_UNIS_ADD_LEN(group_pwj_tablet_id_info); + } + } else { + OB_UNIS_ADD_LEN(pwj_map_element_count); + } + return len; +} + +int ObExecContext::serialize_group_pwj_map(char *buf, const int64_t buf_len, int64_t &pos) const +{ + int ret = OB_SUCCESS; + // serialize group_pwj_map_ + int64_t pwj_map_element_count = 0; + if (OB_SUCC(ret)) { + if (group_pwj_map_ != nullptr) { + pwj_map_element_count = group_pwj_map_->size(); + OB_UNIS_ENCODE(pwj_map_element_count); + FOREACH_X(iter, *group_pwj_map_, OB_SUCC(ret)) { + const uint64_t table_id = iter->first; + const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second; + OB_UNIS_ENCODE(table_id); + OB_UNIS_ENCODE(group_pwj_tablet_id_info); + } + } else { + OB_UNIS_ENCODE(pwj_map_element_count); + } + } + return ret; +} + +int ObExecContext::deserialize_group_pwj_map(const char *buf, const int64_t data_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + // deserialize size for group_pwj_map_ + int64_t pwj_map_element_count = 0; + OB_UNIS_DECODE(pwj_map_element_count); + if (OB_SUCC(ret) && pwj_map_element_count > 0) { + GroupPWJTabletIdMap *group_pwj_map = nullptr; + uint64_t table_id; + GroupPWJTabletIdInfo group_pwj_tablet_id_info; + if (OB_FAIL(get_group_pwj_map(group_pwj_map))) { + LOG_WARN("failed to get_group_pwj_map"); + } else { + for (int64_t i = 0; i < pwj_map_element_count && OB_SUCC(ret); ++i) { + OB_UNIS_DECODE(table_id); + OB_UNIS_DECODE(group_pwj_tablet_id_info); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) { + LOG_WARN("failed to set refactored", K(table_id), K(pwj_map_element_count), + K(group_pwj_map->size())); + } + } + } + if (OB_SUCC(ret)) { + group_pwj_map_ = group_pwj_map; + } + } + return ret; +} + int ObExecContext::get_sqludt_meta_by_subschema_id(uint16_t subschema_id, ObSqlUDTMeta &udt_meta) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index cdf862ea6..e143e00d0 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -452,8 +452,13 @@ public: ObIArray& get_temp_table_ctx() { return temp_ctx_; } - int get_pwj_map(PWJTabletIdMap *&pwj_map); - PWJTabletIdMap *get_pwj_map() { return pwj_map_; } + int get_group_pwj_map(GroupPWJTabletIdMap *&group_pwj_map); + inline GroupPWJTabletIdMap *get_group_pwj_map() { return group_pwj_map_; } + int deep_copy_group_pwj_map(const GroupPWJTabletIdMap *src); + int64_t get_group_pwj_map_serialize_size() const; + int serialize_group_pwj_map(char *buf, const int64_t buf_len, int64_t &pos) const; + int deserialize_group_pwj_map(const char *buf, const int64_t data_len, int64_t &pos); + void set_partition_id_calc_type(PartitionIdCalcType calc_type) { calc_type_ = calc_type; } PartitionIdCalcType get_partition_id_calc_type() { return calc_type_; } void set_fixed_id(ObObjectID fixed_id) { fixed_id_ = fixed_id; } @@ -648,7 +653,7 @@ protected: // just for convert charset in query response result lib::MemoryContext convert_allocator_; - PWJTabletIdMap* pwj_map_; + GroupPWJTabletIdMap *group_pwj_map_; // the following two parameters only used in calc_partition_id expr PartitionIdCalcType calc_type_; ObObjectID fixed_id_; // fixed part id or fixed subpart ids diff --git a/src/sql/engine/px/ob_dfo.cpp b/src/sql/engine/px/ob_dfo.cpp index 29c8da631..8092b23ee 100644 --- a/src/sql/engine/px/ob_dfo.cpp +++ b/src/sql/engine/px/ob_dfo.cpp @@ -653,6 +653,9 @@ OB_DEF_SERIALIZE(ObPxRpcInitSqcArgs) // can reuse cache from now on (const_cast(ser_cache_)).cache_serialized_ = ser_cache_.enable_serialize_cache_; LST_DO_CODE(OB_UNIS_ENCODE, qc_order_gi_tasks_); + if (OB_SUCC(ret) && sqc_.is_fulltree()) { + ret = exec_ctx_->serialize_group_pwj_map(buf, buf_len, pos); + } LOG_TRACE("serialize sqc", K_(sqc)); LOG_DEBUG("end trace sqc args", K(pos), K(buf_len), K(this->get_serialize_size())); return ret; @@ -705,6 +708,9 @@ OB_DEF_SERIALIZE_SIZE(ObPxRpcInitSqcArgs) LST_DO_CODE(OB_UNIS_ADD_LEN, sqc_); LST_DO_CODE(OB_UNIS_ADD_LEN, qc_order_gi_tasks_); } + if (OB_SUCC(ret) && sqc_.is_fulltree()) { + len += exec_ctx_->get_group_pwj_map_serialize_size(); + } return len; } @@ -789,6 +795,9 @@ int ObPxRpcInitSqcArgs::do_deserialize(int64_t &pos, const char *net_buf, int64_ // if version of qc is old, qc_order_gi_tasks_ will not be serialized and the value will be false. qc_order_gi_tasks_ = false; LST_DO_CODE(OB_UNIS_DECODE, qc_order_gi_tasks_); + if (OB_SUCC(ret) && sqc_.is_fulltree() && pos < data_len) { + ret = exec_ctx_->deserialize_group_pwj_map(buf, data_len, pos); + } LOG_TRACE("deserialize qc order gi tasks", K(qc_order_gi_tasks_), K(sqc_), K(this)); } return ret; @@ -1008,6 +1017,12 @@ int ObPxRpcInitTaskArgs::deep_copy_assign(ObPxRpcInitTaskArgs &src, ret = OB_DESERIALIZE_ERROR; LOG_WARN("data_len and pos mismatch", K(ser_arg_len), K(ser_pos), K(des_pos), K(ret)); } + if (OB_SUCC(ret)) { + if (sqc_handler_->get_sqc_init_arg().sqc_.is_fulltree() + && nullptr != src.exec_ctx_->get_group_pwj_map()) { + exec_ctx_->deep_copy_group_pwj_map(src.exec_ctx_->get_group_pwj_map()); + } + } return ret; } diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 0e0f5fe70..0c0e45784 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -50,6 +50,83 @@ case ERR_CODE: { \ OB_SERIALIZE_MEMBER(ObExprExtraSerializeInfo, *current_time_, *last_trace_id_, *mview_ids_, *last_refresh_scns_); +ObBaseOrderMap::~ObBaseOrderMap() +{ + int ret = OB_SUCCESS; + ClearMapFunc clear_func; + if (OB_FAIL(map_.foreach_refactored(clear_func))) { + LOG_WARN("failed to clear"); + } + map_.destroy(); + allocator_.reset(); +} + +int ObBaseOrderMap::init(int64_t count) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(map_.create(count, ObModIds::OB_SQL_PX))) { + SQL_LOG(WARN, "Failed to create hash table", K(count)); + } + return ret; +} + +int ObBaseOrderMap::add_base_partition_order(int64_t pwj_group_id, + const TabletIdArray &tablet_id_array, + const DASTabletLocIArray &dst_locations) +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + ObTMArray *base_order = nullptr; + if (OB_ISNULL(buf = reinterpret_cast *>( + allocator_.alloc(sizeof(ObTMArray))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory"); + } else if (FALSE_IT(base_order = new(buf) ObTMArray())) { + } else if (OB_FAIL(base_order->reserve(dst_locations.count()))) { + LOG_WARN("fail reserve base order", K(ret), K(dst_locations.count())); + } else if (OB_FAIL(map_.set_refactored(pwj_group_id, base_order))) { + base_order->destroy(); + LOG_WARN("failed to set", K(pwj_group_id)); + } else { + for (int i = 0; i < dst_locations.count() && OB_SUCC(ret); ++i) { + for (int j = 0; j < tablet_id_array.count() && OB_SUCC(ret); ++j) { + if (dst_locations.at(i)->tablet_id_.id() == tablet_id_array.at(j)) { + if (OB_FAIL(base_order->push_back(j))) { + LOG_WARN("fail to push idx into base order", K(ret)); + } + break; + } + } + } + } + return ret; +} + +int ObBaseOrderMap::reorder_partition_as_base_order(int64_t pwj_group_id, + const TabletIdArray &tablet_id_array, + DASTabletLocIArray &dst_locations) +{ + int ret = OB_SUCCESS; + ObIArray *base_order = nullptr; + if (OB_FAIL(map_.get_refactored(pwj_group_id, base_order))) { + LOG_WARN("hash not found", K(pwj_group_id)); + } else if (base_order->count() != dst_locations.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("not match count", K(base_order->count()), K(dst_locations.count())); + } else { + int index = 0; + for (int i = 0; i < base_order->count() && OB_SUCC(ret); ++i) { + for (int j = 0; j < dst_locations.count() && OB_SUCC(ret); ++j) { + if (dst_locations.at(j)->tablet_id_.id() == tablet_id_array.at(base_order->at(i))) { + std::swap(dst_locations.at(j), dst_locations.at(index++)); + break; + } + } + } + } + return ret; +} + // 物理分布策略:对于叶子节点,dfo 分布一般直接按照数据分布来 // Note:如果 dfo 中有两个及以上的 scan,仅仅考虑第一个。并且,要求其余 scan // 的副本分布和第一个 scan 完全一致,否则报错。 @@ -997,7 +1074,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx, ObDASTableLoc *dml_table_loc = nullptr; ObTableID dml_table_location_key = OB_INVALID_ID; ObTableID dml_ref_table_id = OB_INVALID_ID; - ObSEArraybase_order; + ObBaseOrderMap base_order_map; + if (OB_FAIL(base_order_map.init(max(1, scan_ops.count())))) { + LOG_WARN("Failed to init base_order_map"); + } // 处理insert op 对应的partition location信息 if (OB_FAIL(ret) || OB_ISNULL(dml_op)) { // pass @@ -1030,7 +1110,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx, ret = OB_ERR_UNEXPECTED; LOG_WARN("table loc is null", K(ret)); } else if (OB_FAIL(set_sqcs_accessed_location(ctx, base_table_location_key, - dfo, base_order, table_loc, dml_op))) { + dfo, base_order_map, table_loc, dml_op))) { LOG_WARN("failed to set sqc accessed location", K(ret)); } dml_table_loc = table_loc; @@ -1060,7 +1140,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx, // dml op has already set sqc.get_location information, // table scan does not need to be set again OB_ISNULL(dml_op) ? base_table_location_key : OB_INVALID_ID, - dfo, base_order, table_loc, scan_op))) { + dfo, base_order_map, table_loc, scan_op))) { LOG_WARN("failed to set sqc accessed location", K(ret), K(table_location_key), K(ref_table_id), KPC(table_loc)); } @@ -1077,13 +1157,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx, return ret; } - -int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext &ctx, - int64_t base_table_location_key, - ObDfo &dfo, - ObIArray &base_order, - const ObDASTableLoc *table_loc, - const ObOpSpec *phy_op) +int ObPXServerAddrUtil::set_sqcs_accessed_location( + ObExecContext &ctx, int64_t base_table_location_key, ObDfo &dfo, + ObBaseOrderMap &base_order_map, + const ObDASTableLoc *table_loc, const ObOpSpec *phy_op) { int ret = OB_SUCCESS; common::ObArray sqcs; @@ -1111,7 +1188,7 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext &ctx, LOG_WARN("fail to get table scan partition order", K(ret)); } else if (OB_FAIL(ObPXServerAddrUtil::reorder_all_partitions(table_location_key, table_loc->get_ref_table_id(), locations, - temp_locations, asc_order, ctx, base_order))) { + temp_locations, asc_order, ctx, base_order_map))) { // 按照GI要求的访问顺序对当前SQC涉及到的分区进行排序 // 如果是partition wise join场景, 需要根据partition_wise_join要求结合GI要求做asc/desc排序 LOG_WARN("fail to reorder all partitions", K(ret)); @@ -1225,11 +1302,10 @@ private: ObTabletIdxMap *map_; }; -int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, - int64_t ref_table_id, - const DASTabletLocList &src_locations, - DASTabletLocIArray &dst_locations, - bool asc, ObExecContext &exec_ctx, ObIArray &base_order) +int ObPXServerAddrUtil::reorder_all_partitions( + int64_t table_location_key, int64_t ref_table_id, const DASTabletLocList &src_locations, + DASTabletLocIArray &dst_locations, bool asc, ObExecContext &exec_ctx, + ObBaseOrderMap &base_order_map) { int ret = OB_SUCCESS; dst_locations.reset(); @@ -1237,7 +1313,7 @@ int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, ObTabletIdxMap tablet_order_map; if (OB_FAIL(dst_locations.reserve(src_locations.size()))) { LOG_WARN("fail reserve locations", K(ret), K(src_locations.size())); - // virtual table is list parition now, + // virtual table is list partition now, // no actual partition define, can't traverse // table schema for partition info } else if (!is_virtual_table(ref_table_id) && @@ -1264,41 +1340,40 @@ int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, ret = OB_SCHEMA_ERROR; } } - PWJTabletIdMap *pwj_map = NULL; + GroupPWJTabletIdMap *group_pwj_map = nullptr; if (OB_FAIL(ret)) { LOG_WARN("fail to sort locations", K(ret)); - } else if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) { - TabletIdArray tablet_id_array; - if (OB_FAIL(pwj_map->get_refactored(table_location_key, tablet_id_array))) { + } else if (OB_NOT_NULL(group_pwj_map = exec_ctx.get_group_pwj_map())) { + GroupPWJTabletIdInfo group_pwj_tablet_id_info; + TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_; + if (OB_FAIL(group_pwj_map->get_refactored(table_location_key, group_pwj_tablet_id_info))) { if (OB_HASH_NOT_EXIST == ret) { - // map中没有意味着不需要pwj调序 + // means this is not a partition wise join table, do not need to reorder partition ret = OB_SUCCESS; - } - } else if (0 == base_order.count()) { - //TODO @yishen 在partition数量较多的情况, 使用hash map优化. - if (OB_FAIL(base_order.reserve(dst_locations.count()))) { - LOG_WARN("fail reserve base order", K(ret), K(dst_locations.count())); - } - for (int i = 0; i < dst_locations.count() && OB_SUCC(ret); ++i) { - for (int j = 0; j < tablet_id_array.count() && OB_SUCC(ret); ++j) { - if (dst_locations.at(i)->tablet_id_.id() == tablet_id_array.at(j)) { - if (OB_FAIL(base_order.push_back(j))) { - LOG_WARN("fail to push idx into base order", K(ret)); - } - break; - } - } + } else { + LOG_WARN("failed to get_refactored", K(table_location_key)); } } else { - //TODO @yishen 在partition数量较多的情况, 使用hash map优化. - int index = 0; - for (int i = 0; i < base_order.count() && OB_SUCC(ret); ++i) { - for (int j = 0; j < dst_locations.count() && OB_SUCC(ret); ++j) { - if (dst_locations.at(j)->tablet_id_.id() == tablet_id_array.at(base_order.at(i))) { - std::swap(dst_locations.at(j), dst_locations.at(index++)); - break; + // set base order or reorder partition as base order + uint64_t pwj_group_id = group_pwj_tablet_id_info.group_id_; + ObIArray *base_order = nullptr; + if (OB_FAIL(base_order_map.get_map().get_refactored(pwj_group_id, base_order))) { + if (ret == OB_HASH_NOT_EXIST) { + ret = base_order_map.add_base_partition_order(pwj_group_id, tablet_id_array, + dst_locations); + if (ret != OB_SUCCESS) { + LOG_WARN("failed to add_base_partition_order"); + } else { + LOG_TRACE("succ to add_base_partition_order", K(pwj_group_id), K(table_location_key)); } + } else { + LOG_WARN("failed to get_refactored"); } + } else if (OB_FAIL(base_order_map.reorder_partition_as_base_order( + pwj_group_id, tablet_id_array, dst_locations))) { + LOG_WARN("failed to reorder_partition_as_base_order"); + } else { + LOG_TRACE("succ to reorder_partition_as_base_order", K(pwj_group_id), K(table_location_key)); } } } diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index 0c6faf08c..870f0e25c 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -60,6 +60,34 @@ public: common::ObFixedArray *last_refresh_scns_; }; +class ObBaseOrderMap +{ +public: + struct ClearMapFunc + { + int operator()(const hash::HashMapPair *> &entry) { + entry.second->destroy(); + return OB_SUCCESS; + } + }; + ObBaseOrderMap() { + } + ~ObBaseOrderMap(); + int init(int64_t count); + inline hash::ObHashMap *, hash::NoPthreadDefendMode> &get_map() + { + return map_; + } + int add_base_partition_order(int64_t pwj_group_id, const TabletIdArray &tablet_id_array, + const DASTabletLocIArray &dst_locations); + int reorder_partition_as_base_order(int64_t pwj_group_id, + const TabletIdArray &tablet_id_array, + DASTabletLocIArray &dst_locations); +private: + ObArenaAllocator allocator_; + hash::ObHashMap *, hash::NoPthreadDefendMode> map_; +}; + class ObPxSqcUtil { public: @@ -182,11 +210,10 @@ private: int64_t tenant_id, uint64_t ref_table_id, ObTabletIdxMap &idx_map); - static int reorder_all_partitions(int64_t location_key, - int64_t ref_table_id, - const DASTabletLocList &src_locations, - DASTabletLocIArray &tsc_locations, - bool asc, ObExecContext &exec_ctx, ObIArray &base_order); + static int reorder_all_partitions( + int64_t location_key, int64_t ref_table_id, const DASTabletLocList &src_locations, + DASTabletLocIArray &tsc_locations, bool asc, ObExecContext &exec_ctx, + ObBaseOrderMap &base_order_map); static int build_dynamic_partition_table_location(common::ObIArray &scan_ops, const ObIArray *table_locations, ObDfo &dfo); @@ -215,12 +242,10 @@ private: * Add the partition information (table_loc) involved in the * current phy_op to the corresponding SQC access location */ - static int set_sqcs_accessed_location(ObExecContext &ctx, - int64_t base_table_location_key, - ObDfo &dfo, - ObIArray &base_order, - const ObDASTableLoc *table_loc, - const ObOpSpec *phy_op); + static int set_sqcs_accessed_location( + ObExecContext &ctx, int64_t base_table_location_key, ObDfo &dfo, + ObBaseOrderMap &base_order_map, + const ObDASTableLoc *table_loc, const ObOpSpec *phy_op); /** * Get the access sequence of the partition of the current phy_op, * the access sequence of the phy_op partition is determined by diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index ee136441e..11723a8f3 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -11494,15 +11494,11 @@ int ObLogPlan::calc_and_set_exec_pwj_map(ObLocationConstraintContext &location_c ObIArray &base_location_cons = location_constraint.base_table_constraints_; ObIArray &strict_cons = location_constraint.strict_constraints_; const int64_t tbl_count = location_constraint.base_table_constraints_.count(); - ObSEArray pwj_tables; SMART_VAR(ObStrictPwjComparer, strict_pwj_comparer) { PWJTabletIdMap pwj_map; - if (OB_FAIL(pwj_tables.prepare_allocate(tbl_count))) { - LOG_WARN("failed to prepare allocate pwj tables", K(ret)); - } else if (OB_FAIL(pwj_map.create(8, ObModIds::OB_PLAN_EXECUTE))) { + if (OB_FAIL(pwj_map.create(8, ObModIds::OB_PLAN_EXECUTE))) { LOG_WARN("create pwj map failed", K(ret)); } - for (int64_t i = 0; OB_SUCC(ret) && i < strict_cons.count(); ++i) { const ObPwjConstraint *pwj_cons = strict_cons.at(i); if (OB_ISNULL(pwj_cons) || OB_UNLIKELY(pwj_cons->count() <= 1)) { @@ -11515,25 +11511,32 @@ int ObLogPlan::calc_and_set_exec_pwj_map(ObLocationConstraintContext &location_c } if (OB_SUCC(ret)) { - PWJTabletIdMap *exec_pwj_map = NULL; - if (OB_FAIL(exec_ctx->get_pwj_map(exec_pwj_map))) { - LOG_WARN("failed to get exec pwj map", K(ret)); - } else if (OB_FAIL(exec_pwj_map->reuse())) { - LOG_WARN("failed to reuse pwj map", K(ret)); + GroupPWJTabletIdMap *group_pwj_map = nullptr; + if (OB_FAIL(exec_ctx->get_group_pwj_map(group_pwj_map))) { + LOG_WARN("failed to get exec group pwj map", K(ret)); + } else if (OB_FAIL(group_pwj_map->reuse())) { + LOG_WARN("failed to reuse group pwj map", K(ret)); } - for (int64_t i = 0; OB_SUCC(ret) && i < base_location_cons.count(); ++i) { - if (!base_location_cons.at(i).is_multi_part_insert()) { - TabletIdArray tablet_id_array; - if (OB_FAIL(pwj_map.get_refactored(i, tablet_id_array))) { - if (OB_HASH_NOT_EXIST == ret) { - // 没找到说明当前表不需要做partition wise join - ret = OB_SUCCESS; - } else { - LOG_WARN("failed to get refactored", K(ret)); + GroupPWJTabletIdInfo group_pwj_tablet_id_info; + TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_; + for (int64_t group_id = 0; OB_SUCC(ret) && group_id < strict_cons.count(); ++group_id) { + group_pwj_tablet_id_info.group_id_ = group_id; + const ObPwjConstraint *pwj_cons = strict_cons.at(group_id); + for (int64_t i = 0; OB_SUCC(ret) && i < pwj_cons->count(); ++i) { + const int64_t table_idx = pwj_cons->at(i); + uint64_t table_id = base_location_cons.at(table_idx).key_.table_id_; + tablet_id_array.reset(); + if (!base_location_cons.at(table_idx).is_multi_part_insert()) { + if (OB_FAIL(pwj_map.get_refactored(table_idx, tablet_id_array))) { + if (OB_HASH_NOT_EXIST == ret) { + // means this is not a partition wise join table + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get refactored", K(ret)); + } + } else if (OB_FAIL(group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) { + LOG_WARN("failed to set refactored", K(ret)); } - } else if (OB_FAIL(exec_pwj_map->set_refactored(base_location_cons.at(i).key_.table_id_, - tablet_id_array))) { - LOG_WARN("failed to set refactored", K(ret)); } } } diff --git a/src/sql/optimizer/ob_pwj_comparer.cpp b/src/sql/optimizer/ob_pwj_comparer.cpp index 2cd7d26a0..060cd1f03 100644 --- a/src/sql/optimizer/ob_pwj_comparer.cpp +++ b/src/sql/optimizer/ob_pwj_comparer.cpp @@ -110,6 +110,8 @@ int PwjTable::assign(const PwjTable &other) return ret; } +OB_SERIALIZE_MEMBER(GroupPWJTabletIdInfo, group_id_, tablet_id_array_); + void ObPwjComparer::reset() { pwj_tables_.reset(); diff --git a/src/sql/optimizer/ob_pwj_comparer.h b/src/sql/optimizer/ob_pwj_comparer.h index 69ec3099c..8b7c76f75 100644 --- a/src/sql/optimizer/ob_pwj_comparer.h +++ b/src/sql/optimizer/ob_pwj_comparer.h @@ -83,9 +83,31 @@ struct PwjTable { common::ObSEArray all_subpartition_indexes_; }; -// TODO yibo 用PartitionIdArray的指针作为value, 否则每次get都要拷贝一次array typedef common::ObSEArray TabletIdArray; + +struct GroupPWJTabletIdInfo { + OB_UNIS_VERSION(1); +public: + TO_STRING_KV(K_(group_id), K_(tablet_id_array)); + /* + for union all non strict partition wise join, there may be several partition wise join groups, + for example: + union all + | | + join join + | | | | + t1(p0-p15) t2(p0-p15) t3(p0-p8) t4(p0-p8) + + t1 and t2 are in group 0 + t3 and t4 are in group 1 + */ + int64_t group_id_{0}; + TabletIdArray tablet_id_array_; +}; + +// TODO yibo 用PartitionIdArray的指针作为value, 否则每次get都要拷贝一次array typedef common::hash::ObHashMap PWJTabletIdMap; +typedef common::hash::ObHashMap GroupPWJTabletIdMap; typedef common::hash::ObHashMap, common::hash::equal_to, diff --git a/src/sql/plan_cache/ob_plan_match_helper.cpp b/src/sql/plan_cache/ob_plan_match_helper.cpp index 4bab7d9f3..65813672c 100644 --- a/src/sql/plan_cache/ob_plan_match_helper.cpp +++ b/src/sql/plan_cache/ob_plan_match_helper.cpp @@ -82,32 +82,31 @@ int ObPlanMatchHelper::match_plan(const ObPlanCacheCtx &pc_ctx, use_pwj_map = true; } - if (OB_SUCC(ret) && is_matched) { - PWJTabletIdMap *exec_pwj_map = NULL; - ObDASCtx &das_ctx = DAS_CTX(pc_ctx.exec_ctx_); - if (use_pwj_map) { - if (OB_FAIL(pc_ctx.exec_ctx_.get_pwj_map(exec_pwj_map))) { - LOG_WARN("failed to get exec pwj map", K(ret)); - } else if (OB_FAIL(exec_pwj_map->reuse())) { - LOG_WARN("failed to reuse pwj map", K(ret)); - } + if (OB_SUCC(ret) && is_matched && use_pwj_map) { + GroupPWJTabletIdMap *exec_group_pwj_map = nullptr; + if (OB_FAIL(pc_ctx.exec_ctx_.get_group_pwj_map(exec_group_pwj_map))) { + LOG_WARN("failed to get exec group pwj map", K(ret)); + } else if (OB_FAIL(exec_group_pwj_map->reuse())) { + LOG_WARN("failed to reuse pwj map", K(ret)); } - for (int64_t i = 0; OB_SUCC(ret) && i < base_cons.count(); ++i) { - // in the case of multi part insert, only the location constraint is matched, but the - // corresponding phy table location information does not need to be added to table_locs - if (!base_cons.at(i).is_multi_part_insert()) { - ObCandiTableLoc &src_location = phy_tbl_infos.at(i); - if (use_pwj_map) { - TabletIdArray tablet_id_array; - if (OB_FAIL(pwj_map.get_refactored(i, tablet_id_array))) { + GroupPWJTabletIdInfo group_pwj_tablet_id_info; + TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_; + for (int64_t group_id = 0; OB_SUCC(ret) && group_id < strict_cons.count(); ++group_id) { + group_pwj_tablet_id_info.group_id_ = group_id; + const ObPlanPwjConstraint &pwj_cons = strict_cons.at(group_id); + for (int64_t i = 0; OB_SUCC(ret) && i < pwj_cons.count(); ++i) { + const int64_t table_idx = pwj_cons.at(i); + uint64_t table_id = base_cons.at(table_idx).key_.table_id_; + tablet_id_array.reset(); + if (!base_cons.at(table_idx).is_multi_part_insert()) { + if (OB_FAIL(pwj_map.get_refactored(table_idx, tablet_id_array))) { if (OB_HASH_NOT_EXIST == ret) { - // 没找到说明当前表不需要做partition wise join + // means this is not a partition wise join table ret = OB_SUCCESS; } else { LOG_WARN("failed to get refactored", K(ret)); } - } else if (OB_FAIL(exec_pwj_map->set_refactored(base_cons.at(i).key_.table_id_, - tablet_id_array))) { + } else if (OB_FAIL(exec_group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) { LOG_WARN("failed to set refactored", K(ret)); } }