Fix bug in building balance group

This commit is contained in:
ZhenNan0 2023-07-13 15:12:24 +00:00 committed by ob-robot
parent 5e15f1b92a
commit dff6229dbd
11 changed files with 87 additions and 69 deletions

View File

@ -23,11 +23,11 @@
#define ISTAT(fmt, args...) FLOG_INFO("[BALANCE_GROUP_BUILDER] " fmt, K_(mod), ##args)
#define WSTAT(fmt, args...) FLOG_WARN("[BALANCE_GROUP_BUILDER] " fmt, K_(mod), ##args)
#define ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg) \
#define ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid) \
do {\
if (OB_FAIL(add_new_part_(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg))) {\
if (OB_FAIL(add_new_part_(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid))) {\
LOG_WARN("add new partition fail", KR(ret), K(bg), K(table_id), K(part_object_id), \
K(dest_ls_id), K(in_new_pg));\
K(dest_ls_id), K(in_new_pg), K(part_group_uid));\
}\
} while (0)
@ -315,6 +315,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_none_(
} else {
ObLSID dest_ls_id; // binding to the first table first tablet
bool in_new_pg = true; // in new partition group
const uint64_t part_group_uid = 0; // all partitions belong to the same partition group for each LS
for (int64_t t = 0; OB_SUCC(ret) && t < table_schemas.count(); t++) {
const ObSimpleTableSchemaV2 *table_schema = table_schemas.at(t);
if (OB_ISNULL(table_schema)) {
@ -334,7 +335,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_none_(
ObObjectID part_object_id = info.object_id_;
ObTabletID tablet_id = info.tablet_id_;
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
}
}
@ -408,6 +409,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_partition_(
// partitions/subpartitions of all tables with same one-level-partition-value, are in the same partition group.
// Here, partitions/subpartitions with same one-level-partition-index of all tables are in the same partition group
bool in_new_pg = true; // in new partition group
const uint64_t part_group_uid = p; // all partitions/subpartitions with same one-level part index belong to the same partition group for each LS
for (int64_t t = 0; OB_SUCC(ret) && t < table_schemas.count(); t++) {
const ObSimpleTableSchemaV2 &table_schema = *table_schemas.at(t);
const uint64_t table_id = table_schema.get_table_id();
@ -420,7 +422,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_partition_(
ObObjectID part_object_id = part_info.get_part_id();
ObTabletID tablet_id = part_info.get_tablet_id();
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
} else if (PARTITION_LEVEL_TWO == table_schema.get_part_level()) {
int64_t sub_part_num = 0;
@ -436,7 +438,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_partition_(
ObObjectID part_object_id = part_info.get_part_id();
ObTabletID tablet_id = part_info.get_tablet_id();
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
}
}
@ -487,6 +489,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_subpart_(
for (int64_t sp = 0; OB_SUCC(ret) && sp < partition->get_sub_part_num(); sp++) {
ObLSID dest_ls_id;
bool in_new_pg = true; // in new partition group
const uint64_t part_group_uid = sp; // subpartitions with same sub_part_idx belong to the same partition group for each LS
for (int64_t t = 0; OB_SUCC(ret) && t < table_schemas.count(); t++) {
const ObSimpleTableSchemaV2 &table_schema = *table_schemas.at(t);
const uint64_t table_id = table_schema.get_table_id();
@ -498,7 +501,7 @@ int ObAllBalanceGroupBuilder::build_bg_for_tablegroup_sharding_subpart_(
ObObjectID part_object_id = part_info.get_part_id();
ObTabletID tablet_id = part_info.get_tablet_id();
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
}
}
@ -527,8 +530,9 @@ int ObAllBalanceGroupBuilder::build_bg_for_partlevel_zero_(const ObSimpleTableSc
const uint64_t table_id = table_schema.get_table_id();
ObObjectID part_object_id = 0;
ObTabletID tablet_id = table_schema.get_tablet_id();
const uint64_t part_group_uid = table_id; // each table is an independent partition group
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
return ret;
}
@ -555,8 +559,9 @@ int ObAllBalanceGroupBuilder::build_bg_for_partlevel_one_(const ObSimpleTableSch
ObLSID dest_ls_id;
ObObjectID part_object_id = part->get_part_id();
ObTabletID tablet_id = part->get_tablet_id();
const uint64_t part_group_uid = part_object_id; // each partition is an independent partition group
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
}
}
@ -596,13 +601,14 @@ int ObAllBalanceGroupBuilder::build_bg_for_partlevel_two_(const ObSimpleTableSch
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sub partition is null", KR(ret), K(table_schema));
} else {
// every subpartition is a independent partition group
// every subpartition is an independent partition group
ObLSID dest_ls_id;
bool in_new_pg = true; // in new partition group
ObObjectID part_object_id = sub_part->get_sub_part_id();
ObTabletID tablet_id = sub_part->get_tablet_id();
const uint64_t part_group_uid = part_object_id; // each subpartition is an independent partition group
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg);
ADD_NEW_PART(bg, table_id, part_object_id, tablet_id, dest_ls_id, in_new_pg, part_group_uid);
}
}
}
@ -617,7 +623,8 @@ int ObAllBalanceGroupBuilder::add_new_part_(
const ObObjectID part_object_id,
const ObTabletID tablet_id,
ObLSID &dest_ls_id,
bool &in_new_partition_group)
bool &in_new_partition_group,
const uint64_t part_group_uid)
{
int ret = OB_SUCCESS;
ObLSID src_ls_id;
@ -637,11 +644,12 @@ int ObAllBalanceGroupBuilder::add_new_part_(
// skip this partition
}
} else if ((in_new_partition_group && dest_ls_id.is_valid())
|| (!in_new_partition_group && !dest_ls_id.is_valid())) {
|| (!in_new_partition_group && !dest_ls_id.is_valid())
|| !is_valid_id(part_group_uid)) {
// dest_ls_id should only be valid when this partition is the first partition in new partition group
ret = OB_INVALID_ARGUMENT;
LOG_WARN("dest_ls_id or in_new_partition_group is invalid", KR(ret), K(in_new_partition_group),
K(dest_ls_id), K(bg), K(table_id), K(part_object_id), K(tablet_id));
LOG_WARN("invalid args", KR(ret), K(in_new_partition_group),
K(dest_ls_id), K(bg), K(table_id), K(part_object_id), K(tablet_id), K(part_group_uid));
} else if (in_new_partition_group && FALSE_IT(dest_ls_id = src_ls_id)) {
// use first partition's LS as all other partitions' LS in same partition group
} else if (OB_FAIL(callback_->on_new_partition(
@ -652,9 +660,10 @@ int ObAllBalanceGroupBuilder::add_new_part_(
src_ls_id,
dest_ls_id,
tablet_size,
in_new_partition_group))) {
in_new_partition_group,
part_group_uid))) {
LOG_WARN("callback handle new partition fail", KR(ret), K(bg), K(table_id), K(part_object_id),
K(tablet_id), K(src_ls_id), K(dest_ls_id), K(tablet_size), K(in_new_partition_group));
K(tablet_id), K(src_ls_id), K(dest_ls_id), K(tablet_size), K(in_new_partition_group), K(part_group_uid));
} else {
// auto clear flag
in_new_partition_group = false;

View File

@ -85,6 +85,7 @@ public:
// @param [in] dest_ls_id the LS that partition should be located
// @param [in] tablet_size tablet data size
// @param [in] in_new_partition_group is this partition in new partition group
// @param [in] part_group_uid partition group unique id
virtual int on_new_partition(
const ObBalanceGroup &bg,
const common::ObObjectID table_id,
@ -93,7 +94,8 @@ public:
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const int64_t tablet_size,
const bool in_new_partition_group) = 0;
const bool in_new_partition_group,
const uint64_t part_group_uid) = 0;
};
private:
@ -137,7 +139,8 @@ private:
const common::ObObjectID part_object_id,
const common::ObTabletID tablet_id,
share::ObLSID &dest_ls_id,
bool &in_new_partition_group);
bool &in_new_partition_group,
const uint64_t part_group_uid);
int prepare_tablet_data_size_();
private:

