make data version in log readable

This commit is contained in:
zhaoyiping0622 2024-07-16 14:37:08 +00:00 committed by ob-robot
parent 7d06184389
commit 5a0b7accb8
6 changed files with 69 additions and 50 deletions

View File

@ -569,14 +569,5 @@ int ObTenantDataVersionMgr::write_to_file_(char *buf, int64_t buf_length, int64_
return ret;
}
ObDataVersionPrinter::ObDataVersionPrinter(const uint64_t data_version)
: version_val_(data_version), version_str_{0}
{
if (OB_INVALID_INDEX ==
VersionUtil::print_version_str(version_str_, OB_SERVER_VERSION_LENGTH, data_version)) {
MEMSET(version_str_, 0, OB_SERVER_VERSION_LENGTH);
}
}
} // namespace common
} // namespace oceanbase

View File

@ -161,6 +161,4 @@ private:
} // namespace common
} // namespace oceanbase
#define ODV_MGR (::oceanbase::common::ObTenantDataVersionMgr::get_instance())
#define DVP(data_version) (::oceanbase::common::ObDataVersionPrinter(data_version))
#define KDV(data_version) "data_version", DVP(data_version)
#endif // OCEANBASE_OBSERVER_OB_TENANT_DATA_VERSION_H_

View File

@ -71,5 +71,14 @@ int64_t VersionUtil::print_version_str(char *buf, const int64_t buf_len, uint64_
return pos;
}
ObVersionPrinter::ObVersionPrinter(const uint64_t version)
: version_val_(version), version_str_{0}
{
if (OB_INVALID_INDEX ==
VersionUtil::print_version_str(version_str_, OB_SERVER_VERSION_LENGTH, version)) {
MEMSET(version_str_, 0, OB_SERVER_VERSION_LENGTH);
}
}
} // namespace common
} // namespace oceanbase

View File

@ -179,7 +179,24 @@ public:
static int64_t print_version_str(char *buf, const int64_t buf_len, uint64_t version);
};
class ObVersionPrinter
{
public:
ObVersionPrinter(const uint64_t version);
TO_STRING_KV(K_(version_str), K_(version_val));
private:
uint64_t version_val_;
char version_str_[OB_SERVER_VERSION_LENGTH];
};
} // namespace common
} // namespace oceanbase
#define VP(version) (::oceanbase::common::ObVersionPrinter(version))
#define DVP(version) VP(version)
#define CVP(version) VP(version)
// print data version in human readable way
#define KDV(x) #x, DVP(x)
// print cluster version in human readable way
#define KCV(x) #x, CVP(x)
#endif

View File

