fix check_merge_interval_time in case when cluster has been restarted
This commit is contained in:
@ -31,6 +31,7 @@
|
|||||||
#include "share/ob_global_stat_proxy.h"
|
#include "share/ob_global_stat_proxy.h"
|
||||||
#include "share/ob_service_epoch_proxy.h"
|
#include "share/ob_service_epoch_proxy.h"
|
||||||
#include "share/ob_column_checksum_error_operator.h"
|
#include "share/ob_column_checksum_error_operator.h"
|
||||||
|
#include "share/ob_server_table_operator.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -784,6 +785,8 @@ void ObMajorMergeScheduler::check_merge_interval_time(const bool is_merging)
|
|||||||
int64_t global_last_merged_time = -1;
|
int64_t global_last_merged_time = -1;
|
||||||
int64_t global_merge_start_time = -1;
|
int64_t global_merge_start_time = -1;
|
||||||
int64_t max_merge_time = -1;
|
int64_t max_merge_time = -1;
|
||||||
|
int64_t start_service_time = -1;
|
||||||
|
int64_t all_service_time = -1;
|
||||||
if (OB_ISNULL(zone_merge_mgr_)) {
|
if (OB_ISNULL(zone_merge_mgr_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("zone_merge_mgr_ is unexpected nullptr", KR(ret), K_(tenant_id));
|
LOG_WARN("zone_merge_mgr_ is unexpected nullptr", KR(ret), K_(tenant_id));
|
||||||
@ -805,12 +808,25 @@ void ObMajorMergeScheduler::check_merge_interval_time(const bool is_merging)
|
|||||||
} else {
|
} else {
|
||||||
max_merge_time = MAX(global_last_merged_time, global_merge_start_time);
|
max_merge_time = MAX(global_last_merged_time, global_merge_start_time);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && !is_paused()) {
|
if (OB_SUCC(ret)) {
|
||||||
|
ObServerTableOperator st_operator;
|
||||||
|
if (OB_FAIL(st_operator.init(sql_proxy_))) {
|
||||||
|
LOG_WARN("fail to init server table operator", K(ret), K_(tenant_id));
|
||||||
|
} else if (OB_FAIL(st_operator.get_start_service_time(GCONF.self_addr_, start_service_time))) {
|
||||||
|
LOG_WARN("fail to get start service time", KR(ret), K_(tenant_id));
|
||||||
|
} else {
|
||||||
|
all_service_time = now - start_service_time;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// LOG_ERROR should satisfy one additional condition: all_service_time > MAX_NO_MERGE_INTERVAL.
|
||||||
|
// So as to avoid LOG_ERROR when the tenant miss daily merge due to the cluster restarted.
|
||||||
|
if (OB_SUCC(ret) && !is_paused() && (all_service_time > MAX_NO_MERGE_INTERVAL)) {
|
||||||
if (is_merging) {
|
if (is_merging) {
|
||||||
if ((now - max_merge_time) > MAX_NO_MERGE_INTERVAL) {
|
if ((now - max_merge_time) > MAX_NO_MERGE_INTERVAL) {
|
||||||
if (TC_REACH_TIME_INTERVAL(30 * 60 * 1000 * 1000)) {
|
if (TC_REACH_TIME_INTERVAL(30 * 60 * 1000 * 1000)) {
|
||||||
LOG_ERROR("long time major freeze not finish, please check it", KR(ret), K(global_last_merged_time),
|
LOG_ERROR("long time major freeze not finish, please check it", KR(ret),
|
||||||
K(global_merge_start_time), K(max_merge_time), K(now), K_(tenant_id), K(is_merging));
|
K(global_last_merged_time), K(global_merge_start_time), K(max_merge_time),
|
||||||
|
K(now), K_(tenant_id), K(is_merging), K(start_service_time), K(all_service_time));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -822,8 +838,9 @@ void ObMajorMergeScheduler::check_merge_interval_time(const bool is_merging)
|
|||||||
(GCONF.enable_major_freeze) &&
|
(GCONF.enable_major_freeze) &&
|
||||||
(!tenant_config->major_freeze_duty_time.disable())) {
|
(!tenant_config->major_freeze_duty_time.disable())) {
|
||||||
if (TC_REACH_TIME_INTERVAL(30 * 60 * 1000 * 1000)) {
|
if (TC_REACH_TIME_INTERVAL(30 * 60 * 1000 * 1000)) {
|
||||||
LOG_ERROR("long time no major freeze, please check it", KR(ret), K(global_last_merged_time),
|
LOG_ERROR("long time no major freeze, please check it", KR(ret),
|
||||||
K(global_merge_start_time), K(max_merge_time), K(now), K_(tenant_id), K(is_merging));
|
K(global_last_merged_time), K(global_merge_start_time), K(max_merge_time),
|
||||||
|
K(now), K_(tenant_id), K(is_merging), K(start_service_time), K(all_service_time));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -456,6 +456,53 @@ int ObServerTableOperator::update_with_partition(const common::ObAddr &server,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObServerTableOperator::get_start_service_time(
|
||||||
|
const common::ObAddr &server,
|
||||||
|
int64_t &start_service_time) const
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
char svr_ip[OB_IP_STR_BUFF] = "";
|
||||||
|
if (!inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("not init", K(ret));
|
||||||
|
} else if (!server.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", K(ret), K(server));
|
||||||
|
} else if (false == server.ip_to_string(svr_ip, sizeof(svr_ip))) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("convert server ip to string failed", K(ret), K(server));
|
||||||
|
} else {
|
||||||
|
ObSqlString sql;
|
||||||
|
ObTimeoutCtx ctx;
|
||||||
|
if (OB_FAIL(ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
|
||||||
|
LOG_WARN("fail to get timeout ctx", K(ret), K(ctx));
|
||||||
|
} else if (OB_FAIL(sql.assign_fmt("SELECT start_service_time FROM %s WHERE svr_ip = '%s' AND"
|
||||||
|
" svr_port = %d", OB_ALL_SERVER_TNAME, svr_ip, server.get_port()))) {
|
||||||
|
LOG_WARN("fail to append sql", K(ret));
|
||||||
|
} else {
|
||||||
|
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
|
int tmp_ret = OB_SUCCESS;
|
||||||
|
ObMySQLResult *result = NULL;
|
||||||
|
if (OB_FAIL(proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
|
||||||
|
LOG_WARN("fail to execute sql", K(sql), K(ret));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("fail to get sql result", K(sql), K(ret));
|
||||||
|
} else if (OB_FAIL(result->next())) {
|
||||||
|
LOG_WARN("fail to get next", KR(ret), K(sql));;
|
||||||
|
} else {
|
||||||
|
EXTRACT_INT_FIELD_MYSQL(*result, "start_service_time", start_service_time, int64_t);
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret) && (OB_ITER_END != (tmp_ret = result->next()))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("get more row than one", KR(ret), KR(tmp_ret), K(sql));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
}//end namespace rootserver
|
}//end namespace rootserver
|
||||||
}//end namespace oceanbase
|
}//end namespace oceanbase
|
||||||
|
|
||||||
|
|||||||
@ -53,6 +53,7 @@ public:
|
|||||||
virtual int update_stop_time(const common::ObAddr &server,
|
virtual int update_stop_time(const common::ObAddr &server,
|
||||||
const int64_t stop_time);
|
const int64_t stop_time);
|
||||||
virtual int update_with_partition(const common::ObAddr &server, bool with_partition);
|
virtual int update_with_partition(const common::ObAddr &server, bool with_partition);
|
||||||
|
int get_start_service_time(const common::ObAddr &server, int64_t &start_service_time) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int build_server_status(const common::sqlclient::ObMySQLResult &res,
|
int build_server_status(const common::sqlclient::ObMySQLResult &res,
|
||||||
|
|||||||
Reference in New Issue
Block a user