[FEAT MERGE]forcelly_release_mds
This commit is contained in:
parent
57232c7578
commit
64721d8fe8
@ -111,8 +111,6 @@ int ObTabletCopyFinishTask::process()
|
||||
LOG_WARN("failed to get tablet status", K(ret), K(tablet_id_));
|
||||
} else if (ObCopyTabletStatus::TABLET_NOT_EXIST == status) {
|
||||
FLOG_INFO("copy tablet from src do not exist, skip copy finish task", K(tablet_id_), K(status));
|
||||
} else if (OB_FAIL(check_log_replay_to_mds_sstable_end_scn_())) {
|
||||
LOG_WARN("failed to check log replay to mds sstable end scn", K(ret), K(tablet_id_));
|
||||
} else if (OB_FAIL(create_new_table_store_with_major_())) {
|
||||
LOG_WARN("failed to create new table store with major", K(ret), K_(tablet_id));
|
||||
} else if (OB_FAIL(create_new_table_store_with_minor_())) {
|
||||
@ -271,18 +269,48 @@ int ObTabletCopyFinishTask::create_new_table_store_with_major_()
|
||||
int ObTabletCopyFinishTask::create_new_table_store_with_minor_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN mds_max_end_scn(SCN::min_scn());
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTabletPointer *pointer = nullptr;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||
} else if (OB_FAIL(ObStorageHATabletBuilderUtil::build_table_with_minor_tables(ls_,
|
||||
tablet_id_,
|
||||
src_tablet_meta_,
|
||||
mds_tables_handle_,
|
||||
minor_tables_handle_,
|
||||
ddl_tables_handle_,
|
||||
restore_action_))) {
|
||||
LOG_WARN("failed to build table with minor tables", K(ret), K(mds_tables_handle_),
|
||||
K(minor_tables_handle_), K(ddl_tables_handle_), K(restore_action_));
|
||||
} else if (OB_FAIL(ls_->ha_get_tablet(tablet_id_, tablet_handle))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(tablet_id_));
|
||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet should not be NULL", K(ret), K(tablet_id_));
|
||||
} else if (OB_ISNULL(pointer = tablet->get_pointer_handle().get_resource_ptr())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet pointer should not be NULL", K(ret), KPC(tablet));
|
||||
} else if (OB_FAIL(get_mds_sstable_max_end_scn_(mds_max_end_scn))) {
|
||||
LOG_WARN("failed to get mds sstable max end scn", K(ret), K(mds_tables_handle_));
|
||||
} else {
|
||||
TabletMdsLockGuard<LockMode::EXCLUSIVE> guard;
|
||||
pointer->get_mds_truncate_lock_guard(guard);
|
||||
//Release mds node failed must do dag net retry.
|
||||
//Because ls still replay and mds may has residue node which makes data incorrect.
|
||||
//So should make ls offline and online
|
||||
if (OB_FAIL(ObStorageHATabletBuilderUtil::build_table_with_minor_tables(ls_,
|
||||
tablet_id_,
|
||||
src_tablet_meta_,
|
||||
mds_tables_handle_,
|
||||
minor_tables_handle_,
|
||||
ddl_tables_handle_,
|
||||
restore_action_))) {
|
||||
LOG_WARN("failed to build table with minor tables", K(ret), K(mds_tables_handle_),
|
||||
K(minor_tables_handle_), K(ddl_tables_handle_), K(restore_action_));
|
||||
} else if (mds_max_end_scn.is_min()) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(pointer->release_mds_nodes_redo_scn_below(tablet_id_, mds_max_end_scn))) {
|
||||
LOG_WARN("failed to relase mds node redo scn blow", K(ret), K(tablet_id_), K(mds_max_end_scn));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const bool need_retry = false;
|
||||
if (OB_SUCCESS != (tmp_ret = ha_dag_->get_ha_dag_net_ctx()->set_result(ret, need_retry))) {
|
||||
LOG_ERROR("failed to set ha dag net ctx result", K(tmp_ret), K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -547,30 +575,13 @@ int ObTabletCopyFinishTask::check_major_valid_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletCopyFinishTask::check_log_replay_to_mds_sstable_end_scn_()
|
||||
int ObTabletCopyFinishTask::get_mds_sstable_max_end_scn_(share::SCN &max_end_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN max_end_scn(SCN::min_scn());
|
||||
SCN current_replay_scn;
|
||||
const int64_t total_timeout = 20_min;
|
||||
const int64_t wait_replay_timeout = 10_min;
|
||||
bool is_ls_deleted = false;
|
||||
SCN last_replay_scn;
|
||||
share::SCN readable_scn;
|
||||
const int64_t start_ts = ObTimeUtil::current_time();
|
||||
int64_t current_ts = 0;
|
||||
int64_t last_replay_ts = 0;
|
||||
const int64_t CHECK_CONDITION_INTERVAL = 200_ms;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
int32_t result = OB_SUCCESS;
|
||||
|
||||
max_end_scn.set_min();
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("tablet copy finish task do not init", K(ret));
|
||||
} else if (tablet_id_.is_ls_inner_tablet()) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(timeout_ctx.set_timeout(total_timeout))) {
|
||||
LOG_WARN("failed to set timeout ctx", K(ret), K(tablet_id_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < mds_tables_handle_.get_count(); ++i) {
|
||||
const ObITable *table = mds_tables_handle_.get_table(i);
|
||||
@ -581,70 +592,7 @@ int ObTabletCopyFinishTask::check_log_replay_to_mds_sstable_end_scn_()
|
||||
max_end_scn = SCN::max(table->get_end_scn(), max_end_scn);
|
||||
}
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret)) {
|
||||
if (timeout_ctx.is_timeouted()) {
|
||||
ret = OB_WAIT_REPLAY_TIMEOUT;
|
||||
LOG_WARN("wait log replay to mds sstable end scn already timeout", K(ret));
|
||||
} else if (ha_dag_->get_ha_dag_net_ctx()->is_failed()) {
|
||||
FLOG_INFO("ha dag net is already failed, skip physical copy finish task", K(tablet_id_), KPC(ha_dag_));
|
||||
ret = OB_CANCELED;
|
||||
} else if (ls_->is_stopped()) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
LOG_WARN("ls is not running, stop check log replay scn", K(ret), K(tablet_id_));
|
||||
} else if (OB_FAIL(ObStorageHAUtils::check_ls_deleted(ls_->get_ls_id(), is_ls_deleted))) {
|
||||
LOG_WARN("failed to get ls status from inner table", K(ret));
|
||||
} else if (is_ls_deleted) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("ls will be removed, no need run migration", K(ret), KPC(ls_), K(is_ls_deleted));
|
||||
} else if (OB_FAIL(ObStorageHAUtils::check_log_status(ls_->get_tenant_id(), ls_->get_ls_id(), result))) {
|
||||
LOG_WARN("failed to check log status", K(ret), KPC(ls_), K(tablet_id_));
|
||||
} else if (OB_SUCCESS != result) {
|
||||
LOG_INFO("can not replay log, it will retry", K(result), KPC(ha_dag_));
|
||||
if (OB_FAIL(ha_dag_->get_ha_dag_net_ctx()->set_result(result/*result*/, true/*need_retry*/, this->get_dag()->get_type()))) {
|
||||
LOG_WARN("failed to set result", K(ret), KPC(ha_dag_));
|
||||
} else {
|
||||
ret = result;
|
||||
LOG_WARN("log sync or replay error, need retry", K(ret), KPC(ha_dag_));
|
||||
}
|
||||
} else if (OB_FAIL(ls_->get_max_decided_scn(current_replay_scn))) {
|
||||
LOG_WARN("failed to get current replay log ts", K(ret), K(tablet_id_));
|
||||
} else if (current_replay_scn >= max_end_scn) {
|
||||
break;
|
||||
} else {
|
||||
current_ts = ObTimeUtility::current_time();
|
||||
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000)) {
|
||||
LOG_INFO("replay log is not ready, retry next loop", K(tablet_id_),
|
||||
"current_replay_scn", current_replay_scn,
|
||||
"mds max end scn", max_end_scn);
|
||||
}
|
||||
|
||||
if (current_replay_scn == last_replay_scn) {
|
||||
if (current_ts - last_replay_ts > wait_replay_timeout) {
|
||||
ret = OB_WAIT_REPLAY_TIMEOUT;
|
||||
LOG_WARN("failed to check log replay to mds end scn", K(ret), K(tablet_id_), K(current_replay_scn), K(max_end_scn));
|
||||
}
|
||||
} else if (last_replay_scn > current_replay_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("last end log ts should not smaller than current end log ts", K(ret),
|
||||
K(last_replay_scn), K(current_replay_scn));
|
||||
} else {
|
||||
last_replay_scn = current_replay_scn;
|
||||
last_replay_ts = current_ts;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ob_usleep(CHECK_CONDITION_INTERVAL);
|
||||
if (OB_FAIL(share::dag_yield())) {
|
||||
LOG_WARN("fail to yield dag", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("finish check_log_replay_to_mds_sstable_end_scn_",
|
||||
K(ret), K(tablet_id_), "cost", ObTimeUtil::current_time() - start_ts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -75,13 +75,13 @@ private:
|
||||
int deal_with_major_sstables_();
|
||||
int check_restore_major_valid_(
|
||||
const ObTablesHandleArray &major_tables_handle);
|
||||
int get_mds_sstable_max_end_scn_(share::SCN &max_escn);
|
||||
int check_log_replay_to_mds_sstable_end_scn_();
|
||||
int classify_major_sstables_(
|
||||
ObTablesHandleArray &shared_major_sstables,
|
||||
ObTablesHandleArray &local_major_sstables);
|
||||
int deal_with_shared_majors_(ObTablesHandleArray &major_tables_handle);
|
||||
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
common::SpinRWLock lock_;
|
||||
|
@ -37,6 +37,9 @@ namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
//errsim def
|
||||
ERRSIM_POINT_DEF(EN_RELEASE_MDS_NODE_FAILED);
|
||||
|
||||
ObTabletPointer::ObTabletPointer()
|
||||
: phy_addr_(),
|
||||
obj_(),
|
||||
@ -606,6 +609,14 @@ int ObTabletPointer::release_mds_nodes_redo_scn_below(const ObTabletID &tablet_i
|
||||
LOG_WARN("fail to release mds nodes in mds table", K(ret));
|
||||
}
|
||||
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = EN_RELEASE_MDS_NODE_FAILED ? : OB_SUCCESS;
|
||||
if (OB_FAIL(ret)) {
|
||||
STORAGE_LOG(ERROR, "fake EN_RELEASE_MDS_NODE_FAILED", K(ret));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1241,7 +1241,7 @@ void MdsTableImpl<MdsTableType>::on_flush_(const share::SCN &flush_scn, const in
|
||||
if (rec_scn_ == share::SCN::max_scn()) {
|
||||
MDS_LOG_FLUSH(WARN, "maybe meet concurrent reset mds table");
|
||||
} else {
|
||||
MDS_LOG_FLUSH(ERROR, "flush version mismatch!");
|
||||
MDS_LOG_FLUSH(WARN, "flush version mismatch!");
|
||||
}
|
||||
} else {
|
||||
flushing_scn_.reset();
|
||||
@ -1275,7 +1275,7 @@ int MdsTableImpl<MdsTableType>::scan_all_nodes_to_dump(DUMP_OP &&for_each_op,
|
||||
if (OB_SUCC(ret)) {
|
||||
if (for_flush) {
|
||||
if (!flushing_scn_.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
MDS_LOG_FLUSH(WARN, "not in flushing process");
|
||||
} else {
|
||||
flushing_version = flushing_scn_;
|
||||
@ -1403,11 +1403,12 @@ struct ForcelyReleaseNodesRedoScnBelowOp
|
||||
[this, &row](const UserMdsNode<K, V> &mds_node) {
|
||||
UserMdsNode<K, V> &cast_node = const_cast<UserMdsNode<K, V> &>(mds_node);
|
||||
if (mds_node.redo_scn_ <= redo_below_scn_) {
|
||||
// below_scn_ comes from mds sstable mds_ckpt_scn, it must not cross any node's [redo, end) scn range
|
||||
// before support dump uncommitted nodes, all nodes be scanned must satisfy this rule
|
||||
MDS_ASSERT(mds_node.end_scn_ <= redo_below_scn_);
|
||||
if (!cast_node.is_committed_()) {
|
||||
MDS_LOG_GC(INFO, "release uncommitted node");
|
||||
} else {
|
||||
// below_scn_ comes from mds sstable mds_ckpt_scn, it must not cross any node's [redo, end) scn range
|
||||
// before support dump uncommitted nodes, all nodes be scanned must satisfy this rule
|
||||
MDS_ASSERT(mds_node.end_scn_ <= redo_below_scn_);
|
||||
}
|
||||
const_cast<MdsRow<K, V> &>(row).sorted_list_.del((ListNodeBase*)(ListNode<UserMdsNode<K, V>>*)(&cast_node));
|
||||
MdsFactory::destroy(&cast_node);
|
||||
|
@ -59,6 +59,8 @@ int ObMdsTableMergeDag::init_by_param(const share::ObIDagInitParam *param)
|
||||
LOG_WARN("flush scn is invalid", K(ret), KPC(mds_param));
|
||||
} else if (OB_FAIL(ObTabletMergeDag::inner_init(mds_param))) {
|
||||
LOG_WARN("failed to init ObTabletMergeDag", K(ret), KPC(mds_param));
|
||||
} else if (OB_FAIL(fill_compat_mode_())) {
|
||||
LOG_WARN("failed to fill compat mode", K(ret), KPC(mds_param));
|
||||
} else {
|
||||
flush_scn_ = mds_param->flush_scn_;
|
||||
generate_ts_ = mds_param->generate_ts_;
|
||||
@ -70,6 +72,27 @@ int ObMdsTableMergeDag::init_by_param(const share::ObIDagInitParam *param)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMdsTableMergeDag::fill_compat_mode_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// Mds dump should not access mds data to avoid potential dead lock
|
||||
// between mds table lock on ObTabletPointer and other mds component
|
||||
// inner locks. So here use no_lock to get tablet.
|
||||
|
||||
ObLSHandle tmp_ls_handle;
|
||||
ObTabletHandle tmp_tablet_handle;
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, tmp_ls_handle, ObLSGetMod::COMPACT_MODE))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else if (OB_FAIL(tmp_ls_handle.get_ls()->get_tablet_svr()->get_tablet(
|
||||
tablet_id_, tmp_tablet_handle, 0/*timeout_us*/, storage::ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(ls_id_), K(tablet_id_));
|
||||
} else {
|
||||
compat_mode_ = tmp_tablet_handle.get_obj()->get_tablet_meta().compat_mode_;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMdsTableMergeDag::create_first_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -44,6 +44,8 @@ public:
|
||||
K_(flush_scn),
|
||||
KTIME_(generate_ts),
|
||||
K_(mds_construct_sequence));
|
||||
private:
|
||||
int fill_compat_mode_();
|
||||
private:
|
||||
bool is_inited_;
|
||||
share::SCN flush_scn_;
|
||||
|
@ -24,6 +24,10 @@ ObMdsTableMergeDagParam::ObMdsTableMergeDagParam()
|
||||
generate_ts_(0),
|
||||
mds_construct_sequence_(-1)
|
||||
{
|
||||
// Mds dump should not access mds data to avoid potential dead lock
|
||||
// between mds table lock on ObTabletPointer and other mds component
|
||||
// inner locks.
|
||||
skip_get_tablet_ = true;
|
||||
}
|
||||
} // namespace mds
|
||||
} // namespace storage
|
||||
|
Loading…
x
Reference in New Issue
Block a user