Replace memtable pointer with ObTableHandleV2 and use LSHandle in ObMemtable

This commit is contained in:
obdev 2022-12-30 06:38:13 +00:00 committed by ob-robot
parent d1453a1695
commit 976cfca700
15 changed files with 182 additions and 91 deletions

View File

@ -356,13 +356,14 @@ int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTableHandleV2 handle;
if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) {
// move memtables from active_list to frozen_list
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id));
stat_.add_diagnose_info("data_checkpoint freeze failed");
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result))) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result, handle))) {
TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id));
stat_.add_diagnose_info("fail to submit ls_freeze_task");
} else {
@ -439,7 +440,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result)
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
memtable::ObIMemtable *imemtable = nullptr;
ObTableHandleV2 frozen_memtable_handle;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] tablet_freeze start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
@ -480,7 +481,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result)
} else if (FALSE_IT(tablet = handle.get_obj())) {
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) {
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(frozen_memtable_handle))) {
if (ret == OB_ENTRY_NOT_EXIST) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret),
@ -491,13 +492,13 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result)
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, imemtable))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, frozen_memtable_handle))) {
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
} else {
TRANS_LOG(INFO, "[Freezer] succeed to start tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
}
if (OB_FAIL(ret) || OB_ISNULL(imemtable)) {
if (OB_FAIL(ret) || !frozen_memtable_handle.is_valid()) {
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
@ -516,7 +517,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
memtable::ObIMemtable *imemtable = nullptr;
ObTableHandleV2 frozen_memtable_handle;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] force_tablet_freeze start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
@ -560,17 +561,17 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
}
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable, true))) {
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(frozen_memtable_handle, true))) {
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, imemtable))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, frozen_memtable_handle))) {
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit freeze_task");
} else {
TRANS_LOG(INFO, "[Freezer] succeed to start force_tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
}
if (OB_FAIL(ret) || OB_ISNULL(imemtable)) {
if (OB_FAIL(ret) || !frozen_memtable_handle.is_valid()) {
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
@ -582,18 +583,21 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
return ret;
}
int ObFreezer::tablet_freeze_task(memtable::ObIMemtable *imemtable)
int ObFreezer::tablet_freeze_task(ObTableHandleV2 handle)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
memtable::ObIMemtable *imemtable = nullptr;
memtable::ObMemtable *memtable = nullptr;
if (OB_ISNULL(imemtable)) {
if (!handle.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "memtable cannot be null", K(ret), K(ls_id));
} else {
// succeed to set freeze_flag
if (FALSE_IT(memtable = static_cast<memtable::ObMemtable*>(imemtable))) {
if (OB_FAIL(handle.get_memtable(imemtable))) {
LOG_WARN("fail to get memtable", K(ret));
} else if (FALSE_IT(memtable = static_cast<memtable::ObMemtable*>(imemtable))) {
} else if (OB_FAIL(wait_memtable_ready_for_flush_with_ls_lock(memtable))) {
TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id));
} else {
@ -655,14 +659,13 @@ int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *
return ret;
}
int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable)
int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &frozen_memtable_handle)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
imemtable = nullptr;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
@ -701,7 +704,7 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
} else if (FALSE_IT(tablet = handle.get_obj())) {
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) {
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(frozen_memtable_handle))) {
if (ret == OB_ENTRY_NOT_EXIST) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret),
@ -712,7 +715,7 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
}
if (OB_FAIL(ret) || OB_ISNULL(imemtable)) {
if (OB_FAIL(ret) || !frozen_memtable_handle.is_valid()) {
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
@ -725,16 +728,20 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
}
// must ensure that call tablet_freeze_for_replace_tablet_meta() successfully before calling the func
int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *imemtable)
int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &handle)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
memtable::ObIMemtable *imemtable = nullptr;
if (OB_ISNULL(imemtable)) {
if (!handle.is_valid()) {
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] no need to tablet_freeze_for_replace_tablet_meta", K(ret), K(ls_id), K(tablet_id));
} else {
if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) {
// succeed to set freeze flag
if (OB_FAIL(handle.get_memtable(imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to get memtable", K(ret));
} else if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K(ls_id), K(tablet_id));
} else {
stat_.add_diagnose_info("tablet_freeze_for_replace_tablet_meta success");
@ -818,7 +825,7 @@ int ObFreezer::submit_log_for_freeze()
return ret;
}
int ObFreezer::submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memtable::ObIMemtable *imemtable)
int ObFreezer::submit_freeze_task(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle)
{
int ret = OB_SUCCESS;
ObTenantFreezer *tenant_freezer = nullptr;
@ -836,8 +843,8 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memt
ret = tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() {
return ls_freeze_task(); });
} else {
ret = tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() {
return tablet_freeze_task(imemtable); });
ret = tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, handle]() {
return tablet_freeze_task(handle); });
}
} else {
if (is_ls_freeze) {
@ -845,7 +852,7 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memt
[this]() { return ls_freeze_task(); });
} else {
ret = tenant_freezer->freeze_thread_pool_.commit_task(*result,
[this, imemtable]() { return tablet_freeze_task(imemtable); });
[this, handle]() { return tablet_freeze_task(handle); });
}
}
@ -914,6 +921,7 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet)
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletMemtableMgr *memtable_mgr = nullptr;
ObTableHandleV2 last_frozen_memtable_handle;
memtable::ObMemtable *last_frozen_memtable = nullptr;
const common::ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
SCN clog_checkpoint_scn = tablet->get_tablet_meta().clog_checkpoint_scn_;
@ -934,7 +942,17 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet)
LOG_INFO("[Freezer] no need to create an active memtable", K(ret), K(ls_id), K(tablet_id));
} else { // create a new memtable since there is no active memtable
// get schema_version
if (OB_NOT_NULL(last_frozen_memtable = memtable_mgr->get_last_frozen_memtable())) {
if (OB_FAIL(memtable_mgr->get_last_frozen_memtable(last_frozen_memtable_handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("[Freezer] fail to get last frozen memtable", K(ret), K(ls_id), K(tablet_id));
} else {
ret = OB_SUCCESS;
}
} else if (last_frozen_memtable_handle.get_data_memtable(last_frozen_memtable)) {
LOG_WARN("[Freezer] fail to get memtable", K(ret), K(ls_id), K(tablet_id));
}
if (OB_FAIL(ret)) {
} else if (OB_NOT_NULL(last_frozen_memtable)) {
schema_version = last_frozen_memtable->get_max_schema_version();
} else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))) {
LOG_WARN("[Freezer] failed to get schema version", K(ret), K(ls_id), K(tablet_id));

View File

@ -45,6 +45,7 @@ class ObLSTxService;
class ObLSTabletService;
class ObTablet;
class ObLSWRSHandler;
class ObTableHandleV2;
namespace checkpoint
{
class ObDataCheckpoint;
@ -198,8 +199,8 @@ public:
int logstream_freeze(ObFuture<int> *result = nullptr);
int tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
int force_tablet_freeze(const ObTabletID &tablet_id);
int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable);
int handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *imemtable);
int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &handle);
int handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &handle);
/* freeze_flag */
bool is_freeze(uint32_t is_freeze=UINT32_MAX) const;
@ -275,8 +276,8 @@ private:
int inner_logstream_freeze(ObFuture<int> *result);
int submit_log_for_freeze();
int ls_freeze_task();
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
int submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memtable::ObIMemtable *imemtable = nullptr);
int tablet_freeze_task(ObTableHandleV2 handle);
int submit_freeze_task(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);

