fix some bugs && refine some codes
This commit is contained in:
parent
4fc2582e5e
commit
0921188899
@ -2525,7 +2525,7 @@ int ObCODDLUtil::get_column_checksums(
|
||||
LOG_WARN("unexpected column_group", K(ret), K(i));
|
||||
} else if (OB_FAIL(co_sstable->fetch_cg_sstable(i, cg_sstable_wrapper))) {
|
||||
LOG_WARN("fail to get cg sstable", K(ret), K(i));
|
||||
} else if (OB_FAIL(cg_sstable_wrapper.get_sstable(cg_sstable))) {
|
||||
} else if (OB_FAIL(cg_sstable_wrapper.get_loaded_column_store_sstable(cg_sstable))) {
|
||||
LOG_WARN("get sstable failed", K(ret));
|
||||
} else if (OB_UNLIKELY(cg_sstable == nullptr || !cg_sstable->is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -199,7 +199,12 @@ int ObFreezeInfoManager::update_freeze_info(
|
||||
const share::SCN &latest_snapshot_gc_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(freeze_info_.frozen_statuses_.prepare_allocate(freeze_infos.count()))) {
|
||||
const int64_t freeze_info_cnt = freeze_infos.count();
|
||||
|
||||
if (OB_UNLIKELY(freeze_infos.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid arguments", K(ret), K(freeze_infos), K(latest_snapshot_gc_scn));
|
||||
} else if (OB_FAIL(freeze_info_.frozen_statuses_.prepare_allocate(freeze_info_cnt))) {
|
||||
LOG_WARN("failed to prepare allocate mem for new freeze info", KR(ret), K(freeze_infos), K(freeze_info_));
|
||||
} else if (OB_FAIL(freeze_info_.frozen_statuses_.assign(freeze_infos))) {
|
||||
LOG_WARN("fail to assign", KR(ret), K(freeze_infos));
|
||||
@ -211,7 +216,7 @@ int ObFreezeInfoManager::update_freeze_info(
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
freeze_info_.latest_snapshot_gc_scn_ = latest_snapshot_gc_scn;
|
||||
LOG_INFO("inner load succ", K(freeze_info_));
|
||||
LOG_INFO("inner load succ", "latest_freeze_info", freeze_info_.frozen_statuses_.at(freeze_info_cnt - 1), K(freeze_info_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -354,7 +359,7 @@ int ObFreezeInfoManager::get_freeze_info_behind_major_snapshot(
|
||||
LOG_WARN("get invalid arguments", K(ret), K(snapshot_version));
|
||||
} else {
|
||||
bool found = false;
|
||||
for (int64_t i = freeze_info_.frozen_statuses_.count() - 1; OB_SUCC(ret) && i >= 0; --i) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < freeze_info_.frozen_statuses_.count(); ++i) {
|
||||
if (snapshot_version < freeze_info_.frozen_statuses_.at(i).frozen_scn_.get_val_for_tx()) {
|
||||
frozen_status = freeze_info_.frozen_statuses_.at(i);
|
||||
found = true;
|
||||
|
@ -61,7 +61,7 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_L
|
||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::MDS_TABLE_MERGE_TASK, "MDS_TABLE_MERGE", "COMPACTION",
|
||||
false, 3, {"ls_id", "tablet_id", "flush_scn"})
|
||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION",
|
||||
false, 3, {"ls_id", "compaction_scn", "tablet_count"})
|
||||
false, 2, {"ls_id", "tablet_count"})
|
||||
|
||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL_COMPLEMENT", "DDL",
|
||||
true, 7, {"ls_id", "source_tablet_id", "dest_tablet_id", "data_table_id", "target_table_id", "schema_version", "snapshot_version"})
|
||||
|
@ -38,7 +38,7 @@ int ObCGScanner::init(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument to init ObCGScanner", K(ret), K(wrapper), K(iter_param));
|
||||
} else if (FALSE_IT(table_wrapper_ = wrapper)) {
|
||||
} else if (OB_FAIL(table_wrapper_.get_sstable(sstable_))) {
|
||||
} else if (OB_FAIL(table_wrapper_.get_loaded_column_store_sstable(sstable_))) {
|
||||
LOG_WARN("fail to get sstable", K(ret), K(wrapper));
|
||||
} else if (OB_UNLIKELY(!sstable_->is_normal_cg_sstable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -78,7 +78,7 @@ int ObCGScanner::switch_context(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument", K(ret), K(wrapper), K(iter_param));
|
||||
} else if (FALSE_IT(table_wrapper_ = wrapper)) {
|
||||
} else if (OB_FAIL(table_wrapper_.get_sstable(sstable_))) {
|
||||
} else if (OB_FAIL(table_wrapper_.get_loaded_column_store_sstable(sstable_))) {
|
||||
LOG_WARN("fail to get sstable", K(ret), K(wrapper));
|
||||
} else if (OB_UNLIKELY(!sstable_->is_normal_cg_sstable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -62,7 +62,7 @@ int ObCGGetter::init(
|
||||
1 != idx_key.get_datum_cnt())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument to init ObCGGetter", K(ret), K(wrapper), K(iter_param));
|
||||
} else if (OB_FAIL(wrapper.get_sstable(sstable))) {
|
||||
} else if (OB_FAIL(wrapper.get_loaded_column_store_sstable(sstable))) {
|
||||
LOG_WARN("fail to get sstable", K(ret), K(wrapper));
|
||||
} else {
|
||||
is_same_data_block_ = false;
|
||||
|
@ -249,7 +249,7 @@ int ObCOMerger::alloc_writers(
|
||||
}
|
||||
} else if (OB_FAIL(co_sstable.fetch_cg_sstable(idx, cg_wrapper))) {
|
||||
STORAGE_LOG(WARN, "failed to get cg sstable", K(ret), K(sstable));
|
||||
} else if (OB_FAIL(cg_wrapper.get_sstable(cg_sstable))) {
|
||||
} else if (OB_FAIL(cg_wrapper.get_loaded_column_store_sstable(cg_sstable))) {
|
||||
STORAGE_LOG(WARN, "failed to get sstable from wrapper", K(ret), K(cg_wrapper));
|
||||
} else if (OB_FAIL(cg_wrappers_.push_back(cg_wrapper))) {
|
||||
STORAGE_LOG(WARN, "failed to push cg wrapper", K(ret), K(cg_wrappers_));
|
||||
|
@ -69,7 +69,7 @@ int ObSSTableWrapper::set_sstable(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSSTableWrapper::get_sstable(ObSSTable *&table)
|
||||
int ObSSTableWrapper::get_loaded_column_store_sstable(ObSSTable *&table)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSSTable *meta_sstable = nullptr;
|
||||
@ -78,8 +78,14 @@ int ObSSTableWrapper::get_sstable(ObSSTable *&table)
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("wrapper not valid", K(ret), KPC(this));
|
||||
} else if (OB_UNLIKELY(!sstable_->is_column_store_sstable())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("This func can only be used when fetching column store SSTable", K(ret), KPC(sstable_));
|
||||
} else if (sstable_->is_loaded()) {
|
||||
table = sstable_;
|
||||
} else if (OB_UNLIKELY(!meta_handle_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("meta handle is unexpected not valid", K(ret), KPC(sstable_), K(meta_handle_));
|
||||
} else if (OB_FAIL(meta_handle_.get_sstable(meta_sstable))) {
|
||||
LOG_WARN("failed to get sstable", K(ret), KPC(this));
|
||||
} else if (sstable_->get_key() == meta_sstable->get_key()) {
|
||||
@ -129,7 +135,7 @@ int ObSSTableWrapper::get_merge_row_cnt(const ObTableIterParam &iter_param, int6
|
||||
|
||||
blocksstable::ObSSTable *cur_sstable = nullptr;
|
||||
ObDDLMergeBlockRowIterator ddl_merge_iter;
|
||||
if (OB_FAIL(get_sstable(cur_sstable))) {
|
||||
if (OB_FAIL(get_loaded_column_store_sstable(cur_sstable))) {
|
||||
LOG_WARN("fail to get sstable", K(ret), K(*this));
|
||||
} else if (OB_FAIL((cur_sstable->get_index_tree_root(root_block)))) {
|
||||
LOG_WARN("fail to get index tree root", K(ret), K(root_block), K(*this));
|
||||
|
@ -46,7 +46,8 @@ public:
|
||||
void reset();
|
||||
bool is_valid() const { return sstable_ != nullptr; }
|
||||
int set_sstable(blocksstable::ObSSTable *sstable, ObStorageMetaHandle *meta_handle = nullptr);
|
||||
int get_sstable(blocksstable::ObSSTable *&table);
|
||||
// this interface will return the loaded column store type sstable
|
||||
int get_loaded_column_store_sstable(blocksstable::ObSSTable *&table);
|
||||
int get_merge_row_cnt(const ObTableIterParam &iter_param, int64_t &row_cnt);
|
||||
blocksstable::ObSSTable *get_sstable() const { return sstable_; }
|
||||
const ObStorageMetaHandle &get_meta_handle() const { return meta_handle_; }
|
||||
|
@ -1246,13 +1246,14 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_finish(
|
||||
int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
bool &tablet_need_freeze_flag,
|
||||
ObTabletSchedulePair &schedule_pair,
|
||||
bool &create_dag_flag,
|
||||
const int64_t input_major_snapshot,
|
||||
const bool scheduler_called)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
create_dag_flag = false;
|
||||
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_MEDIUM_CREATE_DAG) ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -1274,6 +1275,8 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; // broadcast scn
|
||||
ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX;
|
||||
int64_t schedule_scn = 0; // medium_snapshot in medium info
|
||||
bool tablet_need_freeze_flag = false;
|
||||
|
||||
if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) {
|
||||
LOG_WARN("failed to load medium info list", K(ret), K(tablet));
|
||||
} else if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, major_frozen_snapshot, compaction_type, schedule_scn))) {
|
||||
@ -1289,6 +1292,13 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
} else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) {
|
||||
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && tablet_need_freeze_flag) {
|
||||
schedule_pair.tablet_id_ = tablet_id;
|
||||
schedule_pair.schedule_merge_scn_ = schedule_scn;
|
||||
} else {
|
||||
schedule_pair.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ namespace oceanbase
|
||||
{
|
||||
namespace compaction
|
||||
{
|
||||
struct ObTabletSchedulePair;
|
||||
|
||||
class ObMediumCompactionScheduleFunc
|
||||
{
|
||||
@ -47,7 +48,7 @@ public:
|
||||
static int schedule_tablet_medium_merge(
|
||||
ObLS &ls,
|
||||
ObTablet &tablet,
|
||||
bool &tablet_need_freeze_flag,
|
||||
ObTabletSchedulePair &schedule_pair,
|
||||
bool &create_dag_flag,
|
||||
const int64_t major_frozen_scn = 0,
|
||||
const bool scheduler_called = false);
|
||||
|
@ -226,12 +226,12 @@ int ObTabletMiniMergeCtx::try_schedule_meta_merge(
|
||||
|
||||
// try schedule medium merge
|
||||
if (!medium_is_cooling_down) {
|
||||
bool non_used_freeze_flag = false; // no meaning, just for placeholder for refering
|
||||
bool non_used_schedule_dag_flag = false;
|
||||
ObTabletSchedulePair non_used_schedule_pair;
|
||||
if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
*get_ls(),
|
||||
*tablet_handle.get_obj(),
|
||||
non_used_freeze_flag,
|
||||
non_used_schedule_pair,
|
||||
non_used_schedule_dag_flag))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to schedule tablet medium merge", K(tmp_ret), "param", get_dag_param());
|
||||
|
@ -1193,10 +1193,9 @@ void prepare_allocator(
|
||||
*/
|
||||
ObBatchFreezeTabletsParam::ObBatchFreezeTabletsParam()
|
||||
: ls_id_(),
|
||||
compaction_scn_(),
|
||||
tablet_ids_()
|
||||
tablet_pairs_()
|
||||
{
|
||||
tablet_ids_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID));
|
||||
tablet_pairs_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID));
|
||||
}
|
||||
|
||||
int ObBatchFreezeTabletsParam::assign(
|
||||
@ -1206,11 +1205,10 @@ int ObBatchFreezeTabletsParam::assign(
|
||||
|
||||
if (this == &other) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) {
|
||||
} else if (OB_FAIL(tablet_pairs_.assign(other.tablet_pairs_))) {
|
||||
LOG_WARN("failed to copy tablet ids", K(ret));
|
||||
} else {
|
||||
ls_id_ = other.ls_id_;
|
||||
compaction_scn_ = other.compaction_scn_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1219,7 +1217,6 @@ int64_t ObBatchFreezeTabletsParam::get_hash() const
|
||||
{
|
||||
int64_t hash_val = 0;
|
||||
hash_val = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_val);
|
||||
hash_val = common::murmurhash(&compaction_scn_, sizeof(compaction_scn_), hash_val);
|
||||
return hash_val;
|
||||
}
|
||||
|
||||
@ -1280,8 +1277,6 @@ bool ObBatchFreezeTabletsDag::operator == (const ObIDag &other) const
|
||||
is_same = false;
|
||||
} else if (param_.ls_id_ != static_cast<const ObBatchFreezeTabletsDag &>(other).param_.ls_id_) {
|
||||
is_same = false;
|
||||
} else if (param_.compaction_scn_ != static_cast<const ObBatchFreezeTabletsDag &>(other).param_.compaction_scn_) {
|
||||
is_same = false;
|
||||
}
|
||||
return is_same;
|
||||
}
|
||||
@ -1303,8 +1298,7 @@ int ObBatchFreezeTabletsDag::fill_info_param(
|
||||
allocator,
|
||||
get_type(),
|
||||
param_.ls_id_.id(),
|
||||
param_.compaction_scn_,
|
||||
param_.tablet_ids_.count()))) {
|
||||
param_.tablet_pairs_.count()))) {
|
||||
LOG_WARN("failed to fill info param", K(ret), K(param_));
|
||||
}
|
||||
return ret;
|
||||
@ -1313,8 +1307,8 @@ int ObBatchFreezeTabletsDag::fill_info_param(
|
||||
int ObBatchFreezeTabletsDag::fill_dag_key(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld compaction_scn=%ld freeze_tablet_cnt=%ld",
|
||||
param_.ls_id_.id(), param_.compaction_scn_, param_.tablet_ids_.count()))) {
|
||||
if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld freeze_tablet_cnt=%ld",
|
||||
param_.ls_id_.id(), param_.tablet_pairs_.count()))) {
|
||||
LOG_WARN("failed to fill dag key", K(ret), K(param_));
|
||||
}
|
||||
return ret;
|
||||
@ -1362,40 +1356,47 @@ int ObBatchFreezeTabletsTask::process()
|
||||
const ObBatchFreezeTabletsParam ¶m = base_dag_->get_param();
|
||||
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls_ptr = nullptr;
|
||||
if (OB_TMP_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to get log stream", K(param));
|
||||
ObLS *ls = nullptr;
|
||||
int64_t weak_read_ts = 0;
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(param));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null ls", K(ret), K(param));
|
||||
} else {
|
||||
ls_ptr = ls_handle.get_ls();
|
||||
weak_read_ts = ls->get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx();
|
||||
}
|
||||
|
||||
int64_t fail_freeze_cnt = 0;
|
||||
int64_t succ_schedule_cnt = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_ids_.count(); ++i) {
|
||||
const ObTabletID &tablet_id = param.tablet_ids_.at(i);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_pairs_.count(); ++i) {
|
||||
const ObTabletSchedulePair &cur_pair = param.tablet_pairs_.at(i);
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
|
||||
if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*need_rewrite*/, true/*is_sync*/))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(tablet_id));
|
||||
if (OB_UNLIKELY(!cur_pair.is_valid())) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN_RET(tmp_ret, "get invalid tablet pair", K(cur_pair));
|
||||
} else if (cur_pair.schedule_merge_scn_ > weak_read_ts) {
|
||||
// no need to force freeze
|
||||
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_, true/*need_rewrite*/, true/*is_sync*/))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(cur_pair));
|
||||
++fail_freeze_cnt;
|
||||
} else if (OB_ISNULL(ls_ptr)) {
|
||||
// do nothing
|
||||
} else if (OB_TMP_FAIL(ls_ptr->get_tablet_svr()->get_tablet(tablet_id,
|
||||
tablet_handle,
|
||||
0/*timeout_us*/,
|
||||
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(tablet_id));
|
||||
} else if (OB_TMP_FAIL(ls->get_tablet_svr()->get_tablet(cur_pair.tablet_id_,
|
||||
tablet_handle,
|
||||
0/*timeout_us*/,
|
||||
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(cur_pair));
|
||||
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
|
||||
} else if (OB_UNLIKELY(tablet->get_snapshot_version() < param.compaction_scn_)) {
|
||||
} else if (OB_UNLIKELY(tablet->get_snapshot_version() < cur_pair.schedule_merge_scn_)) {
|
||||
// do nothing
|
||||
} else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(param.ls_id_,
|
||||
*tablet,
|
||||
MEDIUM_MERGE,
|
||||
param.compaction_scn_))) {
|
||||
cur_pair.schedule_merge_scn_))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(tablet_id));
|
||||
LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(cur_pair));
|
||||
}
|
||||
} else {
|
||||
++succ_schedule_cnt;
|
||||
@ -1404,12 +1405,15 @@ int ObBatchFreezeTabletsTask::process()
|
||||
if (OB_FAIL(share::dag_yield())) {
|
||||
LOG_WARN("failed to dag yield", K(ret));
|
||||
}
|
||||
}
|
||||
if (REACH_TENANT_TIME_INTERVAL(5_s)) {
|
||||
weak_read_ts = ls->get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx();
|
||||
}
|
||||
} // end for
|
||||
|
||||
if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_ids_.count())) {
|
||||
if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_pairs_.count())) {
|
||||
ret = OB_PARTIAL_FAILED;
|
||||
}
|
||||
FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), KP(ls_ptr), K(succ_schedule_cnt));
|
||||
FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), K(succ_schedule_cnt));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -353,22 +353,44 @@ public:
|
||||
};
|
||||
|
||||
|
||||
struct ObTabletSchedulePair
|
||||
{
|
||||
public:
|
||||
ObTabletSchedulePair()
|
||||
: tablet_id_(),
|
||||
schedule_merge_scn_(0)
|
||||
{ }
|
||||
ObTabletSchedulePair(
|
||||
const common::ObTabletID &tablet_id,
|
||||
const int64_t schedule_merge_scn)
|
||||
: tablet_id_(tablet_id),
|
||||
schedule_merge_scn_(schedule_merge_scn)
|
||||
{ }
|
||||
bool is_valid() const { return tablet_id_.is_valid() && schedule_merge_scn_ > 0; }
|
||||
bool need_force_freeze() const { return schedule_merge_scn_ > 0; }
|
||||
void reset() { tablet_id_.reset(); schedule_merge_scn_ = 0; }
|
||||
TO_STRING_KV(K_(tablet_id), K_(schedule_merge_scn));
|
||||
public:
|
||||
common::ObTabletID tablet_id_;
|
||||
int64_t schedule_merge_scn_;
|
||||
};
|
||||
|
||||
|
||||
struct ObBatchFreezeTabletsParam : public share::ObIDagInitParam
|
||||
{
|
||||
public:
|
||||
ObBatchFreezeTabletsParam();
|
||||
virtual ~ObBatchFreezeTabletsParam() { tablet_ids_.reset(); }
|
||||
virtual bool is_valid() const override { return ls_id_.is_valid() && compaction_scn_ > 0 && tablet_ids_.count() > 0; }
|
||||
virtual ~ObBatchFreezeTabletsParam() { tablet_pairs_.reset(); }
|
||||
virtual bool is_valid() const override { return ls_id_.is_valid() && tablet_pairs_.count() > 0; }
|
||||
int assign(const ObBatchFreezeTabletsParam &other);
|
||||
bool operator == (const ObBatchFreezeTabletsParam &other) const;
|
||||
bool operator != (const ObBatchFreezeTabletsParam &other) const { return !this->operator==(other); }
|
||||
int64_t get_hash() const;
|
||||
VIRTUAL_TO_STRING_KV(K_(ls_id), K_(compaction_scn), "tablet_count", tablet_ids_.count(), K_(tablet_ids));
|
||||
VIRTUAL_TO_STRING_KV(K_(ls_id), "tablet_pair_cnt", tablet_pairs_.count(), K_(tablet_pairs));
|
||||
public:
|
||||
static constexpr int64_t DEFAULT_BATCH_SIZE = 16;
|
||||
share::ObLSID ls_id_;
|
||||
int64_t compaction_scn_;
|
||||
common::ObSEArray<common::ObTabletID, DEFAULT_BATCH_SIZE> tablet_ids_;
|
||||
common::ObSEArray<ObTabletSchedulePair, DEFAULT_BATCH_SIZE> tablet_pairs_;
|
||||
};
|
||||
|
||||
|
||||
@ -388,11 +410,6 @@ public:
|
||||
virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; }
|
||||
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
|
||||
const ObBatchFreezeTabletsParam &get_param() const { return param_; }
|
||||
virtual bool ignore_warning() override
|
||||
{
|
||||
return OB_PARTIAL_FAILED != dag_ret_;
|
||||
}
|
||||
|
||||
INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(param));
|
||||
private:
|
||||
bool is_inited_;
|
||||
|
@ -1528,7 +1528,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
bool is_leader = false;
|
||||
bool could_major_merge = false;
|
||||
const int64_t major_frozen_scn = get_frozen_version();
|
||||
ObSEArray<ObTabletID, MERGE_BACTH_FREEZE_CNT> need_freeze_tablets;
|
||||
ObSEArray<ObTabletSchedulePair, MERGE_BACTH_FREEZE_CNT> need_freeze_tablets;
|
||||
need_freeze_tablets.set_attr(ObMemAttr(MTL_ID(), "MediumBatch"));
|
||||
if (could_major_merge_start()) {
|
||||
could_major_merge = true;
|
||||
@ -1550,15 +1550,16 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
}
|
||||
|
||||
bool enable_adaptive_compaction = get_enable_adaptive_compaction();
|
||||
bool tablet_need_freeze_flag = false;
|
||||
ObTabletSchedulePair schedule_pair;
|
||||
|
||||
while (OB_SUCC(ret)) { // loop all tablet in ls
|
||||
tablet_time_guard.reuse();
|
||||
bool tablet_merge_finish = false;
|
||||
tablet_need_freeze_flag = false;
|
||||
// ATTENTION!!! load weak ts before get tablet
|
||||
const share::SCN &weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts();
|
||||
tablet_could_schedule_medium = false;
|
||||
schedule_pair.reset();
|
||||
|
||||
if (OB_FAIL(medium_ls_tablet_iter_.get_next_tablet(tablet_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -1587,7 +1588,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
} else if (OB_TMP_FAIL(schedule_tablet_medium(
|
||||
ls, tablet_handle, major_frozen_scn, weak_read_ts,
|
||||
could_major_merge, tablet_could_schedule_medium, merge_version, enable_adaptive_compaction,
|
||||
is_leader, tablet_merge_finish, tablet_need_freeze_flag, tablet_time_guard))) {
|
||||
is_leader, tablet_merge_finish, schedule_pair, tablet_time_guard))) {
|
||||
LOG_WARN("failed to schedule tablet medium", KR(tmp_ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
if (tablet_could_schedule_medium
|
||||
@ -1596,8 +1597,8 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
LOG_WARN("failed to clear prohibit schedule medium flag", K(tmp_ret), K(ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
medium_ls_tablet_iter_.update_merge_finish(tablet_merge_finish);
|
||||
if (tablet_need_freeze_flag) {
|
||||
if (OB_TMP_FAIL(need_freeze_tablets.push_back(tablet_id))) {
|
||||
if (schedule_pair.need_force_freeze()) {
|
||||
if (OB_TMP_FAIL(need_freeze_tablets.push_back(schedule_pair))) {
|
||||
LOG_WARN("failed to push back tablet_id for batch_freeze", KR(tmp_ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
}
|
||||
@ -1605,7 +1606,8 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
} // end of while
|
||||
|
||||
if (OB_FAIL(ret) || need_freeze_tablets.empty()) {
|
||||
} else if (OB_TMP_FAIL(schedule_batch_freeze_dag(merge_version, ls_id, need_freeze_tablets))) {
|
||||
} else if (OB_TMP_FAIL(schedule_batch_freeze_dag(ls_id,
|
||||
need_freeze_tablets))) {
|
||||
LOG_WARN("failed to schedule batch force freeze tablets dag", K(tmp_ret), K(ls_id),
|
||||
"tablet_count", need_freeze_tablets.count());
|
||||
}
|
||||
@ -1654,7 +1656,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
|
||||
const bool enable_adaptive_compaction,
|
||||
bool &is_leader,
|
||||
bool &tablet_merge_finish,
|
||||
bool &tablet_need_freeze_flag,
|
||||
ObTabletSchedulePair &schedule_pair,
|
||||
ObCompactionTimeGuard &time_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1665,6 +1667,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
|
||||
bool need_diagnose = false;
|
||||
bool tablet_could_schedule_merge = false;
|
||||
bool create_dag_flag = false;
|
||||
schedule_pair.reset();
|
||||
|
||||
if (tablet_could_schedule_medium
|
||||
&& OB_TMP_FAIL(ObTabletMergeChecker::check_could_merge_for_medium(tablet, tablet_could_schedule_merge))) {
|
||||
@ -1721,7 +1724,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium(
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (could_major_merge) {
|
||||
if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
ls, tablet, tablet_need_freeze_flag, create_dag_flag,
|
||||
ls, tablet, schedule_pair, create_dag_flag,
|
||||
major_frozen_scn, true /*scheduler_called*/))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id));
|
||||
@ -2049,19 +2052,17 @@ void ObTenantTabletScheduler::report_blocking_medium(
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_batch_freeze_dag(
|
||||
const int64_t merge_version,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObIArray<ObTabletID> &tablet_ids)
|
||||
const common::ObIArray<ObTabletSchedulePair> &tablet_pairs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBatchFreezeTabletsParam param;
|
||||
|
||||
if (OB_UNLIKELY(!ls_id.is_valid() || tablet_ids.empty())) {
|
||||
if (OB_UNLIKELY(!ls_id.is_valid() || tablet_pairs.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_ids));
|
||||
LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_pairs));
|
||||
} else if (FALSE_IT(param.ls_id_ = ls_id)) {
|
||||
} else if (FALSE_IT(param.compaction_scn_ = merge_version)) {
|
||||
} else if (OB_FAIL(param.tablet_ids_.assign(tablet_ids))) {
|
||||
} else if (OB_FAIL(param.tablet_pairs_.assign(tablet_pairs))) {
|
||||
LOG_WARN("failed to assign tablet ids", K(ret));
|
||||
} else if (OB_FAIL(MTL(ObTenantDagScheduler *)->create_and_add_dag<ObBatchFreezeTabletsDag>(¶m, true/*is_emergency*/))) {
|
||||
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
|
||||
|
@ -47,6 +47,7 @@ struct ObTabletStatKey;
|
||||
|
||||
namespace compaction
|
||||
{
|
||||
struct ObTabletSchedulePair;
|
||||
|
||||
class ObFastFreezeChecker
|
||||
{
|
||||
@ -261,7 +262,7 @@ private:
|
||||
const bool enable_adaptive_compaction,
|
||||
bool &is_leader,
|
||||
bool &tablet_merge_finish,
|
||||
bool &tablet_need_freeze_flag,
|
||||
ObTabletSchedulePair &schedule_pair,
|
||||
ObCompactionTimeGuard &time_guard);
|
||||
int after_schedule_tenant_medium(
|
||||
const int64_t merge_version,
|
||||
@ -295,9 +296,8 @@ private:
|
||||
const bool &could_major_merge,
|
||||
const share::ObLSID &ls_id);
|
||||
int schedule_batch_freeze_dag(
|
||||
const int64_t merge_version,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObIArray<ObTabletID> &tablet_ids);
|
||||
const common::ObIArray<compaction::ObTabletSchedulePair> &tablet_ids);
|
||||
public:
|
||||
static const int64_t INIT_COMPACTION_SCN = 1;
|
||||
typedef common::ObSEArray<ObGetMergeTablesResult, compaction::ObPartitionMergePolicy::OB_MINOR_PARALLEL_INFO_ARRAY_SIZE> MinorParallelResultArray;
|
||||
|
@ -732,7 +732,7 @@ int get_sstables(ObTableStoreIterator &ddl_sstable_iter, const int64_t cg_idx, O
|
||||
// skip
|
||||
} else if (OB_FAIL(cur_co_sstable->fetch_cg_sstable(cg_idx, cg_sstable_wrapper))) {
|
||||
LOG_WARN("get all tables failed", K(ret));
|
||||
} else if (OB_FAIL(cg_sstable_wrapper.get_sstable(cg_sstable))) {
|
||||
} else if (OB_FAIL(cg_sstable_wrapper.get_loaded_column_store_sstable(cg_sstable))) {
|
||||
LOG_WARN("get sstable failed", K(ret));
|
||||
} else if (OB_ISNULL(cg_sstable)) {
|
||||
// skip
|
||||
|
Loading…
x
Reference in New Issue
Block a user