Implement the transfer query test framework

This commit is contained in:
WenJinyu
2023-08-29 06:10:52 +00:00
committed by ob-robot
parent 13326df663
commit 6dae488704
6 changed files with 240 additions and 2 deletions

View File

@ -1570,6 +1570,10 @@ DEF_BOOL(_enable_system_tenant_memory_limit, OB_CLUSTER_PARAMETER, "True",
"specifies whether allowed to limit the memory of tenant 500", "specifies whether allowed to limit the memory of tenant 500",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
#endif #endif
ERRSIM_DEF_TIME(errsim_transfer_backfill_error_time, OB_TENANT_PARAMETER, "0", "[0s,1h]",
"the duration of the error happened to transfer backfill. "
"Range: [0s, 1h] in duration",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_stall_threshold_for_dynamic_worker, OB_TENANT_PARAMETER, "3ms", "[0ms,)", DEF_TIME(_stall_threshold_for_dynamic_worker, OB_TENANT_PARAMETER, "3ms", "[0ms,)",
"threshold of dynamic worker works", "threshold of dynamic worker works",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -410,6 +410,7 @@ ERRSIM_POINT_DEF(EN_ERRSIM_ALLOW_TRANSFER_BACKFILL_TX);
int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam &param) int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam &param)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
set_errsim_backfill_point_();
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("transfer worker do not init", K(ret)); LOG_WARN("transfer worker do not init", K(ret));
@ -432,6 +433,12 @@ int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXPara
ret = OB_EAGAIN; ret = OB_EAGAIN;
LOG_WARN("errsim forbid execute transfer backfill", K(ret), K(addr)); LOG_WARN("errsim forbid execute transfer backfill", K(ret), K(addr));
} }
ObErrsimBackfillPointType point_type(ObErrsimBackfillPointType::TYPE::ERRSIM_START_BACKFILL_BEFORE);
if (OB_SUCC(ret) && errsim_point_info_.is_errsim_point(point_type)) {
ret = OB_EAGAIN;
LOG_WARN("[ERRSIM TRANSFER] errsim start transfer backfill error", K(ret), K(param));
}
#endif #endif
share::ObTenantDagScheduler *scheduler = nullptr; share::ObTenantDagScheduler *scheduler = nullptr;
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
@ -447,11 +454,45 @@ int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXPara
return ret; return ret;
} }
void ObTransferWorkerMgr::set_errsim_backfill_point_()
{
#ifdef ERRSIM
int ret = OB_SUCCESS;
int64_t point_time = 0;
int64_t current_time = common::ObTimeUtility::current_time();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
point_time = tenant_config->errsim_transfer_backfill_error_time;
}
if (0 == point_time) {
errsim_point_info_.reset();
} else if (errsim_point_info_.is_valid()
&& (current_time - errsim_point_info_.get_point_start_time()) < point_time) {
FLOG_INFO("wait clear errsim point", K(ret), K(errsim_point_info_), K(point_time), K(current_time));
// do nothing
} else {
errsim_point_info_.reset();
const ObErrsimBackfillPointType::TYPE point_type =
(ObErrsimBackfillPointType::TYPE)ObRandom::rand(ObErrsimBackfillPointType::TYPE::ERRSIM_POINT_NONE, ObErrsimBackfillPointType::TYPE::ERRSIM_MODULE_MAX);
ObErrsimBackfillPointType type(point_type);
if (OB_FAIL(errsim_point_info_.set_point_type(type))) {
LOG_WARN("failed to set point type", K(ret), K(type));
} else if (OB_FAIL(errsim_point_info_.set_point_start_time(current_time))) {
LOG_WARN("failed to set point start time", K(ret), K(current_time));
} else {
FLOG_INFO("succ to set point type", K(ret), K(errsim_point_info_));
}
}
#endif
}
/******************ObTransferBackfillTXCtx*********************/ /******************ObTransferBackfillTXCtx*********************/
ObTransferBackfillTXCtx::ObTransferBackfillTXCtx() ObTransferBackfillTXCtx::ObTransferBackfillTXCtx()
: ObIHADagNetCtx(), : ObIHADagNetCtx(),
tenant_id_(OB_INVALID_TENANT_ID), tenant_id_(OB_INVALID_TENANT_ID),
#ifdef ERRSIM
errsim_point_info_(),
#endif
task_id_(), task_id_(),
src_ls_id_(), src_ls_id_(),
dest_ls_id_(), dest_ls_id_(),
@ -514,6 +555,9 @@ void ObTransferBackfillTXCtx::reuse()
/******************ObTransferBackfillTXParam*********************/ /******************ObTransferBackfillTXParam*********************/
ObTransferBackfillTXParam::ObTransferBackfillTXParam() ObTransferBackfillTXParam::ObTransferBackfillTXParam()
: tenant_id_(OB_INVALID_TENANT_ID), : tenant_id_(OB_INVALID_TENANT_ID),
#ifdef ERRSIM
errsim_point_info_(),
#endif
task_id_(), task_id_(),
src_ls_id_(), src_ls_id_(),
dest_ls_id_(), dest_ls_id_(),
@ -540,6 +584,9 @@ void ObTransferBackfillTXParam::reset()
dest_ls_id_.reset(); dest_ls_id_.reset();
backfill_scn_.reset(); backfill_scn_.reset();
tablet_infos_.reset(); tablet_infos_.reset();
#ifdef ERRSIM
errsim_point_info_.reset();
#endif
} }
/******************ObTransferBackfillTXDagNet*********************/ /******************ObTransferBackfillTXDagNet*********************/
@ -574,6 +621,9 @@ int ObTransferBackfillTXDagNet::init_by_param(const ObIDagInitParam *param)
ctx_.src_ls_id_ = init_param->src_ls_id_; ctx_.src_ls_id_ = init_param->src_ls_id_;
ctx_.dest_ls_id_ = init_param->dest_ls_id_; ctx_.dest_ls_id_ = init_param->dest_ls_id_;
ctx_.backfill_scn_ = init_param->backfill_scn_; ctx_.backfill_scn_ = init_param->backfill_scn_;
#ifdef ERRSIM
ctx_.errsim_point_info_ = init_param->errsim_point_info_;
#endif
is_inited_ = true; is_inited_ = true;
} }
return ret; return ret;
@ -1622,8 +1672,8 @@ int ObTransferReplaceTableTask::transfer_replace_tables_(
param.rebuild_seq_ = ls->get_rebuild_seq(); param.rebuild_seq_ = ls->get_rebuild_seq();
param.is_transfer_replace_ = true; param.is_transfer_replace_ = true;
param.tablet_meta_ = &mig_param; param.tablet_meta_ = &mig_param;
#ifdef ERRSIM #ifdef ERRSIM
param.errsim_point_info_ = ctx_->errsim_point_info_;
SERVER_EVENT_SYNC_ADD("TRANSFER", "TRANSFER_REPLACE_TABLE_WITH_LOG_REPLAY_SKIP_CHECK", SERVER_EVENT_SYNC_ADD("TRANSFER", "TRANSFER_REPLACE_TABLE_WITH_LOG_REPLAY_SKIP_CHECK",
"dest_ls_id", ls->get_ls_id(), "dest_ls_id", ls->get_ls_id(),
"migration_status", migration_status, "migration_status", migration_status,
@ -1690,6 +1740,11 @@ int ObTransferReplaceTableTask::do_replace_logical_tables_(ObLS *ls)
LOG_WARN("failed to transfer replace tables", K(ret), K(tablet_info), KPC(ls), KPC(tablet), KPC(ctx_)); LOG_WARN("failed to transfer replace tables", K(ret), K(tablet_info), KPC(ls), KPC(tablet), KPC(ctx_));
} else { } else {
#ifdef ERRSIM #ifdef ERRSIM
ObErrsimBackfillPointType point_type(ObErrsimBackfillPointType::TYPE::ERRSIM_REPLACE_AFTER);
if (ctx_->errsim_point_info_.is_errsim_point(point_type)) {
ret = OB_EAGAIN;
LOG_WARN("[ERRSIM TRANSFER] errsim transfer replace after", K(ret), K(point_type));
}
SERVER_EVENT_ADD("TRANSFER", "REPLACE_LOGICAL_TABLE", SERVER_EVENT_ADD("TRANSFER", "REPLACE_LOGICAL_TABLE",
"task_id", ctx_->task_id_, "task_id", ctx_->task_id_,
"tenant_id", ctx_->tenant_id_, "tenant_id", ctx_->tenant_id_,
@ -1778,6 +1833,5 @@ int ObTransferReplaceTableTask::build_migration_param_(
return ret; return ret;
} }
} }
} }

