[MDS] fix thread deadlock may happend when write KV unit failed
This commit is contained in:
@ -226,10 +226,8 @@ public:
|
|||||||
int for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(DUMP_OP &&for_each_op, const int64_t mds_construct_sequence, const bool for_flush);
|
int for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(DUMP_OP &&for_each_op, const int64_t mds_construct_sequence, const bool for_flush);
|
||||||
TO_STRING_KV(KP(this), K_(ls_id), K_(tablet_id), K_(flushing_scn),
|
TO_STRING_KV(KP(this), K_(ls_id), K_(tablet_id), K_(flushing_scn),
|
||||||
K_(rec_scn), K_(last_inner_recycled_scn), K_(total_node_cnt), K_(construct_sequence), K_(debug_info));
|
K_(rec_scn), K_(last_inner_recycled_scn), K_(total_node_cnt), K_(construct_sequence), K_(debug_info));
|
||||||
// template <typename SCAN_OP>
|
|
||||||
// int for_each_scan_node(SCAN_OP &&op);
|
|
||||||
template <typename SCAN_OP>
|
template <typename SCAN_OP>
|
||||||
int for_each_scan_row(SCAN_OP &&op);
|
int for_each_scan_row(FowEachRowAction action_type, SCAN_OP &&op);
|
||||||
MdsTableType &unit_tuple() { return unit_tuple_; }
|
MdsTableType &unit_tuple() { return unit_tuple_; }
|
||||||
private:// helper define
|
private:// helper define
|
||||||
struct ForEachUnitFillVirtualInfoHelper {
|
struct ForEachUnitFillVirtualInfoHelper {
|
||||||
@ -261,11 +259,12 @@ private:// helper define
|
|||||||
};
|
};
|
||||||
template <typename SCAN_OP>
|
template <typename SCAN_OP>
|
||||||
struct ForEachUnitScanRowHelper {
|
struct ForEachUnitScanRowHelper {
|
||||||
ForEachUnitScanRowHelper(SCAN_OP &op) : op_(op) {}
|
ForEachUnitScanRowHelper(FowEachRowAction action, SCAN_OP &op) : op_(op), action_type_(action) {}
|
||||||
template <typename K, typename V>
|
template <typename K, typename V>
|
||||||
int operator()(MdsUnit<K, V> &unit) { return unit.for_each_row(op_); }
|
int operator()(MdsUnit<K, V> &unit) { return unit.for_each_row(action_type_, op_); }
|
||||||
private:
|
private:
|
||||||
SCAN_OP &op_;
|
SCAN_OP &op_;
|
||||||
|
FowEachRowAction action_type_;
|
||||||
};
|
};
|
||||||
template <typename DUMP_OP, ENABLE_IF_LIKE_FUNCTION(DUMP_OP, int(const MdsDumpKV &))>
|
template <typename DUMP_OP, ENABLE_IF_LIKE_FUNCTION(DUMP_OP, int(const MdsDumpKV &))>
|
||||||
int for_each_to_dump_node_(DUMP_OP &&op, share::SCN &flushing_scn, const bool for_flush) {
|
int for_each_to_dump_node_(DUMP_OP &&op, share::SCN &flushing_scn, const bool for_flush) {
|
||||||
|
@ -944,9 +944,9 @@ int MdsTableImpl<MdsTableType>::calculate_flush_scn_and_need_dumped_nodes_cnt_(s
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn
|
RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn
|
||||||
CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump
|
CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump
|
||||||
if (MDS_FAIL(for_each_scan_row(op1))) {
|
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) {
|
||||||
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
||||||
} else if (MDS_FAIL(for_each_scan_row(op2))) {
|
} else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) {
|
||||||
MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed");
|
MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1082,7 +1082,7 @@ void MdsTableImpl<MdsTableType>::on_flush_(const share::SCN &flush_scn, const in
|
|||||||
do {
|
do {
|
||||||
need_retry = false;
|
need_retry = false;
|
||||||
CalculateRecScnOp op(flush_scn);
|
CalculateRecScnOp op(flush_scn);
|
||||||
if (MDS_FAIL(for_each_scan_row(op))) {// lock all rows failed, retry until lock all rows success
|
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCULATE_REC_SCN, op))) {// lock all rows failed, retry until lock all rows success
|
||||||
need_retry = true;
|
need_retry = true;
|
||||||
MDS_LOG_FLUSH(WARN, "fail to do on flush");// record row lock guard may failed, cause lock guard array may meet extended failed cause memory not enough, but retry will make it success
|
MDS_LOG_FLUSH(WARN, "fail to do on flush");// record row lock guard may failed, cause lock guard array may meet extended failed cause memory not enough, but retry will make it success
|
||||||
} else {
|
} else {
|
||||||
@ -1350,9 +1350,9 @@ int MdsTableImpl<MdsTableType>::is_locked_by_others(const Key &key,
|
|||||||
|
|
||||||
template <typename MdsTableType>
|
template <typename MdsTableType>
|
||||||
template <typename SCAN_OP>
|
template <typename SCAN_OP>
|
||||||
int MdsTableImpl<MdsTableType>::for_each_scan_row(SCAN_OP &&op)
|
int MdsTableImpl<MdsTableType>::for_each_scan_row(FowEachRowAction action_type, SCAN_OP &&op)
|
||||||
{// add lock on unit
|
{// add lock on unit
|
||||||
ForEachUnitScanRowHelper<SCAN_OP> for_each_op(op);
|
ForEachUnitScanRowHelper<SCAN_OP> for_each_op(action_type, op);
|
||||||
return unit_tuple_.for_each(for_each_op);
|
return unit_tuple_.for_each(for_each_op);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1415,7 +1415,7 @@ int MdsTableImpl<MdsTableType>::try_recycle(const share::SCN recycle_scn)
|
|||||||
// do nothing
|
// do nothing
|
||||||
} else {
|
} else {
|
||||||
RecycleNodeOp op(do_inner_recycle_scn);
|
RecycleNodeOp op(do_inner_recycle_scn);
|
||||||
if (OB_FAIL(for_each_scan_row(op))) {
|
if (OB_FAIL(for_each_scan_row(FowEachRowAction::RECYCLE, op))) {
|
||||||
MDS_LOG_GC(ERROR, "fail to do recycle");
|
MDS_LOG_GC(ERROR, "fail to do recycle");
|
||||||
} else {
|
} else {
|
||||||
last_inner_recycled_scn_ = do_inner_recycle_scn;
|
last_inner_recycled_scn_ = do_inner_recycle_scn;
|
||||||
@ -1462,7 +1462,7 @@ int MdsTableImpl<MdsTableType>::forcely_reset_mds_table(const char *reason)
|
|||||||
MDS_TG(100_ms);
|
MDS_TG(100_ms);
|
||||||
MdsWLockGuard lg(lock_);
|
MdsWLockGuard lg(lock_);
|
||||||
ForcelyReleaseAllNodeOp op(reason);
|
ForcelyReleaseAllNodeOp op(reason);
|
||||||
if (OB_FAIL(for_each_scan_row(op))) {
|
if (OB_FAIL(for_each_scan_row(FowEachRowAction::RESET, op))) {
|
||||||
MDS_LOG_GC(ERROR, "fail to do reset");
|
MDS_LOG_GC(ERROR, "fail to do reset");
|
||||||
} else {
|
} else {
|
||||||
debug_info_.last_reset_ts_ = ObClockGenerator::getCurrentTime();
|
debug_info_.last_reset_ts_ = ObClockGenerator::getCurrentTime();
|
||||||
|
@ -112,7 +112,7 @@ public:
|
|||||||
template <typename OP>
|
template <typename OP>
|
||||||
int for_each_node_on_row(OP &&op) const;
|
int for_each_node_on_row(OP &&op) const;
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
int for_each_row(OP &&op);
|
int for_each_row(FowEachRowAction action_type, OP &&op);
|
||||||
void lock() const { lock_.wrlock(); }
|
void lock() const { lock_.wrlock(); }
|
||||||
void unlock() const { lock_.unlock(); }
|
void unlock() const { lock_.unlock(); }
|
||||||
int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array, const int64_t unit_id) const;
|
int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array, const int64_t unit_id) const;
|
||||||
@ -184,7 +184,7 @@ public:
|
|||||||
template <typename OP>
|
template <typename OP>
|
||||||
int for_each_node_on_row(OP &&op) const;
|
int for_each_node_on_row(OP &&op) const;
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
int for_each_row(OP &&op) const;
|
int for_each_row(FowEachRowAction action_type, OP &&op) const;
|
||||||
void lock() const { lock_.wrlock(); }
|
void lock() const { lock_.wrlock(); }
|
||||||
void unlock() const { lock_.unlock(); }
|
void unlock() const { lock_.unlock(); }
|
||||||
int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array, const int64_t unit_id) const;
|
int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array, const int64_t unit_id) const;
|
||||||
|
@ -176,7 +176,7 @@ int MdsUnit<K, V>::for_each_node_on_row(OP &&op) const
|
|||||||
|
|
||||||
template <typename K, typename V>
|
template <typename K, typename V>
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
int MdsUnit<K, V>::for_each_row(OP &&op)// node maybe recycled in this function
|
int MdsUnit<K, V>::for_each_row(FowEachRowAction action_type, OP &&op)// node maybe recycled in this function
|
||||||
{
|
{
|
||||||
#define PRINT_WRAPPER KR(ret)
|
#define PRINT_WRAPPER KR(ret)
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -184,18 +184,25 @@ int MdsUnit<K, V>::for_each_row(OP &&op)// node maybe recycled in this function
|
|||||||
MdsWLockGuard lg(lock_);
|
MdsWLockGuard lg(lock_);
|
||||||
CLICK();
|
CLICK();
|
||||||
multi_row_list_.for_each_node_from_head_to_tail_until_true(
|
multi_row_list_.for_each_node_from_head_to_tail_until_true(
|
||||||
[&op, &ret, this](const KvPair<K, Row<K, V>> &kv_row) mutable {
|
[action_type, &op, &ret, this](const KvPair<K, Row<K, V>> &kv_row) mutable {
|
||||||
MDS_TG(1_ms);
|
MDS_TG(1_ms);
|
||||||
const K *p_k = &kv_row.k_;
|
const K *p_k = &kv_row.k_;
|
||||||
const Row<K, V> &row = kv_row.v_;
|
const Row<K, V> &row = kv_row.v_;
|
||||||
if (MDS_FAIL(op(row))) {
|
if (MDS_FAIL(op(row))) {
|
||||||
MDS_LOG_SCAN(WARN, "fail to scan row", KPC(p_k));
|
MDS_LOG_SCAN(WARN, "fail to scan row", KPC(p_k));
|
||||||
}
|
}
|
||||||
|
// CAUTIONS: not every path scan need recycle empty row, or maybe result some problem unexpected, for example:
|
||||||
|
// CALCULATE_REC_SCN operation will lock rows inner op, but will not release locks after op executed done.
|
||||||
|
// (to resolve replay out of order problem, if repaly concurrent happened with calculate rec_scn, without lock's protection, will finally get a wrong rec_scn)
|
||||||
|
// but destroy mds_row will add row's lock inner destruction, which will resulting deadlock in same thread.
|
||||||
|
// so only operations logic behaves like gc should recycle empty row.
|
||||||
|
if (FowEachRowAction::RECYCLE == action_type || FowEachRowAction::RESET == action_type) {
|
||||||
if (row.sorted_list_.empty()) {// if this row is recycled, just delete it
|
if (row.sorted_list_.empty()) {// if this row is recycled, just delete it
|
||||||
KvPair<K, Row<K, V>> *p_kv = &const_cast<KvPair<K, Row<K, V>> &>(kv_row);
|
KvPair<K, Row<K, V>> *p_kv = &const_cast<KvPair<K, Row<K, V>> &>(kv_row);
|
||||||
multi_row_list_.del(p_kv);
|
multi_row_list_.del(p_kv);
|
||||||
MdsFactory::destroy(p_kv);
|
MdsFactory::destroy(p_kv);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return OB_SUCCESS != ret;// keep scanning until meet failure
|
return OB_SUCCESS != ret;// keep scanning until meet failure
|
||||||
});
|
});
|
||||||
return ret;
|
return ret;
|
||||||
@ -760,7 +767,8 @@ int MdsUnit<DummyKey, V>::for_each_node_on_row(OP &&op) const {
|
|||||||
|
|
||||||
template <typename V>
|
template <typename V>
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
int MdsUnit<DummyKey, V>::for_each_row(OP &&op) const {
|
int MdsUnit<DummyKey, V>::for_each_row(FowEachRowAction action_type, OP &&op) const {
|
||||||
|
UNUSED(action_type);
|
||||||
#define PRINT_WRAPPER KR(ret)
|
#define PRINT_WRAPPER KR(ret)
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
MDS_TG(10_ms);
|
MDS_TG(10_ms);
|
||||||
|
@ -38,6 +38,14 @@ enum class NodePosition {
|
|||||||
POSITION_END,
|
POSITION_END,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum class FowEachRowAction {
|
||||||
|
CALCUALTE_FLUSH_SCN,
|
||||||
|
COUNT_NODES_BEFLOW_FLUSH_SCN,
|
||||||
|
CALCULATE_REC_SCN,
|
||||||
|
RECYCLE,
|
||||||
|
RESET,
|
||||||
|
};
|
||||||
|
|
||||||
inline const char *obj_to_string(NodePosition pos) {
|
inline const char *obj_to_string(NodePosition pos) {
|
||||||
const char *ret = "UNKNOWN";
|
const char *ret = "UNKNOWN";
|
||||||
switch (pos) {
|
switch (pos) {
|
||||||
|
Reference in New Issue
Block a user