when writing to mlog, ensure that the auto-increment seq on the same rowkey dose not rolled back

This commit is contained in:
obdev 2024-10-25 10:13:36 +00:00 committed by ob-robot
parent 6731c49670
commit 01a7822a25
12 changed files with 171 additions and 12 deletions

View File

@ -200,7 +200,7 @@ bool ObTabletAutoincSeq::is_valid() const
return 0 != intervals_count_ && nullptr != intervals_;
}
int ObTabletAutoincSeq::get_autoinc_seq_value(uint64_t &autoinc_seq)
int ObTabletAutoincSeq::get_autoinc_seq_value(uint64_t &autoinc_seq) const
{
int ret = OB_SUCCESS;
if (0 == intervals_count_) {

View File

@ -182,7 +182,7 @@ public:
{
return memtable::MultiSourceDataUnitType::TABLET_SEQ;
}
int get_autoinc_seq_value(uint64_t &autoinc_seq);
int get_autoinc_seq_value(uint64_t &autoinc_seq) const;
int set_autoinc_seq_value(
common::ObArenaAllocator &allocator,
const uint64_t autoinc_seq);

View File

@ -68,6 +68,55 @@ int ObTabletAutoincMgr::set_interval(const ObTabletAutoincParam &param, ObTablet
return ret;
}
int ObTabletAutoincMgr::clear_cache_if_fallback_for_mlog(
const uint64_t current_value)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("tablet autoinc mgr is not inited", K(ret));
} else {
const int64_t TRY_LOCK_INTERVAL = 1000L; // 1ms
uint64_t cache_value = 0;
while (true) {
if (OB_SUCCESS != mutex_.trylock()) {
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_CONFLICT_SLEEP>(TRY_LOCK_INTERVAL);
THIS_WORKER.sched_run();
} else {
break;
}
}
if (prefetch_node_.is_valid() && curr_node_.is_valid()
&& curr_node_.cache_end_ + 1 != prefetch_node_.cache_start_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("curr_node_ and prefetch_node_ is invalid", KPC(this));
}
if (OB_FAIL(ret)) {
} else if (prefetch_node_.is_valid()) {
cache_value = prefetch_node_.cache_end_;
} else if (curr_node_.is_valid()) {
cache_value = curr_node_.cache_end_;
}
if (OB_FAIL(ret)) {
} else if (0 == cache_value) {
LOG_INFO("inc cache is empty, skip check", KPC(this));
} else if (cache_value + 1 < current_value) {
LOG_INFO("auto inc seq fallback, need clear cache", K(ret), KPC(this), K(current_value));
curr_node_.reset();
prefetch_node_.reset();
next_value_ = 1;
} else if (cache_value + 1 > current_value) {
}
mutex_.unlock();
}
return ret;
}
int ObTabletAutoincMgr::fetch_interval(const ObTabletAutoincParam &param, ObTabletCacheInterval &interval) {
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
@ -292,6 +341,92 @@ int ObTabletAutoincrementService::get_autoinc_seq(const uint64_t tenant_id, cons
return ret;
}
int ObTabletAutoincrementService::get_autoinc_seq_for_mlog(
const uint64_t tenant_id,
const ObLSID &ls_id,
const common::ObTabletID &tablet_id,
uint64_t &autoinc_seq)
{
int ret = OB_SUCCESS;
const int64_t auto_increment_cache_size = 10000; //TODO(shuangcan): fix me
ObTabletAutoincParam param;
param.tenant_id_ = tenant_id;
ObTabletAutoincMgr *autoinc_mgr = nullptr;
uint64_t current_value = 0;
ObLSHandle ls_handle;
ObLSService *ls_service = MTL(ObLSService *);
ObLS *ls = nullptr;
ObTabletHandle tablet_handle;
bool is_committed = true;
bool seq_impossible_fallback = false;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("tablet auto increment service is not inited", K(ret));
} else if (OB_ISNULL(ls_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ls_service or log_service", K(ret));
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("get ls failed", K(ret), K(ls_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid ls", K(ret), K(ls_id));
} else if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(ls_id));
}
if (OB_SUCC(ret)) {
bool need_retry = false;
const int64_t abs_timeout_us = THIS_WORKER.is_timeout_ts_valid() ?
THIS_WORKER.get_timeout_ts() : ObTimeUtility::current_time() + GCONF.rpc_timeout;
do {
need_retry = false;
if (OB_FAIL(tablet_handle.get_obj()->cross_ls_get_latest<ObTabletAutoincSeq>(ReadAutoIncSeqValueOp(current_value), is_committed))) {
if (OB_EMPTY_RESULT == ret) {
seq_impossible_fallback = true;
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get auto inc seq", K(ret));
}
} else if (OB_UNLIKELY(!is_committed)) {
if (ObTimeUtility::current_time() > abs_timeout_us) {
ret = OB_TIMEOUT;
LOG_WARN("get auto inc timeout", K(ret), K(abs_timeout_us));
} else {
need_retry = true;
usleep(100);
}
}
} while (need_retry);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(acquire_mgr(tenant_id, tablet_id, auto_increment_cache_size, autoinc_mgr))) {
LOG_WARN("failed to acquire mgr", K(ret));
} else {
ObTabletCacheInterval interval(tablet_id, 1/*cache size*/);
lib::ObMutex &mutex = init_node_mutexs_[tablet_id.id() % INIT_NODE_MUTEX_NUM];
lib::ObMutexGuard guard(mutex);
lib::DisableSchedInterGuard sched_guard;
if (OB_ISNULL(autoinc_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("autoinc mgr is unexpected null", K(ret));
} else if (!seq_impossible_fallback
&& OB_FAIL(autoinc_mgr->clear_cache_if_fallback_for_mlog(current_value))) {
LOG_WARN("failed to clear_cache_if_fallback_for_mlog", K(ret));
} else if (OB_FAIL(autoinc_mgr->fetch_interval(param, interval))) {
LOG_WARN("fail to fetch interval", K(ret), K(param));
} else if (OB_FAIL(interval.next_value(autoinc_seq))) {
LOG_WARN("fail to get next value", K(ret));
}
}
if (nullptr != autoinc_mgr) {
release_mgr(autoinc_mgr);
}
return ret;
}
ObTabletAutoincrementService &ObTabletAutoincrementService::get_instance()
{
static ObTabletAutoincrementService autoinc_service;

View File

@ -60,6 +60,8 @@ public:
int fetch_interval(const ObTabletAutoincParam &param, ObTabletCacheInterval &interval);
int fetch_interval_without_cache(const ObTabletAutoincParam &param, ObTabletCacheInterval &interval);
void destroy() {}
int clear_cache_if_fallback_for_mlog(
const uint64_t current_value);
TO_STRING_KV(K_(tablet_id),
K_(next_value),
@ -142,6 +144,11 @@ public:
int get_tablet_cache_interval(const uint64_t tenant_id,
ObTabletCacheInterval &interval);
int get_autoinc_seq(const uint64_t tenant_id, const common::ObTabletID &tablet_id, uint64_t &autoinc_seq);
int get_autoinc_seq_for_mlog(
const uint64_t tenant_id,
const ObLSID &ls_id,
const common::ObTabletID &tablet_id,
uint64_t &autoinc_seq);
int clear_tablet_autoinc_seq_cache(const uint64_t tenant_id, const common::ObIArray<common::ObTabletID> &tablet_ids, const int64_t abs_timeout_us);
private:
int acquire_mgr(const uint64_t tenant_id, const common::ObTabletID &tablet_id, const int64_t init_cache_size, ObTabletAutoincMgr *&autoinc_mgr);

View File

@ -60,7 +60,7 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_DELETE, ObDASDMLIterator>::write_rows(cons
}
} else if (ctdef.table_param_.get_data_table().is_mlog_table()
&& !ctdef.is_access_mlog_as_master_table_) {
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_DELETE);
ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_DELETE);
if (OB_FAIL(as->insert_rows(ls_id,
tablet_id,
*tx_desc_,

View File

@ -278,7 +278,8 @@ int ObDASMLogDMLIterator::get_next_row(blocksstable::ObDatumRow *&row)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row cannot be null", KR(ret), KP(row));
} else {
if (OB_FAIL(ObDASUtils::generate_mlog_row(tablet_id_,
if (OB_FAIL(ObDASUtils::generate_mlog_row(ls_id_,
tablet_id_,
dml_param_,
*row,
op_type_,

View File

@ -511,11 +511,13 @@ class ObDASMLogDMLIterator : public blocksstable::ObDatumRowIterator
public:
// support get next datum row
ObDASMLogDMLIterator(
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
const storage::ObDMLBaseParam &dml_param,
ObDatumRowIterator *iter,
ObDASOpType op_type)
: tablet_id_(tablet_id),
: ls_id_(ls_id),
tablet_id_(tablet_id),
dml_param_(dml_param),
row_iter_(iter),
op_type_(op_type),
@ -530,6 +532,7 @@ public:
virtual int get_next_row(blocksstable::ObDatumRow *&datum_row) override;
private:
const share::ObLSID &ls_id_;
const ObTabletID &tablet_id_;
const storage::ObDMLBaseParam &dml_param_;
ObDatumRowIterator *row_iter_;

View File

@ -57,7 +57,7 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_INSERT, ObDASDMLIterator>::write_rows(cons
if (ctdef.table_param_.get_data_table().is_mlog_table()
&& !ctdef.is_access_mlog_as_master_table_) {
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_INSERT);
ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_INSERT);
if (OB_FAIL(as->insert_rows(ls_id,
tablet_id,
*tx_desc_,
@ -296,7 +296,7 @@ int ObDASInsertOp::insert_row_with_fetch()
} else if (OB_FAIL(dml_iter.rewind(index_ins_ctdef))) {
LOG_WARN("rewind dml iter failed", K(ret));
} else {
ObDASMLogDMLIterator mlog_iter(index_tablet_id, dml_param, &dml_iter, DAS_OP_TABLE_INSERT);
ObDASMLogDMLIterator mlog_iter(ls_id_, index_tablet_id, dml_param, &dml_iter, DAS_OP_TABLE_INSERT);
ObDatumRowIterator *new_iter = nullptr;
if (index_ins_ctdef->table_param_.get_data_table().is_mlog_table()
&& !index_ins_ctdef->is_access_mlog_as_master_table_) {

View File

@ -245,7 +245,7 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_UPDATE, ObDASUpdIterator>::write_rows(cons
}
} else if (ctdef.table_param_.get_data_table().is_mlog_table()
&& !ctdef.is_access_mlog_as_master_table_) {
ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_UPDATE);
ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_UPDATE);
if (OB_FAIL(as->insert_rows(ls_id,
tablet_id,
*tx_desc_,

View File

@ -400,7 +400,8 @@ int ObDASUtils::find_child_das_def(const ObDASBaseCtDef *root_ctdef,
return ret;
}
int ObDASUtils::generate_mlog_row(const ObTabletID &tablet_id,
int ObDASUtils::generate_mlog_row(const ObLSID &ls_id,
const ObTabletID &tablet_id,
const storage::ObDMLBaseParam &dml_param,
blocksstable::ObDatumRow &row,
ObDASOpType op_type,
@ -420,8 +421,8 @@ int ObDASUtils::generate_mlog_row(const ObTabletID &tablet_id,
} else if (row.count_ < 4) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("each mlog row should at least contain 4 columns", KR(ret), K(row.count_));
} else if (OB_FAIL(auto_inc.get_autoinc_seq(tenant_id, tablet_id, autoinc_seq))) {
LOG_WARN("get_autoinc_seq fail", K(ret), K(tenant_id), K(tablet_id));
} else if (OB_FAIL(auto_inc.get_autoinc_seq_for_mlog(tenant_id, ls_id, tablet_id, autoinc_seq))) {
LOG_WARN("get_autoinc_seq fail", K(ret), K(tenant_id), K(ls_id), K(tablet_id));
} else {
// mlog_row = | base_table_rowkey_cols | partition key cols | sequence_col | ... | dmltype_col | old_new_col |
int sequence_col = 0;

View File

@ -91,7 +91,8 @@ public:
}
return ret;
}
static int generate_mlog_row(const common::ObTabletID &tablet_id,
static int generate_mlog_row(const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const storage::ObDMLBaseParam &dml_param,
blocksstable::ObDatumRow &row,
ObDASOpType op_type,

View File

@ -224,6 +224,17 @@ struct ReadAutoIncSeqOp
share::ObTabletAutoincSeq &auto_inc_seq_;
};
struct ReadAutoIncSeqValueOp
{
ReadAutoIncSeqValueOp(uint64_t &auto_inc_seq_value)
: auto_inc_seq_value_(auto_inc_seq_value) {}
int operator()(const share::ObTabletAutoincSeq &data)
{
return data.get_autoinc_seq_value(auto_inc_seq_value_);
}
uint64_t &auto_inc_seq_value_;
};
struct ReadSplitDataOp
{
ReadSplitDataOp(ObTabletSplitMdsUserData &split_data) : split_data_(split_data) {}