View File

@ -52,15 +52,15 @@ ObBalanceGroupInfo::~ObBalanceGroupInfo()
int ObBalanceGroupInfo::append_part(ObTransferPartInfo &part,
const int64_t data_size,
const bool need_create_new_part_group/* = false*/)
const uint64_t part_group_uid)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! part.is_valid() || data_size < 0)) {
if (OB_UNLIKELY(! part.is_valid() || data_size < 0 || !is_valid_id(part_group_uid))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(part), K(data_size));
} else if (need_create_new_part_group && OB_FAIL(create_new_part_group_())) {
LOG_WARN("create new partition group fail", KR(ret), K(need_create_new_part_group));
} else if (part_groups_.count() <= 0) {
} else if (OB_FAIL(create_new_part_group_if_needed_(part_group_uid))) {
LOG_WARN("create new part group if needed failed", KR(ret), K(part_group_uid), K_(last_part_group_uid));
} else if (OB_UNLIKELY(part_groups_.count() <= 0)) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("no partition groups in this balance group", KPC(this), KR(ret), K(part));
} else {
@ -70,33 +70,39 @@ int ObBalanceGroupInfo::append_part(ObTransferPartInfo &part,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid data", KR(ret), KPC(part_group), KPC(this));
} else if (OB_FAIL(part_group->add_part(part, data_size))) {
LOG_WARN("add part into partition group fail", KR(ret), KPC(part_group), K(part), K(data_size), KPC(this));
LOG_WARN("add part into partition group fail", KR(ret),
KPC(part_group), K(part), K(data_size), K(part_group_uid), KPC(this));
}
LOG_TRACE("[ObBalanceGroupInfo] append part", K(part), K(data_size),
K(need_create_new_part_group),
LOG_TRACE("[ObBalanceGroupInfo] append part", K(part), K(data_size), K(part_group_uid),
"part_group_count", part_groups_.count(), KPC(part_group));
}
return ret;
}
int ObBalanceGroupInfo::create_new_part_group_()
int ObBalanceGroupInfo::create_new_part_group_if_needed_(const uint64_t part_group_uid)
{
int ret = OB_SUCCESS;
ObTransferPartGroup *part_group = NULL;
const int64_t part_group_size = sizeof(ObTransferPartGroup);
void *buf = alloc_.alloc(part_group_size);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for partition group fail", KR(ret), K(buf), K(part_group_size));
} else if (OB_ISNULL(part_group = new(buf) ObTransferPartGroup(alloc_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("construct ObTransferPartGroup fail", KR(ret), K(buf), K(part_group_size));
} else if (OB_FAIL(part_groups_.push_back(part_group))) {
LOG_WARN("push back new partition group fail", KR(ret), K(part_group), K(part_groups_));
} else {
// success
if (OB_UNLIKELY(!is_valid_id(part_group_uid))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid part_group_uid", KR(ret), K(part_group_uid));
} else if (part_group_uid != last_part_group_uid_) {
// only create new part group when part_group_uid is different from last_part_group_uid_
// (Scenarios with invalid last_part_group_uid_ have been included)
ObTransferPartGroup *part_group = NULL;
const int64_t part_group_size = sizeof(ObTransferPartGroup);
void *buf = alloc_.alloc(part_group_size);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for partition group fail", KR(ret), K(buf), K(part_group_size));
} else if (OB_ISNULL(part_group = new(buf) ObTransferPartGroup(alloc_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("construct ObTransferPartGroup fail", KR(ret), K(buf), K(part_group_size));
} else if (OB_FAIL(part_groups_.push_back(part_group))) {
LOG_WARN("push back new partition group fail", KR(ret), K(part_group), K(part_groups_));
} else {
last_part_group_uid_ = part_group_uid;
}
}
return ret;
}
@ -123,7 +129,8 @@ int ObBalanceGroupInfo::pop_back(const int64_t part_group_count,
} else if (OB_FAIL(append(part, pg->get_part_list()))) {
LOG_WARN("append array to part list fail", KR(ret), K(part), KPC(pg));
} else {
// succ
// the last part group has been popped, reset the uid
last_part_group_uid_ = OB_INVALID_ID;
}
// free pg memory anyway

View File

@ -63,6 +63,7 @@ class ObBalanceGroupInfo final
public:
explicit ObBalanceGroupInfo(const ObBalanceGroupID &id, common::ObIAllocator &alloc) :
id_(id),
last_part_group_uid_(OB_INVALID_ID),
alloc_(alloc),
part_groups_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(alloc, "PartGroupArray"))
{
@ -79,14 +80,14 @@ public:
//
// @param [in] part target partition info which will be added
// @param [in] data_size partition data size
// @param [in] need_create_new_part_group whether to create new partition group
// @param [in] part_group_uid partition group unique id
//
// @return OB_SUCCESS success
// @return OB_ENTRY_EXIST no partition group found
// @return other fail
int append_part(share::ObTransferPartInfo &part,
const int64_t data_size,
const bool need_create_new_part_group = false);
const uint64_t part_group_uid);
// pop partition groups from back of array, and push back into part list
//
@ -100,10 +101,11 @@ public:
TO_STRING_KV(K_(id), "part_group_count", part_groups_.count());
private:
int create_new_part_group_();
int create_new_part_group_if_needed_(const uint64_t part_group_uid);
private:
ObBalanceGroupID id_;
int64_t last_part_group_uid_; // unique id of the last part group in part_groups_
ObIAllocator &alloc_; // allocator for ObTransferPartGroup
// Partition Group Array
common::ObArray<ObTransferPartGroup *> part_groups_;

View File

@ -59,7 +59,7 @@ void ObLSBalanceGroupInfo::destroy()
int ObLSBalanceGroupInfo::append_part_into_balance_group(const ObBalanceGroupID &bg_id,
share::ObTransferPartInfo &part,
const int64_t data_size,
const bool need_create_new_part_group)
const uint64_t part_group_uid)
{
int ret = OB_SUCCESS;
ObBalanceGroupInfo *bg = NULL;
@ -68,10 +68,9 @@ int ObLSBalanceGroupInfo::append_part_into_balance_group(const ObBalanceGroupID
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(! bg_id.is_valid())) {
} else if (OB_UNLIKELY(! bg_id.is_valid() || !is_valid_id(part_group_uid))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(bg_id), K(part), K(data_size),
K(need_create_new_part_group));
LOG_WARN("invalid argument", KR(ret), K(bg_id), K(part), K(data_size), K(part_group_uid));
} else if (OB_FAIL(bg_map_.get_refactored(bg_id, bg))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(create_new_balance_group_(bg_id, bg))) {
@ -86,9 +85,8 @@ int ObLSBalanceGroupInfo::append_part_into_balance_group(const ObBalanceGroupID
} else if (OB_ISNULL(bg)) {
ret = OB_INVALID_DATA;
LOG_WARN("balance group is invalid", KR(ret), KPC(bg), K(bg_id));
} else if (OB_FAIL(bg->append_part(part, data_size, need_create_new_part_group))) {
LOG_WARN("append part info balance group fail", KR(ret), K(part), K(data_size),
K(need_create_new_part_group));
} else if (OB_FAIL(bg->append_part(part, data_size, part_group_uid))) {
LOG_WARN("append part info balance group fail", KR(ret), K(part), K(data_size), K(part_group_uid));
} else if (FALSE_IT(part_group_cnt = bg->get_part_group_count())) {
} else if (OB_FAIL(orig_part_group_cnt_map_.set_refactored(bg_id, part_group_cnt, 1/*overwrite*/))) {
LOG_WARN("overwrite partition group count map fail", KR(ret), K(bg_id), K(part_group_cnt));

View File

@ -49,7 +49,7 @@ public:
// @param [in] bg_id target balance group id
// @param [in] part target partition info which will be added
// @param [in] data_size partition data size
// @param [in] need_create_new_part_group whether to create new partition group in balance group
// @param [in] part_group_uid target partition group unique id
//
// @return OB_SUCCESS success
// @return OB_ENTRY_EXIST no partition group found
@ -57,7 +57,7 @@ public:
int append_part_into_balance_group(const ObBalanceGroupID &bg_id,
share::ObTransferPartInfo &part,
const int64_t data_size,
const bool need_create_new_part_group);
const uint64_t part_group_uid);
////////////////////////////////////////////////
// Transfer out partition groups by specified factor

View File

@ -86,13 +86,13 @@ int ObTenantLSBalanceGroupInfo::on_new_partition(
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const int64_t tablet_size,
const bool in_new_partition_group)
const bool in_new_partition_group,
const uint64_t part_group_uid)
{
UNUSEDx(tablet_id, dest_ls_id, in_new_partition_group);
int ret = OB_SUCCESS;
ObLSBalanceGroupInfo *ls_bg_info = NULL;
ObTransferPartInfo part_info(table_id, part_object_id);
//This partition_group is iterated for the first time and needs to be newly created;
bool create_new_partition_group = in_new_partition_group;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
@ -104,10 +104,6 @@ int ObTenantLSBalanceGroupInfo::on_new_partition(
LOG_WARN("create new ls balance group info fail", KR(ret), K(src_ls_id));
} else if (OB_FAIL(ls_bg_map_.set_refactored(src_ls_id, ls_bg_info))) {
LOG_WARN("set new ls balance group info fail", KR(ret), K(src_ls_id), K(ls_bg_info));
} else {
//It is not the first time that the partition in this partition_group has been iterated,
//but this partition is not on the same LS as the previous partition, and needs to be newly created.
create_new_partition_group = true;
}
}
@ -115,10 +111,9 @@ int ObTenantLSBalanceGroupInfo::on_new_partition(
} else if (OB_ISNULL(ls_bg_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid ls balance group info", KR(ret), K(ls_bg_info), K(src_ls_id));
} else if (OB_FAIL(ls_bg_info->append_part_into_balance_group(bg.id(), part_info, tablet_size,
create_new_partition_group))) {
} else if (OB_FAIL(ls_bg_info->append_part_into_balance_group(bg.id(), part_info, tablet_size, part_group_uid))) {
LOG_WARN("append part into balance group for LS balance group info fail", KR(ret), K(bg),
K(part_info), K(tablet_size), K(in_new_partition_group), K(create_new_partition_group));
K(part_info), K(tablet_size), K(part_group_uid));
}
return ret;
}

