[CP] [OBCDC] Support refresh schema to specified schema_version in online schema mode while sync from standby tenant
This commit is contained in:
@ -443,11 +443,16 @@ int ObLogInstance::init_logger_()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef ENABLE_SANITY
|
||||||
|
const char *extra_flags = "";
|
||||||
|
#else
|
||||||
|
const char *extra_flags = "|Sanity";
|
||||||
|
#endif
|
||||||
_LOG_INFO("====================libobcdc start====================");
|
_LOG_INFO("====================libobcdc start====================");
|
||||||
_LOG_INFO("libobcdc %s %s", PACKAGE_VERSION, RELEASEID);
|
_LOG_INFO("libobcdc %s %s", PACKAGE_VERSION, RELEASEID);
|
||||||
_LOG_INFO("BUILD_VERSION: %s", build_version());
|
_LOG_INFO("BUILD_VERSION: %s", build_version());
|
||||||
_LOG_INFO("BUILD_TIME: %s %s", build_date(), build_time());
|
_LOG_INFO("BUILD_TIME: %s %s", build_date(), build_time());
|
||||||
_LOG_INFO("BUILD_FLAGS: %s", build_flags());
|
_LOG_INFO("BUILD_FLAGS: %s%s", build_flags(), extra_flags);
|
||||||
_LOG_INFO("Copyright (c) 2022 Ant Group Co., Ltd.");
|
_LOG_INFO("Copyright (c) 2022 Ant Group Co., Ltd.");
|
||||||
_LOG_INFO("======================================================");
|
_LOG_INFO("======================================================");
|
||||||
_LOG_INFO("\n");
|
_LOG_INFO("\n");
|
||||||
@ -460,10 +465,15 @@ int ObLogInstance::init_logger_()
|
|||||||
|
|
||||||
void ObLogInstance::print_version()
|
void ObLogInstance::print_version()
|
||||||
{
|
{
|
||||||
|
#ifndef ENABLE_SANITY
|
||||||
|
const char *extra_flags = "";
|
||||||
|
#else
|
||||||
|
const char *extra_flags = "|Sanity";
|
||||||
|
#endif
|
||||||
MPRINT("libobcdc %s %s", PACKAGE_VERSION, RELEASEID);
|
MPRINT("libobcdc %s %s", PACKAGE_VERSION, RELEASEID);
|
||||||
MPRINT("REVISION: %s", build_version());
|
MPRINT("REVISION: %s", build_version());
|
||||||
MPRINT("BUILD_TIME: %s %s", build_date(), build_time());
|
MPRINT("BUILD_TIME: %s %s", build_date(), build_time());
|
||||||
MPRINT("BUILD_FLAGS: %s\n", build_flags());
|
MPRINT("BUILD_FLAGS: %s%s\n", build_flags(), extra_flags);
|
||||||
MPRINT("Copyright (c) 2022 Ant Group Co., Ltd.");
|
MPRINT("Copyright (c) 2022 Ant Group Co., Ltd.");
|
||||||
MPRINT();
|
MPRINT();
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,11 @@ const char* build_flags();
|
|||||||
|
|
||||||
int so_main()
|
int so_main()
|
||||||
{
|
{
|
||||||
|
#ifndef ENABLE_SANITY
|
||||||
|
const char *extra_flags = "";
|
||||||
|
#else
|
||||||
|
const char *extra_flags = "|Sanity";
|
||||||
|
#endif
|
||||||
fprintf(stdout, "\n");
|
fprintf(stdout, "\n");
|
||||||
|
|
||||||
fprintf(stdout, "libobcdc (%s %s)\n", PACKAGE_STRING, RELEASEID);
|
fprintf(stdout, "libobcdc (%s %s)\n", PACKAGE_STRING, RELEASEID);
|
||||||
@ -32,7 +37,7 @@ int so_main()
|
|||||||
|
|
||||||
fprintf(stdout, "BUILD_VERSION: %s\n", build_version());
|
fprintf(stdout, "BUILD_VERSION: %s\n", build_version());
|
||||||
fprintf(stdout, "BUILD_TIME: %s %s\n", build_date(), build_time());
|
fprintf(stdout, "BUILD_TIME: %s %s\n", build_date(), build_time());
|
||||||
fprintf(stdout, "BUILD_FLAGS: %s\n", build_flags());
|
fprintf(stdout, "BUILD_FLAGS: %s%s\n", build_flags(), extra_flags);
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -744,7 +744,9 @@ int ObLogSchemaGetter::get_schema_guard_and_simple_table_schema_(
|
|||||||
// @retval OB_TENANT_HAS_BEEN_DROPPED Tenant has been dropped
|
// @retval OB_TENANT_HAS_BEEN_DROPPED Tenant has been dropped
|
||||||
// @retval OB_TIMEOUT Timeout
|
// @retval OB_TIMEOUT Timeout
|
||||||
// @retval Other error codes Fail
|
// @retval Other error codes Fail
|
||||||
int ObLogSchemaGetter::refresh_to_expected_version_(const uint64_t tenant_id,
|
int ObLogSchemaGetter::refresh_to_expected_version_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const bool specify_version_mode,
|
||||||
const int64_t expected_version,
|
const int64_t expected_version,
|
||||||
const int64_t timeout,
|
const int64_t timeout,
|
||||||
int64_t &latest_version)
|
int64_t &latest_version)
|
||||||
@ -769,23 +771,26 @@ int ObLogSchemaGetter::refresh_to_expected_version_(const uint64_t tenant_id,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
|
||||||
// fail
|
|
||||||
} else if (OB_INVALID_VERSION == latest_version || latest_version < expected_version) {
|
|
||||||
// If the tenant version is invalid, or if the desired schema version is not reached, then a refresh of the schema is requested
|
|
||||||
LOG_INFO("begin refresh schema to expected version", K(tenant_id), K(expected_version),
|
|
||||||
K(latest_version));
|
|
||||||
|
|
||||||
RETRY_ON_FAIL_WITH_TENANT_ID(tenant_id, timeout, schema_service_, auto_switch_mode_and_refresh_schema, tenant_id,
|
|
||||||
expected_version);
|
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
// Get the latest schema version again
|
// If the tenant version is invalid, or if the desired schema version is not reached, then a refresh of the schema is requested
|
||||||
RETRY_ON_FAIL_WITH_TENANT_ID(tenant_id, timeout, schema_service_, get_tenant_refreshed_schema_version, tenant_id, latest_version);
|
LOG_INFO("[SCHEMA_GETTER] begin refresh schema to expected version", K(tenant_id), K(expected_version), K(latest_version));
|
||||||
|
bool need_refresh_schema = (OB_INVALID_VERSION == latest_version || latest_version < expected_version);
|
||||||
|
const static int64_t retry_print_interval = 10 * _SEC_;
|
||||||
|
|
||||||
|
while (need_refresh_schema) {
|
||||||
|
if (TC_REACH_TIME_INTERVAL(retry_print_interval)) {
|
||||||
|
LOG_INFO("[SCHEMA_GETTER] waiting for schema guard to be refreshed", K(tenant_id), K(expected_version), K(specify_version_mode),
|
||||||
|
K(latest_version), "delta", latest_version - expected_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RETRY_ON_FAIL_WITH_TENANT_ID(tenant_id, timeout, schema_service_, auto_switch_mode_and_refresh_schema, tenant_id, expected_version);
|
||||||
|
// Get the latest schema version again
|
||||||
|
RETRY_ON_FAIL_WITH_TENANT_ID(tenant_id, timeout, schema_service_, get_tenant_refreshed_schema_version, tenant_id, latest_version);
|
||||||
|
|
||||||
|
need_refresh_schema = (OB_SUCCESS == ret) && ((OB_INVALID_VERSION == latest_version) || (specify_version_mode && latest_version < expected_version));
|
||||||
|
}
|
||||||
int64_t cost_time = get_timestamp() - start_time;
|
int64_t cost_time = get_timestamp() - start_time;
|
||||||
LOG_INFO("refresh schema to expected version", KR(ret), K(tenant_id),
|
LOG_INFO("[SCHEMA_GETTER] refresh schema to expected version", KR(ret), K(tenant_id),
|
||||||
K(latest_version), K(expected_version), "delta", latest_version - expected_version,
|
K(latest_version), K(expected_version), "delta", latest_version - expected_version,
|
||||||
"latest_version", TS_TO_STR(latest_version),
|
"latest_version", TS_TO_STR(latest_version),
|
||||||
"cost_time", TVAL_TO_STR(cost_time));
|
"cost_time", TVAL_TO_STR(cost_time));
|
||||||
@ -819,7 +824,7 @@ int ObLogSchemaGetter::get_schema_guard_(const uint64_t tenant_id,
|
|||||||
refreshed_version = OB_INVALID_VERSION;
|
refreshed_version = OB_INVALID_VERSION;
|
||||||
|
|
||||||
// First refresh the schema to ensure it is greater than or equal to expected_version
|
// First refresh the schema to ensure it is greater than or equal to expected_version
|
||||||
if (OB_FAIL(refresh_to_expected_version_(tenant_id, expected_version, timeout, refreshed_version))) {
|
if (OB_FAIL(refresh_to_expected_version_(tenant_id, specify_version_mode, expected_version, timeout, refreshed_version))) {
|
||||||
if (OB_TENANT_HAS_BEEN_DROPPED != ret && OB_TIMEOUT != ret) {
|
if (OB_TENANT_HAS_BEEN_DROPPED != ret && OB_TIMEOUT != ret) {
|
||||||
LOG_ERROR("refresh_to_expected_version_ fail", KR(ret), K(tenant_id), K(expected_version));
|
LOG_ERROR("refresh_to_expected_version_ fail", KR(ret), K(tenant_id), K(expected_version));
|
||||||
}
|
}
|
||||||
|
@ -651,7 +651,9 @@ private:
|
|||||||
IObLogSchemaGuard &guard,
|
IObLogSchemaGuard &guard,
|
||||||
const share::schema::ObMultiVersionSchemaService::RefreshSchemaMode refresh_schema_mode,
|
const share::schema::ObMultiVersionSchemaService::RefreshSchemaMode refresh_schema_mode,
|
||||||
int64_t &refreshed_version);
|
int64_t &refreshed_version);
|
||||||
int refresh_to_expected_version_(const uint64_t tenant_id,
|
int refresh_to_expected_version_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const bool specify_version_mode,
|
||||||
const int64_t expected_version,
|
const int64_t expected_version,
|
||||||
const int64_t timeout,
|
const int64_t timeout,
|
||||||
int64_t &latest_version);
|
int64_t &latest_version);
|
||||||
|
Reference in New Issue
Block a user