serialize all change status in DupTabletLogBody
Co-authored-by: Minionyh <minions.yh@gmail.com>
This commit is contained in:
parent
eb5068370a
commit
a4bf57407c
@ -368,6 +368,7 @@ TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dup_table_trx_read_committed)
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("INSERT_TRX_COMMIT", tmp_event_val, 30 * 1000 * 1000));
|
||||
check_dup_table_insert_readable(this, 1, true /*expected_follower_read*/);
|
||||
ASSERT_EQ(OB_SUCCESS, finish_event("ZONE2_READ_INSERT_COMMITTED", ""));
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(3), dup_table_trx_read_committed)
|
||||
@ -377,11 +378,19 @@ TEST_F(GET_ZONE_TEST_CLASS_NAME(3), dup_table_trx_read_committed)
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("INSERT_TRX_COMMIT", tmp_event_val, 30 * 1000 * 1000, 100));
|
||||
check_dup_table_insert_readable(this, 1, true /*expected_follower_read*/);
|
||||
ASSERT_EQ(OB_SUCCESS, finish_event("ZONE3_READ_INSERT_COMMITTED", ""));
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), remove_dup_table)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("ZONE2_READ_INSERT_COMMITTED", tmp_event_val, 30 * 1000 * 1000));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("ZONE3_READ_INSERT_COMMITTED", tmp_event_val, 30 * 1000 * 1000));
|
||||
|
||||
WRITE_SQL_BY_CONN(static_test_conn_, "drop table Persons");
|
||||
GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.ls_id_num_, ls_handle);
|
||||
|
||||
|
@ -378,10 +378,10 @@ bool ObRedoLogGenerator::check_dup_tablet_(const ObITransCallback *callback_ptr)
|
||||
if (MutatorType::MUTATOR_ROW == callback_ptr->get_mutator_type()) {
|
||||
const ObMvccRowCallback *row_iter = static_cast<const ObMvccRowCallback *>(callback_ptr);
|
||||
const ObTabletID &target_tablet = row_iter->get_tablet_id();
|
||||
if (OB_TMP_FAIL(mem_ctx_->get_trans_ctx()->merge_tablet_modify_record_(target_tablet))) {
|
||||
TRANS_LOG_RET(WARN, tmp_ret, "merge tablet modify record failed", K(tmp_ret),
|
||||
K(target_tablet), KPC(row_iter));
|
||||
}
|
||||
// if (OB_TMP_FAIL(mem_ctx_->get_trans_ctx()->merge_tablet_modify_record_(target_tablet))) {
|
||||
// TRANS_LOG_RET(WARN, tmp_ret, "merge tablet modify record failed", K(tmp_ret),
|
||||
// K(target_tablet), KPC(row_iter));
|
||||
// }
|
||||
// check dup table
|
||||
}
|
||||
|
||||
|
@ -563,7 +563,7 @@ private:
|
||||
* Dup_Table Tablets
|
||||
*******************************************************/
|
||||
|
||||
class DupTabletCommonHeader
|
||||
class DupTabletSetCommonHeader
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
@ -600,14 +600,14 @@ public:
|
||||
K(tablet_set_type_),
|
||||
K(sp_op_type_));
|
||||
|
||||
DupTabletCommonHeader(const uint64_t id) : unique_id_(id)
|
||||
DupTabletSetCommonHeader(const uint64_t id) : unique_id_(id)
|
||||
{
|
||||
// set_free();
|
||||
// set_invalid_sp_op_type();
|
||||
reuse();
|
||||
}
|
||||
DupTabletCommonHeader() { reset(); }
|
||||
~DupTabletCommonHeader() { reset(); }
|
||||
DupTabletSetCommonHeader() { reset(); }
|
||||
~DupTabletSetCommonHeader() { reset(); }
|
||||
|
||||
bool is_valid() const {
|
||||
return unique_id_is_valid() && tablet_set_type_is_valid();
|
||||
@ -669,7 +669,7 @@ public:
|
||||
}
|
||||
// bool contain_special_op(uint64_t special_op) const { return get_special_op_() == special_op; }
|
||||
bool no_specail_op() const { return INVALID_SPECIAL_OP == sp_op_type_; }
|
||||
void copy_tablet_set_type(const DupTabletCommonHeader &src_common_header)
|
||||
void copy_tablet_set_type(const DupTabletSetCommonHeader &src_common_header)
|
||||
{
|
||||
set_unique_id_(src_common_header.get_unique_id());
|
||||
set_special_op_(src_common_header.get_special_op());
|
||||
@ -712,7 +712,7 @@ private:
|
||||
int64_t sp_op_type_;
|
||||
};
|
||||
|
||||
typedef common::ObSEArray<DupTabletCommonHeader, 3> DupTabletSetIDArray;
|
||||
typedef common::ObSEArray<DupTabletSetCommonHeader, 3> DupTabletSetIDArray;
|
||||
|
||||
/*******************************************************
|
||||
* Dup_Table Checkpoint
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,4 @@
|
||||
//Copyrigh (c) 2021 OceanBase
|
||||
// Copyrigh (c) 2021 OceanBase
|
||||
// OceanBase is licensed under Mulan PubL v2.
|
||||
// You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
// You may obtain a copy of Mulan PubL v2 at:
|
||||
@ -11,14 +11,14 @@
|
||||
#ifndef OCEANBASE_DUP_TABLE_TABLETS_H
|
||||
#define OCEANBASE_DUP_TABLE_TABLETS_H
|
||||
|
||||
#include "lib/list/ob_dlist.h"
|
||||
#include "lib/queue/ob_fixed_queue.h"
|
||||
#include "common/ob_tablet_id.h"
|
||||
#include "lib/lock/ob_spin_rwlock.h"
|
||||
#include "lib/hash/ob_hashmap.h"
|
||||
#include "lib/hash/ob_hashset.h"
|
||||
#include "ob_dup_table_ts_sync.h"
|
||||
#include "lib/list/ob_dlist.h"
|
||||
#include "lib/lock/ob_spin_rwlock.h"
|
||||
#include "lib/queue/ob_fixed_queue.h"
|
||||
#include "ob_dup_table_base.h"
|
||||
#include "ob_dup_table_ts_sync.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -35,6 +35,21 @@ class ObDupTableLSHandler;
|
||||
//****** ObLSDupTabletsMgr
|
||||
//**********************************************************************
|
||||
|
||||
struct DupTabletInfo
|
||||
{
|
||||
int64_t update_dup_schema_ts_;
|
||||
|
||||
void reset() { update_dup_schema_ts_ = 0; }
|
||||
|
||||
DupTabletInfo() { reset(); }
|
||||
|
||||
TO_STRING_KV(K(update_dup_schema_ts_));
|
||||
};
|
||||
|
||||
typedef common::hash::
|
||||
ObHashMap<common::ObTabletID, DupTabletInfo, common::hash::NoPthreadDefendMode>
|
||||
DupTabletIdMap;
|
||||
|
||||
enum class DupTabletSetChangeFlag
|
||||
{
|
||||
UNKNOWN = -1,
|
||||
@ -81,14 +96,11 @@ static const char *get_dup_tablet_flag_str(const DupTabletSetChangeFlag &flag)
|
||||
return flag_str;
|
||||
}
|
||||
|
||||
struct DupTabletSetChangeStatus
|
||||
class DupTabletSetChangeStatus
|
||||
{
|
||||
DupTabletSetChangeFlag flag_;
|
||||
share::SCN tablet_change_scn_;
|
||||
share::SCN need_confirm_scn_;
|
||||
share::SCN readable_version_;
|
||||
int64_t trx_ref_;
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
void init()
|
||||
{
|
||||
reset();
|
||||
@ -105,6 +117,12 @@ struct DupTabletSetChangeStatus
|
||||
}
|
||||
|
||||
DupTabletSetChangeStatus() { reset(); }
|
||||
~DupTabletSetChangeStatus() { reset(); }
|
||||
|
||||
const share::SCN &get_readable_version() { return readable_version_; }
|
||||
const share::SCN &get_tablet_change_scn() { return tablet_change_scn_; }
|
||||
const share::SCN &get_need_confirm_scn() { return need_confirm_scn_; }
|
||||
const DupTabletSetChangeFlag &get_tablet_set_change_falg() { return flag_; }
|
||||
|
||||
bool is_valid() const { return flag_ != DupTabletSetChangeFlag::UNKNOWN; }
|
||||
bool need_log() const
|
||||
@ -131,7 +149,7 @@ struct DupTabletSetChangeStatus
|
||||
bool can_be_confirmed_anytime() const
|
||||
{
|
||||
return (trx_ref_ == 0 && readable_version_ >= need_confirm_scn_
|
||||
&& flag_ == DupTabletSetChangeFlag::CONFIRMING)
|
||||
&& flag_ == DupTabletSetChangeFlag::CONFIRMING)
|
||||
|| flag_ == DupTabletSetChangeFlag::CONFIRMED;
|
||||
}
|
||||
bool has_confirmed() const { return DupTabletSetChangeFlag::CONFIRMED == flag_; }
|
||||
@ -236,60 +254,64 @@ struct DupTabletSetChangeStatus
|
||||
K(need_confirm_scn_),
|
||||
K(readable_version_),
|
||||
K(trx_ref_));
|
||||
};
|
||||
|
||||
struct DupTabletInfo
|
||||
{
|
||||
int64_t update_dup_schema_ts_;
|
||||
|
||||
void reset() { update_dup_schema_ts_ = 0; }
|
||||
|
||||
DupTabletInfo() { reset(); }
|
||||
|
||||
TO_STRING_KV(K(update_dup_schema_ts_));
|
||||
};
|
||||
|
||||
typedef common::hash::
|
||||
ObHashMap<common::ObTabletID, DupTabletInfo, common::hash::NoPthreadDefendMode>
|
||||
DupTabletIdMap;
|
||||
|
||||
// class DupTabletHashMap : public DupTabletIdMap
|
||||
// {
|
||||
// public:
|
||||
// NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
// TO_STRING_KV(K(common_header_), K(size()));
|
||||
// const DupTabletCommonHeader &get_common_header() { return common_header_; }
|
||||
//
|
||||
// int create(int64_t unique_id, int64_t bucket_num, const lib::ObLabel &bucket_label);
|
||||
// void destroy()
|
||||
// {
|
||||
// DupTabletIdMap::destroy();
|
||||
// common_header_.reset();
|
||||
// }
|
||||
//
|
||||
// private:
|
||||
// DupTabletCommonHeader common_header_;
|
||||
// };
|
||||
|
||||
class DupTabletChangeMap : public common::ObDLinkBase<DupTabletChangeMap>, public DupTabletIdMap
|
||||
{
|
||||
public:
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
DupTabletChangeMap(const uint64_t set_id) : change_status_(), common_header_(set_id) { reuse(); }
|
||||
DupTabletSetChangeFlag flag_;
|
||||
share::SCN tablet_change_scn_;
|
||||
share::SCN need_confirm_scn_;
|
||||
share::SCN readable_version_;
|
||||
int64_t trx_ref_;
|
||||
};
|
||||
|
||||
struct DupTabletSetAttribute
|
||||
{
|
||||
DupTabletSetCommonHeader common_header_;
|
||||
DupTabletSetChangeStatus change_status_;
|
||||
|
||||
void reuse()
|
||||
{
|
||||
|
||||
change_status_.init();
|
||||
change_status_.set_temporary();
|
||||
// common_header_.clean_sp_op();
|
||||
// common_header_.set_free();
|
||||
common_header_.reuse();
|
||||
}
|
||||
|
||||
void reset()
|
||||
{
|
||||
common_header_.reset();
|
||||
change_status_.reset();
|
||||
}
|
||||
DupTabletSetAttribute(const uint64_t uid) : common_header_(uid), change_status_() { reuse(); }
|
||||
DupTabletSetAttribute() { reset(); }
|
||||
|
||||
TO_STRING_KV(K(common_header_), K(change_status_));
|
||||
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
class DupTabletChangeMap : public common::ObDLinkBase<DupTabletChangeMap>, public DupTabletIdMap
|
||||
{
|
||||
public:
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
static int deserialize_set_attribute(const char *buf,
|
||||
int64_t data_len,
|
||||
int64_t pos,
|
||||
DupTabletSetAttribute &deser_attr);
|
||||
|
||||
DupTabletChangeMap(const uint64_t set_id) : dup_set_attr_(set_id) { reuse(); }
|
||||
|
||||
void reuse()
|
||||
{
|
||||
dup_set_attr_.reuse();
|
||||
DupTabletIdMap::clear();
|
||||
}
|
||||
|
||||
void destroy()
|
||||
{
|
||||
reset();
|
||||
dup_set_attr_.reset();
|
||||
common::ObDLinkBase<DupTabletChangeMap>::reset();
|
||||
DupTabletIdMap::destroy();
|
||||
}
|
||||
|
||||
@ -298,28 +320,23 @@ public:
|
||||
DupTabletSetChangeStatus *get_change_status()
|
||||
{
|
||||
DupTabletSetChangeStatus *change_status_ptr = nullptr;
|
||||
if (common_header_.is_readable_set()) {
|
||||
if (dup_set_attr_.common_header_.is_readable_set()) {
|
||||
change_status_ptr = nullptr;
|
||||
} else {
|
||||
change_status_ptr = &change_status_;
|
||||
change_status_ptr = &dup_set_attr_.change_status_;
|
||||
}
|
||||
return change_status_ptr;
|
||||
}
|
||||
DupTabletCommonHeader &get_common_header() { return common_header_; }
|
||||
DupTabletSetCommonHeader &get_common_header() { return dup_set_attr_.common_header_; }
|
||||
bool need_reserve(const share::SCN &scn) const
|
||||
{
|
||||
return change_status_.need_reserve(scn);
|
||||
return dup_set_attr_.change_status_.need_reserve(scn);
|
||||
}
|
||||
|
||||
TO_STRING_KV(K(change_status_),
|
||||
K(common_header_),
|
||||
K(DupTabletIdMap::size()),
|
||||
K(DupTabletIdMap::created()));
|
||||
TO_STRING_KV(K(dup_set_attr_), K(DupTabletIdMap::size()), K(DupTabletIdMap::created()));
|
||||
|
||||
private:
|
||||
DupTabletSetChangeStatus change_status_;
|
||||
DupTabletCommonHeader common_header_;
|
||||
// DupTabletIdMap tablet_id_map_;
|
||||
DupTabletSetAttribute dup_set_attr_;
|
||||
};
|
||||
|
||||
class TabletsSerCallBack : public IHashSerCallBack
|
||||
@ -333,10 +350,7 @@ public:
|
||||
class TabletsDeSerCallBack : public IHashDeSerCallBack
|
||||
{
|
||||
public:
|
||||
TabletsDeSerCallBack(const char *buf,
|
||||
int64_t buf_len,
|
||||
int64_t pos,
|
||||
int64_t deser_time)
|
||||
TabletsDeSerCallBack(const char *buf, int64_t buf_len, int64_t pos, int64_t deser_time)
|
||||
: IHashDeSerCallBack(buf, buf_len, pos), deser_time_(deser_time)
|
||||
{}
|
||||
int operator()(DupTabletChangeMap &dup_tablet_map);
|
||||
@ -351,141 +365,27 @@ public:
|
||||
int64_t operator()(const common::hash::HashMapPair<common::ObTabletID, DupTabletInfo> &hash_pair);
|
||||
};
|
||||
|
||||
|
||||
class DupTabletChangeLogTail
|
||||
class DupTabletLogBody
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
DupTabletChangeLogTail() {}
|
||||
DupTabletChangeLogTail(const share::SCN &readable_version, bool confirm_all)
|
||||
: readable_version_(readable_version), has_confirmed_(confirm_all)
|
||||
{}
|
||||
bool is_valid() const;
|
||||
void reset()
|
||||
{
|
||||
readable_version_.set_min();
|
||||
has_confirmed_ = false;
|
||||
}
|
||||
|
||||
share::SCN readable_version_;
|
||||
bool has_confirmed_;
|
||||
|
||||
TO_STRING_KV(K(readable_version_), K(has_confirmed_));
|
||||
};
|
||||
|
||||
typedef ObSEArray<DupTabletCommonHeader, 10> OpObjectArr;
|
||||
class DupTabletSpecialOpArg
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
static int deserialize_dup_attribute(const char *buf,
|
||||
const int64_t data_len,
|
||||
int64_t pos,
|
||||
DupTabletSetAttribute &deser_attr);
|
||||
|
||||
public:
|
||||
DupTabletSpecialOpArg() {}
|
||||
DupTabletLogBody(DupTabletChangeMap &hash_map) : hash_map_(hash_map) {}
|
||||
|
||||
void reset() { op_objects_.reset(); }
|
||||
bool is_valid() { return !op_objects_.empty(); }
|
||||
void reset() {}
|
||||
|
||||
TO_STRING_KV(K(op_objects_));
|
||||
public:
|
||||
OpObjectArr op_objects_;
|
||||
};
|
||||
const DupTabletChangeMap &get_cur_map_ref() { return hash_map_; }
|
||||
|
||||
typedef common::hash::ObHashMap<uint64_t, DupTabletSpecialOpArg, common::hash::NoPthreadDefendMode>
|
||||
SpecialOpArgMap;
|
||||
|
||||
class DupTabletCommonLogBody
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
DupTabletCommonLogBody(DupTabletChangeMap &hash_map) : tablet_id_map_(hash_map) {}
|
||||
|
||||
TO_STRING_KV(K(tablet_id_map_));
|
||||
|
||||
protected:
|
||||
DupTabletChangeMap &tablet_id_map_;
|
||||
};
|
||||
|
||||
class DupTabletChangeLogBody : public DupTabletCommonLogBody
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
DupTabletChangeLogBody(DupTabletChangeMap &hash_map) : DupTabletCommonLogBody(hash_map)
|
||||
{
|
||||
change_tail_.readable_version_ =
|
||||
DupTabletCommonLogBody::tablet_id_map_.get_change_status()->readable_version_;
|
||||
change_tail_.has_confirmed_ = tablet_id_map_.get_change_status()->has_confirmed();
|
||||
}
|
||||
|
||||
const DupTabletChangeLogTail &get_change_tail() { return change_tail_; }
|
||||
|
||||
INHERIT_TO_STRING_KV("common_log_body", DupTabletCommonLogBody, K(change_tail_));
|
||||
TO_STRING_KV(K(hash_map_));
|
||||
|
||||
private:
|
||||
DupTabletChangeLogTail change_tail_;
|
||||
};
|
||||
|
||||
class DupTabletSpecialOpLogBody : public DupTabletChangeLogBody
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
||||
public:
|
||||
DupTabletSpecialOpLogBody(DupTabletChangeMap &hash_map, DupTabletSpecialOpArg &op_arg)
|
||||
: DupTabletChangeLogBody(hash_map), sp_op_arg_(op_arg)
|
||||
{}
|
||||
|
||||
INHERIT_TO_STRING_KV("change_log_body", DupTabletChangeLogBody, K(sp_op_arg_));
|
||||
|
||||
private:
|
||||
DupTabletSpecialOpArg &sp_op_arg_;
|
||||
};
|
||||
|
||||
class DupTabletLog
|
||||
{
|
||||
public:
|
||||
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
|
||||
int deserialize_common_header(const char *buf, const int64_t data_len, int64_t &pos);
|
||||
int deserialize_content(const char *buf, const int64_t data_len, int64_t &pos);
|
||||
|
||||
int64_t get_serialize_size();
|
||||
|
||||
public:
|
||||
DupTabletLog(DupTabletChangeMap *hash_map) : hash_map_(hash_map)
|
||||
{
|
||||
common_header_ = hash_map_->get_common_header();
|
||||
}
|
||||
|
||||
DupTabletLog(DupTabletChangeLogTail change_tail,
|
||||
DupTabletChangeMap *hash_map,
|
||||
DupTabletSpecialOpArg *sp_op_arg = nullptr)
|
||||
: hash_map_(hash_map), change_tail_(change_tail), special_op_arg_(sp_op_arg)
|
||||
{
|
||||
common_header_ = hash_map_->get_common_header();
|
||||
};
|
||||
|
||||
DupTabletLog() { reset(); };
|
||||
|
||||
void reset()
|
||||
{
|
||||
common_header_.reset();
|
||||
hash_map_ = nullptr;
|
||||
change_tail_.reset();
|
||||
special_op_arg_ = nullptr;
|
||||
}
|
||||
|
||||
int set_hash_map_ptr(DupTabletChangeMap *hash_map_ptr,
|
||||
DupTabletSpecialOpArg *special_op_arg_ = nullptr);
|
||||
const DupTabletCommonHeader &get_common_header();
|
||||
const DupTabletChangeLogTail &get_change_tail();
|
||||
|
||||
TO_STRING_KV(K(common_header_), K(change_tail_), K(hash_map_));
|
||||
|
||||
private:
|
||||
DupTabletCommonHeader common_header_;
|
||||
DupTabletChangeMap *hash_map_;
|
||||
DupTabletChangeLogTail change_tail_;
|
||||
DupTabletSpecialOpArg *special_op_arg_;
|
||||
DupTabletChangeMap &hash_map_;
|
||||
};
|
||||
|
||||
// ***********************************************************************************************
|
||||
@ -570,7 +470,8 @@ private:
|
||||
// ***********************************************************************************************
|
||||
// * If move a tablet from old to readable without confirm
|
||||
// * Problem:
|
||||
// * 1. Leader (A) tablet1(readable->old); submit lease log(log_ts = n); tablet1(old->new->readable); switch_to_follower
|
||||
// * 1. Leader (A) tablet1(readable->old); submit lease log(log_ts = n);
|
||||
// tablet1(old->new->readable); switch_to_follower
|
||||
// * Follower (B) replay log n=>tablet1(readable->old); switch_to_leader
|
||||
// * 2. Follower(A) tablet1(readable),replay_ts = n
|
||||
// * Leaser(B) tablet1(old); confirm A replay_ts > n ; tablet1(old->delete)
|
||||
@ -591,6 +492,7 @@ public:
|
||||
bool is_master() { return ATOMIC_LOAD(&is_master_); }
|
||||
|
||||
const static int64_t MAX_CONFIRMING_TABLET_COUNT;
|
||||
|
||||
public:
|
||||
int check_readable(const common::ObTabletID &tablet_id,
|
||||
bool &readable,
|
||||
@ -646,10 +548,8 @@ public:
|
||||
// K(need_confirm_new_queue_.get_size()),
|
||||
// K(old_tablets_),
|
||||
// K(readable_tablets_));
|
||||
int get_tablets_stat(ObDupLSTabletsStatIterator &collect_iter,
|
||||
const share::ObLSID &ls_id);
|
||||
int get_tablet_set_stat(ObDupLSTabletSetStatIterator &collect_iter,
|
||||
const share::ObLSID &ls_id);
|
||||
int get_tablets_stat(ObDupLSTabletsStatIterator &collect_iter, const share::ObLSID &ls_id);
|
||||
int get_tablet_set_stat(ObDupLSTabletSetStatIterator &collect_iter, const share::ObLSID &ls_id);
|
||||
|
||||
private:
|
||||
class GcDiscardedDupTabletHandler
|
||||
@ -657,7 +557,7 @@ private:
|
||||
public:
|
||||
GcDiscardedDupTabletHandler(int64_t update_ts,
|
||||
int64_t gc_time_interval,
|
||||
const DupTabletCommonHeader &common_header,
|
||||
const DupTabletSetCommonHeader &common_header,
|
||||
DupTabletChangeMap &old_tablets)
|
||||
: gc_ts_(update_ts), gc_time_interval_(gc_time_interval), gc_tablet_cnt_(0),
|
||||
ret_(OB_SUCCESS), src_common_header_(common_header), old_tablets_(old_tablets)
|
||||
@ -671,7 +571,7 @@ private:
|
||||
int64_t gc_time_interval_;
|
||||
int64_t gc_tablet_cnt_;
|
||||
int ret_;
|
||||
DupTabletCommonHeader src_common_header_;
|
||||
DupTabletSetCommonHeader src_common_header_;
|
||||
DupTabletChangeMap &old_tablets_;
|
||||
};
|
||||
|
||||
@ -726,9 +626,9 @@ private:
|
||||
const TabletSetAttr attr,
|
||||
// const int64_t tablet_gc_window,
|
||||
ObDupLSTabletsStatIterator &collect_iter)
|
||||
: collect_ts_(collect_ts),ls_id_(ls_id), tenant_id_(tenant_id), addr_(addr),
|
||||
: collect_ts_(collect_ts), ls_id_(ls_id), tenant_id_(tenant_id), addr_(addr),
|
||||
is_master_(is_master), tablet_set_id_(tablet_set_id), attr_(attr),
|
||||
//tablet_gc_window_(tablet_gc_window)
|
||||
// tablet_gc_window_(tablet_gc_window)
|
||||
collect_iter_(collect_iter)
|
||||
{}
|
||||
int operator()(const common::hash::HashMapPair<common::ObTabletID, DupTabletInfo> &hash_pair);
|
||||
@ -759,7 +659,7 @@ private:
|
||||
int get_free_tablet_set(DupTabletChangeMap *&free_set, const uint64_t target_id = 0);
|
||||
|
||||
// If get a free tablet set, need set tablet set type and push into queue
|
||||
int get_target_tablet_set_(const DupTabletCommonHeader &target_common_header,
|
||||
int get_target_tablet_set_(const DupTabletSetCommonHeader &target_common_header,
|
||||
DupTabletChangeMap *&target_set,
|
||||
const bool construct_target_set = false,
|
||||
const bool need_changing_new_set = false);
|
||||
@ -767,8 +667,8 @@ private:
|
||||
int check_and_recycle_empty_readable_set(DupTabletChangeMap *need_free_set, bool &need_remove);
|
||||
int return_tablet_set(DupTabletChangeMap *need_free_set);
|
||||
|
||||
int clean_readable_tablets_(const share::SCN & min_reserve_tablet_scn);
|
||||
int clean_durable_confirming_tablets_(const share::SCN & min_reserve_tablet_scn);
|
||||
int clean_readable_tablets_(const share::SCN &min_reserve_tablet_scn);
|
||||
int clean_durable_confirming_tablets_(const share::SCN &min_reserve_tablet_scn);
|
||||
int clean_unlog_tablets_();
|
||||
int construct_empty_block_confirm_task_(const int64_t trx_ref);
|
||||
int search_special_op_(uint64_t special_op_type);
|
||||
@ -790,7 +690,7 @@ private:
|
||||
//
|
||||
static int64_t GC_DUP_TABLETS_TIME_INTERVAL; // 5 min
|
||||
static int64_t GC_DUP_TABLETS_FAILED_TIMEOUT; // 25 min
|
||||
const static int64_t GC_TIMEOUT; // 1s
|
||||
const static int64_t GC_TIMEOUT; // 1s
|
||||
|
||||
const static int64_t RESERVED_FREE_SET_COUNT;
|
||||
const static int64_t MAX_FREE_SET_COUNT;
|
||||
@ -822,8 +722,6 @@ private:
|
||||
common::ObDList<DupTabletChangeMap> readable_tablets_list_;
|
||||
DupTabletChangeMap *removing_old_set_;
|
||||
|
||||
SpecialOpArgMap op_arg_map_;
|
||||
|
||||
// gc_dup_table
|
||||
int64_t last_gc_succ_time_;
|
||||
|
||||
@ -845,6 +743,7 @@ public:
|
||||
ObTabletIDArray &get_array() { return array_; }
|
||||
const ObTabletIDArray &get_array() const { return array_; }
|
||||
void set_ls_id(const share::ObLSID &ls_id) { ls_id_ = ls_id; }
|
||||
|
||||
private:
|
||||
share::ObLSID ls_id_;
|
||||
ObTabletIDArray array_;
|
||||
@ -854,16 +753,19 @@ class ObTenantDupTabletSchemaHelper
|
||||
{
|
||||
public:
|
||||
typedef common::hash::ObHashSet<common::ObTabletID, hash::NoPthreadDefendMode> TabletIDSet;
|
||||
|
||||
public:
|
||||
ObTenantDupTabletSchemaHelper() {}
|
||||
ObTenantDupTabletSchemaHelper() {}
|
||||
|
||||
public:
|
||||
int refresh_and_get_tablet_set(TabletIDSet &tenant_dup_tablet_set);
|
||||
|
||||
private:
|
||||
int get_all_dup_tablet_set_(TabletIDSet & tablets_set);
|
||||
int get_all_dup_tablet_set_(TabletIDSet &tablets_set);
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
|
||||
} // namespace transaction
|
||||
|
||||
} // namespace oceanbase
|
||||
|
@ -91,7 +91,7 @@ int ObDupTabletScanTask::refresh_dup_tablet_schema_(
|
||||
if (OB_FAIL(ls_status_op.get_duplicate_ls_status_info(MTL_ID(), *GCTX.sql_proxy_,
|
||||
dup_ls_status_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
DUP_TABLE_LOG(WARN, "no duplicate ls", K(dup_ls_status_info));
|
||||
DUP_TABLE_LOG(DEBUG, "no duplicate ls", K(dup_ls_status_info));
|
||||
} else {
|
||||
DUP_TABLE_LOG(WARN, "get duplicate ls status info failed", K(ret), K(dup_ls_status_info));
|
||||
}
|
||||
|
@ -1018,7 +1018,6 @@ void ObTxExecInfo::reset()
|
||||
prepare_log_info_arr_.reset();
|
||||
xid_.reset();
|
||||
need_checksum_ = true;
|
||||
tablet_modify_record_.reset();
|
||||
is_sub2pc_ = false;
|
||||
}
|
||||
|
||||
@ -1058,7 +1057,6 @@ OB_SERIALIZE_MEMBER(ObTxExecInfo,
|
||||
prepare_log_info_arr_,
|
||||
xid_,
|
||||
need_checksum_,
|
||||
tablet_modify_record_,
|
||||
is_sub2pc_);
|
||||
|
||||
bool ObMulSourceDataNotifyArg::is_redo_submitted() const { return redo_submitted_; }
|
||||
|
@ -1670,7 +1670,6 @@ public:
|
||||
// for xa
|
||||
ObXATransID xid_;
|
||||
bool need_checksum_;
|
||||
common::ObSEArray<common::ObTabletID, MAX_TABLET_MODIFY_RECORD_COUNT> tablet_modify_record_;
|
||||
bool is_sub2pc_;
|
||||
};
|
||||
|
||||
|
@ -6314,8 +6314,6 @@ int ObPartTransCtx::search_unsubmitted_dup_table_redo_()
|
||||
if (ls_tx_ctx_mgr_->get_ls_log_adapter()->has_dup_tablet()) {
|
||||
if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_COMMIT_INFO_LOG))) {
|
||||
TRANS_LOG(WARN, "submit commit info log failed", K(ret), KPC(this));
|
||||
// } else if (OB_FAIL(check_tablet_modify_record_())) {
|
||||
// TRANS_LOG(WARN, "check the modify tablet failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -6438,16 +6436,20 @@ int ObPartTransCtx::check_dup_trx_with_submitting_all_redo(ObTxLogBlock &log_blo
|
||||
// c. return OB_EAGAIN
|
||||
if (ls_tx_ctx_mgr_->get_ls_log_adapter()->has_dup_tablet()) {
|
||||
if (!sub_state_.is_info_log_submitted() && get_downstream_state() < ObTxState::REDO_COMPLETE) {
|
||||
|
||||
ret = submit_pending_log_block_(log_block, helper);
|
||||
|
||||
TRANS_LOG(INFO, "submit all redo log for dup table check", K(ret),
|
||||
K(exec_info_.tablet_modify_record_.count()), KPC(this));
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_tablet_modify_record_())) {
|
||||
TRANS_LOG(WARN, "check the modify tablet failed", K(ret));
|
||||
if (exec_info_.redo_lsns_.count() > 0 || exec_info_.prev_record_lsn_.is_valid()
|
||||
|| !helper.callbacks_.is_empty()) {
|
||||
set_dup_table_tx_();
|
||||
}
|
||||
|
||||
// ret = submit_pending_log_block_(log_block, helper);
|
||||
//
|
||||
// TRANS_LOG(INFO, "submit all redo log for dup table check", K(ret),
|
||||
// K(exec_info_.tablet_modify_record_.count()), KPC(this));
|
||||
// if (OB_FAIL(ret)) {
|
||||
// // do nothing
|
||||
// } else if (OB_FAIL(check_tablet_modify_record_())) {
|
||||
// TRANS_LOG(WARN, "check the modify tablet failed", K(ret));
|
||||
// }
|
||||
}
|
||||
|
||||
} else {
|
||||
@ -6485,65 +6487,65 @@ int ObPartTransCtx::dup_table_tx_pre_commit_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::merge_tablet_modify_record_(const common::ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// int ObPartTransCtx::merge_tablet_modify_record_(const common::ObTabletID &tablet_id)
|
||||
// {
|
||||
// int ret = OB_SUCCESS;
|
||||
//
|
||||
// if (exec_info_.tablet_modify_record_.count() >= MAX_TABLET_MODIFY_RECORD_COUNT) {
|
||||
// // do nothing
|
||||
// } else {
|
||||
// bool is_contain = false;
|
||||
// for (int i = 0; i < exec_info_.tablet_modify_record_.count(); i++) {
|
||||
// if (exec_info_.tablet_modify_record_[i] == tablet_id) {
|
||||
// is_contain = true;
|
||||
// }
|
||||
// }
|
||||
// if (!is_contain && OB_FAIL(exec_info_.tablet_modify_record_.push_back(tablet_id))) {
|
||||
// TRANS_LOG(WARN, "push back tablet id failed", K(ret), K(tablet_id),
|
||||
// K(exec_info_.tablet_modify_record_));
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ret;
|
||||
// }
|
||||
|
||||
if (exec_info_.tablet_modify_record_.count() >= MAX_TABLET_MODIFY_RECORD_COUNT) {
|
||||
// do nothing
|
||||
} else {
|
||||
bool is_contain = false;
|
||||
for (int i = 0; i < exec_info_.tablet_modify_record_.count(); i++) {
|
||||
if (exec_info_.tablet_modify_record_[i] == tablet_id) {
|
||||
is_contain = true;
|
||||
}
|
||||
}
|
||||
if (!is_contain && OB_FAIL(exec_info_.tablet_modify_record_.push_back(tablet_id))) {
|
||||
TRANS_LOG(WARN, "push back tablet id failed", K(ret), K(tablet_id),
|
||||
K(exec_info_.tablet_modify_record_));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::check_tablet_modify_record_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (!exec_info_.is_dup_tx_) {
|
||||
bool has_dup_tablet = false;
|
||||
if (!ls_tx_ctx_mgr_->get_ls_log_adapter()->has_dup_tablet()) {
|
||||
has_dup_tablet = false;
|
||||
TRANS_LOG(INFO, "no dup tablet in this ls", K(has_dup_tablet), K(trans_id_), K(ls_id_));
|
||||
} else if (exec_info_.tablet_modify_record_.count() >= MAX_TABLET_MODIFY_RECORD_COUNT) {
|
||||
has_dup_tablet = true;
|
||||
TRANS_LOG(INFO, "too much tablet, consider it as a dup trx", K(ret), K(has_dup_tablet),
|
||||
K(exec_info_.tablet_modify_record_), KPC(this));
|
||||
} else {
|
||||
has_dup_tablet = false;
|
||||
for (int i = 0; i < exec_info_.tablet_modify_record_.count(); i++) {
|
||||
|
||||
if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->check_dup_tablet_in_redo(
|
||||
exec_info_.tablet_modify_record_[i], has_dup_tablet, share::SCN::min_scn(),
|
||||
share::SCN::max_scn()))) {
|
||||
TRANS_LOG(WARN, "check dup tablet failed", K(ret), K(trans_id_), K(ls_id_),
|
||||
K(exec_info_.tablet_modify_record_[i]));
|
||||
} else if (has_dup_tablet) {
|
||||
TRANS_LOG(INFO, "modify a dup tablet, consider it as a dup trx", K(ret),
|
||||
K(has_dup_tablet), K(exec_info_.tablet_modify_record_[i]),
|
||||
K(exec_info_.tablet_modify_record_.count()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (has_dup_tablet) {
|
||||
set_dup_table_tx_();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
// int ObPartTransCtx::check_tablet_modify_record_()
|
||||
// {
|
||||
// int ret = OB_SUCCESS;
|
||||
//
|
||||
// if (!exec_info_.is_dup_tx_) {
|
||||
// bool has_dup_tablet = false;
|
||||
// if (!ls_tx_ctx_mgr_->get_ls_log_adapter()->has_dup_tablet()) {
|
||||
// has_dup_tablet = false;
|
||||
// TRANS_LOG(INFO, "no dup tablet in this ls", K(has_dup_tablet), K(trans_id_), K(ls_id_));
|
||||
// } else if (exec_info_.tablet_modify_record_.count() >= MAX_TABLET_MODIFY_RECORD_COUNT) {
|
||||
// has_dup_tablet = true;
|
||||
// TRANS_LOG(INFO, "too much tablet, consider it as a dup trx", K(ret), K(has_dup_tablet),
|
||||
// K(exec_info_.tablet_modify_record_), KPC(this));
|
||||
// } else {
|
||||
// has_dup_tablet = false;
|
||||
// for (int i = 0; i < exec_info_.tablet_modify_record_.count(); i++) {
|
||||
//
|
||||
// if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->check_dup_tablet_in_redo(
|
||||
// exec_info_.tablet_modify_record_[i], has_dup_tablet, share::SCN::min_scn(),
|
||||
// share::SCN::max_scn()))) {
|
||||
// TRANS_LOG(WARN, "check dup tablet failed", K(ret), K(trans_id_), K(ls_id_),
|
||||
// K(exec_info_.tablet_modify_record_[i]));
|
||||
// } else if (has_dup_tablet) {
|
||||
// TRANS_LOG(INFO, "modify a dup tablet, consider it as a dup trx", K(ret),
|
||||
// K(has_dup_tablet), K(exec_info_.tablet_modify_record_[i]),
|
||||
// K(exec_info_.tablet_modify_record_.count()), KPC(this));
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if (has_dup_tablet) {
|
||||
// set_dup_table_tx_();
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return ret;
|
||||
// }
|
||||
|
||||
int ObPartTransCtx::clear_dup_table_redo_sync_result_()
|
||||
{
|
||||
@ -6599,24 +6601,24 @@ int ObPartTransCtx::retry_dup_trx_before_prepare(const share::SCN &before_prepar
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::merge_tablet_modify_record(const common::ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
CtxLockGuard guard(lock_);
|
||||
|
||||
if (is_exiting_) {
|
||||
// ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
TRANS_LOG(WARN, "merge tablet modify record into a exiting part_ctx", K(ret), K(tablet_id),
|
||||
KPC(this));
|
||||
} else if (!is_follower_()) {
|
||||
ret = OB_NOT_FOLLOWER;
|
||||
TRANS_LOG(WARN, "can not invoke on leader", K(ret), K(tablet_id), KPC(this));
|
||||
} else if (OB_FAIL(merge_tablet_modify_record_(tablet_id))) {
|
||||
TRANS_LOG(WARN, "merge tablet modify record failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
// int ObPartTransCtx::merge_tablet_modify_record(const common::ObTabletID &tablet_id)
|
||||
// {
|
||||
// int ret = OB_SUCCESS;
|
||||
//
|
||||
// CtxLockGuard guard(lock_);
|
||||
//
|
||||
// if (is_exiting_) {
|
||||
// // ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
// TRANS_LOG(WARN, "merge tablet modify record into a exiting part_ctx", K(ret), K(tablet_id),
|
||||
// KPC(this));
|
||||
// } else if (!is_follower_()) {
|
||||
// ret = OB_NOT_FOLLOWER;
|
||||
// TRANS_LOG(WARN, "can not invoke on leader", K(ret), K(tablet_id), KPC(this));
|
||||
// } else if (OB_FAIL(merge_tablet_modify_record_(tablet_id))) {
|
||||
// TRANS_LOG(WARN, "merge tablet modify record failed", K(ret));
|
||||
// }
|
||||
// return ret;
|
||||
// }
|
||||
|
||||
int ObPartTransCtx::sub_prepare(const ObLSArray &parts,
|
||||
const MonotonicTs &commit_time,
|
||||
|
@ -294,7 +294,7 @@ public:
|
||||
// newly added for 4.0
|
||||
|
||||
int retry_dup_trx_before_prepare(const share::SCN &before_prepare_version);
|
||||
int merge_tablet_modify_record(const common::ObTabletID &tablet_id);
|
||||
// int merge_tablet_modify_record(const common::ObTabletID &tablet_id);
|
||||
int set_scheduler(const common::ObAddr &scheduler);
|
||||
const common::ObAddr &get_scheduler() const;
|
||||
int on_success(ObTxLogCb *log_cb);
|
||||
@ -475,8 +475,8 @@ private:
|
||||
memtable::ObRedoLogSubmitHelper &helper);
|
||||
bool is_dup_table_redo_sync_completed_();
|
||||
int dup_table_tx_pre_commit_();
|
||||
int merge_tablet_modify_record_(const common::ObTabletID &tablet_id);
|
||||
int check_tablet_modify_record_();
|
||||
// int merge_tablet_modify_record_(const common::ObTabletID &tablet_id);
|
||||
// int check_tablet_modify_record_();
|
||||
void set_dup_table_tx_()
|
||||
{
|
||||
exec_info_.is_dup_tx_ = true;
|
||||
|
@ -1368,7 +1368,7 @@ int ObTxLogBlock::update_next_log_pos_()
|
||||
if (OB_FAIL(serialization::decode(replay_buf_, len_, tmp_pos, version))) {
|
||||
TRANS_LOG(WARN, "deserialize UNIS_VERSION error", K(ret), K(*this), K(tmp_pos), K(version));
|
||||
} else if (OB_FAIL(serialization::decode(replay_buf_, len_, tmp_pos, body_size))) {
|
||||
TRANS_LOG(WARN, "deserialize body_size error", K(ret), K(*this), K(tmp_pos), K(version));
|
||||
TRANS_LOG(WARN, "deserialize body_size error", K(ret), K(*this), K(tmp_pos), K(body_size));
|
||||
} else if (tmp_pos + body_size > len_) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
TRANS_LOG(WARN, "has not enough space for deserializing tx_log_body", K(body_size),
|
||||
|
@ -557,8 +557,6 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo)
|
||||
}
|
||||
} else if (FALSE_IT(row_head = mmi_ptr_->get_row_head())) {
|
||||
// do nothing
|
||||
} else if (OB_NOT_NULL(ctx_) && OB_FAIL(ctx_->merge_tablet_modify_record(row_head.tablet_id_))) {
|
||||
TRANS_LOG(WARN, "record tablet_id in redo failed", K(ret), K(row_head));
|
||||
} else if (OB_FAIL(replay_one_row_in_memtable_(row_head, mmi_ptr_))) {
|
||||
if (OB_MINOR_FREEZE_NOT_ALLOW == ret) {
|
||||
if (TC_REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user