[LogService] Support the ls delay GC
This commit is contained in:
@ -8,8 +8,6 @@
|
|||||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
* See the Mulan PubL v2 for more details.
|
* See the Mulan PubL v2 for more details.
|
||||||
*
|
|
||||||
* Sequencer: sequence trans.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define USING_LOG_PREFIX OBLOG_SEQUENCER
|
#define USING_LOG_PREFIX OBLOG_SEQUENCER
|
||||||
@ -382,7 +380,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG("handle_to_be_sequenced_trans_ end", KR(ret), K(trans_id));
|
LOG_TRACE("handle_to_be_sequenced_trans_ end", KR(ret), K(trans_id), K(trx_sort_elem));
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -8,8 +8,6 @@
|
|||||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
* See the Mulan PubL v2 for more details.
|
* See the Mulan PubL v2 for more details.
|
||||||
*
|
|
||||||
* Sequencer: sequence trans.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef OCEANBASE_LIBOBCDC_SEQUENCER_H__
|
#ifndef OCEANBASE_LIBOBCDC_SEQUENCER_H__
|
||||||
|
@ -45,6 +45,7 @@ public:
|
|||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
// SELECT SVR_IP, SVR_PORT, ROLE, BEGIN_LSN, END_LSN FROM GV$OB_LOG_STAT
|
||||||
int get_ls_log_info(
|
int get_ls_log_info(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "ob_log_handler.h"
|
#include "ob_log_handler.h"
|
||||||
#include "ob_log_service.h"
|
#include "ob_log_service.h"
|
||||||
#include "ob_switch_leader_adapter.h"
|
#include "ob_switch_leader_adapter.h"
|
||||||
|
#include "common_util/ob_log_time_utils.h"
|
||||||
#include "archiveservice/ob_archive_service.h"
|
#include "archiveservice/ob_archive_service.h"
|
||||||
#include "share/scn.h"
|
#include "share/scn.h"
|
||||||
#include "rpc/obrpc/ob_rpc_net_handler.h"
|
#include "rpc/obrpc/ob_rpc_net_handler.h"
|
||||||
@ -617,11 +618,10 @@ void ObGCHandler::try_check_and_set_wait_gc_(ObGarbageCollector::LSStatus &ls_st
|
|||||||
} else if (OB_FAIL(check_if_tenant_in_archive_(tenant_in_archive))) {
|
} else if (OB_FAIL(check_if_tenant_in_archive_(tenant_in_archive))) {
|
||||||
CLOG_LOG(WARN, "check_if_tenant_in_archive_ failed", K(ret), K(ls_id), K(gc_state));
|
CLOG_LOG(WARN, "check_if_tenant_in_archive_ failed", K(ret), K(ls_id), K(gc_state));
|
||||||
} else if (! tenant_in_archive) {
|
} else if (! tenant_in_archive) {
|
||||||
if (OB_FAIL(ls_->set_gc_state(LSGCState::WAIT_GC))) {
|
if (OB_FAIL(try_check_and_set_wait_gc_when_log_archive_is_off_(gc_state, readable_scn, offline_scn, ls_status))) {
|
||||||
CLOG_LOG(WARN, "set_gc_state failed", K(ls_id), K(gc_state), K(ret));
|
CLOG_LOG(WARN, "try_check_and_set_wait_gc_when_log_archive_is_off_ failed", K(ret), K(ls_id), K(gc_state),
|
||||||
|
K(readable_scn), K(offline_scn), K(ls_status));
|
||||||
}
|
}
|
||||||
ls_status = ObGarbageCollector::LSStatus::LS_NEED_DELETE_ENTRY;
|
|
||||||
CLOG_LOG(INFO, "try_check_and_set_wait_gc_ success", K(ls_id), K(gc_state), K(offline_scn), K(readable_scn));
|
|
||||||
} else if (OB_FAIL(archive_service->get_ls_archive_progress(ls_id, lsn, scn, force_wait, ignore))){
|
} else if (OB_FAIL(archive_service->get_ls_archive_progress(ls_id, lsn, scn, force_wait, ignore))){
|
||||||
CLOG_LOG(WARN, "get_ls_archive_progress failed", K(ls_id), K(gc_state), K(offline_scn), K(ret));
|
CLOG_LOG(WARN, "get_ls_archive_progress failed", K(ls_id), K(gc_state), K(offline_scn), K(ret));
|
||||||
} else if (ignore) {
|
} else if (ignore) {
|
||||||
@ -641,6 +641,96 @@ void ObGCHandler::try_check_and_set_wait_gc_(ObGarbageCollector::LSStatus &ls_st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObGCHandler::try_check_and_set_wait_gc_when_log_archive_is_off_(
|
||||||
|
const LSGCState &gc_state,
|
||||||
|
const share::SCN &readable_scn,
|
||||||
|
const share::SCN &offline_scn,
|
||||||
|
ObGarbageCollector::LSStatus &ls_status)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (OB_ISNULL(ls_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
CLOG_LOG(WARN, "ls_ is nullptr", KR(ret));
|
||||||
|
} else {
|
||||||
|
const uint64_t tenant_id = MTL_ID();
|
||||||
|
int tmp_ret = OB_SUCCESS;
|
||||||
|
bool is_tenant_dropping_or_dropped = false;
|
||||||
|
|
||||||
|
if (OB_SUCCESS != (tmp_ret = check_if_tenant_is_dropping_or_dropped_(tenant_id, is_tenant_dropping_or_dropped))) {
|
||||||
|
CLOG_LOG(WARN, "check_if_tenant_has_been_dropped_ failed", K(tmp_ret), K(tenant_id), K(ls_id));
|
||||||
|
} else if (is_tenant_dropping_or_dropped) {
|
||||||
|
// The LS delay deletion mechanism will no longer take effect when the tenant is dropped.
|
||||||
|
if (OB_FAIL(ls_->set_gc_state(LSGCState::WAIT_GC))) {
|
||||||
|
CLOG_LOG(WARN, "set_gc_state failed", K(ls_id), K(gc_state), K(ret));
|
||||||
|
}
|
||||||
|
ls_status = ObGarbageCollector::LSStatus::LS_NEED_DELETE_ENTRY;
|
||||||
|
CLOG_LOG(INFO, "Tenant is dropped and the log stream can be removed, try_check_and_set_wait_gc_ success",
|
||||||
|
K(tenant_id), K(ls_id), K(gc_state), K(offline_scn), K(readable_scn));
|
||||||
|
} else {
|
||||||
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||||
|
// The LS delay deletion mechanism will take effect when the tenant is not dropped.
|
||||||
|
if (! tenant_config.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
CLOG_LOG(WARN, "tenant_config is not valid", K(ret), K(tenant_id));
|
||||||
|
} else if (OB_UNLIKELY(OB_INVALID_TIMESTAMP == gc_start_ts_)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
CLOG_LOG(WARN, "gc_start_ts_ is not valid", KR(ret), K(ls_), K(gc_start_ts_));
|
||||||
|
} else {
|
||||||
|
const int64_t ls_gc_delay_time = tenant_config->ls_gc_delay_time;
|
||||||
|
const int64_t current_time_us = common::ObTimeUtility::current_time();
|
||||||
|
|
||||||
|
if ((current_time_us - gc_start_ts_) >= ls_gc_delay_time) {
|
||||||
|
if (OB_FAIL(ls_->set_gc_state(LSGCState::WAIT_GC))) {
|
||||||
|
CLOG_LOG(WARN, "set_gc_state failed", K(ls_id), K(gc_state), K(ret));
|
||||||
|
}
|
||||||
|
ls_status = ObGarbageCollector::LSStatus::LS_NEED_DELETE_ENTRY;
|
||||||
|
CLOG_LOG(INFO, "The log stream can be removed, try_check_and_set_wait_gc_ success",
|
||||||
|
K(ls_id), K(gc_state), K(offline_scn), K(readable_scn), K(ls_gc_delay_time));
|
||||||
|
} else {
|
||||||
|
CLOG_LOG(INFO, "The log stream requires delayed gc", K(ls_id),
|
||||||
|
K(ls_gc_delay_time), K(gc_start_ts_),
|
||||||
|
"gc_start_ts", TS_TO_STR(gc_start_ts_),
|
||||||
|
"current_time", TS_TO_STR(current_time_us));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObGCHandler::check_if_tenant_is_dropping_or_dropped_(const uint64_t tenant_id,
|
||||||
|
bool &is_tenant_dropping_or_dropped)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
schema::ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
|
||||||
|
schema::ObSchemaGetterGuard guard;
|
||||||
|
is_tenant_dropping_or_dropped = false;
|
||||||
|
const ObTenantSchema *tenant_schema = nullptr;
|
||||||
|
|
||||||
|
if (OB_ISNULL(schema_service)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
CLOG_LOG(WARN, "schema_service is nullptr", KR(ret));
|
||||||
|
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
|
||||||
|
CLOG_LOG(WARN, "fail to get schema guard", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant_schema))) {
|
||||||
|
CLOG_LOG(WARN, "get tenant info failed", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_ISNULL(tenant_schema)) {
|
||||||
|
// Double check the tenant status to avoid any potential problems in the schema module.
|
||||||
|
if (OB_FAIL(guard.check_if_tenant_has_been_dropped(tenant_id, is_tenant_dropping_or_dropped))) {
|
||||||
|
CLOG_LOG(WARN, "fail to check if tenant has been dropped", KR(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
CLOG_LOG(INFO, "tenant info is nullptr, check the tenant status",
|
||||||
|
K(tenant_id), K(is_tenant_dropping_or_dropped));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
is_tenant_dropping_or_dropped = tenant_schema->is_dropping();
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObGCHandler::get_tenant_readable_scn_(SCN &readable_scn)
|
int ObGCHandler::get_tenant_readable_scn_(SCN &readable_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -817,7 +907,11 @@ void ObGCHandler::handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_
|
|||||||
ObRole role;
|
ObRole role;
|
||||||
ObLSID ls_id = ls_->get_ls_id();
|
ObLSID ls_id = ls_->get_ls_id();
|
||||||
LSGCState gc_state = INVALID_LS_GC_STATE;
|
LSGCState gc_state = INVALID_LS_GC_STATE;
|
||||||
gc_start_ts_ = ObTimeUtility::current_time();
|
// If gc_start_ts_ is an invalid value, it is necessary to get the current time again to avoid a situation
|
||||||
|
// where gc_start_ts_ remains an invalid value after ObServer restart, which may affect the GC logic.
|
||||||
|
if (OB_INVALID_TIMESTAMP == gc_start_ts_) {
|
||||||
|
gc_start_ts_ = ObTimeUtility::current_time();
|
||||||
|
}
|
||||||
if (OB_FAIL(get_palf_role_(role))) {
|
if (OB_FAIL(get_palf_role_(role))) {
|
||||||
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
|
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
|
||||||
} else if (ObRole::LEADER != role) {
|
} else if (ObRole::LEADER != role) {
|
||||||
@ -849,6 +943,12 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status)
|
|||||||
ObRole role;
|
ObRole role;
|
||||||
ObLSID ls_id = ls_->get_ls_id();
|
ObLSID ls_id = ls_->get_ls_id();
|
||||||
LSGCState gc_state = INVALID_LS_GC_STATE;
|
LSGCState gc_state = INVALID_LS_GC_STATE;
|
||||||
|
// If gc_start_ts_ is an invalid value, it is necessary to get the current time again to avoid a situation
|
||||||
|
// where gc_start_ts_ remains an invalid value after ObServer restart, which may affect the GC logic.
|
||||||
|
if (OB_INVALID_TIMESTAMP == gc_start_ts_) {
|
||||||
|
gc_start_ts_ = ObTimeUtility::current_time();
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_FAIL(get_palf_role_(role))) {
|
if (OB_FAIL(get_palf_role_(role))) {
|
||||||
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
|
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
|
||||||
} else if (ObRole::LEADER != role) {
|
} else if (ObRole::LEADER != role) {
|
||||||
|
@ -299,6 +299,13 @@ private:
|
|||||||
bool is_ls_offline_finished_(const LSGCState &state);
|
bool is_ls_offline_finished_(const LSGCState &state);
|
||||||
bool is_tablet_clear_(const ObGarbageCollector::LSStatus &ls_status);
|
bool is_tablet_clear_(const ObGarbageCollector::LSStatus &ls_status);
|
||||||
void try_check_and_set_wait_gc_(ObGarbageCollector::LSStatus &ls_status);
|
void try_check_and_set_wait_gc_(ObGarbageCollector::LSStatus &ls_status);
|
||||||
|
int try_check_and_set_wait_gc_when_log_archive_is_off_(
|
||||||
|
const LSGCState &gc_state,
|
||||||
|
const share::SCN &readable_scn,
|
||||||
|
const share::SCN &offline_scn,
|
||||||
|
ObGarbageCollector::LSStatus &ls_status);
|
||||||
|
int check_if_tenant_is_dropping_or_dropped_(const uint64_t tenant_id,
|
||||||
|
bool &is_tenant_dropping_or_dropped);
|
||||||
int get_tenant_readable_scn_(share::SCN &readable_scn);
|
int get_tenant_readable_scn_(share::SCN &readable_scn);
|
||||||
int check_if_tenant_in_archive_(bool &in_archive);
|
int check_if_tenant_in_archive_(bool &in_archive);
|
||||||
void submit_log_(const ObGCLSLOGType log_type);
|
void submit_log_(const ObGCLSLOGType log_type);
|
||||||
|
@ -558,6 +558,12 @@ DEF_TIME(log_storage_warning_tolerance_time, OB_CLUSTER_PARAMETER, "5s",
|
|||||||
"Range: [1s,300s]",
|
"Range: [1s,300s]",
|
||||||
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
|
|
||||||
|
DEF_TIME(ls_gc_delay_time, OB_TENANT_PARAMETER, "1h",
|
||||||
|
"[0s,)",
|
||||||
|
"The max delay time for ls gc when log archive is off. The default value is 3600s. Range: [0s, +∞). "
|
||||||
|
"The ls delay deletion mechanism will no longer take effect when the tenant is dropped.",
|
||||||
|
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
|
|
||||||
DEF_TIME(standby_db_fetch_log_rpc_timeout, OB_TENANT_PARAMETER, "15s",
|
DEF_TIME(standby_db_fetch_log_rpc_timeout, OB_TENANT_PARAMETER, "15s",
|
||||||
"[2s,)",
|
"[2s,)",
|
||||||
"The threshold for detecting the RPC timeout for the standby tenant to fetch log from the log restore source tenant. "
|
"The threshold for detecting the RPC timeout for the standby tenant to fetch log from the log restore source tenant. "
|
||||||
|
@ -127,6 +127,7 @@ log_restore_concurrency
|
|||||||
log_storage_warning_tolerance_time
|
log_storage_warning_tolerance_time
|
||||||
log_transport_compress_all
|
log_transport_compress_all
|
||||||
log_transport_compress_func
|
log_transport_compress_func
|
||||||
|
ls_gc_delay_time
|
||||||
ls_meta_table_check_interval
|
ls_meta_table_check_interval
|
||||||
major_compact_trigger
|
major_compact_trigger
|
||||||
major_freeze_duty_time
|
major_freeze_duty_time
|
||||||
|
Reference in New Issue
Block a user