fix non strict constraints partition wise join reorder partition bug

This commit is contained in:
obdev 2024-08-26 09:48:20 +00:00 committed by ob-robot
parent 9895d573eb
commit bde6581619
9 changed files with 359 additions and 115 deletions

View File

@ -108,7 +108,7 @@ ObExecContext::ObExecContext(ObIAllocator &allocator)
frame_cnt_(0),
op_kit_store_(),
convert_allocator_(nullptr),
pwj_map_(nullptr),
group_pwj_map_(nullptr),
calc_type_(CALC_NORMAL),
fixed_id_(OB_INVALID_ID),
check_status_times_(0),
@ -169,9 +169,9 @@ ObExecContext::~ObExecContext()
package_guard_->~ObPLPackageGuard();
package_guard_ = NULL;
}
if (OB_NOT_NULL(pwj_map_)) {
pwj_map_->destroy();
pwj_map_ = NULL;
if (OB_NOT_NULL(group_pwj_map_)) {
group_pwj_map_->destroy();
group_pwj_map_ = nullptr;
}
if (OB_NOT_NULL(vt_ift_)) {
vt_ift_->~ObIVirtualTableIteratorFactory();
@ -840,24 +840,50 @@ int ObExecContext::add_row_id_list(const common::ObIArray<int64_t> *row_id_list)
return ret;
}
int ObExecContext::get_pwj_map(PWJTabletIdMap *&pwj_map)
int ObExecContext::get_group_pwj_map(GroupPWJTabletIdMap *&group_pwj_map)
{
int ret = OB_SUCCESS;
pwj_map = nullptr;
if (nullptr == pwj_map_) {
void *buf = allocator_.alloc(sizeof(PWJTabletIdMap));
group_pwj_map = nullptr;
if (nullptr == group_pwj_map_) {
void *buf = allocator_.alloc(sizeof(GroupPWJTabletIdMap));
if (nullptr == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to allocate memories", K(ret));
} else if (FALSE_IT(pwj_map_ = new(buf) PWJTabletIdMap())) {
} else if (OB_FAIL(pwj_map_->create(PARTITION_WISE_JOIN_TSC_HASH_BUCKET_NUM, /* assume no more than 8 table scan in a plan */
ObModIds::OB_SQL_PX))) {
LOG_WARN("Failed to create gi task map", K(ret));
} else {
pwj_map = pwj_map_;
group_pwj_map_ = new (buf) GroupPWJTabletIdMap();
/* assume no more than 8table scan in a plan */
if (OB_FAIL(group_pwj_map_->create(PARTITION_WISE_JOIN_TSC_HASH_BUCKET_NUM, ObModIds::OB_SQL_PX))) {
LOG_WARN("Failed to create group_pwj_map_", K(ret));
} else {
group_pwj_map = group_pwj_map_;
}
}
} else {
pwj_map = pwj_map_;
group_pwj_map = group_pwj_map_;
}
return ret;
}
int ObExecContext::deep_copy_group_pwj_map(const GroupPWJTabletIdMap *src)
{
int ret = OB_SUCCESS;
GroupPWJTabletIdMap *des = nullptr;
if (OB_ISNULL(src)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null");
} else if (OB_FAIL(get_group_pwj_map(des))) {
LOG_WARN("failed to get_group_pwj_map");
} else if (des->size() > 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("size should be 0", K(des->size()), K(src->size()));
} else {
FOREACH_X(iter, *src, OB_SUCC(ret)) {
const uint64_t table_id = iter->first;
const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second;
if (OB_FAIL(des->set_refactored(table_id, group_pwj_tablet_id_info))) {
LOG_WARN("failed to set refactored", K(table_id));
}
}
}
return ret;
}
@ -1040,6 +1066,78 @@ DEFINE_GET_SERIALIZE_SIZE(ObExecContext)
return len;
}
int64_t ObExecContext::get_group_pwj_map_serialize_size() const
{
int64_t len = 0;
// add serialize size for group_pwj_map_
int64_t pwj_map_element_count = 0;
if (group_pwj_map_ != nullptr) {
pwj_map_element_count = group_pwj_map_->size();
OB_UNIS_ADD_LEN(pwj_map_element_count);
FOREACH(iter, *group_pwj_map_) {
const uint64_t table_id = iter->first;
const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second;
OB_UNIS_ADD_LEN(table_id);
OB_UNIS_ADD_LEN(group_pwj_tablet_id_info);
}
} else {
OB_UNIS_ADD_LEN(pwj_map_element_count);
}
return len;
}
int ObExecContext::serialize_group_pwj_map(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
// serialize group_pwj_map_
int64_t pwj_map_element_count = 0;
if (OB_SUCC(ret)) {
if (group_pwj_map_ != nullptr) {
pwj_map_element_count = group_pwj_map_->size();
OB_UNIS_ENCODE(pwj_map_element_count);
FOREACH_X(iter, *group_pwj_map_, OB_SUCC(ret)) {
const uint64_t table_id = iter->first;
const GroupPWJTabletIdInfo &group_pwj_tablet_id_info = iter->second;
OB_UNIS_ENCODE(table_id);
OB_UNIS_ENCODE(group_pwj_tablet_id_info);
}
} else {
OB_UNIS_ENCODE(pwj_map_element_count);
}
}
return ret;
}
int ObExecContext::deserialize_group_pwj_map(const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
// deserialize size for group_pwj_map_
int64_t pwj_map_element_count = 0;
OB_UNIS_DECODE(pwj_map_element_count);
if (OB_SUCC(ret) && pwj_map_element_count > 0) {
GroupPWJTabletIdMap *group_pwj_map = nullptr;
uint64_t table_id;
GroupPWJTabletIdInfo group_pwj_tablet_id_info;
if (OB_FAIL(get_group_pwj_map(group_pwj_map))) {
LOG_WARN("failed to get_group_pwj_map");
} else {
for (int64_t i = 0; i < pwj_map_element_count && OB_SUCC(ret); ++i) {
OB_UNIS_DECODE(table_id);
OB_UNIS_DECODE(group_pwj_tablet_id_info);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) {
LOG_WARN("failed to set refactored", K(table_id), K(pwj_map_element_count),
K(group_pwj_map->size()));
}
}
}
if (OB_SUCC(ret)) {
group_pwj_map_ = group_pwj_map;
}
}
return ret;
}
int ObExecContext::get_sqludt_meta_by_subschema_id(uint16_t subschema_id, ObSqlUDTMeta &udt_meta)
{
int ret = OB_SUCCESS;

View File

@ -452,8 +452,13 @@ public:
ObIArray<ObSqlTempTableCtx>& get_temp_table_ctx() { return temp_ctx_; }
int get_pwj_map(PWJTabletIdMap *&pwj_map);
PWJTabletIdMap *get_pwj_map() { return pwj_map_; }
int get_group_pwj_map(GroupPWJTabletIdMap *&group_pwj_map);
inline GroupPWJTabletIdMap *get_group_pwj_map() { return group_pwj_map_; }
int deep_copy_group_pwj_map(const GroupPWJTabletIdMap *src);
int64_t get_group_pwj_map_serialize_size() const;
int serialize_group_pwj_map(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize_group_pwj_map(const char *buf, const int64_t data_len, int64_t &pos);
void set_partition_id_calc_type(PartitionIdCalcType calc_type) { calc_type_ = calc_type; }
PartitionIdCalcType get_partition_id_calc_type() { return calc_type_; }
void set_fixed_id(ObObjectID fixed_id) { fixed_id_ = fixed_id; }
@ -648,7 +653,7 @@ protected:
// just for convert charset in query response result
lib::MemoryContext convert_allocator_;
PWJTabletIdMap* pwj_map_;
GroupPWJTabletIdMap *group_pwj_map_;
// the following two parameters only used in calc_partition_id expr
PartitionIdCalcType calc_type_;
ObObjectID fixed_id_; // fixed part id or fixed subpart ids

View File

@ -653,6 +653,9 @@ OB_DEF_SERIALIZE(ObPxRpcInitSqcArgs)
// can reuse cache from now on
(const_cast<ObSqcSerializeCache &>(ser_cache_)).cache_serialized_ = ser_cache_.enable_serialize_cache_;
LST_DO_CODE(OB_UNIS_ENCODE, qc_order_gi_tasks_);
if (OB_SUCC(ret) && sqc_.is_fulltree()) {
ret = exec_ctx_->serialize_group_pwj_map(buf, buf_len, pos);
}
LOG_TRACE("serialize sqc", K_(sqc));
LOG_DEBUG("end trace sqc args", K(pos), K(buf_len), K(this->get_serialize_size()));
return ret;
@ -705,6 +708,9 @@ OB_DEF_SERIALIZE_SIZE(ObPxRpcInitSqcArgs)
LST_DO_CODE(OB_UNIS_ADD_LEN, sqc_);
LST_DO_CODE(OB_UNIS_ADD_LEN, qc_order_gi_tasks_);
}
if (OB_SUCC(ret) && sqc_.is_fulltree()) {
len += exec_ctx_->get_group_pwj_map_serialize_size();
}
return len;
}
@ -789,6 +795,9 @@ int ObPxRpcInitSqcArgs::do_deserialize(int64_t &pos, const char *net_buf, int64_
// if version of qc is old, qc_order_gi_tasks_ will not be serialized and the value will be false.
qc_order_gi_tasks_ = false;
LST_DO_CODE(OB_UNIS_DECODE, qc_order_gi_tasks_);
if (OB_SUCC(ret) && sqc_.is_fulltree() && pos < data_len) {
ret = exec_ctx_->deserialize_group_pwj_map(buf, data_len, pos);
}
LOG_TRACE("deserialize qc order gi tasks", K(qc_order_gi_tasks_), K(sqc_), K(this));
}
return ret;
@ -1008,6 +1017,12 @@ int ObPxRpcInitTaskArgs::deep_copy_assign(ObPxRpcInitTaskArgs &src,
ret = OB_DESERIALIZE_ERROR;
LOG_WARN("data_len and pos mismatch", K(ser_arg_len), K(ser_pos), K(des_pos), K(ret));
}
if (OB_SUCC(ret)) {
if (sqc_handler_->get_sqc_init_arg().sqc_.is_fulltree()
&& nullptr != src.exec_ctx_->get_group_pwj_map()) {
exec_ctx_->deep_copy_group_pwj_map(src.exec_ctx_->get_group_pwj_map());
}
}
return ret;
}