@ -324,7 +324,7 @@ int ObUpgradeExecutor::execute(
} else if (OB_FAIL(fill_extra_info_(tenant_id, version,
current_data_version, BUF_LEN, extra_buf))) {
LOG_WARN("fail to fill extra info", KR(ret),
K(tenant_id), K(version), K(current_data_version));
K(tenant_id), KDV(version), KDV(current_data_version));
} else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
job_id, job_type, *sql_proxy_, "tenant_id", tenant_id,
"extra_info", ObHexEscapeSqlStr(ObString(strlen(extra_buf), extra_buf))))) {
@ -336,7 +336,7 @@ int ObUpgradeExecutor::execute(
switch (action) {
case obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION: {
if (OB_FAIL(run_upgrade_post_job_(tenant_ids, version))) {
LOG_WARN("fail to run upgrade post job", KR(ret), K(version));
LOG_WARN("fail to run upgrade post job", KR(ret), KDV(version));
}
break;
}
@ -412,11 +412,11 @@ int ObUpgradeExecutor::execute(
if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
min_cluster_version_str, BUF_LEN, min_cluster_version)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("fail to print version str", KR(ret), K(min_cluster_version));
LOG_WARN("fail to print version str", KR(ret), KCV(min_cluster_version));
} else if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
targe_data_version_str, BUF_LEN, target_data_version)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("fail to print version str", KR(ret), K(target_data_version));
LOG_WARN("fail to print version str", KR(ret), KDV(target_data_version));
} else if (OB_FAIL(observer::ObService::get_build_version(build_version))) {
LOG_WARN("fail to get build version", KR(ret));
} else if (0 != tenant_id) {
@ -424,7 +424,7 @@ int ObUpgradeExecutor::execute(
if (OB_INVALID_INDEX == ObClusterVersion::print_version_str(
current_data_version_str, BUF_LEN, current_data_version)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("fail to print version str", KR(ret), K(current_data_version));
LOG_WARN("fail to print version str", KR(ret), KDV(current_data_version));
}
CLUSTER_EVENT_SYNC_ADD("UPGRADE",
ObRsJobTableOperator::get_job_type_str(job_type),
@ -481,7 +481,7 @@ int ObUpgradeExecutor::fill_extra_info_(
if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
version_buf, VERSION_LEN, target_data_version))) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("fail to print version", KR(ret), K(target_data_version));
LOG_WARN("fail to print version", KR(ret), KDV(target_data_version));
} else if (OB_FAIL(databuff_printf(buf, buf_len, len,
"TARGET_DATA_VERSION: '%s'", version_buf))) {
LOG_WARN("fail to print string", KR(ret), K(len));
@ -496,7 +496,7 @@ int ObUpgradeExecutor::fill_extra_info_(
} else if (OB_INVALID_INDEX == (version_len = ObClusterVersion::print_version_str(
version_buf, VERSION_LEN, current_data_version))) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("fail to print version", KR(ret), K(current_data_version));
LOG_WARN("fail to print version", KR(ret), KDV(current_data_version));
} else if (OB_FAIL(databuff_printf(buf, buf_len, len,
", CURRENT_DATA_VERSION: '%s'", version_buf))) {
LOG_WARN("fail to print string", KR(ret), K(len));
@ -519,14 +519,14 @@ int ObUpgradeExecutor::run_upgrade_post_job_(
LOG_WARN("executor should stopped", KR(ret));
} else if (!ObUpgradeChecker::check_data_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
LOG_WARN("unsupported version to run upgrade job", KR(ret), KDV(version));
} else {
ObBaseUpgradeProcessor *processor = NULL;
int64_t backup_ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(upgrade_processors_.get_processor_by_version(
version, processor))) {
LOG_WARN("fail to get processor by version", KR(ret), K(version));
LOG_WARN("fail to get processor by version", KR(ret), KDV(version));
} else {
for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
@ -534,16 +534,16 @@ int ObUpgradeExecutor::run_upgrade_post_job_(
int64_t current_version = processor->get_version();
processor->set_tenant_id(tenant_id);
FLOG_INFO("[UPGRADE] start to run post upgrade job by version",
K(tenant_id), K(current_version));
K(tenant_id), KDV(current_version));
if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_TMP_FAIL(processor->post_upgrade())) {
LOG_WARN("run post upgrade by version failed",
KR(tmp_ret), K(tenant_id), K(current_version));
KR(tmp_ret), K(tenant_id), KDV(current_version));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
}
FLOG_INFO("[UPGRADE] finish post upgrade job by version",
KR(tmp_ret), K(tenant_id), K(current_version),
KR(tmp_ret), K(tenant_id), KDV(current_version),
"cost", ObTimeUtility::current_time() - start_ts);
} // end for
}
@ -622,15 +622,15 @@ int ObUpgradeExecutor::run_upgrade_begin_action_(
if (OB_ERR_NULL_VALUE != ret) {
ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("current data version should be not exist",
KR(ret), K(tenant_id), K(current_data_version));
KR(ret), K(tenant_id), KDV(current_data_version));
} else if (OB_FAIL(proxy.update_current_data_version(DEFAULT_DATA_VERSION))) {
// overwrite ret
LOG_WARN("fail to init current data version",
KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
KR(ret), K(tenant_id), KDV(DEFAULT_DATA_VERSION));
} else {
target_data_version = DEFAULT_DATA_VERSION;
LOG_INFO("[UPGRADE] init missing current data version",
KR(ret), K(tenant_id), K(DEFAULT_DATA_VERSION));
KR(ret), K(tenant_id), KDV(DEFAULT_DATA_VERSION));
}
} else {
LOG_WARN("fail to get target data version", KR(ret), K(tenant_id));
@ -645,18 +645,18 @@ int ObUpgradeExecutor::run_upgrade_begin_action_(
if (OB_FAIL(ret)) {
} else if (target_data_version >= DATA_CURRENT_VERSION) {
LOG_INFO("[UPGRADE] target data version is new enough, just skip",
KR(ret), K(tenant_id), K(target_data_version));
KR(ret), K(tenant_id), KDV(target_data_version));
} else if (OB_FAIL(proxy.update_target_data_version(DATA_CURRENT_VERSION))) {
LOG_WARN("fail to update target data version",
KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
KR(ret), K(tenant_id), "version", DVP(DATA_CURRENT_VERSION));
} else if (is_user_tenant(tenant_id)
&& OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.write_upgrade_barrier_log(
trans, tenant_id, DATA_CURRENT_VERSION))) {
LOG_WARN("fail to write_upgrade_barrier_log",
KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
KR(ret), K(tenant_id), "version", DVP(DATA_CURRENT_VERSION));
} else {
LOG_INFO("[UPGRADE] update target data version",
KR(ret), K(tenant_id), "version", DATA_CURRENT_VERSION);
KR(ret), K(tenant_id), "version", DVP(DATA_CURRENT_VERSION));
}
}
if (trans.is_started()) {
@ -1071,11 +1071,15 @@ int ObUpgradeExecutor::run_upgrade_all_post_action_(
ObGlobalStatProxy proxy(*sql_proxy_, tenant_id);
if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
LOG_WARN("fail to get current data version",
KR(ret), K(tenant_id), K(current_data_version));
KR(ret), K(tenant_id), KDV(current_data_version));
} else if (!ObUpgradeChecker::check_data_version_exist(current_data_version)) {
// current_data_version not exists in UPGRADE_PATH, maybe you should refresh master code
ret = OB_ERR_UNDEFINED;
LOG_WARN("current_data_version not exists in UPGRADE_PATH", KR(ret), KDV(current_data_version));
} else if (OB_FAIL(upgrade_processors_.get_processor_idx_by_range(
current_data_version, DATA_CURRENT_VERSION,
start_idx, end_idx))) {
LOG_WARN("fail to get processor by version", KR(ret), K(current_data_version));
LOG_WARN("fail to get processor by version", KR(ret), KDV(current_data_version));
}
int64_t version = OB_INVALID_VERSION;
for (int64_t i = start_idx + 1; OB_SUCC(ret) && i <= end_idx; i++) {
@ -1089,15 +1093,15 @@ int ObUpgradeExecutor::run_upgrade_all_post_action_(
} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
} else if (OB_FAIL(processor->post_upgrade())) {
LOG_WARN("run post upgrade by version failed", KR(ret), K(tenant_id), K(version));
LOG_WARN("run post upgrade by version failed", KR(ret), K(tenant_id), KDV(version));
} else if (OB_FAIL(check_schema_sync_(tenant_id))) {
LOG_WARN("fail to check schema sync", KR(ret), K(tenant_id));
} else if (i < end_idx) {
if (OB_FAIL(proxy.update_current_data_version(version))) {
LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), K(version));
LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), KDV(version));
}
} else if (OB_FAIL(update_final_current_data_version_(tenant_id, version))) {
LOG_WARN("fail to update final current data version", KR(ret), K(tenant_id), K(version));
LOG_WARN("fail to update final current data version", KR(ret), K(tenant_id), KDV(version));
}
} // end for
}
@ -1117,11 +1121,11 @@ int ObUpgradeExecutor::update_final_current_data_version_(const uint64_t tenant_
} else {
ObGlobalStatProxy end_proxy(trans, tenant_id);
if (OB_FAIL(end_proxy.update_current_data_version(version))) {
LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), K(version));
LOG_WARN("fail to update current data version", KR(ret), K(tenant_id), KDV(version));
} else if (is_user_tenant(tenant_id) &&
OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.write_upgrade_data_version_barrier_log(
trans, tenant_id, version))) {
LOG_WARN("fail to write_upgrade_data_version_barrier_log", KR(ret), K(tenant_id), K(version));
LOG_WARN("fail to write_upgrade_data_version_barrier_log", KR(ret), K(tenant_id), KDV(version));
}
}
if (trans.is_started()) {
@ -1219,14 +1223,14 @@ int ObUpgradeExecutor::run_upgrade_end_action_(
|| target_data_version != DATA_CURRENT_VERSION) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("data_version not match, upgrade process should be run",
KR(ret), K(tenant_id), K(target_data_version), K(current_data_version));
KR(ret), K(tenant_id), KDV(target_data_version), KDV(current_data_version));
} else {
// target_data_version == current_data_version == DATA_CURRENT_VERSION
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get min data version", KR(ret), K(tenant_id));
} else if (data_version >= current_data_version) {
LOG_INFO("[UPGRADE] data version is not less than current data version, just skip",
K(tenant_id), K(data_version), K(current_data_version));
K(tenant_id), KDV(data_version), KDV(current_data_version));
} else {
HEAP_VAR(obrpc::ObAdminSetConfigItem, item) {
ObSchemaGetterGuard guard;
@ -1240,7 +1244,7 @@ int ObUpgradeExecutor::run_upgrade_end_action_(
if (pos <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("current_data_version is invalid",
KR(ret), K(tenant_id), K(current_data_version));
KR(ret), K(tenant_id), KDV(current_data_version));
} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
LOG_WARN("fail to get schema guard", KR(ret));
} else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant))) {

View File

@ -95,7 +95,7 @@ int ObUpgradeChecker::get_data_version_by_cluster_version(
#undef CONVERT_CLUSTER_VERSION_TO_DATA_VERSION
default: {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid cluster_version", KR(ret), K(cluster_version));
LOG_WARN("invalid cluster_version", KR(ret), KCV(cluster_version));
}
}
return ret;
@ -634,9 +634,9 @@ int ObUpgradeProcesserSet::init(
LOG_WARN("fail to new upgrade processor", KR(ret)); \
} else if (OB_FAIL(processor->init(version, mode, sql_proxy, oracle_sql_proxy, rpc_proxy, common_proxy, \
schema_service, check_server_provider))) { \
LOG_WARN("fail to init processor", KR(ret), K(version)); \
LOG_WARN("fail to init processor", KR(ret), KDV(version)); \
} else if (OB_FAIL(processor_list_.push_back(processor))) { \
LOG_WARN("fail to push back processor", KR(ret), K(version)); \
LOG_WARN("fail to push back processor", KR(ret), KDV(version)); \
} \
if (OB_FAIL(ret)) { \
if (OB_NOT_NULL(processor)) { \
@ -722,9 +722,9 @@ int ObUpgradeProcesserSet::get_processor_by_version(
int ret = OB_SUCCESS;
int64_t idx = OB_INVALID_INDEX;
if (OB_FAIL(get_processor_idx_by_version(version, idx))) {
LOG_WARN("fail to get processor idx by version", KR(ret), K(version));
LOG_WARN("fail to get processor idx by version", KR(ret), KDV(version));
} else if (OB_FAIL(get_processor_by_idx(idx, processor))) {
LOG_WARN("fail to get processor by idx", KR(ret), K(version));
LOG_WARN("fail to get processor by idx", KR(ret), KDV(version));
}
return ret;
}
@ -743,11 +743,11 @@ int ObUpgradeProcesserSet::get_processor_idx_by_range(
LOG_WARN("check inner stat failed", KR(ret));
} else if (start_version <= 0 || end_version <= 0 || start_version > end_version) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid version", KR(ret), K(start_version), K(end_version));
LOG_WARN("invalid version", KR(ret), KDV(start_version), KDV(end_version));
} else if (OB_FAIL(get_processor_idx_by_version(start_version, start_idx))) {
LOG_WARN("fail to get processor idx by version", KR(ret), K(start_version));
LOG_WARN("fail to get processor idx by version", KR(ret), KDV(start_version));
} else if (OB_FAIL(get_processor_idx_by_version(end_version, end_idx))) {
LOG_WARN("fail to get processor idx by version", KR(ret), K(end_version));
LOG_WARN("fail to get processor idx by version", KR(ret), KDV(end_version));
}
return ret;
}
@ -762,7 +762,7 @@ int ObUpgradeProcesserSet::get_processor_idx_by_version(
LOG_WARN("check inner stat failed", KR(ret));
} else if (version <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid version", KR(ret), K(version));
LOG_WARN("invalid version", KR(ret), KDV(version));
} else if (processor_list_.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("processor_list cnt shoud greator than 0", KR(ret));
@ -785,7 +785,7 @@ int ObUpgradeProcesserSet::get_processor_idx_by_version(
}
if (OB_SUCC(ret) && OB_INVALID_INDEX == idx) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to find processor by version", KR(ret), K(version));
LOG_WARN("fail to find processor by version", KR(ret), KDV(version));
}
}
return ret;