fix standby unit shrink
This commit is contained in:
@ -92,6 +92,11 @@ void ObCommonLSService::do_work()
|
||||
} else if (OB_TMP_FAIL(try_create_ls_(user_tenant_schema))) {
|
||||
LOG_WARN("failed to create ls", KR(ret), KR(tmp_ret), K(user_tenant_schema));
|
||||
}
|
||||
if (OB_SUCC(ret) && !user_tenant_schema.is_dropping()) {
|
||||
if (OB_TMP_FAIL(try_modify_ls_unit_group_(user_tenant_schema))) {
|
||||
LOG_WARN("failed to modify ls unit group", KR(ret), KR(tmp_ret), K(user_tenant_schema));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(ObBalanceLSPrimaryZone::try_update_sys_ls_primary_zone(tenant_id_))) {
|
||||
@ -147,6 +152,56 @@ int ObCommonLSService::try_create_ls_(const share::schema::ObTenantSchema &tenan
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
//不管是主库还是备库都有概率存在一个日志流组内的日志流记录的unit_group不一致的情况
|
||||
//所有的日志流都对齐日志流id最小的日志流,虽然不是最优,但是可以保证最终一致性
|
||||
int ObCommonLSService::try_modify_ls_unit_group_(
|
||||
const share::schema::ObTenantSchema &tenant_schema)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = tenant_schema.get_tenant_id();
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else if (!is_user_tenant(tenant_id) || !tenant_schema.is_valid()
|
||||
|| tenant_schema.is_dropping()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("tenant is invalid", KR(ret), K(tenant_id), K(tenant_schema));
|
||||
} else {
|
||||
ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema, tenant_id);
|
||||
share::ObLSStatusOperator status_op;
|
||||
if (OB_FAIL(tenant_info.gather_stat())) {
|
||||
LOG_WARN("failed to gather stat", KR(ret));
|
||||
} else {
|
||||
ObLSGroupInfoArray &ls_group_array = tenant_info.get_ls_group_array();
|
||||
int64_t index = 0;//no used
|
||||
share::ObLSStatusInfo ls_status;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ls_group_array.count(); ++i) {
|
||||
const uint64_t unit_group_id = ls_group_array.at(i).unit_group_id_;
|
||||
const uint64_t ls_group_id = ls_group_array.at(i).ls_group_id_;
|
||||
const ObArray<share::ObLSID> &ls_ids = ls_group_array.at(i).ls_ids_;
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < ls_ids.count(); ++j) {
|
||||
const share::ObLSID ls_id = ls_ids.at(j);
|
||||
if (OB_FAIL(tenant_info.get_ls_status_info(ls_id, ls_status, index))) {
|
||||
LOG_WARN("failed to get ls status info", KR(ret), K(ls_id));
|
||||
} else if (ls_status.unit_group_id_ != unit_group_id) {
|
||||
FLOG_INFO("ls group has different unit group id, need process", K(ls_status), K(unit_group_id));
|
||||
if (OB_FAIL(status_op.alter_unit_group_id(tenant_id, ls_id, ls_group_id,
|
||||
ls_status.unit_group_id_, unit_group_id, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("failed to alter unit group", KR(ret), K(tenant_id), K(ls_id),
|
||||
K(ls_group_id), K(unit_group_id), K(ls_status));
|
||||
|
||||
}
|
||||
}
|
||||
}//end for j
|
||||
}//end for i
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCommonLSService::do_create_user_ls(
|
||||
const share::schema::ObTenantSchema &tenant_schema,
|
||||
const share::ObLSStatusInfo &info, const SCN &create_scn,
|
||||
|
@ -87,6 +87,8 @@ private:
|
||||
int try_force_drop_tenant_(
|
||||
const share::schema::ObTenantSchema &tenant_schema);
|
||||
int try_create_ls_(const share::schema::ObTenantSchema &tenant_schema);
|
||||
//ls group maybe has more than one unit group, need fix
|
||||
int try_modify_ls_unit_group_(const share::schema::ObTenantSchema &tenant_schema);
|
||||
public:
|
||||
//restore_service need create init ls too
|
||||
static int do_create_user_ls(const share::schema::ObTenantSchema &tenant_schema,
|
||||
|
@ -513,7 +513,8 @@ int ObLSServiceHelper::process_status_to_steady(
|
||||
//or ls status not equal, no need to next
|
||||
} else if (machine.ls_info_.ls_is_normal() && machine.ls_info_.get_ls_group_id() != machine.status_info_.ls_group_id_) {
|
||||
if (OB_TMP_FAIL(process_alter_ls(machine.ls_id_, machine.status_info_.ls_group_id_,
|
||||
machine.ls_info_.get_ls_group_id(), tenant_ls_info, *GCTX.sql_proxy_))) {
|
||||
machine.ls_info_.get_ls_group_id(), machine.status_info_.unit_group_id_,
|
||||
tenant_ls_info, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("failed to process alter ls", KR(ret), KR(tmp_ret), K(machine));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
@ -695,6 +696,7 @@ int ObLSServiceHelper::revision_to_equal_status_(const ObLSStatusMachineParamete
|
||||
int ObLSServiceHelper::process_alter_ls(const share::ObLSID &ls_id,
|
||||
const uint64_t &old_ls_group_id,
|
||||
const uint64_t &new_ls_group_id,
|
||||
const uint64_t &old_unit_group_id,
|
||||
ObTenantLSInfo& tenant_info,
|
||||
ObISQLClient &sql_proxy)
|
||||
{
|
||||
@ -702,9 +704,10 @@ int ObLSServiceHelper::process_alter_ls(const share::ObLSID &ls_id,
|
||||
const uint64_t tenant_id = tenant_info.get_tenant_id();;
|
||||
uint64_t unit_group_id = OB_INVALID_ID;
|
||||
if (OB_UNLIKELY(!ls_id.is_valid() || OB_INVALID_ID == old_ls_group_id || OB_INVALID_ID == new_ls_group_id
|
||||
|| old_ls_group_id == new_ls_group_id)) {
|
||||
|| old_ls_group_id == new_ls_group_id || OB_INVALID_ID == old_unit_group_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(ls_id), K(old_ls_group_id), K(new_ls_group_id));
|
||||
LOG_WARN("invalid argument", KR(ret), K(ls_id), K(old_ls_group_id),
|
||||
K(new_ls_group_id), K(old_unit_group_id));
|
||||
} else if (OB_FAIL(tenant_info.gather_stat())) {
|
||||
LOG_WARN("failed to gather stat", KR(ret));
|
||||
} else {
|
||||
@ -730,12 +733,12 @@ int ObLSServiceHelper::process_alter_ls(const share::ObLSID &ls_id,
|
||||
ObLSStatusOperator status_op;
|
||||
if (FAILEDx(status_op.alter_ls_group_id(
|
||||
tenant_id, ls_id, old_ls_group_id, new_ls_group_id,
|
||||
unit_group_id, sql_proxy))) {
|
||||
old_unit_group_id, unit_group_id, sql_proxy))) {
|
||||
LOG_WARN("failed to update ls group id", KR(ret), K(new_ls_group_id), K(old_ls_group_id),
|
||||
K(unit_group_id), K(tenant_info));
|
||||
K(unit_group_id), K(tenant_info), K(old_unit_group_id));
|
||||
}
|
||||
LOG_INFO("[LS_MGR] alter ls group id", KR(ret), K(old_ls_group_id),
|
||||
K(new_ls_group_id), K(unit_group_id));
|
||||
K(new_ls_group_id), K(unit_group_id), K(old_unit_group_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -828,8 +831,13 @@ int ObLSServiceHelper::create_new_ls_in_trans(
|
||||
int ObLSServiceHelper::balance_ls_group(ObTenantLSInfo& tenant_ls_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t task_cnt = 0;
|
||||
if (OB_FAIL(tenant_ls_info.gather_stat())) {
|
||||
LOG_WARN("failed to gather stat", KR(ret));
|
||||
} else if (OB_FAIL(try_shrink_standby_unit_group_(tenant_ls_info, task_cnt))) {
|
||||
LOG_WARN("failed to shrink standby unit group", KR(ret), K(tenant_ls_info));
|
||||
} else if (0 != task_cnt) {
|
||||
LOG_INFO("has unit group need deleting, can not balance", K(task_cnt));
|
||||
} else {
|
||||
int64_t min_count = INT64_MAX, min_index = OB_INVALID_INDEX_INT64;
|
||||
int64_t max_count = 0, max_index = OB_INVALID_INDEX_INT64;
|
||||
@ -879,39 +887,112 @@ int ObLSServiceHelper::balance_ls_group_between_unit_group_(ObTenantLSInfo& tena
|
||||
} else {
|
||||
ObUnitGroupInfo &dest_info = tenant_ls_info.get_unit_group_array().at(min_index);
|
||||
ObUnitGroupInfo &src_info = tenant_ls_info.get_unit_group_array().at(max_index);
|
||||
const uint64_t tenant_id = tenant_ls_info.get_tenant_id();
|
||||
const int64_t ls_group_count = src_info.ls_group_ids_.count();
|
||||
if (ls_group_count < 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("src info is unexpected", KR(ret), K(ls_group_count), K(src_info));
|
||||
} else {
|
||||
const uint64_t ls_group_id = src_info.ls_group_ids_.at(ls_group_count - 1);
|
||||
ObLSGroupInfo ls_group_info;
|
||||
if (OB_FAIL(tenant_ls_info.get_ls_group_info(ls_group_id, ls_group_info))) {
|
||||
LOG_WARN("failed to get ls group_info", KR(ret), K(ls_group_id));
|
||||
} else {
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < ls_group_info.ls_ids_.count(); ++j) {
|
||||
share::ObLSStatusOperator status_op;
|
||||
ObLSID ls_id = ls_group_info.ls_ids_.at(j);
|
||||
if (OB_FAIL(status_op.alter_ls_group_id(tenant_id, ls_id,
|
||||
ls_group_id, ls_group_id,
|
||||
dest_info.unit_group_id_, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("failed to alter unit group", KR(ret), K(tenant_id), K(ls_id), K(ls_group_id),
|
||||
K(src_info), K(dest_info));
|
||||
}
|
||||
}
|
||||
LOG_INFO("[LS_MGR]balance ls group to unit group", KR(ret), K(ls_group_id), K(src_info), K(dest_info));
|
||||
}
|
||||
if (FAILEDx(src_info.ls_group_ids_.remove(ls_group_count - 1))) {
|
||||
LOG_WARN("failed to remove", KR(ret), K(ls_group_count), K(dest_info));
|
||||
} else if (OB_FAIL(dest_info.ls_group_ids_.push_back(ls_group_id))) {
|
||||
LOG_WARN("failed to push back", KR(ret), K(ls_group_id), K(src_info));
|
||||
if (OB_FAIL(try_update_ls_unit_group_(tenant_ls_info, ls_group_id, src_info, dest_info))) {
|
||||
LOG_WARN("failed to update ls unit group", KR(ret), K(tenant_ls_info), K(ls_group_id), K(src_info), K(dest_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSServiceHelper::try_update_ls_unit_group_(
|
||||
ObTenantLSInfo& tenant_ls_info,
|
||||
const uint64_t ls_group_id,
|
||||
ObUnitGroupInfo &src_info,
|
||||
ObUnitGroupInfo &dest_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t index = OB_INVALID_INDEX_INT64;
|
||||
if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else if (OB_UNLIKELY(!tenant_ls_info.is_valid()
|
||||
|| OB_INVALID_ID == ls_group_id || !src_info.is_valid() || !dest_info.is_valid()
|
||||
|| src_info == dest_info)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_ls_info), K(ls_group_id), K(src_info), K(dest_info));
|
||||
} else if (!has_exist_in_array(src_info.ls_group_ids_, ls_group_id, &index)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("ls group not in src unit group", KR(ret), K(ls_group_id), K(src_info));
|
||||
} else {
|
||||
ObLSGroupInfo ls_group_info;
|
||||
const uint64_t tenant_id = tenant_ls_info.get_tenant_id();
|
||||
if (OB_FAIL(tenant_ls_info.get_ls_group_info(ls_group_id, ls_group_info))) {
|
||||
LOG_WARN("failed to get ls group_info", KR(ret), K(ls_group_id));
|
||||
} else {
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < ls_group_info.ls_ids_.count(); ++j) {
|
||||
share::ObLSStatusOperator status_op;
|
||||
ObLSID ls_id = ls_group_info.ls_ids_.at(j);
|
||||
if (OB_FAIL(status_op.alter_unit_group_id(tenant_id, ls_id,
|
||||
ls_group_id, src_info.unit_group_id_,
|
||||
dest_info.unit_group_id_, *GCTX.sql_proxy_))) {
|
||||
LOG_WARN("failed to alter unit group", KR(ret), K(tenant_id), K(ls_id), K(ls_group_id),
|
||||
K(src_info), K(dest_info));
|
||||
}
|
||||
}
|
||||
LOG_INFO("[LS_MGR]balance ls group to unit group", KR(ret), K(ls_group_id), K(src_info), K(dest_info));
|
||||
}
|
||||
if (FAILEDx(src_info.ls_group_ids_.remove(index))) {
|
||||
LOG_WARN("failed to remove", KR(ret), K(index), K(src_info));
|
||||
} else if (OB_FAIL(dest_info.ls_group_ids_.push_back(ls_group_id))) {
|
||||
LOG_WARN("failed to push back", KR(ret), K(ls_group_id), K(src_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
int ObLSServiceHelper::try_shrink_standby_unit_group_(
|
||||
ObTenantLSInfo& tenant_ls_info, int64_t &task_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
task_cnt = 0;
|
||||
if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_));
|
||||
} else if (OB_UNLIKELY(!tenant_ls_info.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("tenant info is invalid", KR(ret), K(tenant_ls_info));
|
||||
} else {
|
||||
ObUnitGroupInfoArray &unit_group_array = tenant_ls_info.get_unit_group_array();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < unit_group_array.count() && task_cnt == 0; ++i) {
|
||||
ObUnitGroupInfo &unit_group_info = unit_group_array.at(i);
|
||||
if (share::ObUnit::UNIT_STATUS_ACTIVE != unit_group_info.unit_status_) {
|
||||
FLOG_INFO("has unit is in deleting status, need alter unit group id", K(unit_group_info));
|
||||
if (unit_group_info.ls_group_ids_.count() > 0) {
|
||||
task_cnt = 1;
|
||||
int64_t unit_group_index = OB_INVALID_INDEX_INT64;
|
||||
//for each ls group, get new unit group, process ls group one by one
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < unit_group_info.ls_group_ids_.count(); ++j) {
|
||||
if (OB_FAIL(tenant_ls_info.get_next_unit_group(unit_group_index))) {
|
||||
LOG_WARN("failed to get next unit group", KR(ret), K(tenant_ls_info));
|
||||
} else if (OB_UNLIKELY(OB_INVALID_INDEX_INT64 == unit_group_index
|
||||
|| unit_group_index >= unit_group_array.count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to next unit group", KR(ret), K(unit_group_index), K(tenant_ls_info));
|
||||
} else {
|
||||
ObUnitGroupInfo &dest_info = unit_group_array.at(unit_group_index);
|
||||
const uint64_t ls_group_id = unit_group_info.ls_group_ids_.at(j);
|
||||
if (OB_FAIL(try_update_ls_unit_group_(tenant_ls_info, ls_group_id,
|
||||
unit_group_info, dest_info))) {
|
||||
LOG_WARN("failed to update ls unit group", KR(ret), K(ls_group_id),
|
||||
K(unit_group_info), K(dest_info), K(tenant_ls_info));
|
||||
}
|
||||
}
|
||||
}//end for j
|
||||
}
|
||||
}
|
||||
}//end for i
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/////////////ObTenantLSInfo
|
||||
void ObTenantLSInfo::reset()
|
||||
{
|
||||
|
@ -171,6 +171,10 @@ public:
|
||||
{
|
||||
return primary_zone_;
|
||||
}
|
||||
ObLSGroupInfoArray& get_ls_group_array()
|
||||
{
|
||||
return ls_group_array_;
|
||||
}
|
||||
TO_STRING_KV(K_(tenant_id), K_(is_load), K_(status_array), K_(unit_group_array),
|
||||
K_(ls_group_array), K_(primary_zone));
|
||||
private:
|
||||
@ -240,6 +244,7 @@ public:
|
||||
static int process_alter_ls(const share::ObLSID &ls_id,
|
||||
const uint64_t &old_ls_group_id,
|
||||
const uint64_t &new_ls_group_id,
|
||||
const uint64_t &old_unit_group_id,
|
||||
ObTenantLSInfo& tenant_ls_info,
|
||||
common::ObISQLClient &sql_proxy);
|
||||
private:
|
||||
@ -250,6 +255,14 @@ private:
|
||||
static int balance_ls_group_between_unit_group_(
|
||||
ObTenantLSInfo& tenant_ls_info,
|
||||
const int64_t min_index, const int64_t max_index);
|
||||
static int try_shrink_standby_unit_group_(
|
||||
ObTenantLSInfo& tenant_ls_info,
|
||||
int64_t &task_cnt);
|
||||
static int try_update_ls_unit_group_(
|
||||
ObTenantLSInfo& tenant_ls_info,
|
||||
const uint64_t ls_group_id,
|
||||
ObUnitGroupInfo &src_info,
|
||||
ObUnitGroupInfo &dest_info);
|
||||
};
|
||||
|
||||
|
||||
|
@ -1038,47 +1038,56 @@ int ObRecoveryLSService::do_ls_balance_task_()
|
||||
ObBalanceTaskHelper ls_balance_task;
|
||||
ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
|
||||
ObAllTenantInfo tenant_info;
|
||||
bool has_next_task = true;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret), K(inited_));
|
||||
} else if (OB_ISNULL(proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql can't null", K(ret), K(proxy_));
|
||||
} else if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
|
||||
*proxy_, ls_balance_task))) {
|
||||
} else if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to pop task", KR(ret), K(tenant_id_));
|
||||
} else if (OB_ISNULL(tenant_info_loader)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("mtl pointer is null", KR(ret), KP(tenant_info_loader));
|
||||
} else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
|
||||
LOG_WARN("get_tenant_info failed", K(ret));
|
||||
} else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) {
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||
START_TRANSACTION(proxy_, exec_tenant_id)
|
||||
if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
|
||||
ls_balance_task.get_operation_scn(), trans))) {
|
||||
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||
} else if (ls_balance_task.get_task_op().is_ls_alter()) {
|
||||
if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
|
||||
LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
} else if (ls_balance_task.get_task_op().is_transfer_begin()
|
||||
|| ls_balance_task.get_task_op().is_transfer_end()) {
|
||||
//nothing
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
END_TRANSACTION(trans)
|
||||
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
||||
} else {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
||||
LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
|
||||
}
|
||||
}
|
||||
while (OB_SUCC(ret) && has_next_task) {
|
||||
ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
|
||||
*proxy_, ls_balance_task);
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
has_next_task = false;
|
||||
} else if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to get balance task", KR(ret), K(tenant_id_));
|
||||
} else if (has_set_stop()) {
|
||||
ret = OB_IN_STOP_STATE;
|
||||
LOG_WARN("thread is in stop state", KR(ret));
|
||||
} else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
|
||||
LOG_WARN("get_tenant_info failed", K(ret));
|
||||
} else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) {
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||
START_TRANSACTION(proxy_, exec_tenant_id)
|
||||
if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
|
||||
ls_balance_task.get_operation_scn(), trans))) {
|
||||
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||
} else if (ls_balance_task.get_task_op().is_ls_alter()) {
|
||||
if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
|
||||
LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
} else if (ls_balance_task.get_task_op().is_transfer_begin()
|
||||
|| ls_balance_task.get_task_op().is_transfer_end()) {
|
||||
//nothing
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
END_TRANSACTION(trans)
|
||||
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
||||
} else {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
||||
LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
|
||||
}
|
||||
has_next_task = false;
|
||||
}
|
||||
}//end while
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1111,7 +1120,7 @@ int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHel
|
||||
} else {
|
||||
ObTenantLSInfo tenant_info(proxy_, &tenant_schema, tenant_id_);
|
||||
if (OB_FAIL(ObLSServiceHelper::process_alter_ls(ls_balance_task.get_src_ls(), status_info.ls_group_id_,
|
||||
ls_balance_task.get_ls_group_id(), tenant_info, trans))) {
|
||||
ls_balance_task.get_ls_group_id(), status_info.unit_group_id_, tenant_info, trans))) {
|
||||
LOG_WARN("failed to process alter ls", KR(ret), K(ls_balance_task), K(status_info));
|
||||
}
|
||||
}
|
||||
|
@ -1050,10 +1050,14 @@ int ObRestoreService::create_all_ls_(
|
||||
common::ObMySQLTransaction trans;
|
||||
const int64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id_);
|
||||
|
||||
ObTenantLSInfo tenant_stat(sql_proxy_, &tenant_schema, tenant_id_);
|
||||
if (OB_FAIL(trans.start(sql_proxy_, exec_tenant_id))) {
|
||||
LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
|
||||
} else {
|
||||
//must be in trans
|
||||
//Multiple LS groups will be created here.
|
||||
//In order to ensure that each LS group can be evenly distributed in the unit group,
|
||||
//it is necessary to read the distribution of LS groups within the transaction.
|
||||
ObTenantLSInfo tenant_stat(sql_proxy_, &tenant_schema, tenant_id_, &trans);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ls_attr_array.count(); ++i) {
|
||||
const ObLSAttr &ls_info = ls_attr_array.at(i);
|
||||
ObLSFlag ls_flag = ls_info.get_ls_flag();
|
||||
|
@ -409,6 +409,7 @@ int ObLSStatusOperator::update_ls_status_in_trans(
|
||||
int ObLSStatusOperator::alter_ls_group_id(const uint64_t tenant_id, const ObLSID &id,
|
||||
const uint64_t old_ls_group_id,
|
||||
const uint64_t new_ls_group_id,
|
||||
const uint64_t old_unit_group_id,
|
||||
const uint64_t new_unit_group_id,
|
||||
ObISQLClient &client)
|
||||
{
|
||||
@ -416,23 +417,23 @@ int ObLSStatusOperator::alter_ls_group_id(const uint64_t tenant_id, const ObLSID
|
||||
if (OB_UNLIKELY(!id.is_valid()
|
||||
|| OB_INVALID_ID == old_ls_group_id
|
||||
|| OB_INVALID_ID == new_ls_group_id
|
||||
|| OB_INVALID_ID == old_unit_group_id
|
||||
|| OB_INVALID_ID == new_unit_group_id
|
||||
|| OB_INVALID_TENANT_ID == tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid_argument", KR(ret), K(id), K(new_ls_group_id),
|
||||
K(old_ls_group_id), K(tenant_id));
|
||||
K(old_ls_group_id), K(tenant_id), K(old_unit_group_id));
|
||||
} else {
|
||||
//init_member_list is no need after create success
|
||||
common::ObSqlString sql;
|
||||
const uint64_t exec_tenant_id =
|
||||
ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
|
||||
if (OB_FAIL(sql.assign_fmt("UPDATE %s set ls_group_id = %lu, unit_group_id = %lu "
|
||||
" where ls_id = %ld and tenant_id = %lu and ls_group_id = %lu",
|
||||
if (OB_FAIL(sql.assign_fmt("UPDATE %s set ls_group_id = %lu, unit_group_id = %lu where ls_id = %ld"
|
||||
" and tenant_id = %lu and ls_group_id = %lu and unit_group_id = %lu",
|
||||
OB_ALL_LS_STATUS_TNAME,
|
||||
new_ls_group_id, new_unit_group_id, id.id(),
|
||||
tenant_id, old_ls_group_id))) {
|
||||
tenant_id, old_ls_group_id, old_unit_group_id))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(id), K(new_ls_group_id),
|
||||
K(old_ls_group_id), K(tenant_id), K(sql));
|
||||
K(old_ls_group_id), K(tenant_id), K(sql), K(old_unit_group_id));
|
||||
} else if (OB_FAIL(exec_write(tenant_id, sql, this, client))) {
|
||||
LOG_WARN("failed to exec write", KR(ret), K(tenant_id), K(id), K(sql));
|
||||
}
|
||||
@ -441,6 +442,40 @@ int ObLSStatusOperator::alter_ls_group_id(const uint64_t tenant_id, const ObLSID
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSStatusOperator::alter_unit_group_id(const uint64_t tenant_id, const ObLSID &id,
|
||||
const uint64_t ls_group_id,
|
||||
const uint64_t old_unit_group_id,
|
||||
const uint64_t new_unit_group_id,
|
||||
ObISQLClient &client)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!id.is_valid()
|
||||
|| OB_INVALID_ID == ls_group_id
|
||||
|| OB_INVALID_ID == old_unit_group_id
|
||||
|| OB_INVALID_ID == new_unit_group_id
|
||||
|| OB_INVALID_TENANT_ID == tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid_argument", KR(ret), K(id), K(ls_group_id),
|
||||
K(tenant_id), K(old_unit_group_id));
|
||||
} else {
|
||||
common::ObSqlString sql;
|
||||
const uint64_t exec_tenant_id =
|
||||
ObLSLifeIAgent::get_exec_tenant_id(tenant_id);
|
||||
if (OB_FAIL(sql.assign_fmt("UPDATE %s set unit_group_id = %lu where ls_id = %ld"
|
||||
" and tenant_id = %lu and ls_group_id = %lu and unit_group_id = %lu",
|
||||
OB_ALL_LS_STATUS_TNAME,
|
||||
new_unit_group_id, id.id(), tenant_id,
|
||||
ls_group_id, old_unit_group_id))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(id), K(ls_group_id),
|
||||
K(tenant_id), K(sql), K(old_unit_group_id));
|
||||
} else if (OB_FAIL(exec_write(tenant_id, sql, this, client))) {
|
||||
LOG_WARN("failed to exec write", KR(ret), K(tenant_id), K(id), K(sql));
|
||||
}
|
||||
ALL_LS_EVENT_ADD(tenant_id, id, "alter_unit_group", ret, sql);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSStatusOperator::update_init_member_list(
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &id, const ObMemberList &member_list, ObISQLClient &client,
|
||||
|
@ -328,11 +328,26 @@ public:
|
||||
* @param[in] ls_id
|
||||
* @param[in] old_ls_group_id
|
||||
* @param[in] new_ls_group_id
|
||||
* @param[in] unit_group_id : the new ls group's target unit group
|
||||
* @param[in] old_unit_group_id : the ls group's unit group
|
||||
* @param[in] new_unit_group_id : the new ls group's target unit group
|
||||
* @param[in] client*/
|
||||
int alter_ls_group_id(const uint64_t tenant_id, const ObLSID &id,
|
||||
const uint64_t old_ls_group_id,
|
||||
const uint64_t new_ls_group_id,
|
||||
const uint64_t old_unit_group_id,
|
||||
const uint64_t new_unit_group_id,
|
||||
ObISQLClient &client);
|
||||
/*
|
||||
* description: update ls's unit group id
|
||||
* @param[in] tenant_id
|
||||
* @param[in] ls_id
|
||||
* @param[in] ls_group_id
|
||||
* @param[in] old_unit_group_id
|
||||
* @param[in] new_unit_group_id : the new ls group's target unit group
|
||||
* @param[in] client*/
|
||||
int alter_unit_group_id(const uint64_t tenant_id, const ObLSID &id,
|
||||
const uint64_t ls_group_id,
|
||||
const uint64_t old_unit_group_id,
|
||||
const uint64_t new_unit_group_id,
|
||||
ObISQLClient &client);
|
||||
|
||||
|
@ -254,7 +254,6 @@ inline bool is_schema_error(int err)
|
||||
case OB_SCHEMA_ERROR:
|
||||
case OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH:
|
||||
case OB_ERR_REMOTE_SCHEMA_NOT_FULL:
|
||||
case OB_PARTITION_IS_BLOCKED:
|
||||
case OB_ERR_SP_ALREADY_EXISTS:
|
||||
case OB_ERR_SP_DOES_NOT_EXIST:
|
||||
case OB_OBJECT_NAME_NOT_EXIST:
|
||||
|
Reference in New Issue
Block a user