Fix v$ob_archive_dest_status
This commit is contained in:
@ -146,9 +146,9 @@ int ObVirtualArchiveDestStatus::inner_get_next_row(common::ObNewRow *&row)
|
|||||||
if (ls_end_map_inited_ && ls_end_map_.size() != 0) {
|
if (ls_end_map_inited_ && ls_end_map_.size() != 0) {
|
||||||
ls_end_map_.reset();
|
ls_end_map_.reset();
|
||||||
}
|
}
|
||||||
// get ls end scn via tenant_id
|
// get ls max scn via tenant_id
|
||||||
if (OB_FAIL(get_ls_end_scn_(curr_tenant))) {
|
if (OB_FAIL(get_ls_max_scn_(curr_tenant))) {
|
||||||
SERVER_LOG(WARN, "get ls end scn failed", K(curr_tenant), K(ret));
|
SERVER_LOG(WARN, "get ls max scn failed", K(curr_tenant), K(ret));
|
||||||
} else if (ls_checkpoint_map_.count() == 0 || ls_end_map_.count() == 0 || ls_checkpoint_map_.count() != ls_end_map_.count()) {
|
} else if (ls_checkpoint_map_.count() == 0 || ls_end_map_.count() == 0 || ls_checkpoint_map_.count() != ls_end_map_.count()) {
|
||||||
SERVER_LOG(WARN, "map may be empty", K(ls_end_map_.count()), K(ls_checkpoint_map_.count()));
|
SERVER_LOG(WARN, "map may be empty", K(ls_end_map_.count()), K(ls_checkpoint_map_.count()));
|
||||||
if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
||||||
@ -293,7 +293,7 @@ int ObVirtualArchiveDestStatus::get_all_tenant_ls_(const uint64_t tenant_id)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObVirtualArchiveDestStatus::get_ls_end_scn_(const uint64_t tenant_id)
|
int ObVirtualArchiveDestStatus::get_ls_max_scn_(const uint64_t tenant_id)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
|
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
|
||||||
@ -306,7 +306,7 @@ int ObVirtualArchiveDestStatus::get_ls_end_scn_(const uint64_t tenant_id)
|
|||||||
ObMySQLResult *result = NULL;
|
ObMySQLResult *result = NULL;
|
||||||
ObSqlString sql;
|
ObSqlString sql;
|
||||||
|
|
||||||
const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, end_scn FROM %s WHERE tenant_id=%d and role='LEADER'";
|
const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, max_scn FROM %s WHERE tenant_id=%d and role='LEADER'";
|
||||||
if (OB_FAIL(sql.append_fmt(SELECT_LS_BY_TENANT, OB_ALL_VIRTUAL_LOG_STAT_TNAME, tenant_id))) {
|
if (OB_FAIL(sql.append_fmt(SELECT_LS_BY_TENANT, OB_ALL_VIRTUAL_LOG_STAT_TNAME, tenant_id))) {
|
||||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) {
|
} else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) {
|
||||||
@ -317,23 +317,24 @@ int ObVirtualArchiveDestStatus::get_ls_end_scn_(const uint64_t tenant_id)
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||||
int64_t ls_id;
|
int64_t ls_id;
|
||||||
uint64_t end_scn;
|
uint64_t max_scn;
|
||||||
ObArchiveSCNValue *scn_value = NULL;
|
ObArchiveSCNValue *scn_value = NULL;
|
||||||
|
|
||||||
EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", ls_id, int64_t);
|
EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", ls_id, int64_t);
|
||||||
EXTRACT_UINT_FIELD_MYSQL(*result, "end_scn", end_scn, uint64_t);
|
EXTRACT_UINT_FIELD_MYSQL(*result, "max_scn", max_scn, uint64_t);
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(ls_end_map_.alloc_value(scn_value))) {
|
if (OB_FAIL(ls_end_map_.alloc_value(scn_value))) {
|
||||||
SERVER_LOG(WARN, "alloc_value fail", K(ret), K(ls_id), K(end_scn));
|
SERVER_LOG(WARN, "alloc_value fail", K(ret), K(ls_id), K(max_scn));
|
||||||
} else if (OB_ISNULL(scn_value)) {
|
} else if (OB_ISNULL(scn_value)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
SERVER_LOG(WARN, "scn_value is NULL", K(ret), K(ls_id), K(scn_value));
|
SERVER_LOG(WARN, "scn_value is NULL", K(ret), K(ls_id), K(scn_value));
|
||||||
} else if (OB_FAIL(scn_value->set(end_scn))) {
|
} else if (OB_FAIL(scn_value->set(max_scn))) {
|
||||||
SERVER_LOG(WARN, "scn_value set failed", K(ret), K(ls_id), K(end_scn));
|
SERVER_LOG(WARN, "scn_value set failed", K(ret), K(ls_id), K(max_scn));
|
||||||
} else if (OB_FAIL(ls_end_map_.insert_and_get(ObLSID(ls_id), scn_value))) {
|
} else if (OB_FAIL(ls_end_map_.insert_and_get(ObLSID(ls_id), scn_value))) {
|
||||||
SERVER_LOG(WARN, "ls_end_map insert and get failed", K(ret), K(ls_id), K(scn_value));
|
SERVER_LOG(WARN, "ls_end_map insert and get failed", K(ret), K(ls_id), K(scn_value));
|
||||||
} else {
|
} else {
|
||||||
|
SERVER_LOG(INFO, "get ls max_scn succ", K(ls_id), K(max_scn));
|
||||||
ls_end_map_.revert(scn_value);
|
ls_end_map_.revert(scn_value);
|
||||||
scn_value = NULL;
|
scn_value = NULL;
|
||||||
}
|
}
|
||||||
@ -346,7 +347,7 @@ int ObVirtualArchiveDestStatus::get_ls_end_scn_(const uint64_t tenant_id)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
SERVER_LOG(WARN, "failed to get ls end scn", K(ret));
|
SERVER_LOG(WARN, "failed to get ls max scn", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -507,29 +508,29 @@ int ObVirtualArchiveDestStatus::get_status_info_(const uint64_t tenant_id,
|
|||||||
int ObVirtualArchiveDestStatus::compare_scn_map_()
|
int ObVirtualArchiveDestStatus::compare_scn_map_()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObArchiveSCNValue *end_scn_val = NULL;
|
ObArchiveSCNValue *max_scn_val = NULL;
|
||||||
ObArchiveSCNValue *ckpt_scn_val = NULL;
|
ObArchiveSCNValue *ckpt_scn_val = NULL;
|
||||||
uint64_t end_scn;
|
uint64_t max_scn;
|
||||||
uint64_t ckpt_scn;
|
uint64_t ckpt_scn;
|
||||||
is_synced_ = true;
|
is_synced_ = true;
|
||||||
|
|
||||||
for (auto ls_id : ls_array_) {
|
for (auto ls_id : ls_array_) {
|
||||||
if (OB_FAIL(ls_end_map_.get(ObLSID(ls_id), end_scn_val))) {
|
if (OB_FAIL(ls_end_map_.get(ObLSID(ls_id), max_scn_val))) {
|
||||||
is_synced_ = false;
|
is_synced_ = false;
|
||||||
SERVER_LOG(WARN, "get ls end scn from ls_end_map failed", K(ls_id), K(ret));
|
SERVER_LOG(WARN, "get ls max scn from ls_end_map failed", K(ls_id), K(ret));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
ls_end_map_.revert(end_scn_val);
|
ls_end_map_.revert(max_scn_val);
|
||||||
if (OB_FAIL(ls_checkpoint_map_.get(ObLSID(ls_id), ckpt_scn_val))) {
|
if (OB_FAIL(ls_checkpoint_map_.get(ObLSID(ls_id), ckpt_scn_val))) {
|
||||||
is_synced_ = false;
|
is_synced_ = false;
|
||||||
SERVER_LOG(WARN, "get ls checkpoint scn from ls_checkpoint_map failed", K(ls_id), K(ret));
|
SERVER_LOG(WARN, "get ls checkpoint scn from ls_checkpoint_map failed", K(ls_id), K(ret));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
ls_checkpoint_map_.revert(ckpt_scn_val);
|
ls_checkpoint_map_.revert(ckpt_scn_val);
|
||||||
end_scn_val->get(end_scn);
|
max_scn_val->get(max_scn);
|
||||||
ckpt_scn_val->get(ckpt_scn);
|
ckpt_scn_val->get(ckpt_scn);
|
||||||
|
|
||||||
if (end_scn > ckpt_scn) {
|
if (max_scn > ckpt_scn) {
|
||||||
is_synced_ = false;
|
is_synced_ = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -70,8 +70,8 @@ private:
|
|||||||
int get_all_tenant_();
|
int get_all_tenant_();
|
||||||
// get all ls list
|
// get all ls list
|
||||||
int get_all_tenant_ls_(const uint64_t tenant_id);
|
int get_all_tenant_ls_(const uint64_t tenant_id);
|
||||||
// get ls end scn and store them into ls_end_map_
|
// get ls max scn and store them into ls_end_map_
|
||||||
int get_ls_end_scn_(const uint64_t tenant_id); // k means ls_id, v means end_scn
|
int get_ls_max_scn_(const uint64_t tenant_id); // k means ls_id, v means max_scn
|
||||||
// get ls checkpoint scn and store them into ls_checkpoint_map_
|
// get ls checkpoint scn and store them into ls_checkpoint_map_
|
||||||
int get_ls_checkpoint_scn_(const uint64_t tenant_id, const int64_t dest_id);
|
int get_ls_checkpoint_scn_(const uint64_t tenant_id, const int64_t dest_id);
|
||||||
int get_full_row_(const share::schema::ObTableSchema *table,
|
int get_full_row_(const share::schema::ObTableSchema *table,
|
||||||
@ -96,4 +96,4 @@ private:
|
|||||||
};
|
};
|
||||||
}//end namespace observer
|
}//end namespace observer
|
||||||
}//end namespace oceanbase
|
}//end namespace oceanbase
|
||||||
#endif //OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_ARCHIVE_DEST_STATUS_H_
|
#endif //OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_ARCHIVE_DEST_STATUS_H_
|
||||||
|
|||||||
Reference in New Issue
Block a user