View File

@ -37,7 +37,8 @@ enum class ObLSGetMod : int
DDL_MOD = 14,
TXSTORAGE_MOD = 15,
LEADER_COORDINATOR_MOD = 16,
TOTAL_MAX_MOD = 17,
DATA_MEMTABLE_MOD = 17,
TOTAL_MAX_MOD = 18,
};
}

View File

@ -3127,6 +3127,7 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
ObMetaDiskAddr disk_addr;
ObFreezer *freezer = nullptr;
memtable::ObIMemtable *imemtable = nullptr;
ObTableHandleV2 memtable_handle;
bool is_tablet_freeze = false;
if (IS_NOT_INIT) {
@ -3163,7 +3164,7 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
if (!tablet_id.is_ls_inner_tablet()) {
if (nullptr != param.tablet_meta_
&& old_tablet->get_clog_checkpoint_scn() < param.tablet_meta_->clog_checkpoint_scn_) {
if (OB_FAIL(freezer->tablet_freeze_for_replace_tablet_meta(tablet_id, imemtable))) {
if (OB_FAIL(freezer->tablet_freeze_for_replace_tablet_meta(tablet_id, memtable_handle))) {
LOG_WARN("failed to freeze tablet", K(ret), K(tablet_id), KPC(old_tablet));
} else {
is_tablet_freeze = true;
@ -3172,7 +3173,10 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
if (OB_FAIL(ret)) {
} else if (!is_tablet_freeze) {
} else if (nullptr != imemtable) {
} else if (!memtable_handle.is_valid()) {
} else if (OB_FAIL(memtable_handle.get_memtable(imemtable))) {
LOG_WARN("failed to get memtable", K(ret), K(tablet_id));
} else {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(imemtable);
if (OB_FAIL(memtable->resolve_right_boundary_for_migration())) {
LOG_WARN("failed to resolve right boundary", K(ret), K(tablet_id));
@ -3233,7 +3237,7 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
if (is_tablet_freeze) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(freezer->handle_frozen_memtable_for_replace_tablet_meta(tablet_id, imemtable))) {
if (OB_TMP_FAIL(freezer->handle_frozen_memtable_for_replace_tablet_meta(tablet_id, memtable_handle))) {
LOG_WARN("failed to handle_frozen_memtable_for_replace_tablet_meta", K(tmp_ret), K(tablet_id), K(param));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}

View File

@ -93,7 +93,7 @@ ObMemtable::ObMemtable()
: ObIMemtable(),
ObFreezeCheckpoint(),
is_inited_(false),
ls_(nullptr),
ls_handle_(),
freezer_(nullptr),
memtable_mgr_(nullptr),
freeze_clock_(0),
@ -136,7 +136,7 @@ ObMemtable::~ObMemtable()
}
int ObMemtable::init(const ObITable::TableKey &table_key,
ObLS *ls,
ObLSHandle &ls_handle,
storage::ObFreezer *freezer,
storage::ObTabletMemtableMgr *memtable_mgr,
const int64_t schema_version,
@ -150,10 +150,11 @@ int ObMemtable::init(const ObITable::TableKey &table_key,
} else if (!table_key.is_valid() ||
OB_ISNULL(freezer) ||
OB_ISNULL(memtable_mgr) ||
schema_version < 0) {
schema_version < 0 ||
OB_UNLIKELY(!ls_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid param", K(ret), K(table_key), KP(freezer), KP(memtable_mgr),
K(schema_version), K(freeze_clock));
K(schema_version), K(freeze_clock), K(ls_handle));
} else if (FALSE_IT(set_memtable_mgr(memtable_mgr))) {
} else if (FALSE_IT(set_freeze_clock(freeze_clock))) {
} else if (FALSE_IT(set_max_schema_version(schema_version))) {
@ -171,7 +172,7 @@ int ObMemtable::init(const ObITable::TableKey &table_key,
} else if (OB_FAIL(ObITable::init(table_key))) {
TRANS_LOG(WARN, "failed to set_table_key", K(ret), K(table_key));
} else {
ls_ = ls;
ls_handle_ = ls_handle;
ObMemtableStat::get_instance().register_memtable(this);
if (table_key.get_tablet_id().is_sys_tablet()) {
mode_ = lib::Worker::CompatMode::MYSQL;
@ -237,7 +238,7 @@ void ObMemtable::destroy()
time_guard.click();
local_allocator_.destroy();
time_guard.click();
ls_ = nullptr;
ls_handle_.reset();
freezer_ = nullptr;
memtable_mgr_ = nullptr;
freeze_clock_ = 0;
@ -1239,7 +1240,12 @@ int ObMemtable::set_freezer(ObFreezer *handler)
int ObMemtable::get_ls_id(share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
ls_id = freezer_->get_ls_id();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else {
ls_id = ls_handle_.get_ls()->get_ls_id();
}
return ret;
}
@ -2593,10 +2599,10 @@ int ObMemtable::get_tx_table_guard(ObTxTableGuard &tx_table_guard)
{
int ret = OB_SUCCESS;
if (NULL == ls_) {
if (OB_UNLIKELY(!ls_handle_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "ls_ is NULL");
} else if (OB_FAIL(ls_->get_tx_table_guard(tx_table_guard))) {
TRANS_LOG(ERROR, "ls_handle is invalid", K(ret));
} else if (OB_FAIL(ls_handle_.get_ls()->get_tx_table_guard(tx_table_guard))) {
TRANS_LOG(WARN, "Get tx table guard from ls failed.", KR(ret));
}

View File

@ -26,6 +26,7 @@
#include "storage/memtable/ob_multi_source_data.h"
#include "storage/checkpoint/ob_freeze_checkpoint.h"
#include "storage/compaction/ob_medium_compaction_mgr.h"
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
namespace oceanbase
{
@ -167,7 +168,7 @@ public:
virtual ~ObMemtable();
public:
int init(const ObITable::TableKey &table_key,
storage::ObLS *ls,
ObLSHandle &ls_handle,
storage::ObFreezer *freezer,
storage::ObTabletMemtableMgr *memtable_mgr,
const int64_t schema_version,
@ -521,7 +522,7 @@ private:
private:
DISALLOW_COPY_AND_ASSIGN(ObMemtable);
bool is_inited_;
storage::ObLS *ls_;
storage::ObLSHandle ls_handle_;
storage::ObFreezer *freezer_;
storage::ObTabletMemtableMgr *memtable_mgr_;
uint32_t freeze_clock_;

View File

@ -211,10 +211,10 @@ int ObIMemtableMgr::get_multi_source_data_unit(
}
int ObIMemtableMgr::get_memtable_for_multi_source_data_unit(
memtable::ObMemtable *&memtable,
ObTableHandleV2 &handle,
const memtable::MultiSourceDataUnitType type) const
{
UNUSED(memtable);
UNUSED(handle);
UNUSED(type);
int ret = OB_NOT_SUPPORTED;
return ret;

View File

@ -248,7 +248,7 @@ public:
ObIAllocator *allocator = nullptr) const;
virtual int get_memtable_for_multi_source_data_unit(
memtable::ObMemtable *&memtable,
ObTableHandleV2 &handle,
const memtable::MultiSourceDataUnitType type) const;
int release_memtables(const share::SCN &scn);

View File

@ -803,8 +803,9 @@ int ObTablet::save_multi_source_data_unit(
ret = common::OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ls inner tablet does not support multi source data", K(ret), K(tablet_id));
} else if (is_callback) {
ObTableHandleV2 handle;
memtable::ObMemtable *memtable = nullptr;
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, msd->type()))) {
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(handle, msd->type()))) {
if (OB_ENTRY_NOT_EXIST == ret && for_replay) {
TRANS_LOG(INFO, "clog_checkpoint_scn of ls is bigger than the commit_info scn of this multi-trans in replay, failed to get multi source data unit",
K(ret), K(ls_id), K(tablet_id), K(memtable_scn));
@ -814,6 +815,8 @@ int ObTablet::save_multi_source_data_unit(
} else {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn));
}
} else if (OB_FAIL(handle.get_data_memtable(memtable))) {
TRANS_LOG(WARN, "fail to get memtable", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op, is_callback))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op));
}
@ -821,9 +824,12 @@ int ObTablet::save_multi_source_data_unit(
// for tx_end(inf_ref), binding_info must be prepared after tablet_state is prepared
else if (memtable::MemtableRefOp::INC_REF == ref_op
&& memtable::MultiSourceDataUnitType::TABLET_BINDING_INFO == msd->type()) {
ObTableHandleV2 handle;
memtable::ObMemtable *memtable = nullptr;
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, memtable::MultiSourceDataUnitType::TABLET_TX_DATA))) {
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(handle, memtable::MultiSourceDataUnitType::TABLET_TX_DATA))) {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn));
} else if (OB_FAIL(handle.get_data_memtable(memtable))) {
TRANS_LOG(WARN, "[Freezer] fail to get memtable", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op/*add_ref*/, is_callback/*false*/))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op));
}

