Fix migration tablet meta merge freeze memtable and ddl commit log concurrency bug
This commit is contained in:
@ -1117,7 +1117,8 @@ ObTabletCopyFinishTask::ObTabletCopyFinishTask()
|
|||||||
ls_(nullptr),
|
ls_(nullptr),
|
||||||
reporter_(nullptr),
|
reporter_(nullptr),
|
||||||
ha_dag_(nullptr),
|
ha_dag_(nullptr),
|
||||||
mixed_tables_handle_(),
|
minor_tables_handle_(),
|
||||||
|
ddl_tables_handle_(),
|
||||||
major_tables_handle_(),
|
major_tables_handle_(),
|
||||||
restore_action_(ObTabletRestoreAction::MAX),
|
restore_action_(ObTabletRestoreAction::MAX),
|
||||||
src_tablet_meta_(nullptr)
|
src_tablet_meta_(nullptr)
|
||||||
@ -1166,10 +1167,12 @@ int ObTabletCopyFinishTask::process()
|
|||||||
LOG_WARN("tablet copy finish task do not init", K(ret));
|
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||||
} else if (ha_dag_->get_ha_dag_net_ctx()->is_failed()) {
|
} else if (ha_dag_->get_ha_dag_net_ctx()->is_failed()) {
|
||||||
FLOG_INFO("ha dag net is already failed, skip physical copy finish task", K(tablet_id_), KPC(ha_dag_));
|
FLOG_INFO("ha dag net is already failed, skip physical copy finish task", K(tablet_id_), KPC(ha_dag_));
|
||||||
} else if (OB_FAIL(create_new_table_store_only_major_())) {
|
} else if (OB_FAIL(create_new_table_store_with_major_())) {
|
||||||
LOG_WARN("failed to create new table store only major", K(ret), K_(tablet_id));
|
LOG_WARN("failed to create new table store with major", K(ret), K_(tablet_id));
|
||||||
} else if (OB_FAIL(create_new_table_store_none_major_())) {
|
} else if (OB_FAIL(create_new_table_store_with_ddl_())) {
|
||||||
LOG_WARN("failed to create new table store none major", K(ret), K_(tablet_id));
|
LOG_WARN("failed to create new table store with ddl", K(ret), K_(tablet_id));
|
||||||
|
} else if (OB_FAIL(create_new_table_store_with_minor_())) {
|
||||||
|
LOG_WARN("failed to create new table store with minor", K(ret), K_(tablet_id));
|
||||||
} else if (OB_FAIL(update_tablet_data_status_())) {
|
} else if (OB_FAIL(update_tablet_data_status_())) {
|
||||||
LOG_WARN("failed to update tablet data status", K(ret), K(tablet_id_));
|
LOG_WARN("failed to update tablet data status", K(ret), K(tablet_id_));
|
||||||
}
|
}
|
||||||
@ -1186,26 +1189,18 @@ int ObTabletCopyFinishTask::process()
|
|||||||
int ObTabletCopyFinishTask::add_sstable(ObTableHandleV2 &table_handle)
|
int ObTabletCopyFinishTask::add_sstable(ObTableHandleV2 &table_handle)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObTablesHandleArray *tables_handle_ptr = nullptr;
|
||||||
common::SpinWLockGuard guard(lock_);
|
common::SpinWLockGuard guard(lock_);
|
||||||
bool is_major_sstable = false;
|
|
||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("tablet copy finish task do not init", K(ret));
|
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||||
} else if (!table_handle.is_valid()) {
|
} else if (!table_handle.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("add sstable get invalid argument", K(ret), K(table_handle));
|
LOG_WARN("add sstable get invalid argument", K(ret), K(table_handle));
|
||||||
} else if (OB_FAIL(check_is_major_sstable_(table_handle, is_major_sstable))) {
|
} else if (OB_FAIL(get_tables_handle_ptr_(table_handle.get_table()->get_key(), tables_handle_ptr))) {
|
||||||
LOG_WARN("failed to check is major sstable", K(ret), K(table_handle));
|
LOG_WARN("failed to get tables handle ptr", K(ret), K(table_handle));
|
||||||
} else {
|
} else if (OB_FAIL(tables_handle_ptr->add_table(table_handle))) {
|
||||||
if (is_major_sstable) {
|
LOG_WARN("failed to add table", K(ret), K(table_handle));
|
||||||
if (OB_FAIL(major_tables_handle_.add_table(table_handle))) {
|
|
||||||
LOG_WARN("failed to add table", K(ret), K(table_handle), K(tablet_id_));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (OB_FAIL(mixed_tables_handle_.add_table(table_handle))) {
|
|
||||||
LOG_WARN("failed to add table", K(ret), K(table_handle), K(tablet_id_));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1217,6 +1212,7 @@ int ObTabletCopyFinishTask::get_sstable(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTenantMetaMemMgr *meta_mem_mgr = nullptr;
|
ObTenantMetaMemMgr *meta_mem_mgr = nullptr;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
ObTablesHandleArray *tables_handle_ptr = nullptr;
|
||||||
common::SpinRLockGuard guard(lock_);
|
common::SpinRLockGuard guard(lock_);
|
||||||
|
|
||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
@ -1228,10 +1224,9 @@ int ObTabletCopyFinishTask::get_sstable(
|
|||||||
} else if (OB_ISNULL(meta_mem_mgr = MTL(ObTenantMetaMemMgr *))) {
|
} else if (OB_ISNULL(meta_mem_mgr = MTL(ObTenantMetaMemMgr *))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("failed to get meta mem mgr from MTL", K(ret));
|
LOG_WARN("failed to get meta mem mgr from MTL", K(ret));
|
||||||
|
} else if (OB_FAIL(get_tables_handle_ptr_(table_key, tables_handle_ptr))) {
|
||||||
|
LOG_WARN("failed to get tables handle ptr", K(ret), K(table_key));
|
||||||
} else {
|
} else {
|
||||||
ObTablesHandleArray *tables_handle_ptr = table_key.is_major_sstable()
|
|
||||||
? &major_tables_handle_
|
|
||||||
: &mixed_tables_handle_;
|
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < tables_handle_ptr->get_count() && !found; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < tables_handle_ptr->get_count() && !found; ++i) {
|
||||||
ObITable *table = tables_handle_ptr->get_table(i);
|
ObITable *table = tables_handle_ptr->get_table(i);
|
||||||
if (OB_ISNULL(table)) {
|
if (OB_ISNULL(table)) {
|
||||||
@ -1248,28 +1243,16 @@ int ObTabletCopyFinishTask::get_sstable(
|
|||||||
|
|
||||||
if (OB_SUCC(ret) && !found) {
|
if (OB_SUCC(ret) && !found) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("can not get sstable, unexected", K(ret), K(table_key), K(major_tables_handle_), K(mixed_tables_handle_));
|
LOG_WARN("can not get sstable, unexected", K(ret), K(table_key), K(major_tables_handle_),
|
||||||
|
K(minor_tables_handle_), K(ddl_tables_handle_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTabletCopyFinishTask::check_is_major_sstable_(
|
int ObTabletCopyFinishTask::inner_update_tablet_table_store_with_minor_(
|
||||||
ObTableHandleV2 &table_handle,
|
const bool &need_tablet_meta_merge,
|
||||||
bool &is_major)
|
ObTablesHandleArray *tables_handle_ptr)
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
is_major = false;
|
|
||||||
if (!table_handle.is_valid()) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("get invalid argument", K(ret), K(table_handle));
|
|
||||||
} else {
|
|
||||||
is_major = table_handle.get_table()->is_major_sstable();
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTabletCopyFinishTask::create_new_table_store_none_major_()
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObBatchUpdateTableStoreParam update_table_store_param;
|
ObBatchUpdateTableStoreParam update_table_store_param;
|
||||||
@ -1286,18 +1269,17 @@ int ObTabletCopyFinishTask::create_new_table_store_none_major_()
|
|||||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("tablet should not be NULL", K(ret), K(tablet_id_));
|
LOG_WARN("tablet should not be NULL", K(ret), K(tablet_id_));
|
||||||
} else if (OB_FAIL(check_need_merge_tablet_meta_(tablet, need_merge))) {
|
} else if (need_tablet_meta_merge && OB_FAIL(check_need_merge_tablet_meta_(tablet, need_merge))) {
|
||||||
LOG_WARN("failedto check remote logical sstable exist", K(ret), KPC(tablet));
|
LOG_WARN("failedto check remote logical sstable exist", K(ret), KPC(tablet));
|
||||||
} else {
|
} else {
|
||||||
update_table_store_param.multi_version_start_ = 0;
|
update_table_store_param.multi_version_start_ = 0;
|
||||||
update_table_store_param.need_report_ = true;
|
|
||||||
update_table_store_param.tablet_meta_ = need_merge ? src_tablet_meta_ : nullptr;
|
update_table_store_param.tablet_meta_ = need_merge ? src_tablet_meta_ : nullptr;
|
||||||
update_table_store_param.rebuild_seq_ = ls_->get_rebuild_seq();
|
update_table_store_param.rebuild_seq_ = ls_->get_rebuild_seq();
|
||||||
|
|
||||||
if (OB_FAIL(update_table_store_param.tables_handle_.assign(mixed_tables_handle_))) {
|
if (OB_FAIL(update_table_store_param.tables_handle_.assign(*tables_handle_ptr))) {
|
||||||
LOG_WARN("failed to assign tables handle", K(ret), K(mixed_tables_handle_));
|
LOG_WARN("failed to assign tables handle", K(ret), KPC(tables_handle_ptr));
|
||||||
} else if (update_table_store_param.tables_handle_.empty()) {
|
} else if (update_table_store_param.tables_handle_.empty()) {
|
||||||
LOG_INFO("tablet do not has sstable", K(ret), K(tablet_id_), K(mixed_tables_handle_), KPC(tablet));
|
LOG_INFO("tablet do not has sstable", K(ret), K(tablet_id_), KPC(tables_handle_ptr), KPC(tablet));
|
||||||
} else if (OB_FAIL(ls_->build_ha_tablet_new_table_store(tablet_id_, update_table_store_param))) {
|
} else if (OB_FAIL(ls_->build_ha_tablet_new_table_store(tablet_id_, update_table_store_param))) {
|
||||||
LOG_WARN("failed to build ha tablet new table store", K(ret), K(tablet_id_), KPC_(src_tablet_meta), K(update_table_store_param));
|
LOG_WARN("failed to build ha tablet new table store", K(ret), K(tablet_id_), KPC_(src_tablet_meta), K(update_table_store_param));
|
||||||
}
|
}
|
||||||
@ -1311,7 +1293,36 @@ int ObTabletCopyFinishTask::create_new_table_store_none_major_()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTabletCopyFinishTask::create_new_table_store_only_major_()
|
int ObTabletCopyFinishTask::create_new_table_store_with_ddl_()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObTablesHandleArray *tables_handle_ptr = &ddl_tables_handle_;
|
||||||
|
const bool need_tablet_meta_merge = false;
|
||||||
|
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||||
|
} else if (OB_FAIL(inner_update_tablet_table_store_with_minor_(need_tablet_meta_merge, tables_handle_ptr))) {
|
||||||
|
LOG_WARN("failed to update tablet table store with minor", K(ret));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTabletCopyFinishTask::create_new_table_store_with_minor_()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObTablesHandleArray *tables_handle_ptr = &minor_tables_handle_;
|
||||||
|
const bool need_tablet_meta_merge = true;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||||
|
} else if (OB_FAIL(inner_update_tablet_table_store_with_minor_(need_tablet_meta_merge, tables_handle_ptr))) {
|
||||||
|
LOG_WARN("failed to update tablet table store with minor", K(ret));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTabletCopyFinishTask::create_new_table_store_with_major_()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTabletHandle tablet_handle;
|
ObTabletHandle tablet_handle;
|
||||||
@ -1539,6 +1550,30 @@ int ObTabletCopyFinishTask::check_remote_logical_sstable_exist_(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTabletCopyFinishTask::get_tables_handle_ptr_(
|
||||||
|
const ObITable::TableKey &table_key,
|
||||||
|
ObTablesHandleArray *&tables_handle_ptr)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
tables_handle_ptr = nullptr;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||||
|
} else if (!table_key.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("get tables handle ptr get invalid argument", K(ret), K(table_key));
|
||||||
|
} else if (table_key.is_major_sstable()) {
|
||||||
|
tables_handle_ptr = &major_tables_handle_;
|
||||||
|
} else if (table_key.is_minor_sstable()) {
|
||||||
|
tables_handle_ptr = &minor_tables_handle_;
|
||||||
|
} else if (table_key.is_ddl_sstable()) {
|
||||||
|
tables_handle_ptr = &ddl_tables_handle_;
|
||||||
|
} else {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("get tables handle ptr get unexpected table key", K(ret), K(table_key));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -223,11 +223,12 @@ public:
|
|||||||
const ObITable::TableKey &table_key,
|
const ObITable::TableKey &table_key,
|
||||||
ObTableHandleV2 &table_handle);
|
ObTableHandleV2 &table_handle);
|
||||||
private:
|
private:
|
||||||
int check_is_major_sstable_(
|
int create_new_table_store_with_major_();
|
||||||
ObTableHandleV2 &tablet_handle,
|
int create_new_table_store_with_ddl_();
|
||||||
bool &is_major);
|
int create_new_table_store_with_minor_();
|
||||||
int create_new_table_store_none_major_();
|
int inner_update_tablet_table_store_with_minor_(
|
||||||
int create_new_table_store_only_major_();
|
const bool &need_tablet_meta_merge,
|
||||||
|
ObTablesHandleArray *tables_handle_ptr);
|
||||||
int inner_update_tablet_table_store_with_major_(
|
int inner_update_tablet_table_store_with_major_(
|
||||||
const common::ObTabletID &tablet_id,
|
const common::ObTabletID &tablet_id,
|
||||||
ObITable *table);
|
ObITable *table);
|
||||||
@ -238,6 +239,9 @@ private:
|
|||||||
int check_remote_logical_sstable_exist_(
|
int check_remote_logical_sstable_exist_(
|
||||||
ObTablet *tablet,
|
ObTablet *tablet,
|
||||||
bool &is_exist);
|
bool &is_exist);
|
||||||
|
int get_tables_handle_ptr_(
|
||||||
|
const ObITable::TableKey &table_key,
|
||||||
|
ObTablesHandleArray *&table_handle_ptr);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
@ -246,7 +250,8 @@ private:
|
|||||||
ObLS *ls_;
|
ObLS *ls_;
|
||||||
observer::ObIMetaReport *reporter_;
|
observer::ObIMetaReport *reporter_;
|
||||||
ObStorageHADag *ha_dag_;
|
ObStorageHADag *ha_dag_;
|
||||||
ObTablesHandleArray mixed_tables_handle_;
|
ObTablesHandleArray minor_tables_handle_;
|
||||||
|
ObTablesHandleArray ddl_tables_handle_;
|
||||||
ObTablesHandleArray major_tables_handle_;
|
ObTablesHandleArray major_tables_handle_;
|
||||||
ObTabletRestoreAction::ACTION restore_action_;
|
ObTabletRestoreAction::ACTION restore_action_;
|
||||||
const ObMigrationTabletParam *src_tablet_meta_;
|
const ObMigrationTabletParam *src_tablet_meta_;
|
||||||
|
|||||||
@ -311,7 +311,6 @@ ObBatchUpdateTableStoreParam::ObBatchUpdateTableStoreParam()
|
|||||||
: tables_handle_(),
|
: tables_handle_(),
|
||||||
snapshot_version_(0),
|
snapshot_version_(0),
|
||||||
multi_version_start_(0),
|
multi_version_start_(0),
|
||||||
need_report_(false),
|
|
||||||
rebuild_seq_(OB_INVALID_VERSION),
|
rebuild_seq_(OB_INVALID_VERSION),
|
||||||
update_logical_minor_sstable_(false),
|
update_logical_minor_sstable_(false),
|
||||||
start_scn_(SCN::min_scn()),
|
start_scn_(SCN::min_scn()),
|
||||||
@ -323,7 +322,6 @@ void ObBatchUpdateTableStoreParam::reset()
|
|||||||
{
|
{
|
||||||
tables_handle_.reset();
|
tables_handle_.reset();
|
||||||
multi_version_start_ = 0;
|
multi_version_start_ = 0;
|
||||||
need_report_ = false;
|
|
||||||
rebuild_seq_ = OB_INVALID_VERSION;
|
rebuild_seq_ = OB_INVALID_VERSION;
|
||||||
update_logical_minor_sstable_ = false;
|
update_logical_minor_sstable_ = false;
|
||||||
start_scn_.set_min();
|
start_scn_.set_min();
|
||||||
@ -350,7 +348,6 @@ int ObBatchUpdateTableStoreParam::assign(
|
|||||||
LOG_WARN("failed to assign tables handle", K(ret), K(param));
|
LOG_WARN("failed to assign tables handle", K(ret), K(param));
|
||||||
} else {
|
} else {
|
||||||
multi_version_start_ = param.multi_version_start_;
|
multi_version_start_ = param.multi_version_start_;
|
||||||
need_report_ = param.need_report_;
|
|
||||||
rebuild_seq_ = param.rebuild_seq_;
|
rebuild_seq_ = param.rebuild_seq_;
|
||||||
update_logical_minor_sstable_ = param.update_logical_minor_sstable_;
|
update_logical_minor_sstable_ = param.update_logical_minor_sstable_;
|
||||||
start_scn_ = param.start_scn_;
|
start_scn_ = param.start_scn_;
|
||||||
|
|||||||
@ -355,13 +355,12 @@ struct ObBatchUpdateTableStoreParam final
|
|||||||
int assign(const ObBatchUpdateTableStoreParam ¶m);
|
int assign(const ObBatchUpdateTableStoreParam ¶m);
|
||||||
int get_max_clog_checkpoint_scn(share::SCN &clog_checkpoint_scn) const;
|
int get_max_clog_checkpoint_scn(share::SCN &clog_checkpoint_scn) const;
|
||||||
|
|
||||||
TO_STRING_KV(K_(tables_handle), K_(snapshot_version), K_(multi_version_start), K_(need_report),
|
TO_STRING_KV(K_(tables_handle), K_(snapshot_version), K_(multi_version_start),
|
||||||
K_(rebuild_seq), K_(update_logical_minor_sstable), K_(start_scn), KP_(tablet_meta));
|
K_(rebuild_seq), K_(update_logical_minor_sstable), K_(start_scn), KP_(tablet_meta));
|
||||||
|
|
||||||
ObTablesHandleArray tables_handle_;
|
ObTablesHandleArray tables_handle_;
|
||||||
int64_t snapshot_version_;
|
int64_t snapshot_version_;
|
||||||
int64_t multi_version_start_;
|
int64_t multi_version_start_;
|
||||||
bool need_report_;
|
|
||||||
int64_t rebuild_seq_;
|
int64_t rebuild_seq_;
|
||||||
bool update_logical_minor_sstable_;
|
bool update_logical_minor_sstable_;
|
||||||
share::SCN start_scn_;
|
share::SCN start_scn_;
|
||||||
|
|||||||
Reference in New Issue
Block a user