View File

@ -31,6 +31,9 @@ public:
void reset(); void reset();
VIRTUAL_TO_STRING_KV(K_(task_id), K_(src_ls_id), K_(dest_ls_id), K_(backfill_scn), K_(tablet_infos)); VIRTUAL_TO_STRING_KV(K_(task_id), K_(src_ls_id), K_(dest_ls_id), K_(backfill_scn), K_(tablet_infos));
uint64_t tenant_id_; uint64_t tenant_id_;
#ifdef ERRSIM
ObErrsimTransferBackfillPoint errsim_point_info_;
#endif
share::ObTaskId task_id_; share::ObTaskId task_id_;
share::ObLSID src_ls_id_; share::ObLSID src_ls_id_;
share::ObLSID dest_ls_id_; share::ObLSID dest_ls_id_;
@ -60,8 +63,12 @@ private:
const ObTabletTransferInfo &transfer_info, const ObTabletTransferInfo &transfer_info,
bool &is_ready, bool &is_ready,
ObTabletHAStatus &ha_status /* source tablet ha status */) const; ObTabletHAStatus &ha_status /* source tablet ha status */) const;
void set_errsim_backfill_point_();
private: private:
bool is_inited_; bool is_inited_;
#ifdef ERRSIM
ObErrsimTransferBackfillPoint errsim_point_info_;
#endif
uint64_t tenant_id_; uint64_t tenant_id_;
share::ObTaskId task_id_; share::ObTaskId task_id_;
ObLS *dest_ls_; ObLS *dest_ls_;
@ -81,6 +88,9 @@ public:
virtual DagNetCtxType get_dag_net_ctx_type() { return ObIHADagNetCtx::TRANSFER_BACKFILL_TX; } virtual DagNetCtxType get_dag_net_ctx_type() { return ObIHADagNetCtx::TRANSFER_BACKFILL_TX; }
public: public:
uint64_t tenant_id_; uint64_t tenant_id_;
#ifdef ERRSIM
ObErrsimTransferBackfillPoint errsim_point_info_;
#endif
share::ObTaskId task_id_; share::ObTaskId task_id_;
share::ObLSID src_ls_id_; share::ObLSID src_ls_id_;
share::ObLSID dest_ls_id_; share::ObLSID dest_ls_id_;

