record min_start_scn in KeepAliveLog
This commit is contained in:
@ -176,14 +176,15 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLSTxService::check_scheduler_status(share::ObLSID ls_id)
|
int ObLSTxService::check_scheduler_status(int64_t &min_start_scn,
|
||||||
|
transaction::MinStartScnStatus &status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(trans_service_)) {
|
if (OB_ISNULL(trans_service_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
TRANS_LOG(WARN, "not init", K(ret));
|
TRANS_LOG(WARN, "not init", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ret = trans_service_->check_scheduler_status(ls_id);
|
ret = mgr_->check_scheduler_status(min_start_scn, status);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@
|
|||||||
#include "storage/tablelock/ob_table_lock_common.h"
|
#include "storage/tablelock/ob_table_lock_common.h"
|
||||||
#include "logservice/ob_log_base_type.h"
|
#include "logservice/ob_log_base_type.h"
|
||||||
#include "logservice/rcservice/ob_role_change_handler.h"
|
#include "logservice/rcservice/ob_role_change_handler.h"
|
||||||
|
#include "storage/tx/ob_keep_alive_ls_handler.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -91,7 +92,7 @@ public:
|
|||||||
// submit next log when all trx in frozen memtable have submitted log
|
// submit next log when all trx in frozen memtable have submitted log
|
||||||
int traverse_trans_to_submit_next_log();
|
int traverse_trans_to_submit_next_log();
|
||||||
// check schduler status for gc
|
// check schduler status for gc
|
||||||
int check_scheduler_status(share::ObLSID ls_id);
|
int check_scheduler_status(int64_t &min_start_scn, transaction::MinStartScnStatus &status);
|
||||||
|
|
||||||
// for ls gc
|
// for ls gc
|
||||||
// @return OB_SUCCESS, all the tx of this ls cleaned up
|
// @return OB_SUCCESS, all the tx of this ls cleaned up
|
||||||
|
|||||||
@ -23,11 +23,11 @@ using namespace share;
|
|||||||
namespace transaction
|
namespace transaction
|
||||||
{
|
{
|
||||||
|
|
||||||
OB_SERIALIZE_MEMBER(ObKeepAliveLogBody, compat_bit_);
|
OB_SERIALIZE_MEMBER(ObKeepAliveLogBody, compat_bit_, min_start_scn_, min_start_status_);
|
||||||
|
|
||||||
int64_t ObKeepAliveLogBody::get_max_serialize_size()
|
int64_t ObKeepAliveLogBody::get_max_serialize_size()
|
||||||
{
|
{
|
||||||
ObKeepAliveLogBody max_log_body(INT64_MAX);
|
ObKeepAliveLogBody max_log_body(INT64_MAX, INT64_MAX, MinStartScnStatus::MAX);
|
||||||
return max_log_body.get_serialize_size();
|
return max_log_body.get_serialize_size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,8 +35,7 @@ int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *lo
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE,
|
logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE,
|
||||||
ObReplayBarrierType::NO_NEED_BARRIER);
|
ObReplayBarrierType::NO_NEED_BARRIER,INT64_MAX);
|
||||||
ObKeepAliveLogBody log_body;
|
|
||||||
submit_buf_len_ = base_header.get_serialize_size() + ObKeepAliveLogBody::get_max_serialize_size();
|
submit_buf_len_ = base_header.get_serialize_size() + ObKeepAliveLogBody::get_max_serialize_size();
|
||||||
submit_buf_pos_ = 0;
|
submit_buf_pos_ = 0;
|
||||||
|
|
||||||
@ -49,12 +48,6 @@ int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *lo
|
|||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
TRANS_LOG(WARN, "[Keep Alive] submit_buf alloc failed", K(ret), KP(submit_buf_),
|
TRANS_LOG(WARN, "[Keep Alive] submit_buf alloc failed", K(ret), KP(submit_buf_),
|
||||||
K(base_header));
|
K(base_header));
|
||||||
} else if (OB_FAIL(base_header.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) {
|
|
||||||
TRANS_LOG(WARN, "[Keep Alive] serialize base header error", K(ret),
|
|
||||||
K(base_header.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_));
|
|
||||||
} else if (OB_FAIL(log_body.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) {
|
|
||||||
TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log body failed", K(ret),
|
|
||||||
K(log_body.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_));
|
|
||||||
} else {
|
} else {
|
||||||
ls_id_ = ls_id;
|
ls_id_ = ls_id;
|
||||||
is_busy_ = false;
|
is_busy_ = false;
|
||||||
@ -93,15 +86,19 @@ void ObKeepAliveLSHandler::reset()
|
|||||||
is_stopped_ = false;
|
is_stopped_ = false;
|
||||||
last_gts_ = 0;
|
last_gts_ = 0;
|
||||||
ls_id_.reset();
|
ls_id_.reset();
|
||||||
|
tmp_keep_alive_info_.reset();
|
||||||
|
durable_keep_alive_info_.reset();
|
||||||
stat_info_.reset();
|
stat_info_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObKeepAliveLSHandler::try_submit_log()
|
int ObKeepAliveLSHandler::try_submit_log(int64_t min_start_scn, MinStartScnStatus min_start_status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
palf::LSN lsn;
|
palf::LSN lsn;
|
||||||
int64_t ts_ns = 0;
|
int64_t ts_ns = 0;
|
||||||
|
|
||||||
|
SpinWLockGuard guard(lock_);
|
||||||
|
|
||||||
if (OB_ISNULL(log_handler_ptr_)) {
|
if (OB_ISNULL(log_handler_ptr_)) {
|
||||||
stat_info_.other_error_cnt += 1;
|
stat_info_.other_error_cnt += 1;
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -111,7 +108,7 @@ int ObKeepAliveLSHandler::try_submit_log()
|
|||||||
} else if (ATOMIC_LOAD(&is_busy_)) {
|
} else if (ATOMIC_LOAD(&is_busy_)) {
|
||||||
stat_info_.cb_busy_cnt += 1;
|
stat_info_.cb_busy_cnt += 1;
|
||||||
// ret = OB_TX_NOLOGCB;
|
// ret = OB_TX_NOLOGCB;
|
||||||
} else if (!check_gts_()) {
|
} else if (!check_gts_() && min_start_status == MinStartScnStatus::UNKOWN) {
|
||||||
stat_info_.near_to_gts_cnt += 1;
|
stat_info_.near_to_gts_cnt += 1;
|
||||||
// ret = OB_OP_NOT_ALLOW;
|
// ret = OB_OP_NOT_ALLOW;
|
||||||
} else {
|
} else {
|
||||||
@ -119,6 +116,9 @@ int ObKeepAliveLSHandler::try_submit_log()
|
|||||||
if (ATOMIC_LOAD(&is_stopped_)) {
|
if (ATOMIC_LOAD(&is_stopped_)) {
|
||||||
ATOMIC_STORE(&is_busy_, false);
|
ATOMIC_STORE(&is_busy_, false);
|
||||||
TRANS_LOG(INFO, "ls hash stopped", K(ret));
|
TRANS_LOG(INFO, "ls hash stopped", K(ret));
|
||||||
|
} else if (OB_FAIL(serialize_keep_alive_log_(min_start_scn, min_start_status))) {
|
||||||
|
ATOMIC_STORE(&is_busy_, false);
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log failed", K(ret), K(ls_id_));
|
||||||
} else if (OB_FAIL(log_handler_ptr_->append(submit_buf_, submit_buf_pos_, last_gts_, true, this,
|
} else if (OB_FAIL(log_handler_ptr_->append(submit_buf_, submit_buf_pos_, last_gts_, true, this,
|
||||||
lsn, ts_ns))) {
|
lsn, ts_ns))) {
|
||||||
stat_info_.other_error_cnt += 1;
|
stat_info_.other_error_cnt += 1;
|
||||||
@ -126,27 +126,99 @@ int ObKeepAliveLSHandler::try_submit_log()
|
|||||||
TRANS_LOG(WARN, "[Keep Alive] submit keep alive log failed", K(ret), K(ls_id_));
|
TRANS_LOG(WARN, "[Keep Alive] submit keep alive log failed", K(ret), K(ls_id_));
|
||||||
} else {
|
} else {
|
||||||
stat_info_.submit_succ_cnt += 1;
|
stat_info_.submit_succ_cnt += 1;
|
||||||
stat_info_.last_log_ts_ = ts_ns;
|
tmp_keep_alive_info_.log_ts_ = ts_ns;
|
||||||
stat_info_.last_lsn_ = lsn;
|
tmp_keep_alive_info_.lsn_ = lsn;
|
||||||
|
tmp_keep_alive_info_.min_start_status_ = min_start_status;
|
||||||
|
tmp_keep_alive_info_.min_start_scn_ = min_start_scn;
|
||||||
|
TRANS_LOG(DEBUG, "[Keep Alive] submit keep alive log success", K(ret), K(ls_id_),
|
||||||
|
K(tmp_keep_alive_info_), K(min_start_scn), K(min_start_status));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObKeepAliveLSHandler::on_success()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
SpinWLockGuard guard(lock_);
|
||||||
|
|
||||||
|
durable_keep_alive_info_ = tmp_keep_alive_info_;
|
||||||
|
stat_info_.stat_keepalive_info_ = durable_keep_alive_info_;
|
||||||
|
|
||||||
|
ATOMIC_STORE(&is_busy_,false);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObKeepAliveLSHandler::on_failure()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
ATOMIC_STORE(&is_busy_,false);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObKeepAliveLSHandler::replay(const void *buffer,
|
||||||
|
const int64_t nbytes,
|
||||||
|
const palf::LSN &lsn,
|
||||||
|
const int64_t ts_ns)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
logservice::ObLogBaseHeader base_header;
|
||||||
|
ObKeepAliveLogBody log_body;
|
||||||
|
|
||||||
|
int64_t pos = 0;
|
||||||
|
if (OB_FAIL(base_header.deserialize(static_cast<const char *>(buffer), nbytes, pos))) {
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] deserialize base header error", K(ret), K(nbytes), K(pos));
|
||||||
|
} else if (OB_FAIL(log_body.deserialize(static_cast<const char *>(buffer), nbytes, pos))) {
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] deserialize log body error", K(ret), K(nbytes), K(pos));
|
||||||
|
} else {
|
||||||
|
SpinWLockGuard guard(lock_);
|
||||||
|
durable_keep_alive_info_.log_ts_ = ts_ns;
|
||||||
|
durable_keep_alive_info_.lsn_ = lsn;
|
||||||
|
durable_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn();
|
||||||
|
durable_keep_alive_info_.min_start_status_ = log_body.get_min_start_status();
|
||||||
|
stat_info_.stat_keepalive_info_ = durable_keep_alive_info_;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
TRANS_LOG(DEBUG, "[Keep Alive] replay keep alive log success", K(ret), K(base_header),
|
||||||
|
K(log_body));
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
void ObKeepAliveLSHandler::print_stat_info()
|
void ObKeepAliveLSHandler::print_stat_info()
|
||||||
{
|
{
|
||||||
|
SpinRLockGuard guard(lock_);
|
||||||
TRANS_LOG(INFO, "[Keep Alive Stat] LS Keep Alive Info", "tenant_id", MTL_ID(),
|
TRANS_LOG(INFO, "[Keep Alive Stat] LS Keep Alive Info", "tenant_id", MTL_ID(),
|
||||||
"LS_ID", ls_id_,
|
"LS_ID", ls_id_,
|
||||||
"Not_Master_Cnt", stat_info_.not_master_cnt,
|
"Not_Master_Cnt", stat_info_.not_master_cnt,
|
||||||
"Near_To_GTS_Cnt", stat_info_.near_to_gts_cnt,
|
"Near_To_GTS_Cnt", stat_info_.near_to_gts_cnt,
|
||||||
"Other_Error_Cnt", stat_info_.other_error_cnt,
|
"Other_Error_Cnt", stat_info_.other_error_cnt,
|
||||||
"Submit_Succ_Cnt", stat_info_.submit_succ_cnt,
|
"Submit_Succ_Cnt", stat_info_.submit_succ_cnt,
|
||||||
"last_log_ts", stat_info_.last_log_ts_,
|
"last_log_ts", stat_info_.stat_keepalive_info_.log_ts_,
|
||||||
"last_lsn", stat_info_.last_lsn_,
|
"last_lsn", stat_info_.stat_keepalive_info_.lsn_,
|
||||||
"last_gts", last_gts_);
|
"last_gts", last_gts_,
|
||||||
stat_info_.reset();
|
"min_start_scn", stat_info_.stat_keepalive_info_.min_start_scn_,
|
||||||
|
"min_start_status", stat_info_.stat_keepalive_info_.min_start_status_);
|
||||||
|
stat_info_.clear_cnt();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObKeepAliveLSHandler::get_min_start_scn(int64_t &min_start_scn,
|
||||||
|
int64_t &keep_alive_scn,
|
||||||
|
MinStartScnStatus &status)
|
||||||
|
{
|
||||||
|
SpinRLockGuard guard(lock_);
|
||||||
|
|
||||||
|
min_start_scn = durable_keep_alive_info_.min_start_scn_;
|
||||||
|
keep_alive_scn = durable_keep_alive_info_.log_ts_;
|
||||||
|
status = durable_keep_alive_info_.min_start_status_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObKeepAliveLSHandler::check_gts_()
|
bool ObKeepAliveLSHandler::check_gts_()
|
||||||
@ -176,5 +248,31 @@ bool ObKeepAliveLSHandler::check_gts_()
|
|||||||
return need_submit;
|
return need_submit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObKeepAliveLSHandler::serialize_keep_alive_log_(int64_t min_start_scn, MinStartScnStatus status)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
const int64_t replay_hint = ls_id_.hash();
|
||||||
|
logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE,
|
||||||
|
ObReplayBarrierType::NO_NEED_BARRIER, replay_hint);
|
||||||
|
ObKeepAliveLogBody log_body(1, min_start_scn, status);
|
||||||
|
|
||||||
|
if (OB_ISNULL(submit_buf_)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] invalid submit buf", K(ret), KP(submit_buf_), K(submit_buf_len_),
|
||||||
|
K(submit_buf_pos_));
|
||||||
|
} else if (OB_FALSE_IT(submit_buf_pos_ = 0)) {
|
||||||
|
} else if (OB_FAIL(base_header.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) {
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] serialize base header error", K(ret),
|
||||||
|
K(base_header.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_));
|
||||||
|
} else if (OB_FAIL(log_body.serialize(submit_buf_, submit_buf_len_, submit_buf_pos_))) {
|
||||||
|
TRANS_LOG(WARN, "[Keep Alive] serialize keep alive log body failed", K(ret),
|
||||||
|
K(log_body.get_serialize_size()), K(submit_buf_len_), K(submit_buf_pos_));
|
||||||
|
}
|
||||||
|
|
||||||
|
TRANS_LOG(DEBUG, "[Keep Alive] serialize keep alive log", K(ret), K(ls_id_), K(log_body));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace transaction
|
} // namespace transaction
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
@ -29,20 +29,58 @@ class ObLogHandler;
|
|||||||
|
|
||||||
namespace transaction
|
namespace transaction
|
||||||
{
|
{
|
||||||
|
|
||||||
|
enum class MinStartScnStatus
|
||||||
|
{
|
||||||
|
UNKOWN = 0, // collect failed
|
||||||
|
NO_CTX,
|
||||||
|
HAS_CTX,
|
||||||
|
|
||||||
|
MAX
|
||||||
|
};
|
||||||
|
|
||||||
class ObKeepAliveLogBody
|
class ObKeepAliveLogBody
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
OB_UNIS_VERSION(1);
|
OB_UNIS_VERSION(1);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ObKeepAliveLogBody() : compat_bit_(1) {}
|
ObKeepAliveLogBody()
|
||||||
ObKeepAliveLogBody(int64_t compat_bit) : compat_bit_(compat_bit) {}
|
: compat_bit_(1), min_start_scn_(OB_INVALID_TIMESTAMP),
|
||||||
|
min_start_status_(MinStartScnStatus::UNKOWN)
|
||||||
|
{}
|
||||||
|
ObKeepAliveLogBody(int64_t compat_bit, int64_t min_start_scn, MinStartScnStatus min_status)
|
||||||
|
: compat_bit_(compat_bit), min_start_scn_(min_start_scn), min_start_status_(min_status)
|
||||||
|
{}
|
||||||
|
|
||||||
static int64_t get_max_serialize_size();
|
static int64_t get_max_serialize_size();
|
||||||
TO_STRING_KV(K_(compat_bit));
|
int64_t get_min_start_scn() { return min_start_scn_; };
|
||||||
|
MinStartScnStatus get_min_start_status() { return min_start_status_; }
|
||||||
|
|
||||||
|
TO_STRING_KV(K_(compat_bit), K_(min_start_scn), K_(min_start_status));
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int64_t compat_bit_; // not used, only for compatibility
|
int64_t compat_bit_; // not used, only for compatibility
|
||||||
|
int64_t min_start_scn_;
|
||||||
|
MinStartScnStatus min_start_status_;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct KeepAliveLsInfo
|
||||||
|
{
|
||||||
|
int64_t log_ts_;
|
||||||
|
palf::LSN lsn_;
|
||||||
|
int64_t min_start_scn_;
|
||||||
|
MinStartScnStatus min_start_status_;
|
||||||
|
|
||||||
|
void reset()
|
||||||
|
{
|
||||||
|
log_ts_ = OB_INVALID_TIMESTAMP;
|
||||||
|
lsn_.reset();
|
||||||
|
min_start_scn_ = OB_INVALID_TIMESTAMP;
|
||||||
|
min_start_status_ = MinStartScnStatus::UNKOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
TO_STRING_KV(K(log_ts_), K(lsn_), K(min_start_scn_), K(min_start_status_));
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObLSKeepAliveStatInfo
|
class ObLSKeepAliveStatInfo
|
||||||
@ -56,8 +94,16 @@ public:
|
|||||||
near_to_gts_cnt = 0;
|
near_to_gts_cnt = 0;
|
||||||
other_error_cnt = 0;
|
other_error_cnt = 0;
|
||||||
submit_succ_cnt = 0;
|
submit_succ_cnt = 0;
|
||||||
last_log_ts_ = 0;
|
stat_keepalive_info_.reset();
|
||||||
last_lsn_.reset();
|
}
|
||||||
|
|
||||||
|
void clear_cnt()
|
||||||
|
{
|
||||||
|
cb_busy_cnt = 0;
|
||||||
|
not_master_cnt = 0;
|
||||||
|
near_to_gts_cnt = 0;
|
||||||
|
other_error_cnt = 0;
|
||||||
|
submit_succ_cnt = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t cb_busy_cnt;
|
int64_t cb_busy_cnt;
|
||||||
@ -65,8 +111,7 @@ public:
|
|||||||
int64_t near_to_gts_cnt;
|
int64_t near_to_gts_cnt;
|
||||||
int64_t other_error_cnt;
|
int64_t other_error_cnt;
|
||||||
int64_t submit_succ_cnt;
|
int64_t submit_succ_cnt;
|
||||||
int64_t last_log_ts_;
|
KeepAliveLsInfo stat_keepalive_info_;
|
||||||
palf::LSN last_lsn_;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// none
|
// none
|
||||||
@ -91,22 +136,15 @@ public:
|
|||||||
|
|
||||||
void reset();
|
void reset();
|
||||||
|
|
||||||
int try_submit_log();
|
int try_submit_log(int64_t min_start_scn, MinStartScnStatus status);
|
||||||
void print_stat_info();
|
void print_stat_info();
|
||||||
public:
|
public:
|
||||||
|
|
||||||
bool is_busy() { return ATOMIC_LOAD(&is_busy_); }
|
bool is_busy() { return ATOMIC_LOAD(&is_busy_); }
|
||||||
int on_success() {ATOMIC_STORE(&is_busy_, false); return OB_SUCCESS;}
|
int on_success();
|
||||||
int on_failure() {ATOMIC_STORE(&is_busy_, false); return OB_SUCCESS;}
|
int on_failure();
|
||||||
|
|
||||||
int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns)
|
int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns);
|
||||||
{
|
|
||||||
UNUSED(buffer);
|
|
||||||
UNUSED(nbytes);
|
|
||||||
UNUSED(lsn);
|
|
||||||
UNUSED(ts_ns);
|
|
||||||
return OB_SUCCESS;
|
|
||||||
}
|
|
||||||
void switch_to_follower_forcedly()
|
void switch_to_follower_forcedly()
|
||||||
{
|
{
|
||||||
ATOMIC_STORE(&is_master_, false);
|
ATOMIC_STORE(&is_master_, false);
|
||||||
@ -117,9 +155,13 @@ public:
|
|||||||
int64_t get_rec_log_ts() { return INT64_MAX; }
|
int64_t get_rec_log_ts() { return INT64_MAX; }
|
||||||
int flush(int64_t rec_log_ts) { return OB_SUCCESS;}
|
int flush(int64_t rec_log_ts) { return OB_SUCCESS;}
|
||||||
|
|
||||||
|
void get_min_start_scn(int64_t &min_start_scn, int64_t &keep_alive_scn, MinStartScnStatus &status);
|
||||||
private:
|
private:
|
||||||
bool check_gts_();
|
bool check_gts_();
|
||||||
|
int serialize_keep_alive_log_(int64_t min_start_scn, MinStartScnStatus status);
|
||||||
private :
|
private :
|
||||||
|
SpinRWLock lock_;
|
||||||
|
|
||||||
bool is_busy_;
|
bool is_busy_;
|
||||||
bool is_master_;
|
bool is_master_;
|
||||||
bool is_stopped_;
|
bool is_stopped_;
|
||||||
@ -133,6 +175,9 @@ private :
|
|||||||
|
|
||||||
int64_t last_gts_;
|
int64_t last_gts_;
|
||||||
|
|
||||||
|
KeepAliveLsInfo tmp_keep_alive_info_;
|
||||||
|
KeepAliveLsInfo durable_keep_alive_info_;
|
||||||
|
|
||||||
ObLSKeepAliveStatInfo stat_info_;
|
ObLSKeepAliveStatInfo stat_info_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -998,7 +998,7 @@ int ObLSTxCtxMgr::get_min_undecided_log_ts(int64_t &log_ts)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLSTxCtxMgr::check_scheduler_status()
|
int ObLSTxCtxMgr::check_scheduler_status(int64_t &min_start_scn, MinStartScnStatus &status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTimeGuard tg("ObLSTxCtxMgr::check_scheduler_status", 100000);
|
ObTimeGuard tg("ObLSTxCtxMgr::check_scheduler_status", 100000);
|
||||||
@ -1006,6 +1006,9 @@ int ObLSTxCtxMgr::check_scheduler_status()
|
|||||||
IteratePartCtxAskSchedulerStatusFunctor functor;
|
IteratePartCtxAskSchedulerStatusFunctor functor;
|
||||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) {
|
if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) {
|
||||||
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
|
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
|
||||||
|
} else {
|
||||||
|
min_start_scn = functor.get_min_start_scn();
|
||||||
|
status = functor.get_min_start_status();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -2162,6 +2165,8 @@ int ObTxCtxMgr::check_scheduler_status(share::ObLSID ls_id)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
||||||
|
int64_t min_start_scn = OB_INVALID_TIMESTAMP;
|
||||||
|
MinStartScnStatus min_status = MinStartScnStatus::UNKOWN;
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
TRANS_LOG(WARN, "ObTxCtxMgr not inited");
|
TRANS_LOG(WARN, "ObTxCtxMgr not inited");
|
||||||
@ -2173,7 +2178,7 @@ int ObTxCtxMgr::check_scheduler_status(share::ObLSID ls_id)
|
|||||||
TRANS_LOG(WARN, "get participant transaction context mgr error", K(ls_id));
|
TRANS_LOG(WARN, "get participant transaction context mgr error", K(ls_id));
|
||||||
ret = OB_PARTITION_NOT_EXIST;
|
ret = OB_PARTITION_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(ls_tx_ctx_mgr->check_scheduler_status())) {
|
if (OB_FAIL(ls_tx_ctx_mgr->check_scheduler_status(min_start_scn, min_status))) {
|
||||||
TRANS_LOG(WARN, "check_scheduler_status failed", KR(ret), K(ls_id));
|
TRANS_LOG(WARN, "check_scheduler_status failed", KR(ret), K(ls_id));
|
||||||
} else {
|
} else {
|
||||||
TRANS_LOG(DEBUG, "check_scheduler_status success", K(ls_id));
|
TRANS_LOG(DEBUG, "check_scheduler_status success", K(ls_id));
|
||||||
|
|||||||
@ -21,6 +21,7 @@
|
|||||||
#include "storage/tx/ob_tx_ls_log_writer.h"
|
#include "storage/tx/ob_tx_ls_log_writer.h"
|
||||||
#include "storage/tx/ob_tx_retain_ctx_mgr.h"
|
#include "storage/tx/ob_tx_retain_ctx_mgr.h"
|
||||||
#include "storage/tablelock/ob_lock_table.h"
|
#include "storage/tablelock/ob_lock_table.h"
|
||||||
|
#include "storage/tx/ob_keep_alive_ls_handler.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -287,7 +288,7 @@ public:
|
|||||||
ObTransID &block_tx_id);
|
ObTransID &block_tx_id);
|
||||||
|
|
||||||
// check schduler status for tx gc
|
// check schduler status for tx gc
|
||||||
int check_scheduler_status();
|
int check_scheduler_status(int64_t &min_start_scn, MinStartScnStatus &status);
|
||||||
|
|
||||||
// Get this ObLSTxCtxMgr's ls_id_
|
// Get this ObLSTxCtxMgr's ls_id_
|
||||||
const share::ObLSID &get_ls_id() const { return ls_id_; }
|
const share::ObLSID &get_ls_id() const { return ls_id_; }
|
||||||
|
|||||||
@ -29,6 +29,7 @@
|
|||||||
#include "storage/tx_table/ob_tx_table_define.h"
|
#include "storage/tx_table/ob_tx_table_define.h"
|
||||||
#include "storage/tx/ob_tx_stat.h"
|
#include "storage/tx/ob_tx_stat.h"
|
||||||
#include "storage/tx/ob_trans_service.h"
|
#include "storage/tx/ob_trans_service.h"
|
||||||
|
#include "storage/tx/ob_keep_alive_ls_handler.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -1093,6 +1094,7 @@ public:
|
|||||||
IteratePartCtxAskSchedulerStatusFunctor()
|
IteratePartCtxAskSchedulerStatusFunctor()
|
||||||
{
|
{
|
||||||
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
|
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
|
||||||
|
min_start_scn_ = INT64_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
~IteratePartCtxAskSchedulerStatusFunctor() { PRINT_FUNC_STAT; }
|
~IteratePartCtxAskSchedulerStatusFunctor() { PRINT_FUNC_STAT; }
|
||||||
@ -1103,14 +1105,43 @@ public:
|
|||||||
if (OB_UNLIKELY(!tx_id.is_valid() || OB_ISNULL(tx_ctx))) {
|
if (OB_UNLIKELY(!tx_id.is_valid() || OB_ISNULL(tx_ctx))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
TRANS_LOG(WARN, "invalid argument", KR(ret), K(tx_id), "ctx", OB_P(tx_ctx));
|
TRANS_LOG(WARN, "invalid argument", KR(ret), K(tx_id), "ctx", OB_P(tx_ctx));
|
||||||
|
} else {
|
||||||
|
int64_t ctx_start_scn = tx_ctx->get_start_log_ts();
|
||||||
|
if (ctx_start_scn < 0) {
|
||||||
|
ctx_start_scn = INT64_MAX;
|
||||||
|
}
|
||||||
|
if (OB_FALSE_IT(min_start_scn_ = MIN(min_start_scn_, ctx_start_scn))) {
|
||||||
|
// do nothing
|
||||||
} else if (OB_FAIL(tx_ctx->check_scheduler_status())) {
|
} else if (OB_FAIL(tx_ctx->check_scheduler_status())) {
|
||||||
TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx);
|
TRANS_LOG(WARN, "check scheduler status error", KR(ret), "ctx", *tx_ctx);
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
min_start_scn_ = OB_INVALID_TIMESTAMP;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t get_min_start_scn() { return min_start_scn_; }
|
||||||
|
|
||||||
|
MinStartScnStatus get_min_start_status()
|
||||||
|
{
|
||||||
|
MinStartScnStatus start_status = MinStartScnStatus::HAS_CTX;
|
||||||
|
|
||||||
|
if (OB_INVALID_TIMESTAMP == min_start_scn_) {
|
||||||
|
start_status = MinStartScnStatus::UNKOWN;
|
||||||
|
} else if (INT64_MAX == min_start_scn_) {
|
||||||
|
start_status = MinStartScnStatus::NO_CTX;
|
||||||
|
}
|
||||||
|
return start_status;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int64_t min_start_scn_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // transaction
|
} // transaction
|
||||||
|
|||||||
@ -125,6 +125,7 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int iter_ret = OB_SUCCESS;
|
int iter_ret = OB_SUCCESS;
|
||||||
|
int tmp_ret = OB_SUCCESS;
|
||||||
|
|
||||||
ObSharedGuard<ObLSIterator> ls_iter_guard;
|
ObSharedGuard<ObLSIterator> ls_iter_guard;
|
||||||
ObLSIterator *iter_ptr = nullptr;
|
ObLSIterator *iter_ptr = nullptr;
|
||||||
@ -146,16 +147,45 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
|||||||
cur_ls_ptr = nullptr;
|
cur_ls_ptr = nullptr;
|
||||||
while (OB_SUCCESS == (iter_ret = iter_ptr->get_next(cur_ls_ptr))) {
|
while (OB_SUCCESS == (iter_ret = iter_ptr->get_next(cur_ls_ptr))) {
|
||||||
|
|
||||||
// keep alive, interval = 100ms
|
int64_t min_start_scn = OB_INVALID_TIMESTAMP;
|
||||||
do_keep_alive_(cur_ls_ptr);
|
MinStartScnStatus status = MinStartScnStatus::UNKOWN;
|
||||||
|
common::ObRole role;
|
||||||
|
int64_t base_proposal_id, proposal_id;
|
||||||
|
|
||||||
|
if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_role(role, base_proposal_id))) {
|
||||||
|
TRANS_LOG(WARN, "get role failed", K(tmp_ret), K(cur_ls_ptr->get_ls_id()));
|
||||||
|
status = MinStartScnStatus::UNKOWN;
|
||||||
|
} else if (role == common::ObRole::FOLLOWER) {
|
||||||
|
status = MinStartScnStatus::UNKOWN;
|
||||||
|
}
|
||||||
|
|
||||||
// tx gc, interval = 15s
|
// tx gc, interval = 15s
|
||||||
if (can_tx_gc) {
|
if (can_tx_gc) {
|
||||||
// TODO shanyan.g close ctx gc temporarily because of logical bug
|
// TODO shanyan.g close ctx gc temporarily because of logical bug
|
||||||
// https://work.aone.alibaba-inc.com/issue/42892101
|
// https://work.aone.alibaba-inc.com/issue/42892101
|
||||||
do_tx_gc_(cur_ls_ptr);
|
do_tx_gc_(cur_ls_ptr, min_start_scn, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(MinStartScnStatus::UNKOWN != status) {
|
||||||
|
// do nothing
|
||||||
|
} else if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_role(role, proposal_id))) {
|
||||||
|
TRANS_LOG(WARN, "get role failed", K(tmp_ret), K(cur_ls_ptr->get_ls_id()));
|
||||||
|
status = MinStartScnStatus::UNKOWN;
|
||||||
|
} else if (role == common::ObRole::FOLLOWER) {
|
||||||
|
status = MinStartScnStatus::UNKOWN;
|
||||||
|
} else if (base_proposal_id != proposal_id) {
|
||||||
|
status = MinStartScnStatus::UNKOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (MinStartScnStatus::UNKOWN == status) {
|
||||||
|
min_start_scn = OB_INVALID_TIMESTAMP;
|
||||||
|
} else if (MinStartScnStatus::NO_CTX == status) {
|
||||||
|
min_start_scn = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep alive, interval = 100ms
|
||||||
|
do_keep_alive_(cur_ls_ptr, min_start_scn, status);
|
||||||
|
|
||||||
// TODO shanyan.g
|
// TODO shanyan.g
|
||||||
// 1) We use max(max_commit_ts, gts_cache) as read snapshot,
|
// 1) We use max(max_commit_ts, gts_cache) as read snapshot,
|
||||||
// but now we adopt updating max_commit_ts periodly to avoid getting gts cache cost
|
// but now we adopt updating max_commit_ts periodly to avoid getting gts cache cost
|
||||||
@ -171,11 +201,11 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr)
|
void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr, int64_t min_start_scn, MinStartScnStatus status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
if (ls_ptr->get_keep_alive_ls_handler()->try_submit_log()) {
|
if (ls_ptr->get_keep_alive_ls_handler()->try_submit_log(min_start_scn, status)) {
|
||||||
TRANS_LOG(WARN, "[Tx Loop Worker] try submit keep alive log failed", K(ret));
|
TRANS_LOG(WARN, "[Tx Loop Worker] try submit keep alive log failed", K(ret));
|
||||||
} else if (REACH_TIME_INTERVAL(KEEP_ALIVE_PRINT_INFO_INTERVAL)) {
|
} else if (REACH_TIME_INTERVAL(KEEP_ALIVE_PRINT_INFO_INTERVAL)) {
|
||||||
ls_ptr->get_keep_alive_ls_handler()->print_stat_info();
|
ls_ptr->get_keep_alive_ls_handler()->print_stat_info();
|
||||||
@ -186,11 +216,11 @@ void ObTxLoopWorker::do_keep_alive_(ObLS *ls_ptr)
|
|||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTxLoopWorker::do_tx_gc_(ObLS *ls_ptr)
|
void ObTxLoopWorker::do_tx_gc_(ObLS *ls_ptr, int64_t &min_start_scn, MinStartScnStatus &status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
if (OB_FAIL(ls_ptr->get_tx_svr()->check_scheduler_status(ls_ptr->get_ls_id()))) {
|
if (OB_FAIL(ls_ptr->get_tx_svr()->check_scheduler_status(min_start_scn, status))) {
|
||||||
TRANS_LOG(WARN, "[Tx Loop Worker] check tx scheduler failed", K(ret), K(MTL_ID()), K(*ls_ptr));
|
TRANS_LOG(WARN, "[Tx Loop Worker] check tx scheduler failed", K(ret), K(MTL_ID()), K(*ls_ptr));
|
||||||
} else {
|
} else {
|
||||||
TRANS_LOG(INFO, "[Tx Loop Worker] check tx scheduler success", K(MTL_ID()), K(*ls_ptr));
|
TRANS_LOG(INFO, "[Tx Loop Worker] check tx scheduler success", K(MTL_ID()), K(*ls_ptr));
|
||||||
|
|||||||
@ -56,8 +56,8 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
int scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx);
|
int scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx);
|
||||||
void do_keep_alive_(ObLS *ls); // 100ms
|
void do_keep_alive_(ObLS *ls, int64_t min_start_scn, MinStartScnStatus status); // 100ms
|
||||||
void do_tx_gc_(ObLS *ls); // 15s
|
void do_tx_gc_(ObLS *ls, int64_t &min_start_scn, MinStartScnStatus &status); // 15s
|
||||||
void update_max_commit_ts_(ObLS *ls);
|
void update_max_commit_ts_(ObLS *ls);
|
||||||
void do_retain_ctx_gc_(ObLS * ls); // 15s
|
void do_retain_ctx_gc_(ObLS * ls); // 15s
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user