View File

@ -201,6 +201,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
table_key.scn_range_.start_scn_ = clog_checkpoint_scn;
table_key.scn_range_.end_scn_.set_max();
memtable::ObMemtable *memtable = NULL;
ObLSHandle ls_handle;
if (OB_FAIL(t3m_->acquire_memtable(memtable_handle))) {
LOG_WARN("failed to create memtable", K(ret), K(ls_id), K(tablet_id_));
@ -208,8 +209,13 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
} else if (OB_ISNULL(memtable = static_cast<memtable::ObMemtable *>(memtable_handle.get_table()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get memtable", K(ret), K(ls_id), K(tablet_id_), K(memtable_handle));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::DATA_MEMTABLE_MOD))) {
LOG_WARN("failed to get log stream", K(ret), K(ls_id), K(tablet_id_));
} else if (OB_UNLIKELY(!ls_handle.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id_));
} else if (OB_FAIL(memtable->init(table_key,
ls_,
ls_handle,
freezer_,
this,
schema_version,
@ -217,8 +223,20 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
LOG_WARN("failed to init memtable", K(ret), K(ls_id), K(table_key), KP(freezer_), KP(this),
K(schema_version), K(logstream_freeze_clock));
} else {
memtable::ObMemtable *last_frozen_memtable = get_last_frozen_memtable_();
if (OB_NOT_NULL(last_frozen_memtable)) {
ObTableHandleV2 last_frozen_memtable_handle;
memtable::ObMemtable *last_frozen_memtable = nullptr;
if (OB_FAIL(get_last_frozen_memtable_(last_frozen_memtable_handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("fail to get last frozen memtable", K(ret));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(last_frozen_memtable_handle.get_data_memtable(last_frozen_memtable))) {
LOG_WARN("fail to get memtable", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_NOT_NULL(last_frozen_memtable)) {
// keep the check order: is_frozen, write_ref_cnt, then unsubmitted_cnt and unsynced_cnt
int64_t write_ref = last_frozen_memtable->get_write_ref();
int64_t unsubmitted_cnt = last_frozen_memtable->get_unsubmitted_cnt();
@ -354,47 +372,60 @@ int ObTabletMemtableMgr::get_active_memtable_(ObTableHandleV2 &handle) const
return ret;
}
ObMemtable *ObTabletMemtableMgr::get_last_frozen_memtable() const
int ObTabletMemtableMgr::get_last_frozen_memtable(ObTableHandleV2 &handle) const
{
int ret = OB_SUCCESS;
MemMgrRLockGuard lock_guard(lock_);
memtable::ObMemtable *memtable = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
} else {
memtable = get_last_frozen_memtable_();
} else if (OB_FAIL(get_last_frozen_memtable_(handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("fail to get last frozen memtable", K(ret));
}
}
return memtable;
return ret;
}
ObMemtable *ObTabletMemtableMgr::get_last_frozen_memtable_() const
int ObTabletMemtableMgr::get_last_frozen_memtable_(ObTableHandleV2 &handle) const
{
int ret = OB_SUCCESS;
ObMemtable *memtable = nullptr;
handle.reset();
if (OB_UNLIKELY(get_memtable_count_() == 0)) {
ret = OB_ENTRY_NOT_EXIST;
} else if (memtable_tail_ > memtable_head_) {
for (int64_t i = memtable_tail_ - 1; OB_SUCC(ret) && i >= memtable_head_; --i) {
memtable = get_memtable_(i);
if (OB_ISNULL(memtable)) {
ObTableHandleV2 m_handle;
const ObMemtable *memtable = nullptr;
if (OB_FAIL(get_ith_memtable(i, m_handle))) {
STORAGE_LOG(WARN, "fail to get ith memtable", K(ret), K(i));
} else if (OB_UNLIKELY(!m_handle.is_valid())) {
ret = OB_ERR_SYS;
LOG_ERROR("memtable must not null", K(ret));
LOG_ERROR("memtable handle is invalid", K(ret), K(m_handle));
} else if (OB_FAIL(m_handle.get_data_memtable(memtable))) {
LOG_WARN("fail to get memtable", K(ret), K(m_handle));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_SYS;
LOG_ERROR("memtable must not null", K(ret), K(m_handle));
} else if (memtable->is_frozen_memtable()) {
handle = m_handle;
break;
} else {
memtable = nullptr;
}
}
}
return memtable;
if (OB_FAIL(ret)) {
} else if (!handle.is_valid()) {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
}
int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *memtable,
int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable,
SCN start_scn,
SCN snapshot_scn)
{
@ -423,7 +454,7 @@ int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *
return ret;
}
int ObTabletMemtableMgr::unset_logging_blocked_for_active_memtable(ObIMemtable *memtable)
int ObTabletMemtableMgr::unset_logging_blocked_for_active_memtable(memtable::ObIMemtable *memtable)
{
ObTableHandleV2 handle;
ObIMemtable *active_memtable = nullptr;
@ -450,10 +481,10 @@ int ObTabletMemtableMgr::unset_logging_blocked_for_active_memtable(ObIMemtable *
return ret;
}
int ObTabletMemtableMgr::set_is_tablet_freeze_for_active_memtable(ObIMemtable *&active_memtable, bool is_force_freeze)
int ObTabletMemtableMgr::set_is_tablet_freeze_for_active_memtable(ObTableHandleV2 &handle, bool is_force_freeze)
{
ObTableHandleV2 handle;
active_memtable = nullptr;
handle.reset();
memtable::ObIMemtable *active_memtable = nullptr;
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
@ -912,12 +943,11 @@ int ObTabletMemtableMgr::get_multi_source_data_unit(
}
int ObTabletMemtableMgr::get_memtable_for_multi_source_data_unit(
memtable::ObMemtable *&memtable,
ObTableHandleV2 &handle,
const memtable::MultiSourceDataUnitType type) const
{
int ret = OB_SUCCESS;
ObTableHandleV2 handle;
memtable = nullptr;
handle.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -930,19 +960,28 @@ int ObTabletMemtableMgr::get_memtable_for_multi_source_data_unit(
LOG_DEBUG("memtable does not exist", K(ret), K(memtable_head_), K(memtable_tail_));
} else {
for (int64_t i = memtable_tail_ - 1; OB_SUCC(ret) && i >= memtable_head_; --i) {
memtable = static_cast<ObMemtable*>(tables_[get_memtable_idx(i)]);
if (memtable->has_multi_source_data_unit(type)) {
ObTableHandleV2 m_handle;
const ObMemtable *memtable = nullptr;
if (OB_FAIL(get_ith_memtable(i, m_handle))) {
STORAGE_LOG(WARN, "fail to get ith memtable", K(ret), K(i));
} else if (OB_UNLIKELY(!m_handle.is_valid())) {
ret = OB_ERR_SYS;
LOG_ERROR("memtable handle is invalid", K(ret), K(m_handle));
} else if (OB_FAIL(m_handle.get_data_memtable(memtable))) {
LOG_WARN("fail to get memtable", K(ret), K(m_handle));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable is nullptr", K(ret), K(m_handle));
} else if (memtable->has_multi_source_data_unit(type)) {
handle = m_handle;
break;
} else {
memtable = nullptr;
}
}
}
}
if (OB_FAIL(ret)) {
memtable = nullptr;
} else if (OB_ISNULL(memtable)) {
} else if (!handle.is_valid()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("failed to get memtable", K(ret), K(type));
}

View File

@ -37,6 +37,8 @@ class ObFreezer;
class ObTabletMemtableMgr : public ObIMemtableMgr
{
public:
friend class memtable::ObMemtable;
public:
typedef common::ObIArray<ObTableHandleV2> ObTableHdlArray;
@ -58,16 +60,14 @@ public:
int64_t get_memtable_count() const;
virtual int get_memtable_for_replay(share::SCN replay_scn,
ObTableHandleV2 &handle) override;
memtable::ObMemtable *get_last_frozen_memtable() const;
memtable::ObMemtable *get_last_frozen_memtable_() const;
int get_last_frozen_memtable(ObTableHandleV2 &handle) const;
virtual int get_boundary_memtable(ObTableHandleV2 &handle) override;
virtual int get_multi_source_data_unit(
memtable::ObIMultiSourceDataUnit *const multi_source_data_unit,
ObIAllocator *allocator = nullptr) const override;
virtual int get_memtable_for_multi_source_data_unit(
memtable::ObMemtable *&memtable,
ObTableHandleV2 &handle,
const memtable::MultiSourceDataUnitType type) const override;
int release_tail_memtable(memtable::ObIMemtable *memtable);
int create_memtable(const share::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay);
@ -84,11 +84,7 @@ public:
const bool include_active_memtable = true);
int get_memtables_nolock(ObTableHdlArray &handle);
int get_first_frozen_memtable(ObTableHandleV2 &handle) const;
int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable,
share::SCN start_scn,
share::SCN snapshot_version);
int unset_logging_blocked_for_active_memtable(memtable::ObIMemtable *memtable);
int set_is_tablet_freeze_for_active_memtable(memtable::ObIMemtable *&memtable,
int set_is_tablet_freeze_for_active_memtable(ObTableHandleV2 &handle,
bool is_force_freeze = false);
ObStorageSchemaRecorder &get_storage_schema_recorder()
@ -134,6 +130,11 @@ private:
int64_t &start_pos);
int get_first_frozen_memtable_(ObTableHandleV2 &handle) const;
void clean_tail_memtable_();
int get_last_frozen_memtable_(ObTableHandleV2 &handle) const;
int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable,
share::SCN start_scn,
share::SCN snapshot_version);
int unset_logging_blocked_for_active_memtable(memtable::ObIMemtable *memtable);
DISALLOW_COPY_AND_ASSIGN(ObTabletMemtableMgr);

View File

@ -30,6 +30,7 @@
#include "storage/memtable/mvcc/ob_mvcc_row.h"
#include "share/scn.h"
#include "storage/ls/ob_ls.h"
#include "storage/tx_storage/ob_ls_map.h"
namespace oceanbase
{
@ -121,8 +122,10 @@ public:
table_key.scn_range_.end_scn_ = share::SCN::max_scn();
int64_t schema_version = 1;
uint32_t freeze_clock = 0;
ObLSHandle ls_handle;
ls_handle.set_ls(ls_map_, ls_, ObLSGetMod::DATA_MEMTABLE_MOD);
return mt_table.init(table_key, nullptr, &freezer_, &memtable_mgr_, schema_version, freeze_clock);
return mt_table.init(table_key, ls_handle, &freezer_, &memtable_mgr_, schema_version, freeze_clock);
}
int mock_col_desc()
{
@ -190,6 +193,7 @@ public:
ObTableReadInfo read_info_;
ObArenaAllocator allocator_;
MemtableIDMap ctx_map_;
ObLSMap ls_map_;
};
class RunCtxGuard

View File

@ -312,6 +312,8 @@ int TestCompactionPolicy::mock_memtable(
generate_table_key(ObITable::DATA_MEMTABLE, start_scn, end_border, table_key);
ObMemtable *memtable = nullptr;
ObLSHandle ls_handle;
ObLSService *ls_svr = nullptr;
ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr *);
if (OB_FAIL(t3m->acquire_memtable(table_handle))) {
@ -319,7 +321,11 @@ int TestCompactionPolicy::mock_memtable(
} else if (OB_ISNULL(memtable = static_cast<ObMemtable*>(table_handle.get_table()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get memtable", K(ret));
} else if (OB_FAIL(memtable->init(table_key, mt_mgr->ls_, mt_mgr->freezer_, mt_mgr, 0, mt_mgr->freezer_->get_freeze_clock()))) {
} else if (OB_ISNULL(ls_svr = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(ls_svr->get_ls(mt_mgr->ls_->get_ls_id(), ls_handle, ObLSGetMod::DATA_MEMTABLE_MOD))) {
LOG_WARN("failed to get ls handle", K(ret));
} else if (OB_FAIL(memtable->init(table_key, ls_handle, mt_mgr->freezer_, mt_mgr, 0, mt_mgr->freezer_->get_freeze_clock()))) {
LOG_WARN("failed to init memtable", K(ret));
} else if (OB_FAIL(mt_mgr->add_memtable_(table_handle))) {
LOG_WARN("failed to add memtable to mgr", K(ret));

View File

@ -241,7 +241,9 @@ memtable::ObMemtable *ObTxNode::create_memtable_(const int64_t tablet_id) {
table_key.tablet_id_ = tablet_id;
table_key.scn_range_.start_scn_.convert_for_gts(100);
table_key.scn_range_.end_scn_.set_max();
t->init(table_key, &fake_ls_, &fake_freezer_, &fake_memtable_mgr_, 0, 0);
ObLSHandle ls_handle;
ls_handle.set_ls(fake_ls_map_, fake_ls_, ObLSGetMod::DATA_MEMTABLE_MOD);
t->init(table_key, ls_handle, &fake_freezer_, &fake_memtable_mgr_, 0, 0);
return t;
}

View File

@ -26,6 +26,7 @@
#include "storage/tx_storage/ob_tenant_freezer.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/lock/ob_scond.h"
#include "storage/tx_storage/ob_ls_map.h"
#include "../mock_utils/msg_bus.h"
#include "../mock_utils/basic_fake_define.h"
@ -270,6 +271,7 @@ public:
ObTabletMemtableMgr fake_memtable_mgr_;
storage::ObLS mock_ls_; // TODO mock required member on LS
common::hash::ObHashSet<int16_t> drop_msg_type_set_;
ObLSMap fake_ls_map_;
};
} // transaction