View File

@ -61,7 +61,8 @@ public:
const share::ObLSID &src_ls_id,
const share::ObLSID &dest_ls_id,
const int64_t tablet_size,
const bool in_new_partition_group);
const bool in_new_partition_group,
const uint64_t part_group_uid);
TO_STRING_KV(K_(inited), K_(tenant_id), "valid_ls_count", ls_bg_map_.size());

View File

@ -212,8 +212,10 @@ int ObPartitionBalance::on_new_partition(
const ObLSID &src_ls_id,
const ObLSID &dest_ls_id,
const int64_t tablet_size,
const bool in_new_partition_group)
const bool in_new_partition_group,
const uint64_t part_group_uid)
{
UNUSEDx(tablet_id, part_group_uid);
int ret = OB_SUCCESS;
ObBalanceGroup bg = bg_in; // get a copy
ObLSDesc *src_ls_desc = nullptr;

View File

@ -77,7 +77,8 @@ public:
const ObLSID &src_ls_id,
const ObLSID &dest_ls_id,
const int64_t tablet_size,
const bool in_new_partition_group);
const bool in_new_partition_group,
const uint64_t part_group_uid);
class ObLSPartGroupDesc
{

View File

@ -206,7 +206,7 @@ int ObBalanceJobTableOperator::get_balance_job(const uint64_t tenant_id,
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("empty balance job", KR(ret), K(sql));
LOG_INFO("empty balance job", KR(ret), K(sql));
} else {
LOG_WARN("failed to get balance job", KR(ret), K(sql));
}