View File

@ -24,6 +24,118 @@ using namespace oceanbase::share::schema;
using namespace oceanbase::share; using namespace oceanbase::share;
#ifdef ERRSIM
static const char *OB_ERRSIM_POINT_TYPES[] = {
"POINT_NONE",
"START_BACKFILL_BEFORE",
"REPLACE_SWAP_BEFORE",
"REPLACE_AFTER",
};
void ObErrsimBackfillPointType::reset()
{
type_ = ObErrsimBackfillPointType::ERRSIM_POINT_NONE;
}
bool ObErrsimBackfillPointType::is_valid() const
{
return true;
}
bool ObErrsimBackfillPointType::operator == (const ObErrsimBackfillPointType &other) const
{
bool is_same = true;
if (this == &other) {
// same
} else {
is_same = type_ == other.type_;
}
return is_same;
}
int64_t ObErrsimBackfillPointType::hash() const
{
int64_t hash_value = 0;
hash_value = common::murmurhash(
&type_, sizeof(type_), hash_value);
return hash_value;
}
int ObErrsimBackfillPointType::hash(uint64_t &hash_val) const
{
hash_val = hash();
return OB_SUCCESS;
}
OB_SERIALIZE_MEMBER(ObErrsimBackfillPointType, type_);
ObErrsimTransferBackfillPoint::ObErrsimTransferBackfillPoint()
: point_type_(ObErrsimBackfillPointType::ERRSIM_MODULE_MAX),
point_start_time_(0)
{
}
ObErrsimTransferBackfillPoint::~ObErrsimTransferBackfillPoint()
{
}
bool ObErrsimTransferBackfillPoint::is_valid() const
{
return point_type_.is_valid() && point_start_time_ > 0;
}
int ObErrsimTransferBackfillPoint::set_point_type(const ObErrsimBackfillPointType &point_type)
{
int ret = OB_SUCCESS;
if (!point_type.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("point type is invalid", K(ret), K(point_type));
} else if (is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The point type is in effect, reset is not allowed", K(ret), K(point_type_), K(point_type));
} else {
point_type_ = point_type;
}
return ret;
}
int ObErrsimTransferBackfillPoint::set_point_start_time(int64_t start_time)
{
int ret = OB_SUCCESS;
if (start_time < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("point type is invalid", K(ret), K(start_time));
} else if (is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The point type is in effect, reset is not allowed", K(ret), K(point_start_time_), K(start_time));
} else {
point_start_time_ = start_time;
}
return ret;
}
void ObErrsimTransferBackfillPoint::reset()
{
point_type_.type_ = ObErrsimBackfillPointType::ERRSIM_MODULE_MAX;
point_start_time_ = 0;
}
bool ObErrsimTransferBackfillPoint::is_errsim_point(const ObErrsimBackfillPointType &point_type) const
{
bool is_point = false;
if (!is_valid()) {
is_point = false;
} else if (point_type.type_ == point_type_.type_) {
is_point = true;
} else {
is_point = false;
}
return is_point;
}
#endif
OB_SERIALIZE_MEMBER(ObTabletReportStatus, OB_SERIALIZE_MEMBER(ObTabletReportStatus,
merge_snapshot_version_, merge_snapshot_version_,
cur_report_version_, cur_report_version_,
@ -330,6 +442,9 @@ bool ObUpdateTableStoreParam::is_valid() const
ObBatchUpdateTableStoreParam::ObBatchUpdateTableStoreParam() ObBatchUpdateTableStoreParam::ObBatchUpdateTableStoreParam()
: tables_handle_(), : tables_handle_(),
#ifdef ERRSIM
errsim_point_info_(),
#endif
rebuild_seq_(OB_INVALID_VERSION), rebuild_seq_(OB_INVALID_VERSION),
update_logical_minor_sstable_(false), update_logical_minor_sstable_(false),
is_transfer_replace_(false), is_transfer_replace_(false),
@ -377,6 +492,9 @@ int ObBatchUpdateTableStoreParam::assign(
tablet_meta_ = param.tablet_meta_; tablet_meta_ = param.tablet_meta_;
update_ddl_sstable_ = param.update_ddl_sstable_; update_ddl_sstable_ = param.update_ddl_sstable_;
restore_status_ = param.restore_status_; restore_status_ = param.restore_status_;
#ifdef ERRSIM
errsim_point_info_ = param.errsim_point_info_;
#endif
} }
return ret; return ret;
} }

