diff --git a/src/share/ob_tablet_autoincrement_param.cpp b/src/share/ob_tablet_autoincrement_param.cpp index 9abc8eb7f..1d464e654 100644 --- a/src/share/ob_tablet_autoincrement_param.cpp +++ b/src/share/ob_tablet_autoincrement_param.cpp @@ -200,7 +200,7 @@ bool ObTabletAutoincSeq::is_valid() const return 0 != intervals_count_ && nullptr != intervals_; } -int ObTabletAutoincSeq::get_autoinc_seq_value(uint64_t &autoinc_seq) +int ObTabletAutoincSeq::get_autoinc_seq_value(uint64_t &autoinc_seq) const { int ret = OB_SUCCESS; if (0 == intervals_count_) { diff --git a/src/share/ob_tablet_autoincrement_param.h b/src/share/ob_tablet_autoincrement_param.h index b10e16274..30b265755 100644 --- a/src/share/ob_tablet_autoincrement_param.h +++ b/src/share/ob_tablet_autoincrement_param.h @@ -182,7 +182,7 @@ public: { return memtable::MultiSourceDataUnitType::TABLET_SEQ; } - int get_autoinc_seq_value(uint64_t &autoinc_seq); + int get_autoinc_seq_value(uint64_t &autoinc_seq) const; int set_autoinc_seq_value( common::ObArenaAllocator &allocator, const uint64_t autoinc_seq); diff --git a/src/share/ob_tablet_autoincrement_service.cpp b/src/share/ob_tablet_autoincrement_service.cpp index 815831e87..f9ba114fc 100644 --- a/src/share/ob_tablet_autoincrement_service.cpp +++ b/src/share/ob_tablet_autoincrement_service.cpp @@ -68,6 +68,55 @@ int ObTabletAutoincMgr::set_interval(const ObTabletAutoincParam ¶m, ObTablet return ret; } +int ObTabletAutoincMgr::clear_cache_if_fallback_for_mlog( + const uint64_t current_value) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("tablet autoinc mgr is not inited", K(ret)); + } else { + const int64_t TRY_LOCK_INTERVAL = 1000L; // 1ms + uint64_t cache_value = 0; + + while (true) { + if (OB_SUCCESS != mutex_.trylock()) { + ob_usleep(TRY_LOCK_INTERVAL); + THIS_WORKER.sched_run(); + } else { + break; + } + } + + if (prefetch_node_.is_valid() && curr_node_.is_valid() + && curr_node_.cache_end_ + 1 != prefetch_node_.cache_start_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("curr_node_ and prefetch_node_ is invalid", KPC(this)); + } + + if (OB_FAIL(ret)) { + } else if (prefetch_node_.is_valid()) { + cache_value = prefetch_node_.cache_end_; + } else if (curr_node_.is_valid()) { + cache_value = curr_node_.cache_end_; + } + + if (OB_FAIL(ret)) { + } else if (0 == cache_value) { + LOG_INFO("inc cache is empty, skip check", KPC(this)); + } else if (cache_value + 1 < current_value) { + LOG_INFO("auto inc seq fallback, need clear cache", K(ret), KPC(this), K(current_value)); + curr_node_.reset(); + prefetch_node_.reset(); + next_value_ = 1; + } else if (cache_value + 1 > current_value) { + } + + mutex_.unlock(); + } + return ret; +} int ObTabletAutoincMgr::fetch_interval(const ObTabletAutoincParam ¶m, ObTabletCacheInterval &interval) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -292,6 +341,92 @@ int ObTabletAutoincrementService::get_autoinc_seq(const uint64_t tenant_id, cons return ret; } +int ObTabletAutoincrementService::get_autoinc_seq_for_mlog( + const uint64_t tenant_id, + const ObLSID &ls_id, + const common::ObTabletID &tablet_id, + uint64_t &autoinc_seq) +{ + int ret = OB_SUCCESS; + const int64_t auto_increment_cache_size = 10000; //TODO(shuangcan): fix me + ObTabletAutoincParam param; + param.tenant_id_ = tenant_id; + ObTabletAutoincMgr *autoinc_mgr = nullptr; + + uint64_t current_value = 0; + ObLSHandle ls_handle; + ObLSService *ls_service = MTL(ObLSService *); + ObLS *ls = nullptr; + ObTabletHandle tablet_handle; + bool is_committed = true; + bool seq_impossible_fallback = false; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("tablet auto increment service is not inited", K(ret)); + } else if (OB_ISNULL(ls_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ls_service or log_service", K(ret)); + } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("get ls failed", K(ret), K(ls_id)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid ls", K(ret), K(ls_id)); + } else if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle))) { + LOG_WARN("failed to get tablet", K(ret), K(ls_id)); + } + + if (OB_SUCC(ret)) { + bool need_retry = false; + const int64_t abs_timeout_us = THIS_WORKER.is_timeout_ts_valid() ? + THIS_WORKER.get_timeout_ts() : ObTimeUtility::current_time() + GCONF.rpc_timeout; + do { + need_retry = false; + if (OB_FAIL(tablet_handle.get_obj()->cross_ls_get_latest(ReadAutoIncSeqValueOp(current_value), is_committed))) { + if (OB_EMPTY_RESULT == ret) { + seq_impossible_fallback = true; + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get auto inc seq", K(ret)); + } + } else if (OB_UNLIKELY(!is_committed)) { + if (ObTimeUtility::current_time() > abs_timeout_us) { + ret = OB_TIMEOUT; + LOG_WARN("get auto inc timeout", K(ret), K(abs_timeout_us)); + } else { + need_retry = true; + usleep(100); + } + } + } while (need_retry); + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(acquire_mgr(tenant_id, tablet_id, auto_increment_cache_size, autoinc_mgr))) { + LOG_WARN("failed to acquire mgr", K(ret)); + } else { + ObTabletCacheInterval interval(tablet_id, 1/*cache size*/); + lib::ObMutex &mutex = init_node_mutexs_[tablet_id.id() % INIT_NODE_MUTEX_NUM]; + lib::ObMutexGuard guard(mutex); + lib::DisableSchedInterGuard sched_guard; + if (OB_ISNULL(autoinc_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("autoinc mgr is unexpected null", K(ret)); + } else if (!seq_impossible_fallback + && OB_FAIL(autoinc_mgr->clear_cache_if_fallback_for_mlog(current_value))) { + LOG_WARN("failed to clear_cache_if_fallback_for_mlog", K(ret)); + } else if (OB_FAIL(autoinc_mgr->fetch_interval(param, interval))) { + LOG_WARN("fail to fetch interval", K(ret), K(param)); + } else if (OB_FAIL(interval.next_value(autoinc_seq))) { + LOG_WARN("fail to get next value", K(ret)); + } + } + if (nullptr != autoinc_mgr) { + release_mgr(autoinc_mgr); + } + return ret; +} + ObTabletAutoincrementService &ObTabletAutoincrementService::get_instance() { static ObTabletAutoincrementService autoinc_service; diff --git a/src/share/ob_tablet_autoincrement_service.h b/src/share/ob_tablet_autoincrement_service.h index 6655f0f5b..f7c064240 100644 --- a/src/share/ob_tablet_autoincrement_service.h +++ b/src/share/ob_tablet_autoincrement_service.h @@ -60,6 +60,8 @@ public: int fetch_interval(const ObTabletAutoincParam ¶m, ObTabletCacheInterval &interval); int fetch_interval_without_cache(const ObTabletAutoincParam ¶m, ObTabletCacheInterval &interval); void destroy() {} + int clear_cache_if_fallback_for_mlog( + const uint64_t current_value); TO_STRING_KV(K_(tablet_id), K_(next_value), @@ -142,6 +144,11 @@ public: int get_tablet_cache_interval(const uint64_t tenant_id, ObTabletCacheInterval &interval); int get_autoinc_seq(const uint64_t tenant_id, const common::ObTabletID &tablet_id, uint64_t &autoinc_seq); + int get_autoinc_seq_for_mlog( + const uint64_t tenant_id, + const ObLSID &ls_id, + const common::ObTabletID &tablet_id, + uint64_t &autoinc_seq); int clear_tablet_autoinc_seq_cache(const uint64_t tenant_id, const common::ObIArray &tablet_ids, const int64_t abs_timeout_us); private: int acquire_mgr(const uint64_t tenant_id, const common::ObTabletID &tablet_id, const int64_t init_cache_size, ObTabletAutoincMgr *&autoinc_mgr); diff --git a/src/sql/das/ob_das_delete_op.cpp b/src/sql/das/ob_das_delete_op.cpp index e45ae1295..66b220fd1 100644 --- a/src/sql/das/ob_das_delete_op.cpp +++ b/src/sql/das/ob_das_delete_op.cpp @@ -60,7 +60,7 @@ int ObDASIndexDMLAdaptor::write_rows(cons } } else if (ctdef.table_param_.get_data_table().is_mlog_table() && !ctdef.is_access_mlog_as_master_table_) { - ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_DELETE); + ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_DELETE); if (OB_FAIL(as->insert_rows(ls_id, tablet_id, *tx_desc_, diff --git a/src/sql/das/ob_das_dml_ctx_define.cpp b/src/sql/das/ob_das_dml_ctx_define.cpp index 8afde090e..6946ab4e4 100644 --- a/src/sql/das/ob_das_dml_ctx_define.cpp +++ b/src/sql/das/ob_das_dml_ctx_define.cpp @@ -278,7 +278,8 @@ int ObDASMLogDMLIterator::get_next_row(blocksstable::ObDatumRow *&row) ret = OB_ERR_UNEXPECTED; LOG_WARN("row cannot be null", KR(ret), KP(row)); } else { - if (OB_FAIL(ObDASUtils::generate_mlog_row(tablet_id_, + if (OB_FAIL(ObDASUtils::generate_mlog_row(ls_id_, + tablet_id_, dml_param_, *row, op_type_, diff --git a/src/sql/das/ob_das_dml_ctx_define.h b/src/sql/das/ob_das_dml_ctx_define.h index 1c55642ea..dd93a1847 100644 --- a/src/sql/das/ob_das_dml_ctx_define.h +++ b/src/sql/das/ob_das_dml_ctx_define.h @@ -511,11 +511,13 @@ class ObDASMLogDMLIterator : public blocksstable::ObDatumRowIterator public: // support get next datum row ObDASMLogDMLIterator( + const share::ObLSID &ls_id, const ObTabletID &tablet_id, const storage::ObDMLBaseParam &dml_param, ObDatumRowIterator *iter, ObDASOpType op_type) - : tablet_id_(tablet_id), + : ls_id_(ls_id), + tablet_id_(tablet_id), dml_param_(dml_param), row_iter_(iter), op_type_(op_type), @@ -530,6 +532,7 @@ public: virtual int get_next_row(blocksstable::ObDatumRow *&datum_row) override; private: + const share::ObLSID &ls_id_; const ObTabletID &tablet_id_; const storage::ObDMLBaseParam &dml_param_; ObDatumRowIterator *row_iter_; diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index f7001bcb6..5ca902be0 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -57,7 +57,7 @@ int ObDASIndexDMLAdaptor::write_rows(cons if (ctdef.table_param_.get_data_table().is_mlog_table() && !ctdef.is_access_mlog_as_master_table_) { - ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_INSERT); + ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_INSERT); if (OB_FAIL(as->insert_rows(ls_id, tablet_id, *tx_desc_, @@ -296,7 +296,7 @@ int ObDASInsertOp::insert_row_with_fetch() } else if (OB_FAIL(dml_iter.rewind(index_ins_ctdef))) { LOG_WARN("rewind dml iter failed", K(ret)); } else { - ObDASMLogDMLIterator mlog_iter(index_tablet_id, dml_param, &dml_iter, DAS_OP_TABLE_INSERT); + ObDASMLogDMLIterator mlog_iter(ls_id_, index_tablet_id, dml_param, &dml_iter, DAS_OP_TABLE_INSERT); ObDatumRowIterator *new_iter = nullptr; if (index_ins_ctdef->table_param_.get_data_table().is_mlog_table() && !index_ins_ctdef->is_access_mlog_as_master_table_) { diff --git a/src/sql/das/ob_das_update_op.cpp b/src/sql/das/ob_das_update_op.cpp index 810c26e14..1b96c6d85 100644 --- a/src/sql/das/ob_das_update_op.cpp +++ b/src/sql/das/ob_das_update_op.cpp @@ -245,7 +245,7 @@ int ObDASIndexDMLAdaptor::write_rows(cons } } else if (ctdef.table_param_.get_data_table().is_mlog_table() && !ctdef.is_access_mlog_as_master_table_) { - ObDASMLogDMLIterator mlog_iter(tablet_id, dml_param_, &iter, DAS_OP_TABLE_UPDATE); + ObDASMLogDMLIterator mlog_iter(ls_id, tablet_id, dml_param_, &iter, DAS_OP_TABLE_UPDATE); if (OB_FAIL(as->insert_rows(ls_id, tablet_id, *tx_desc_, diff --git a/src/sql/das/ob_das_utils.cpp b/src/sql/das/ob_das_utils.cpp index 8501c5990..5f084c930 100644 --- a/src/sql/das/ob_das_utils.cpp +++ b/src/sql/das/ob_das_utils.cpp @@ -400,7 +400,8 @@ int ObDASUtils::find_child_das_def(const ObDASBaseCtDef *root_ctdef, return ret; } -int ObDASUtils::generate_mlog_row(const ObTabletID &tablet_id, +int ObDASUtils::generate_mlog_row(const ObLSID &ls_id, + const ObTabletID &tablet_id, const storage::ObDMLBaseParam &dml_param, blocksstable::ObDatumRow &row, ObDASOpType op_type, @@ -420,8 +421,8 @@ int ObDASUtils::generate_mlog_row(const ObTabletID &tablet_id, } else if (row.count_ < 4) { ret = OB_ERR_UNEXPECTED; LOG_WARN("each mlog row should at least contain 4 columns", KR(ret), K(row.count_)); - } else if (OB_FAIL(auto_inc.get_autoinc_seq(tenant_id, tablet_id, autoinc_seq))) { - LOG_WARN("get_autoinc_seq fail", K(ret), K(tenant_id), K(tablet_id)); + } else if (OB_FAIL(auto_inc.get_autoinc_seq_for_mlog(tenant_id, ls_id, tablet_id, autoinc_seq))) { + LOG_WARN("get_autoinc_seq fail", K(ret), K(tenant_id), K(ls_id), K(tablet_id)); } else { // mlog_row = | base_table_rowkey_cols | partition key cols | sequence_col | ... | dmltype_col | old_new_col | int sequence_col = 0; diff --git a/src/sql/das/ob_das_utils.h b/src/sql/das/ob_das_utils.h index 7c76cf4e1..067149b24 100644 --- a/src/sql/das/ob_das_utils.h +++ b/src/sql/das/ob_das_utils.h @@ -91,7 +91,8 @@ public: } return ret; } - static int generate_mlog_row(const common::ObTabletID &tablet_id, + static int generate_mlog_row(const share::ObLSID &ls_id, + const common::ObTabletID &tablet_id, const storage::ObDMLBaseParam &dml_param, blocksstable::ObDatumRow &row, ObDASOpType op_type, diff --git a/src/storage/tablet/ob_i_tablet_mds_interface.h b/src/storage/tablet/ob_i_tablet_mds_interface.h index 425fc483b..8b770deb7 100644 --- a/src/storage/tablet/ob_i_tablet_mds_interface.h +++ b/src/storage/tablet/ob_i_tablet_mds_interface.h @@ -224,6 +224,17 @@ struct ReadAutoIncSeqOp share::ObTabletAutoincSeq &auto_inc_seq_; }; +struct ReadAutoIncSeqValueOp +{ + ReadAutoIncSeqValueOp(uint64_t &auto_inc_seq_value) + : auto_inc_seq_value_(auto_inc_seq_value) {} + int operator()(const share::ObTabletAutoincSeq &data) + { + return data.get_autoinc_seq_value(auto_inc_seq_value_); + } + uint64_t &auto_inc_seq_value_; +}; + struct ReadSplitDataOp { ReadSplitDataOp(ObTabletSplitMdsUserData &split_data) : split_data_(split_data) {}