[BUG] fix invalid smaller tx data min start scn during transfer

This commit is contained in:
Handora
2024-02-09 13:03:28 +00:00
committed by ob-robot
parent e8cef94585
commit 5f6233b80c
10 changed files with 603 additions and 17 deletions

View File

@ -46,7 +46,8 @@ void ObLSTransferStatus::reset()
move_tx_scn_.reset();
}
void ObLSTransferStatus::reset_prepare_op() {
void ObLSTransferStatus::reset_prepare_op()
{
transfer_prepare_op_ = false;
transfer_prepare_scn_.reset();
if (is_finished()) {
@ -54,7 +55,9 @@ void ObLSTransferStatus::reset_prepare_op() {
transfer_task_id_ = 0;
}
}
void ObLSTransferStatus::reset_move_tx_op() {
void ObLSTransferStatus::reset_move_tx_op()
{
move_tx_op_ = false;
move_tx_scn_.reset();
if (is_finished()) {
@ -147,6 +150,7 @@ int ObLSTransferStatus::update_status_inner_(const transaction::ObTransID tx_id,
if (!transfer_tx_id_.is_valid() || transfer_tx_id_ == tx_id) {
if (NotifyType::ON_COMMIT == op_type || NotifyType::ON_ABORT == op_type) {
if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) {
enable_upper_trans_calculation_(op_scn);
reset_prepare_op();
} else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) {
reset_move_tx_op();
@ -157,6 +161,7 @@ int ObLSTransferStatus::update_status_inner_(const transaction::ObTransID tx_id,
if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) {
transfer_prepare_op_ = true;
transfer_prepare_scn_ = op_scn;
disable_upper_trans_calculation_();
} else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) {
move_tx_op_ = true;
move_tx_scn_ = op_scn;
@ -184,12 +189,14 @@ int ObLSTransferStatus::replay_status_inner_(const transaction::ObTransID tx_id,
if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) {
if (!transfer_prepare_scn_.is_valid() || transfer_prepare_scn_ < op_scn) {
if (NotifyType::ON_COMMIT == op_type || NotifyType::ON_ABORT == op_type) {
enable_upper_trans_calculation_(op_scn);
reset_prepare_op();
} else {
transfer_tx_id_ = tx_id;
transfer_task_id_ = task_id;
transfer_prepare_op_ = true;
transfer_prepare_scn_ = op_scn;
disable_upper_trans_calculation_();
}
}
} else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) {
@ -223,6 +230,50 @@ int ObLSTransferStatus::get_transfer_prepare_status(
return ret;
}
int ObLSTransferStatus::enable_upper_trans_calculation_(const share::SCN op_scn)
{
int ret = OB_SUCCESS;
ObTxTableGuard guard;
ObTxDataTable *tx_data_table = nullptr;
if (OB_FAIL(ls_->get_tx_table_guard(guard))) {
TRANS_LOG(WARN, "failed to get tx table", K(ret));
} else if (OB_UNLIKELY(!guard.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx table guard is invalid", K(ret), K(guard));
} else if (OB_ISNULL(tx_data_table =
guard.get_tx_table()->get_tx_data_table())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret));
} else {
tx_data_table->enable_upper_trans_calculation(op_scn);
TRANS_LOG(INFO, "enable upper trans calculation", KPC(ls_), K(guard), KPC(this));
}
return ret;
}
int ObLSTransferStatus::disable_upper_trans_calculation_()
{
int ret = OB_SUCCESS;
ObTxTableGuard guard;
ObTxDataTable *tx_data_table = nullptr;
if (OB_FAIL(ls_->get_tx_table_guard(guard))) {
TRANS_LOG(WARN, "failed to get tx table", K(ret));
} else if (OB_UNLIKELY(!guard.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx table guard is invalid", K(ret), K(guard));
} else if (OB_ISNULL(tx_data_table =
guard.get_tx_table()->get_tx_data_table())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret));
} else {
tx_data_table->disable_upper_trans_calculation();
TRANS_LOG(INFO, "disable upper trans calculation", KPC(ls_), K(guard), KPC(this));
}
return ret;
}
}
}

View File

@ -55,6 +55,8 @@ private:
const share::SCN op_scn,
const transaction::NotifyType op_type,
const transaction::ObTxDataSourceType mds_type);
int enable_upper_trans_calculation_(const share::SCN op_scn);
int disable_upper_trans_calculation_();
private:
bool is_inited_;
ObLS *ls_;

View File

@ -46,11 +46,15 @@ public:
public:
ObKeepAliveLogBody()
: compat_bit_(1), min_start_scn_(),
min_start_status_(MinStartScnStatus::UNKOWN)
: compat_bit_(1), min_start_scn_(),
min_start_status_(MinStartScnStatus::UNKOWN)
{}
ObKeepAliveLogBody(int64_t compat_bit, const share::SCN &min_start_scn, MinStartScnStatus min_status)
: compat_bit_(compat_bit), min_start_scn_(min_start_scn), min_start_status_(min_status)
ObKeepAliveLogBody(int64_t compat_bit,
const share::SCN &min_start_scn,
MinStartScnStatus min_status)
: compat_bit_(compat_bit),
min_start_scn_(min_start_scn),
min_start_status_(min_status)
{}
static int64_t get_max_serialize_size();
@ -168,8 +172,11 @@ public:
share::SCN get_rec_scn() { return share::SCN::max_scn(); }
int flush(share::SCN &rec_scn) { return OB_SUCCESS;}
void get_min_start_scn(share::SCN &min_start_scn, share::SCN &keep_alive_scn, MinStartScnStatus &status);
void get_min_start_scn(share::SCN &min_start_scn,
share::SCN &keep_alive_scn,
MinStartScnStatus &status);
void set_sys_ls_end_scn(const share::SCN &sys_ls_end_scn) { sys_ls_end_scn_.inc_update(sys_ls_end_scn);}
private:
bool check_gts_();
int serialize_keep_alive_log_(const share::SCN &min_start_scn, MinStartScnStatus status);

View File

@ -1481,7 +1481,6 @@ int ObPartTransCtx::check_rs_scheduler_is_alive_(bool &is_alive)
int64_t trace_time = 0;
int64_t cur_time = ObTimeUtility::current_time();
share::ObAliveServerTracer *server_tracer = NULL;
is_alive = true;
if (OB_ISNULL(trans_service_)) {
ret = OB_ERR_UNEXPECTED;
@ -9198,6 +9197,9 @@ int ObPartTransCtx::collect_tx_ctx(const ObLSID dest_ls_id,
return ret;
}
// NB: This function can report a retryable error because the outer while loop
// will ignore the error and continuously retry until it succeeds within the
// callback function.
int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
const ObTxCtxMoveArg &arg,
const bool is_new_created)
@ -9231,7 +9233,10 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
ret = OB_OP_NOT_ALLOW;
TRANS_LOG(WARN, "tx ctx has end", KR(ret), KPC(this));
}
} else if (epoch_ != arg.epoch_ && exec_info_.next_log_entry_no_ == 0 && get_redo_log_no_() == 0 && busy_cbs_.is_empty()) {
} else if (epoch_ != arg.epoch_ // ctx created by itself
&& exec_info_.next_log_entry_no_ == 0 // no log submitted
&& get_redo_log_no_() == 0 // no log submitted
&& busy_cbs_.is_empty()) { // no log submitting
// promise tx log before move log
if (exec_info_.state_ == ObTxState::INIT) {
// promise redo log before move log
@ -9282,9 +9287,22 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
if (arg.last_seq_no_ > last_scn_) {
last_scn_.atomic_store(arg.last_seq_no_);
}
if (!ctx_tx_data_.get_start_log_ts().is_valid() || arg.tx_start_scn_ < ctx_tx_data_.get_start_log_ts()) {
// TODO fix start_scn back
// start scn in dest ctx is not valid while start scn in previous dest
// ctx(has been released) or src ctx is valid, so we need change it
if ((!ctx_tx_data_.get_start_log_ts().is_valid() &&
(arg.tx_start_scn_.is_valid()))
||
// start scn in dest ctx is valid and start scn in src ctx is smaller
// than it,, so we need change it
(ctx_tx_data_.get_start_log_ts().is_valid() &&
arg.tx_start_scn_.is_valid() &&
arg.tx_start_scn_ < ctx_tx_data_.get_start_log_ts())) {
if (!ctx_tx_data_.is_read_only()) {
// for merging txn where the start_scn is smaller or refers to a
// previously existing txn, we need to replace it with the smallest
// start_scn to ensure the proper recycling mechanism is in place.
// Otherwise the upper trans version will be calculated incorrectly
ctx_tx_data_.set_start_log_ts(arg.tx_start_scn_);
}
}
@ -9326,7 +9344,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
// log sequence move_tx --> transfer_in --> commit
// so when recycle tx_data on dest_ls, we can see transfer in tablet, not to recycle tx_data which end_scn > transfer_scn
if (OB_SUCC(ret) && exec_info_.state_ >= ObTxState::COMMIT) {
if (OB_FAIL(update_tx_data_end_scn_(move_tx_param.op_scn_, move_tx_param.transfer_scn_))) {
if (OB_FAIL(update_tx_data_start_and_end_scn_(arg.tx_start_scn_,
move_tx_param.op_scn_,
move_tx_param.transfer_scn_))) {
TRANS_LOG(WARN, "update tx data failed", KR(ret), KPC(this));
}
}
@ -9422,7 +9442,9 @@ bool ObPartTransCtx::is_exec_complete_without_lock(ObLSID ls_id,
return is_complete;
}
int ObPartTransCtx::update_tx_data_end_scn_(const SCN end_scn, const SCN transfer_scn)
int ObPartTransCtx::update_tx_data_start_and_end_scn_(const SCN start_scn,
const SCN end_scn,
const SCN transfer_scn)
{
int ret = OB_SUCCESS;
ObTxTable *tx_table = NULL;
@ -9438,6 +9460,14 @@ int ObPartTransCtx::update_tx_data_end_scn_(const SCN end_scn, const SCN transfe
TRANS_LOG(WARN, "copy tx data failed", KR(ret), KPC(this));
} else {
ObTxData *tx_data = tmp_tx_data_guard.tx_data();
if (start_scn.is_valid()) {
share::SCN current_start_scn = get_start_log_ts();
if (current_start_scn.is_valid()) {
tx_data->start_scn_.atomic_store(MIN(start_scn, current_start_scn));
} else {
tx_data->start_scn_.atomic_store(start_scn);
}
}
tx_data->end_scn_.atomic_store(end_scn);
if (OB_FAIL(tx_table->insert(tx_data))) {
TRANS_LOG(WARN, "insert tx data failed", KR(ret), KPC(this));

View File

@ -776,7 +776,9 @@ public:
bool is_exec_complete_without_lock(ObLSID ls_id, int64_t epoch, int64_t transfer_epoch);
private:
int transfer_op_log_cb_(share::SCN op_scn, NotifyType op_type);
int update_tx_data_end_scn_(const share::SCN end_scn, const share::SCN transfer_scn);
int update_tx_data_start_and_end_scn_(const share::SCN start_scn,
const share::SCN end_scn,
const share::SCN transfer_scn);
protected:
virtual int post_msg_(const share::ObLSID&receiver, ObTxMsg &msg);

View File

@ -187,6 +187,21 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
status = MinStartScnStatus::UNKOWN;
}
// During the transfer, we should not update min_start_scn, otherwise we
// will ignore the ctx that has been transferred in. So we check whether
// transfer is going on there.
//
// TODO(handora.qc): while after we have checked the transfer and later
// submitted the log, the transfer may also happens during these two
// operations. So we need double check it in the log application/replay.
if(MinStartScnStatus::UNKOWN == status) {
// do nothing
} else if (cur_ls_ptr->get_transfer_status().get_transfer_prepare_enable()) {
TRANS_LOG(INFO, "ignore min start scn during transfer prepare enabled",
K(cur_ls_ptr->get_transfer_status()), K(status), K(min_start_scn));
status = MinStartScnStatus::UNKOWN;
}
if (MinStartScnStatus::UNKOWN == status) {
min_start_scn.reset();
} else if (MinStartScnStatus::NO_CTX == status) {

View File

@ -39,7 +39,7 @@ using namespace oceanbase::share;
namespace storage
{
int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 30 * 1000 * 1000; // 30 seconds
int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 15 * 1000 * 1000; // 15 seconds
int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
{
@ -71,6 +71,8 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
memtable_mgr_ = static_cast<ObTxDataMemtableMgr *>(memtable_mgr_handle.get_memtable_mgr());
tx_ctx_table_ = tx_ctx_table;
tablet_id_ = LS_TX_DATA_TABLET;
calc_upper_trans_is_disabled_ = false;
latest_transfer_scn_.reset();
is_inited_ = true;
FLOG_INFO("tx data table init success", K(sizeof(ObTxData)), K(sizeof(ObTxDataLinkNode)), KPC(this));
@ -182,6 +184,8 @@ void ObTxDataTable::reset()
calc_upper_info_.reset();
calc_upper_trans_version_cache_.reset();
memtables_cache_.reuse();
calc_upper_trans_is_disabled_ = false;
latest_transfer_scn_.reset();
is_started_ = false;
is_inited_ = false;
}
@ -205,7 +209,7 @@ int ObTxDataTable::offline()
STORAGE_LOG(WARN, "clean memtables cache failed", KR(ret), KPC(this));
} else {
is_started_ = false;
calc_upper_info_.reset();
disable_upper_trans_calculation();
calc_upper_trans_version_cache_.reset();
}
return ret;
@ -230,6 +234,8 @@ int ObTxDataTable::online()
} else {
// load tx data table succeed
is_started_ = true;
calc_upper_trans_is_disabled_ = false;
latest_transfer_scn_.reset();
}
return ret;
@ -609,6 +615,7 @@ int ObTxDataTable::check_need_update_memtables_cache_(bool &need_update)
// cache already up to date, skip update
need_update = false;
}
return ret;
}
@ -821,6 +828,8 @@ int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_en
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret));
} else if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) {
skip_calc = true;
} else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) {
// there is a start_scn of running transactions is smaller than the sstable_end_scn
} else {
@ -1024,11 +1033,14 @@ int ObTxDataTable::check_min_start_in_ctx_(const SCN &sstable_end_scn,
{
SpinRLockGuard lock_guard(calc_upper_info_.lock_);
if (calc_upper_info_.min_start_scn_in_ctx_ <= sstable_end_scn ||
(latest_transfer_scn_.is_valid() &&
calc_upper_info_.keep_alive_scn_ < latest_transfer_scn_) ||
calc_upper_info_.keep_alive_scn_ >= max_decided_scn) {
need_skip = true;
}
if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn > calc_upper_info_.keep_alive_scn_) {
if (cur_ts - calc_upper_info_.update_ts_ > ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL &&
max_decided_scn > calc_upper_info_.keep_alive_scn_) {
need_update_info = true;
}
}
@ -1298,6 +1310,27 @@ int ObTxDataTable::get_start_tx_scn(SCN &start_tx_scn)
return ret;
}
void ObTxDataTable::disable_upper_trans_calculation()
{
ATOMIC_STORE(&calc_upper_trans_is_disabled_, true);
calc_upper_trans_version_cache_.reset();
SpinWLockGuard lock_guard(calc_upper_info_.lock_);
calc_upper_info_.reset();
}
void ObTxDataTable::enable_upper_trans_calculation(const share::SCN latest_transfer_scn)
{
calc_upper_trans_version_cache_.reset();
if (latest_transfer_scn_.is_valid()) {
latest_transfer_scn_ = SCN::max(latest_transfer_scn, latest_transfer_scn_);
} else {
latest_transfer_scn_ = latest_transfer_scn;
}
SpinWLockGuard lock_guard(calc_upper_info_.lock_);
calc_upper_info_.reset();
ATOMIC_STORE(&calc_upper_trans_is_disabled_, false);
}
int ObTxDataTable::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd)
{
int ret = OB_SUCCESS;

View File

@ -122,6 +122,8 @@ public: // ObTxDataTable
ObTxDataTable()
: is_inited_(false),
is_started_(false),
calc_upper_trans_is_disabled_(false),
latest_transfer_scn_(),
ls_id_(),
tablet_id_(0),
arena_allocator_(),
@ -244,6 +246,8 @@ public: // getter and setter
TxDataReadSchema &get_read_schema() { return read_schema_; };
share::ObLSID get_ls_id();
void disable_upper_trans_calculation();
void enable_upper_trans_calculation(const share::SCN latest_transfer_scn);
private:
virtual ObTxDataMemtableMgr *get_memtable_mgr_() { return memtable_mgr_; }
@ -326,6 +330,8 @@ private:
static const int64_t LS_TX_DATA_SCHEMA_COLUMN_CNT = 5;
bool is_inited_;
bool is_started_;
bool calc_upper_trans_is_disabled_;
share::SCN latest_transfer_scn_;
share::ObLSID ls_id_;
ObTabletID tablet_id_;
// Allocator to allocate ObTxData and ObUndoStatus