fix ls locality concurrent bug && fix ddl slice store sort bug
This commit is contained in:
parent
e273c377e4
commit
c0b2e36c0f
@ -174,6 +174,52 @@ int ObLSColumnReplicaCache::add_cs_replica(const ObLSReplicaUniItem &ls_item)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSColumnReplicaCache::assign(const ObLSColumnReplicaCache &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret));
|
||||
} else if (OB_FAIL(other.deep_fetch(ls_id_set_, ls_replica_set_, ls_infos_))) {
|
||||
LOG_WARN("fail to assign", K(ret), K(other));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSColumnReplicaCache::deep_fetch(
|
||||
hash::ObHashSet<ObLSID> &target_ls_id_set,
|
||||
hash::ObHashSet<ObLSReplicaUniItem> &target_ls_replica_set,
|
||||
common::ObIArray<ObLSInfo> &target_ls_infos) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret));
|
||||
} else {
|
||||
target_ls_id_set.reuse();
|
||||
target_ls_replica_set.reuse();
|
||||
target_ls_infos.reuse();
|
||||
|
||||
for (hash::ObHashSet<ObLSID>::const_iterator it = ls_id_set_.begin(); OB_SUCC(ret) && it != ls_id_set_.end(); ++it) {
|
||||
if (OB_FAIL(target_ls_id_set.set_refactored(it->first))) {
|
||||
LOG_WARN("fail to add ls id", K(ret));
|
||||
}
|
||||
}
|
||||
for (hash::ObHashSet<ObLSReplicaUniItem>::const_iterator it = ls_replica_set_.begin(); OB_SUCC(ret) && it != ls_replica_set_.end(); ++it) {
|
||||
if (OB_FAIL(target_ls_replica_set.set_refactored(it->first))) {
|
||||
LOG_WARN("fail to add ls replica", K(ret));
|
||||
}
|
||||
}
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < ls_infos_.count(); ++idx) {
|
||||
if (OB_FAIL(target_ls_infos.push_back(ls_infos_.at(idx)))) {
|
||||
LOG_WARN("fail to push back ls info", K(ret), K(idx), K_(ls_infos));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObLSColumnReplicaCache::update(const ObLSID &ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -54,6 +54,11 @@ public:
|
||||
int init();
|
||||
void destroy();
|
||||
void reuse();
|
||||
int assign(const ObLSColumnReplicaCache &other);
|
||||
int deep_fetch(
|
||||
hash::ObHashSet<ObLSID> &target_ls_id_set,
|
||||
hash::ObHashSet<ObLSReplicaUniItem> &target_ls_replica_set,
|
||||
common::ObIArray<ObLSInfo> &target_ls_infos) const;
|
||||
int update(const ObLSID &ls_id);
|
||||
int update_with_ls_info(const ObLSInfo &ls_info);
|
||||
int check_is_cs_replica(const ObLSReplicaUniItem &ls_item, bool &is_cs_replica) const;
|
||||
@ -68,8 +73,8 @@ private:
|
||||
const static int64_t BUCKET_NUM_OF_LS_REPLICA_SET = 31;
|
||||
private:
|
||||
bool is_inited_;
|
||||
hash::ObHashSet<ObLSID, hash::NoPthreadDefendMode> ls_id_set_; // record looped ls id
|
||||
hash::ObHashSet<ObLSReplicaUniItem, hash::NoPthreadDefendMode> ls_replica_set_; // cs-prelica ls
|
||||
hash::ObHashSet<ObLSID> ls_id_set_; // record looped ls id
|
||||
hash::ObHashSet<ObLSReplicaUniItem> ls_replica_set_; // cs-prelica ls
|
||||
common::ObSEArray<ObLSInfo, 4> ls_infos_; // used for check member list and learner list
|
||||
};
|
||||
|
||||
|
@ -214,14 +214,24 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObLSColumnReplicaCache cs_replica_cache; // a copy one from ls_locality_cache_
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTenantMediumChecker is not inited", K(ret));
|
||||
} else if (OB_FAIL(ls_locality_cache_.refresh_ls_locality(false /*force_refresh*/))) {
|
||||
LOG_WARN("failed to refresh ls locality");
|
||||
ADD_COMMON_SUSPECT_INFO(MEDIUM_MERGE, share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE,
|
||||
SUSPECT_FAILED_TO_REFRESH_LS_LOCALITY, ret);
|
||||
} else if (OB_FAIL(cs_replica_cache.init())) {
|
||||
LOG_WARN("failed to init cs_replica_cache", K(ret));
|
||||
} else {
|
||||
lib::ObMutexGuard guard(lock_);
|
||||
if (OB_FAIL(ls_locality_cache_.refresh_ls_locality(false /*force_refresh*/))) {
|
||||
LOG_WARN("failed to refresh ls locality");
|
||||
ADD_COMMON_SUSPECT_INFO(MEDIUM_MERGE, share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE,
|
||||
SUSPECT_FAILED_TO_REFRESH_LS_LOCALITY, ret);
|
||||
} else if (OB_FAIL(cs_replica_cache.assign(ls_locality_cache_.get_cs_replica_cache()))) {
|
||||
LOG_WARN("failed to assign cs_replica_cache", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
DEL_SUSPECT_INFO(MEDIUM_MERGE, UNKNOW_LS_ID, UNKNOW_TABLET_ID, ObDiagnoseTabletType::TYPE_MEDIUM_MERGE);
|
||||
TabletLSArray tablet_ls_infos;
|
||||
tablet_ls_infos.set_attr(ObMemAttr(MTL_ID(), "CheckInfos"));
|
||||
@ -260,7 +270,7 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
|
||||
int64_t cost_ts = ObTimeUtility::fast_current_time();
|
||||
ObBatchFinishCheckStat stat;
|
||||
while (start_idx < end_idx) {
|
||||
if (OB_TMP_FAIL(check_medium_finish(tablet_ls_infos, start_idx, end_idx, batch_tablet_ls_infos, finish_tablet_ls_infos, stat))) {
|
||||
if (OB_TMP_FAIL(check_medium_finish(tablet_ls_infos, start_idx, end_idx, batch_tablet_ls_infos, finish_tablet_ls_infos, stat, cs_replica_cache))) {
|
||||
LOG_WARN("failed to check medium finish", K(tmp_ret));
|
||||
}
|
||||
start_idx = end_idx;
|
||||
@ -283,7 +293,8 @@ int ObTenantMediumChecker::check_medium_finish(
|
||||
int64_t end_idx,
|
||||
ObIArray<ObTabletCheckInfo> &check_tablet_ls_infos,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls_infos,
|
||||
ObBatchFinishCheckStat &stat)
|
||||
ObBatchFinishCheckStat &stat,
|
||||
const share::ObLSColumnReplicaCache &ls_cs_replica_cache)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -313,7 +324,7 @@ int ObTenantMediumChecker::check_medium_finish(
|
||||
ObCompactionScheduleTimeGuard time_guard;
|
||||
stat.filter_cnt_ += (end_idx - start_idx - check_tablet_ls_infos.count());
|
||||
if (FAILEDx(ObMediumCompactionScheduleFunc::batch_check_medium_finish(
|
||||
ls_info_map_, finish_tablet_ls_infos, check_tablet_ls_infos, time_guard, ls_locality_cache_.get_cs_replica_cache()))) {
|
||||
ls_info_map_, finish_tablet_ls_infos, check_tablet_ls_infos, time_guard, ls_cs_replica_cache))) {
|
||||
LOG_WARN("failed to batch check medium finish", K(ret), K(tablet_ls_infos.count()), K(check_tablet_ls_infos.count()),
|
||||
K(tablet_ls_infos), K(check_tablet_ls_infos));
|
||||
stat.fail_cnt_ += check_tablet_ls_infos.count();
|
||||
|
@ -95,7 +95,8 @@ public:
|
||||
int64_t end_idx,
|
||||
ObIArray<ObTabletCheckInfo> &check_tablet_ls_infos,
|
||||
ObIArray<ObTabletCheckInfo> &finish_tablet_ls_infos,
|
||||
ObBatchFinishCheckStat &stat);
|
||||
ObBatchFinishCheckStat &stat,
|
||||
const share::ObLSColumnReplicaCache &ls_cs_replica_cache);
|
||||
int add_tablet_ls(const ObTabletID &tablet_id, const share::ObLSID &ls_id, const int64_t medium_scn);
|
||||
bool locality_cache_empty();
|
||||
TO_STRING_KV(K_(is_inited), K_(ls_locality_cache));
|
||||
|
@ -1801,28 +1801,32 @@ int ObTabletDirectLoadMgr::notify_all()
|
||||
struct CSSliceEndkeyCompareFunctor
|
||||
{
|
||||
public:
|
||||
CSSliceEndkeyCompareFunctor(const ObStorageDatumUtils &datum_utils) : datum_utils_(datum_utils), ret_code_(OB_SUCCESS) {}
|
||||
CSSliceEndkeyCompareFunctor(const ObStorageDatumUtils &datum_utils, int &ret_code) : datum_utils_(datum_utils), ret_code_(ret_code) {}
|
||||
bool operator ()(const ObDirectLoadSliceWriter *left, const ObDirectLoadSliceWriter *right)
|
||||
{
|
||||
bool bret = false;
|
||||
int ret = ret_code_;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(left) || OB_ISNULL(right) || !left->need_column_store()
|
||||
|| !right->need_column_store() || left->get_writer_type() != right->get_writer_type()) {
|
||||
} else if (OB_ISNULL(left) || OB_ISNULL(right)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), KPC(left), KPC(right));
|
||||
} else if (!left->is_empty() && !right->is_empty()) {
|
||||
const ObChunkSliceStore *left_slice_store = left->is_cs_replica_write()
|
||||
? static_cast<const ObMultiSliceStore *>(left->get_slice_store())->get_column_slice_store()
|
||||
: static_cast<const ObChunkSliceStore *>(left->get_slice_store());
|
||||
const ObChunkSliceStore *right_slice_store = right->is_cs_replica_write()
|
||||
? static_cast<const ObMultiSliceStore *>(right->get_slice_store())->get_column_slice_store()
|
||||
: static_cast<const ObChunkSliceStore *>(right->get_slice_store());
|
||||
int cmp_ret = 0;
|
||||
if (OB_FAIL(left_slice_store->endkey_.compare(right_slice_store->endkey_, datum_utils_, cmp_ret))) {
|
||||
LOG_WARN("endkey compare failed", K(ret));
|
||||
if (!left->need_column_store() || !right->need_column_store() || left->get_writer_type() != right->get_writer_type()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid cs slice writer", K(ret), KPC(left), KPC(right));
|
||||
} else {
|
||||
bret = cmp_ret < 0;
|
||||
const ObChunkSliceStore *left_slice_store = left->is_cs_replica_write()
|
||||
? static_cast<const ObMultiSliceStore *>(left->get_slice_store())->get_column_slice_store()
|
||||
: static_cast<const ObChunkSliceStore *>(left->get_slice_store());
|
||||
const ObChunkSliceStore *right_slice_store = right->is_cs_replica_write()
|
||||
? static_cast<const ObMultiSliceStore *>(right->get_slice_store())->get_column_slice_store()
|
||||
: static_cast<const ObChunkSliceStore *>(right->get_slice_store());
|
||||
int cmp_ret = 0;
|
||||
if (OB_FAIL(left_slice_store->endkey_.compare(right_slice_store->endkey_, datum_utils_, cmp_ret))) {
|
||||
LOG_WARN("endkey compare failed", K(ret));
|
||||
} else {
|
||||
bret = cmp_ret < 0;
|
||||
}
|
||||
}
|
||||
} else if (left->is_empty() && right->is_empty()) {
|
||||
// both empty, compare pointer
|
||||
@ -1836,7 +1840,7 @@ public:
|
||||
}
|
||||
public:
|
||||
const ObStorageDatumUtils &datum_utils_;
|
||||
int ret_code_;
|
||||
int &ret_code_; // is not use reference, the ret_code_ will lose when use ob_sort
|
||||
};
|
||||
|
||||
int ObTabletDirectLoadMgr::calc_range(const int64_t thread_cnt)
|
||||
@ -1874,9 +1878,8 @@ int ObTabletDirectLoadMgr::calc_range(const int64_t thread_cnt)
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
CSSliceEndkeyCompareFunctor cmp(tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils());
|
||||
CSSliceEndkeyCompareFunctor cmp(tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils(), ret);
|
||||
lib::ob_sort(sorted_slices.begin(), sorted_slices.end(), cmp);
|
||||
ret = cmp.ret_code_;
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("sort slice failed", K(ret), K(sorted_slices));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user