View File

@ -50,6 +50,83 @@ case ERR_CODE: { \
OB_SERIALIZE_MEMBER(ObExprExtraSerializeInfo, *current_time_, *last_trace_id_, *mview_ids_, *last_refresh_scns_);
ObBaseOrderMap::~ObBaseOrderMap()
{
int ret = OB_SUCCESS;
ClearMapFunc clear_func;
if (OB_FAIL(map_.foreach_refactored(clear_func))) {
LOG_WARN("failed to clear");
}
map_.destroy();
allocator_.reset();
}
int ObBaseOrderMap::init(int64_t count)
{
int ret = OB_SUCCESS;
if (OB_FAIL(map_.create(count, ObModIds::OB_SQL_PX))) {
SQL_LOG(WARN, "Failed to create hash table", K(count));
}
return ret;
}
int ObBaseOrderMap::add_base_partition_order(int64_t pwj_group_id,
const TabletIdArray &tablet_id_array,
const DASTabletLocIArray &dst_locations)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
ObTMArray<int64_t> *base_order = nullptr;
if (OB_ISNULL(buf = reinterpret_cast<ObTMArray<int64_t> *>(
allocator_.alloc(sizeof(ObTMArray<int64_t>))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory");
} else if (FALSE_IT(base_order = new(buf) ObTMArray<int64_t>())) {
} else if (OB_FAIL(base_order->reserve(dst_locations.count()))) {
LOG_WARN("fail reserve base order", K(ret), K(dst_locations.count()));
} else if (OB_FAIL(map_.set_refactored(pwj_group_id, base_order))) {
base_order->destroy();
LOG_WARN("failed to set", K(pwj_group_id));
} else {
for (int i = 0; i < dst_locations.count() && OB_SUCC(ret); ++i) {
for (int j = 0; j < tablet_id_array.count() && OB_SUCC(ret); ++j) {
if (dst_locations.at(i)->tablet_id_.id() == tablet_id_array.at(j)) {
if (OB_FAIL(base_order->push_back(j))) {
LOG_WARN("fail to push idx into base order", K(ret));
}
break;
}
}
}
}
return ret;
}
int ObBaseOrderMap::reorder_partition_as_base_order(int64_t pwj_group_id,
const TabletIdArray &tablet_id_array,
DASTabletLocIArray &dst_locations)
{
int ret = OB_SUCCESS;
ObIArray<int64_t> *base_order = nullptr;
if (OB_FAIL(map_.get_refactored(pwj_group_id, base_order))) {
LOG_WARN("hash not found", K(pwj_group_id));
} else if (base_order->count() != dst_locations.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not match count", K(base_order->count()), K(dst_locations.count()));
} else {
int index = 0;
for (int i = 0; i < base_order->count() && OB_SUCC(ret); ++i) {
for (int j = 0; j < dst_locations.count() && OB_SUCC(ret); ++j) {
if (dst_locations.at(j)->tablet_id_.id() == tablet_id_array.at(base_order->at(i))) {
std::swap(dst_locations.at(j), dst_locations.at(index++));
break;
}
}
}
}
return ret;
}
// 物理分布策略:对于叶子节点,dfo 分布一般直接按照数据分布来
// Note:如果 dfo 中有两个及以上的 scan,仅仅考虑第一个。并且,要求其余 scan
// 的副本分布和第一个 scan 完全一致,否则报错。
@ -997,7 +1074,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx,
ObDASTableLoc *dml_table_loc = nullptr;
ObTableID dml_table_location_key = OB_INVALID_ID;
ObTableID dml_ref_table_id = OB_INVALID_ID;
ObSEArray<int64_t, 2>base_order;
ObBaseOrderMap base_order_map;
if (OB_FAIL(base_order_map.init(max(1, scan_ops.count())))) {
LOG_WARN("Failed to init base_order_map");
}
// 处理insert op 对应的partition location信息
if (OB_FAIL(ret) || OB_ISNULL(dml_op)) {
// pass
@ -1030,7 +1110,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table loc is null", K(ret));
} else if (OB_FAIL(set_sqcs_accessed_location(ctx, base_table_location_key,
dfo, base_order, table_loc, dml_op))) {
dfo, base_order_map, table_loc, dml_op))) {
LOG_WARN("failed to set sqc accessed location", K(ret));
}
dml_table_loc = table_loc;
@ -1060,7 +1140,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx,
// dml op has already set sqc.get_location information,
// table scan does not need to be set again
OB_ISNULL(dml_op) ? base_table_location_key : OB_INVALID_ID,
dfo, base_order, table_loc, scan_op))) {
dfo, base_order_map, table_loc, scan_op))) {
LOG_WARN("failed to set sqc accessed location", K(ret), K(table_location_key),
K(ref_table_id), KPC(table_loc));
}
@ -1077,13 +1157,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location(ObExecContext &ctx,
return ret;
}
int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext &ctx,
int64_t base_table_location_key,
ObDfo &dfo,
ObIArray<int64_t> &base_order,
const ObDASTableLoc *table_loc,
const ObOpSpec *phy_op)
int ObPXServerAddrUtil::set_sqcs_accessed_location(
ObExecContext &ctx, int64_t base_table_location_key, ObDfo &dfo,
ObBaseOrderMap &base_order_map,
const ObDASTableLoc *table_loc, const ObOpSpec *phy_op)
{
int ret = OB_SUCCESS;
common::ObArray<ObPxSqcMeta *> sqcs;
@ -1111,7 +1188,7 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext &ctx,
LOG_WARN("fail to get table scan partition order", K(ret));
} else if (OB_FAIL(ObPXServerAddrUtil::reorder_all_partitions(table_location_key,
table_loc->get_ref_table_id(), locations,
temp_locations, asc_order, ctx, base_order))) {
temp_locations, asc_order, ctx, base_order_map))) {
// 按照GI要求的访问顺序对当前SQC涉及到的分区进行排序
// 如果是partition wise join场景, 需要根据partition_wise_join要求结合GI要求做asc/desc排序
LOG_WARN("fail to reorder all partitions", K(ret));
@ -1225,11 +1302,10 @@ private:
ObTabletIdxMap *map_;
};
int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key,
int64_t ref_table_id,
const DASTabletLocList &src_locations,
DASTabletLocIArray &dst_locations,
bool asc, ObExecContext &exec_ctx, ObIArray<int64_t> &base_order)
int ObPXServerAddrUtil::reorder_all_partitions(
int64_t table_location_key, int64_t ref_table_id, const DASTabletLocList &src_locations,
DASTabletLocIArray &dst_locations, bool asc, ObExecContext &exec_ctx,
ObBaseOrderMap &base_order_map)
{
int ret = OB_SUCCESS;
dst_locations.reset();
@ -1237,7 +1313,7 @@ int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key,
ObTabletIdxMap tablet_order_map;
if (OB_FAIL(dst_locations.reserve(src_locations.size()))) {
LOG_WARN("fail reserve locations", K(ret), K(src_locations.size()));
// virtual table is list parition now,
// virtual table is list partition now,
// no actual partition define, can't traverse
// table schema for partition info
} else if (!is_virtual_table(ref_table_id) &&
@ -1264,41 +1340,40 @@ int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key,
ret = OB_SCHEMA_ERROR;
}
}
PWJTabletIdMap *pwj_map = NULL;
GroupPWJTabletIdMap *group_pwj_map = nullptr;
if (OB_FAIL(ret)) {
LOG_WARN("fail to sort locations", K(ret));
} else if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) {
TabletIdArray tablet_id_array;
if (OB_FAIL(pwj_map->get_refactored(table_location_key, tablet_id_array))) {
} else if (OB_NOT_NULL(group_pwj_map = exec_ctx.get_group_pwj_map())) {
GroupPWJTabletIdInfo group_pwj_tablet_id_info;
TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_;
if (OB_FAIL(group_pwj_map->get_refactored(table_location_key, group_pwj_tablet_id_info))) {
if (OB_HASH_NOT_EXIST == ret) {
// map中没有意味着不需要pwj调序
// means this is not a partition wise join table, do not need to reorder partition
ret = OB_SUCCESS;
}
} else if (0 == base_order.count()) {
//TODO @yishen 在partition数量较多的情况, 使用hash map优化.
if (OB_FAIL(base_order.reserve(dst_locations.count()))) {
LOG_WARN("fail reserve base order", K(ret), K(dst_locations.count()));
}
for (int i = 0; i < dst_locations.count() && OB_SUCC(ret); ++i) {
for (int j = 0; j < tablet_id_array.count() && OB_SUCC(ret); ++j) {
if (dst_locations.at(i)->tablet_id_.id() == tablet_id_array.at(j)) {
if (OB_FAIL(base_order.push_back(j))) {
LOG_WARN("fail to push idx into base order", K(ret));
}
break;
}
}
} else {
LOG_WARN("failed to get_refactored", K(table_location_key));
}
} else {
//TODO @yishen 在partition数量较多的情况, 使用hash map优化.
int index = 0;
for (int i = 0; i < base_order.count() && OB_SUCC(ret); ++i) {
for (int j = 0; j < dst_locations.count() && OB_SUCC(ret); ++j) {
if (dst_locations.at(j)->tablet_id_.id() == tablet_id_array.at(base_order.at(i))) {
std::swap(dst_locations.at(j), dst_locations.at(index++));
break;
// set base order or reorder partition as base order
uint64_t pwj_group_id = group_pwj_tablet_id_info.group_id_;
ObIArray<int64_t> *base_order = nullptr;
if (OB_FAIL(base_order_map.get_map().get_refactored(pwj_group_id, base_order))) {
if (ret == OB_HASH_NOT_EXIST) {
ret = base_order_map.add_base_partition_order(pwj_group_id, tablet_id_array,
dst_locations);
if (ret != OB_SUCCESS) {
LOG_WARN("failed to add_base_partition_order");
} else {
LOG_TRACE("succ to add_base_partition_order", K(pwj_group_id), K(table_location_key));
}
} else {
LOG_WARN("failed to get_refactored");
}
} else if (OB_FAIL(base_order_map.reorder_partition_as_base_order(
pwj_group_id, tablet_id_array, dst_locations))) {
LOG_WARN("failed to reorder_partition_as_base_order");
} else {
LOG_TRACE("succ to reorder_partition_as_base_order", K(pwj_group_id), K(table_location_key));
}
}
}

View File

@ -60,6 +60,34 @@ public:
common::ObFixedArray<uint64_t, common::ObIAllocator> *last_refresh_scns_;
};
class ObBaseOrderMap
{
public:
struct ClearMapFunc
{
int operator()(const hash::HashMapPair<int64_t, ObIArray<int64_t> *> &entry) {
entry.second->destroy();
return OB_SUCCESS;
}
};
ObBaseOrderMap() {
}
~ObBaseOrderMap();
int init(int64_t count);
inline hash::ObHashMap<int64_t, ObIArray<int64_t> *, hash::NoPthreadDefendMode> &get_map()
{
return map_;
}
int add_base_partition_order(int64_t pwj_group_id, const TabletIdArray &tablet_id_array,
const DASTabletLocIArray &dst_locations);
int reorder_partition_as_base_order(int64_t pwj_group_id,
const TabletIdArray &tablet_id_array,
DASTabletLocIArray &dst_locations);
private:
ObArenaAllocator allocator_;
hash::ObHashMap<int64_t, ObIArray<int64_t> *, hash::NoPthreadDefendMode> map_;
};
class ObPxSqcUtil
{
public:
@ -182,11 +210,10 @@ private:
int64_t tenant_id,
uint64_t ref_table_id,
ObTabletIdxMap &idx_map);
static int reorder_all_partitions(int64_t location_key,
int64_t ref_table_id,
const DASTabletLocList &src_locations,
DASTabletLocIArray &tsc_locations,
bool asc, ObExecContext &exec_ctx, ObIArray<int64_t> &base_order);
static int reorder_all_partitions(
int64_t location_key, int64_t ref_table_id, const DASTabletLocList &src_locations,
DASTabletLocIArray &tsc_locations, bool asc, ObExecContext &exec_ctx,
ObBaseOrderMap &base_order_map);
static int build_dynamic_partition_table_location(common::ObIArray<const ObTableScanSpec*> &scan_ops,
const ObIArray<ObTableLocation> *table_locations, ObDfo &dfo);
@ -215,12 +242,10 @@ private:
* Add the partition information (table_loc) involved in the
* current phy_op to the corresponding SQC access location
*/
static int set_sqcs_accessed_location(ObExecContext &ctx,
int64_t base_table_location_key,
ObDfo &dfo,
ObIArray<int64_t> &base_order,
const ObDASTableLoc *table_loc,
const ObOpSpec *phy_op);
static int set_sqcs_accessed_location(
ObExecContext &ctx, int64_t base_table_location_key, ObDfo &dfo,
ObBaseOrderMap &base_order_map,
const ObDASTableLoc *table_loc, const ObOpSpec *phy_op);
/**
* Get the access sequence of the partition of the current phy_op,
* the access sequence of the phy_op partition is determined by

View File

@ -11494,15 +11494,11 @@ int ObLogPlan::calc_and_set_exec_pwj_map(ObLocationConstraintContext &location_c
ObIArray<LocationConstraint> &base_location_cons = location_constraint.base_table_constraints_;
ObIArray<ObPwjConstraint *> &strict_cons = location_constraint.strict_constraints_;
const int64_t tbl_count = location_constraint.base_table_constraints_.count();
ObSEArray<PwjTable, 4> pwj_tables;
SMART_VAR(ObStrictPwjComparer, strict_pwj_comparer) {
PWJTabletIdMap pwj_map;
if (OB_FAIL(pwj_tables.prepare_allocate(tbl_count))) {
LOG_WARN("failed to prepare allocate pwj tables", K(ret));
} else if (OB_FAIL(pwj_map.create(8, ObModIds::OB_PLAN_EXECUTE))) {
if (OB_FAIL(pwj_map.create(8, ObModIds::OB_PLAN_EXECUTE))) {
LOG_WARN("create pwj map failed", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < strict_cons.count(); ++i) {
const ObPwjConstraint *pwj_cons = strict_cons.at(i);
if (OB_ISNULL(pwj_cons) || OB_UNLIKELY(pwj_cons->count() <= 1)) {
@ -11515,25 +11511,32 @@ int ObLogPlan::calc_and_set_exec_pwj_map(ObLocationConstraintContext &location_c
}
if (OB_SUCC(ret)) {
PWJTabletIdMap *exec_pwj_map = NULL;
if (OB_FAIL(exec_ctx->get_pwj_map(exec_pwj_map))) {
LOG_WARN("failed to get exec pwj map", K(ret));
} else if (OB_FAIL(exec_pwj_map->reuse())) {
LOG_WARN("failed to reuse pwj map", K(ret));
GroupPWJTabletIdMap *group_pwj_map = nullptr;
if (OB_FAIL(exec_ctx->get_group_pwj_map(group_pwj_map))) {
LOG_WARN("failed to get exec group pwj map", K(ret));
} else if (OB_FAIL(group_pwj_map->reuse())) {
LOG_WARN("failed to reuse group pwj map", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < base_location_cons.count(); ++i) {
if (!base_location_cons.at(i).is_multi_part_insert()) {
TabletIdArray tablet_id_array;
if (OB_FAIL(pwj_map.get_refactored(i, tablet_id_array))) {
if (OB_HASH_NOT_EXIST == ret) {
// 没找到说明当前表不需要做partition wise join
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get refactored", K(ret));
GroupPWJTabletIdInfo group_pwj_tablet_id_info;
TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_;
for (int64_t group_id = 0; OB_SUCC(ret) && group_id < strict_cons.count(); ++group_id) {
group_pwj_tablet_id_info.group_id_ = group_id;
const ObPwjConstraint *pwj_cons = strict_cons.at(group_id);
for (int64_t i = 0; OB_SUCC(ret) && i < pwj_cons->count(); ++i) {
const int64_t table_idx = pwj_cons->at(i);
uint64_t table_id = base_location_cons.at(table_idx).key_.table_id_;
tablet_id_array.reset();
if (!base_location_cons.at(table_idx).is_multi_part_insert()) {
if (OB_FAIL(pwj_map.get_refactored(table_idx, tablet_id_array))) {
if (OB_HASH_NOT_EXIST == ret) {
// means this is not a partition wise join table
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get refactored", K(ret));
}
} else if (OB_FAIL(group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) {
LOG_WARN("failed to set refactored", K(ret));
}
} else if (OB_FAIL(exec_pwj_map->set_refactored(base_location_cons.at(i).key_.table_id_,
tablet_id_array))) {
LOG_WARN("failed to set refactored", K(ret));
}
}
}

View File

@ -110,6 +110,8 @@ int PwjTable::assign(const PwjTable &other)
return ret;
}
OB_SERIALIZE_MEMBER(GroupPWJTabletIdInfo, group_id_, tablet_id_array_);
void ObPwjComparer::reset()
{
pwj_tables_.reset();

View File

@ -83,9 +83,31 @@ struct PwjTable {
common::ObSEArray<int64_t, 8> all_subpartition_indexes_;
};
// TODO yibo 用PartitionIdArray的指针作为value, 否则每次get都要拷贝一次array
typedef common::ObSEArray<uint64_t, 8> TabletIdArray;
struct GroupPWJTabletIdInfo {
OB_UNIS_VERSION(1);
public:
TO_STRING_KV(K_(group_id), K_(tablet_id_array));
/*
for union all non strict partition wise join, there may be several partition wise join groups,
for example:
union all
| |
join join
| | | |
t1(p0-p15) t2(p0-p15) t3(p0-p8) t4(p0-p8)
t1 and t2 are in group 0
t3 and t4 are in group 1
*/
int64_t group_id_{0};
TabletIdArray tablet_id_array_;
};
// TODO yibo 用PartitionIdArray的指针作为value, 否则每次get都要拷贝一次array
typedef common::hash::ObHashMap<uint64_t, TabletIdArray, common::hash::NoPthreadDefendMode> PWJTabletIdMap;
typedef common::hash::ObHashMap<uint64_t, GroupPWJTabletIdInfo, common::hash::NoPthreadDefendMode> GroupPWJTabletIdMap;
typedef common::hash::ObHashMap<uint64_t, uint64_t, common::hash::NoPthreadDefendMode,
common::hash::hash_func<uint64_t>,
common::hash::equal_to<uint64_t>,

View File

@ -82,32 +82,31 @@ int ObPlanMatchHelper::match_plan(const ObPlanCacheCtx &pc_ctx,
use_pwj_map = true;
}
if (OB_SUCC(ret) && is_matched) {
PWJTabletIdMap *exec_pwj_map = NULL;
ObDASCtx &das_ctx = DAS_CTX(pc_ctx.exec_ctx_);
if (use_pwj_map) {
if (OB_FAIL(pc_ctx.exec_ctx_.get_pwj_map(exec_pwj_map))) {
LOG_WARN("failed to get exec pwj map", K(ret));
} else if (OB_FAIL(exec_pwj_map->reuse())) {
LOG_WARN("failed to reuse pwj map", K(ret));
}
if (OB_SUCC(ret) && is_matched && use_pwj_map) {
GroupPWJTabletIdMap *exec_group_pwj_map = nullptr;
if (OB_FAIL(pc_ctx.exec_ctx_.get_group_pwj_map(exec_group_pwj_map))) {
LOG_WARN("failed to get exec group pwj map", K(ret));
} else if (OB_FAIL(exec_group_pwj_map->reuse())) {
LOG_WARN("failed to reuse pwj map", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < base_cons.count(); ++i) {
// in the case of multi part insert, only the location constraint is matched, but the
// corresponding phy table location information does not need to be added to table_locs
if (!base_cons.at(i).is_multi_part_insert()) {
ObCandiTableLoc &src_location = phy_tbl_infos.at(i);
if (use_pwj_map) {
TabletIdArray tablet_id_array;
if (OB_FAIL(pwj_map.get_refactored(i, tablet_id_array))) {
GroupPWJTabletIdInfo group_pwj_tablet_id_info;
TabletIdArray &tablet_id_array = group_pwj_tablet_id_info.tablet_id_array_;
for (int64_t group_id = 0; OB_SUCC(ret) && group_id < strict_cons.count(); ++group_id) {
group_pwj_tablet_id_info.group_id_ = group_id;
const ObPlanPwjConstraint &pwj_cons = strict_cons.at(group_id);
for (int64_t i = 0; OB_SUCC(ret) && i < pwj_cons.count(); ++i) {
const int64_t table_idx = pwj_cons.at(i);
uint64_t table_id = base_cons.at(table_idx).key_.table_id_;
tablet_id_array.reset();
if (!base_cons.at(table_idx).is_multi_part_insert()) {
if (OB_FAIL(pwj_map.get_refactored(table_idx, tablet_id_array))) {
if (OB_HASH_NOT_EXIST == ret) {
// 没找到说明当前表不需要做partition wise join
// means this is not a partition wise join table
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get refactored", K(ret));
}
} else if (OB_FAIL(exec_pwj_map->set_refactored(base_cons.at(i).key_.table_id_,
tablet_id_array))) {
} else if (OB_FAIL(exec_group_pwj_map->set_refactored(table_id, group_pwj_tablet_id_info))) {
LOG_WARN("failed to set refactored", K(ret));
}
}