fix try_gc_tablet_checksum about timeout and frequency
This commit is contained in:
parent
6236cfcdcf
commit
c411be8e33
@ -37,7 +37,9 @@ ObDailyMajorFreezeLauncher::ObDailyMajorFreezeLauncher()
|
||||
already_launch_(false),
|
||||
config_(nullptr),
|
||||
gc_freeze_info_last_timestamp_(0),
|
||||
freeze_info_mgr_(nullptr)
|
||||
freeze_info_mgr_(nullptr),
|
||||
last_check_tablet_ckm_us_(0),
|
||||
tablet_ckm_gc_compaction_scn_(SCN::invalid_scn())
|
||||
{
|
||||
}
|
||||
|
||||
@ -56,6 +58,8 @@ int ObDailyMajorFreezeLauncher::init(
|
||||
config_ = &config;
|
||||
gc_freeze_info_last_timestamp_ = ObTimeUtility::current_time();
|
||||
freeze_info_mgr_ = &freeze_info_manager;
|
||||
last_check_tablet_ckm_us_ = ObTimeUtility::current_time();
|
||||
tablet_ckm_gc_compaction_scn_ = SCN::invalid_scn();
|
||||
sql_proxy_ = &proxy;
|
||||
already_launch_ = false;
|
||||
is_inited_ = true;
|
||||
@ -195,37 +199,56 @@ int ObDailyMajorFreezeLauncher::try_gc_freeze_info()
|
||||
int ObDailyMajorFreezeLauncher::try_gc_tablet_checksum()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// keep 30 days for tablet_checksum whose (tablet_id, ls_id) is (1, 1)
|
||||
const int64_t MAX_KEEP_INTERVAL_NS = 30 * 24 * 60 * 60 * 1000L * 1000L * 1000L; // 30 day
|
||||
const int64_t MIN_RESERVED_COUNT = 8;
|
||||
SCN cur_gts_scn;
|
||||
SCN min_keep_compaction_scn;
|
||||
int64_t now = ObTimeUtility::current_time();
|
||||
const static int64_t BATCH_DELETE_CNT = 2000;
|
||||
|
||||
if (OB_UNLIKELY(IS_NOT_INIT || OB_ISNULL(sql_proxy_))) {
|
||||
if (OB_UNLIKELY(IS_NOT_INIT || OB_ISNULL(sql_proxy_) || OB_ISNULL(freeze_info_mgr_))) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret), K_(is_inited));
|
||||
LOG_WARN("not init", KR(ret), K_(is_inited), KP(sql_proxy_), KP(freeze_info_mgr_));
|
||||
} else {
|
||||
ObMySQLTransaction trans;
|
||||
SMART_VAR(ObArray<SCN>, all_compaction_scn) {
|
||||
if (OB_FAIL(trans.start(sql_proxy_, tenant_id_))) {
|
||||
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::load_all_compaction_scn(trans,
|
||||
tenant_id_, all_compaction_scn))) {
|
||||
// 1. load all distinct compaction_scn, when reach 30 min interval time and no valid
|
||||
// tablet_ckm_gc_compaction_scn exists
|
||||
if (((now - last_check_tablet_ckm_us_) < TABLET_CKM_CHECK_INTERVAL_US)
|
||||
|| tablet_ckm_gc_compaction_scn_.is_valid()) {
|
||||
// do nothing, so as to decrease the frequency of load all distinct compaction_scn
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::load_all_compaction_scn(*sql_proxy_,
|
||||
tenant_id_, all_compaction_scn))) {
|
||||
LOG_WARN("fail to load all compaction scn", KR(ret), K_(tenant_id));
|
||||
} else if (all_compaction_scn.count() > MIN_RESERVED_COUNT) {
|
||||
const int64_t snapshot_ver_cnt = all_compaction_scn.count();
|
||||
const SCN &gc_snapshot_scn = all_compaction_scn.at(snapshot_ver_cnt - MIN_RESERVED_COUNT - 1);
|
||||
|
||||
if (OB_FAIL(ObTabletChecksumOperator::delete_tablet_checksum_items(trans, tenant_id_, gc_snapshot_scn,
|
||||
BATCH_DELETE_CNT))) {
|
||||
LOG_WARN("fail to delete tablet checksum items", KR(ret), K_(tenant_id), K(gc_snapshot_scn));
|
||||
} else {
|
||||
last_check_tablet_ckm_us_ = now;
|
||||
// 2. check if need gc tablet_checksum
|
||||
if (all_compaction_scn.count() > MIN_RESERVED_COUNT) {
|
||||
const int64_t compaction_scn_cnt = all_compaction_scn.count();
|
||||
tablet_ckm_gc_compaction_scn_ = all_compaction_scn.at(compaction_scn_cnt - MIN_RESERVED_COUNT - 1);
|
||||
if (OB_FAIL(freeze_info_mgr_->get_gts(cur_gts_scn))) {
|
||||
LOG_WARN("fail to get_gts", KR(ret), K_(tenant_id));
|
||||
} else {
|
||||
min_keep_compaction_scn = SCN::minus(cur_gts_scn, MAX_KEEP_INTERVAL_NS);
|
||||
const SCN special_tablet_ckm_gc_compaction_scn = MIN(min_keep_compaction_scn, tablet_ckm_gc_compaction_scn_);
|
||||
if (OB_FAIL(ObTabletChecksumOperator::delete_special_tablet_checksum_items(*sql_proxy_,
|
||||
tenant_id_, special_tablet_ckm_gc_compaction_scn))) {
|
||||
LOG_WARN("fail to delete special tablet checksum items", KR(ret), K_(tenant_id),
|
||||
K(special_tablet_ckm_gc_compaction_scn));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
||||
ret = ((OB_SUCC(ret)) ? tmp_ret : ret);
|
||||
LOG_WARN("fail to end trans", "is_commit", OB_SUCCESS == ret, KR(tmp_ret));
|
||||
// 3. gc tablet_checksum if need
|
||||
if (OB_SUCC(ret) && tablet_ckm_gc_compaction_scn_.is_valid()) {
|
||||
int64_t affected_rows = 0;
|
||||
if (OB_FAIL(ObTabletChecksumOperator::delete_tablet_checksum_items(*sql_proxy_, tenant_id_,
|
||||
tablet_ckm_gc_compaction_scn_, BATCH_DELETE_CNT, affected_rows))) {
|
||||
LOG_WARN("fail to delete tablet checksum items", KR(ret), K_(tenant_id), K_(tablet_ckm_gc_compaction_scn));
|
||||
} else if (0 == affected_rows) {
|
||||
// already delete all tablet_checksum with comapction_scn <= tablet_ckm_gc_compaction_scn_
|
||||
tablet_ckm_gc_compaction_scn_.set_invalid();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "share/ob_define.h"
|
||||
#include "lib/net/ob_addr.h"
|
||||
#include "rootserver/freeze/ob_freeze_reentrant_thread.h"
|
||||
#include "share/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -59,12 +60,15 @@ private:
|
||||
static const int64_t LAUNCHER_INTERVAL_US = 5 * 1000 * 1000; // 5s
|
||||
static const int64_t MAJOR_FREEZE_RETRY_INTERVAL_US = 1000 * 1000; // 1s
|
||||
static const int64_t MODIFY_GC_INTERVAL = 24 * 60 * 60 * 1000 * 1000L; // 1 day
|
||||
static const int64_t TABLET_CKM_CHECK_INTERVAL_US = 30 * 60 * 1000 * 1000L; // 30 min
|
||||
|
||||
bool is_inited_;
|
||||
bool already_launch_;
|
||||
common::ObServerConfig *config_;
|
||||
int64_t gc_freeze_info_last_timestamp_;
|
||||
ObFreezeInfoManager *freeze_info_mgr_;
|
||||
int64_t last_check_tablet_ckm_us_;
|
||||
share::SCN tablet_ckm_gc_compaction_scn_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObDailyMajorFreezeLauncher);
|
||||
};
|
||||
|
@ -108,6 +108,7 @@ public:
|
||||
int adjust_global_merge_info(const int64_t expected_epoch);
|
||||
|
||||
void reset_freeze_info();
|
||||
int get_gts(share::SCN >s_scn) const;
|
||||
|
||||
private:
|
||||
int inner_reload(ObFreezeInfo &freeze_info);
|
||||
@ -119,7 +120,6 @@ private:
|
||||
|
||||
int set_local_snapshot_gc_scn(const share::SCN &new_scn);
|
||||
|
||||
int get_gts(share::SCN >s_scn) const;
|
||||
int get_schema_version(const share::SCN &frozen_scn, int64_t &schema_version) const;
|
||||
|
||||
int get_min_freeze_info(share::ObSimpleFrozenStatus &frozen_status);
|
||||
|
@ -13,6 +13,7 @@
|
||||
#define USING_LOG_PREFIX SHARE
|
||||
|
||||
#include "share/ob_tablet_checksum_operator.h"
|
||||
#include "share/config/ob_server_config.h"
|
||||
#include "share/inner_table/ob_inner_table_schema_constants.h"
|
||||
#include "share/tablet/ob_tablet_to_ls_operator.h"
|
||||
#include "share/ob_freeze_info_proxy.h"
|
||||
@ -21,6 +22,7 @@
|
||||
#include "lib/mysqlclient/ob_mysql_proxy.h"
|
||||
#include "lib/string/ob_sql_string.h"
|
||||
#include "common/ob_smart_var.h"
|
||||
#include "common/ob_timeout_ctx.h"
|
||||
#include "lib/mysqlclient/ob_mysql_transaction.h"
|
||||
|
||||
namespace oceanbase
|
||||
@ -495,11 +497,12 @@ int ObTabletChecksumOperator::delete_tablet_checksum_items(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const SCN &gc_compaction_scn,
|
||||
const int64_t limit_cnt)
|
||||
const int64_t limit_cnt,
|
||||
int64_t &affected_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
affected_rows = 0;
|
||||
const uint64_t extract_tenant_id = 0;
|
||||
const uint64_t gc_scn_val = gc_compaction_scn.is_valid() ? gc_compaction_scn.get_val_for_inner_table_field() : 0;
|
||||
if (OB_UNLIKELY((!is_valid_tenant_id(tenant_id)))
|
||||
@ -518,6 +521,32 @@ int ObTabletChecksumOperator::delete_tablet_checksum_items(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::delete_special_tablet_checksum_items(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const SCN &gc_compaction_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
const uint64_t extract_tenant_id = 0;
|
||||
const uint64_t gc_scn_val = gc_compaction_scn.is_valid() ? gc_compaction_scn.get_val_for_inner_table_field() : 0;
|
||||
if (OB_UNLIKELY((!is_valid_tenant_id(tenant_id)))
|
||||
|| (!gc_compaction_scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(gc_compaction_scn));
|
||||
} else if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE tenant_id = '%lu' AND compaction_scn <= %lu"
|
||||
" AND tablet_id=%ld AND ls_id=%ld", OB_ALL_TABLET_CHECKSUM_TNAME, extract_tenant_id,
|
||||
gc_scn_val, ObTabletID::MIN_VALID_TABLET_ID, ObLSID::SYS_LS_ID))) {
|
||||
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(gc_compaction_scn));
|
||||
} else if (OB_FAIL(sql_client.write(tenant_id, sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(sql));
|
||||
} else {
|
||||
LOG_INFO("succ to delete special tablet checksum items", K(tenant_id), K(gc_compaction_scn), K(affected_rows));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::delete_tablet_checksum_items(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
@ -561,19 +590,30 @@ int ObTabletChecksumOperator::delete_tablet_checksum_items(
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::load_all_compaction_scn(
|
||||
ObISQLClient &sql_client,
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
ObIArray<SCN> &compaction_scn_arr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t estimated_timeout_us = 0;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
int64_t start_time_us = ObTimeUtility::current_time();
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = nullptr;
|
||||
if (OB_FAIL(sql.assign_fmt("SELECT DISTINCT compaction_scn as dis_compaction_scn FROM %s"
|
||||
// set trx_timeout and query_timeout based on tablet_cnt
|
||||
if (OB_FAIL(ObTabletChecksumOperator::get_estimated_timeout_us(sql_client, tenant_id,
|
||||
estimated_timeout_us))) {
|
||||
LOG_WARN("fail to get estimated_timeout_us", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(estimated_timeout_us))) {
|
||||
LOG_WARN("fail to set trx timeout", KR(ret), K(estimated_timeout_us));
|
||||
} else if (OB_FAIL(timeout_ctx.set_timeout(estimated_timeout_us))) {
|
||||
LOG_WARN("fail to set abs timeout", KR(ret), K(estimated_timeout_us));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT DISTINCT compaction_scn as dis_compaction_scn FROM %s"
|
||||
" WHERE tenant_id = 0 ORDER BY compaction_scn ASC", OB_ALL_TABLET_CHECKSUM_TNAME))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
||||
@ -609,7 +649,10 @@ int ObTabletChecksumOperator::load_all_compaction_scn(
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
int64_t cost_time_us = ObTimeUtility::current_time() - start_time_us;
|
||||
LOG_INFO("finish to load all compaction_scn", KR(ret), K(tenant_id), K(cost_time_us),
|
||||
K(estimated_timeout_us), K(sql), K(compaction_scn_arr));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(
|
||||
@ -660,5 +703,54 @@ int ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::get_tablet_cnt(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
int64_t &tablet_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObISQLClient::ReadResult, res) {
|
||||
ObMySQLResult *result = nullptr;
|
||||
if (OB_FAIL(sql.append_fmt("SELECT COUNT(*) as cnt from %s", OB_ALL_TABLET_CHECKSUM_TNAME))) {
|
||||
LOG_WARN("failed to append fmt", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get mysql result", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("get next result failed", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "cnt", tablet_cnt, int64_t);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::get_estimated_timeout_us(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
int64_t &estimated_timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t tablet_cnt = 0;
|
||||
if (OB_FAIL(ObTabletChecksumOperator::get_tablet_cnt(sql_client, tenant_id, tablet_cnt))) {
|
||||
LOG_WARN("fail to get tablet replica cnt", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
estimated_timeout_us = tablet_cnt * 1000L; // 1ms for each tablet
|
||||
const int64_t default_timeout_us = 9 * 1000 * 1000L;
|
||||
estimated_timeout_us = MAX(estimated_timeout_us, default_timeout_us);
|
||||
estimated_timeout_us = MIN(estimated_timeout_us, 3600 * 1000 * 1000L);
|
||||
estimated_timeout_us = MAX(estimated_timeout_us, GCONF.rpc_timeout);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
||||
|
@ -99,13 +99,19 @@ public:
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
common::ObIArray<ObTabletChecksumItem> &items);
|
||||
// delete limited records whose 'snapshot_version' <= min_snapshot_version
|
||||
// delete records whose compaction_scn <= @gc_compaction_scn and (tablet_id, ls_id) is (1, 1)
|
||||
static int delete_special_tablet_checksum_items(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const SCN &gc_compaction_scn);
|
||||
// delete limited records whose compaction_scn <= @gc_compaction_scn
|
||||
// , while the record of whose (tablet_id, ls_id) is (1, 1) can't be deleted.
|
||||
static int delete_tablet_checksum_items(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const SCN &gc_compaction_scn,
|
||||
const int64_t limit_cnt);
|
||||
const int64_t limit_cnt,
|
||||
int64_t &affected_rows);
|
||||
static int delete_tablet_checksum_items(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
@ -139,6 +145,14 @@ private:
|
||||
const uint64_t tenant_id,
|
||||
common::ObIArray<ObTabletChecksumItem> &items,
|
||||
const bool is_update);
|
||||
static int get_tablet_cnt(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
int64_t &tablet_cnt);
|
||||
static int get_estimated_timeout_us(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
int64_t &estimated_timeout_us);
|
||||
|
||||
private:
|
||||
const static int64_t MAX_BATCH_COUNT = 99;
|
||||
|
Loading…
x
Reference in New Issue
Block a user