View File

@ -58,6 +58,48 @@ static const int64_t MERGE_READ_SNAPSHOT_VERSION = share::OB_MAX_SCN_TS_NS - 2;
static const int64_t GET_BATCH_ROWS_READ_SNAPSHOT_VERSION = share::OB_MAX_SCN_TS_NS - 8; static const int64_t GET_BATCH_ROWS_READ_SNAPSHOT_VERSION = share::OB_MAX_SCN_TS_NS - 8;
// static const int64_t GET_SCAN_COST_READ_SNAPSHOT_VERSION = INT64_MAX - 9; // static const int64_t GET_SCAN_COST_READ_SNAPSHOT_VERSION = INT64_MAX - 9;
#ifdef ERRSIM
struct ObErrsimBackfillPointType final
{
OB_UNIS_VERSION(1);
public:
enum TYPE
{
ERRSIM_POINT_NONE = 0,
ERRSIM_START_BACKFILL_BEFORE = 1,
ERRSIM_REPLACE_SWAP_BEFORE = 2,
ERRSIM_REPLACE_AFTER = 3,
ERRSIM_MODULE_MAX
};
ObErrsimBackfillPointType() : type_(ERRSIM_POINT_NONE) {}
explicit ObErrsimBackfillPointType(const ObErrsimBackfillPointType::TYPE &type) : type_(type) {}
~ObErrsimBackfillPointType() = default;
void reset();
bool is_valid() const;
bool operator == (const ObErrsimBackfillPointType &other) const;
int hash(uint64_t &hash_val) const;
int64_t hash() const;
TO_STRING_KV(K_(type));
TYPE type_;
};
class ObErrsimTransferBackfillPoint final
{
public:
ObErrsimTransferBackfillPoint();
virtual ~ObErrsimTransferBackfillPoint();
bool is_valid() const;
void reset();
int set_point_type(const ObErrsimBackfillPointType &point_type);
int set_point_start_time(int64_t start_time);
bool is_errsim_point(const ObErrsimBackfillPointType &point_type) const;
int64_t get_point_start_time() { return point_start_time_; }
TO_STRING_KV(K_(point_type), K_(point_start_time));
private:
ObErrsimBackfillPointType point_type_;
int64_t point_start_time_;
};
#endif
enum ObMigrateStatus enum ObMigrateStatus
{ {
@ -369,6 +411,9 @@ struct ObBatchUpdateTableStoreParam final
K_(start_scn), KP_(tablet_meta), K_(update_ddl_sstable), K_(restore_status)); K_(start_scn), KP_(tablet_meta), K_(update_ddl_sstable), K_(restore_status));
ObTablesHandleArray tables_handle_; ObTablesHandleArray tables_handle_;
#ifdef ERRSIM
ObErrsimTransferBackfillPoint errsim_point_info_;
#endif
int64_t rebuild_seq_; int64_t rebuild_seq_;
bool update_logical_minor_sstable_; bool update_logical_minor_sstable_;
bool is_transfer_replace_; bool is_transfer_replace_;

View File

@ -708,6 +708,13 @@ int ObTablet::init(
ObTablet::free_storage_schema(tmp_arena_allocator, old_storage_schema); ObTablet::free_storage_schema(tmp_arena_allocator, old_storage_schema);
#ifdef ERRSIM
ObErrsimBackfillPointType point_type(ObErrsimBackfillPointType::TYPE::ERRSIM_REPLACE_SWAP_BEFORE);
if (param.errsim_point_info_.is_errsim_point(point_type)) {
ret = OB_EAGAIN;
LOG_WARN("[ERRSIM TRANSFER] errsim transfer swap tablet before", K(ret), K(param));
}
#endif
return ret; return ret;
} }