Supplementary cancellation of ObTransferBackfillTXDagNet task during log stream GC

This commit is contained in:
WenJinyu
2023-07-03 02:53:55 +00:00
committed by ob-robot
parent b4a247aab3
commit 356df53066
6 changed files with 112 additions and 5 deletions

View File

@ -54,9 +54,6 @@ int ObTransferWorkerMgr::init(ObLS *dest_ls)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ls is nullptr", K(ret));
} else {
share::ObTaskId task_id;
task_id.init(GCONF.self_addr_);
task_id_ = task_id;
tenant_id_ = MTL_ID();
dest_ls_ = dest_ls;
is_inited_ = true;
@ -64,9 +61,14 @@ int ObTransferWorkerMgr::init(ObLS *dest_ls)
return ret;
}
void ObTransferWorkerMgr::update_task_id_()
void ObTransferWorkerMgr::reset_task_id()
{
task_id_.reset();
}
void ObTransferWorkerMgr::update_task_id_()
{
reset_task_id();
task_id_.init(GCONF.self_addr_);
}
int ObTransferWorkerMgr::get_need_backfill_tx_tablets_(ObTransferBackfillTXParam &param)
@ -295,7 +297,7 @@ int ObTransferWorkerMgr::process()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("transfer work not init", K(ret));
} else if (OB_FAIL(check_task_exist_(task_id_, is_exist))) {
} else if (task_id_.is_valid() && OB_FAIL(check_task_exist_(task_id_, is_exist))) {
LOG_WARN("failed to check task exist", K(ret), "ls_id", dest_ls_->get_ls_id(), K(*this));
} else if (is_exist) {
// only one transfer backfill tx task is allowed to execute at a time
@ -346,6 +348,42 @@ int ObTransferWorkerMgr::check_task_exist_(
return ret;
}
int ObTransferWorkerMgr::cancel_dag_net()
{
int ret = OB_SUCCESS;
ObTenantDagScheduler *scheduler = nullptr;
bool is_exist = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("transfer worker do not init", K(ret));
} else if (task_id_.is_invalid()) {
// do nothing
} else if (OB_FAIL(check_task_exist_(task_id_, is_exist))) {
LOG_WARN("fail to check task exist", K(ret), K_(task_id));
} else if (is_exist) {
if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("failed to get ObTenantDagScheduler from MTL", K(ret), KPC(this));
} else if (OB_FAIL(scheduler->cancel_dag_net(task_id_))) {
LOG_WARN("failed to cancel dag net", K(ret), K(this));
}
if (OB_FAIL(ret)) {
} else {
int64_t start_ts = ObTimeUtil::current_time();
do {
if (OB_FAIL(check_task_exist_(task_id_, is_exist))) {
LOG_WARN("fail to check task exist", K(ret), K_(task_id));
} else if (is_exist && REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
ret = OB_EAGAIN;
LOG_WARN("cancel dag task cost too much time", K(ret), K_(task_id),
"cost_time", ObTimeUtil::current_time() - start_ts);
}
} while (is_exist && OB_SUCC(ret));
}
}
return ret;
}
ERRSIM_POINT_DEF(EN_ERRSIM_ALLOW_TRANSFER_BACKFILL_TX);
int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam &param)
@ -692,6 +730,21 @@ int ObTransferBackfillTXDagNet::clear_dag_net_ctx()
return ret;
}
int ObTransferBackfillTXDagNet::deal_with_cancel()
{
int ret = OB_SUCCESS;
const int32_t result = OB_CANCELED;
const bool need_retry = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer backfill tx dag net do not init", K(ret));
} else if (OB_FAIL(ctx_.set_result(result, need_retry))) {
LOG_WARN("failed to set result", K(ret), KPC(this));
}
return ret;
}
/******************ObBaseTransferBackfillTXDag*********************/
ObBaseTransferBackfillTXDag::ObBaseTransferBackfillTXDag(const share::ObDagType::ObDagTypeEnum &dag_type)
: ObStorageHADag(dag_type)

View File

@ -45,6 +45,8 @@ public:
~ObTransferWorkerMgr();
int init(ObLS *dest_ls);
int process();
int cancel_dag_net();
void reset_task_id();
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(task_id), KP_(dest_ls));
private:
int check_task_exist_(const share::ObTaskId &task_id, bool &is_exist);
@ -108,6 +110,7 @@ public:
virtual int fill_comment(char *buf, const int64_t buf_len) const override;
virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override;
virtual int clear_dag_net_ctx();
virtual int deal_with_cancel() override;
ObTransferBackfillTXCtx *get_ctx() { return &ctx_; }
const share::ObLSID &get_ls_id() const { return ctx_.src_ls_id_; }

View File

@ -1732,6 +1732,44 @@ int ObTransferHandler::record_server_event_(const int32_t result, const int64_t
return ret;
}
int ObTransferHandler::safe_to_destroy(bool &is_safe)
{
int ret = OB_SUCCESS;
is_safe = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls transfer handler do not init", K(ret));
} else {
if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) {
LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_));
} else {
is_safe = true;
}
}
return ret;
}
int ObTransferHandler::offline()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls transfer handler do not init", K(ret));
} else {
int retry_cnt = 0;
do {
if (OB_FAIL(transfer_worker_mgr_.cancel_dag_net())) {
LOG_WARN("failed to cancel dag net", K(ret), KPC(ls_));
}
} while (retry_cnt ++ < 3/*max retry cnt*/ && OB_EAGAIN == ret);
}
return ret;
}
void ObTransferHandler::online()
{
transfer_worker_mgr_.reset_task_id();
}
}
}

View File

@ -65,6 +65,9 @@ public:
const share::SCN &scn) override final;
virtual share::SCN get_rec_scn() override final { return share::SCN::max_scn(); }
virtual int flush(share::SCN &scn) override final;
int safe_to_destroy(bool &is_safe);
int offline();
void online();
private:
int get_transfer_task_(share::ObTransferTaskInfo &task_info);

View File

@ -170,6 +170,8 @@ int ObTransferService::get_ls_id_array_()
} else if (OB_ISNULL(ls)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls));
} else if (!(ls->get_log_handler()->is_replay_enabled())) {
LOG_INFO("log handler not enable replay, should not schduler transfer hander", "ls_id", ls->get_ls_id());
} else if (OB_FAIL(ls_id_array_.push_back(ls->get_ls_id()))) {
LOG_WARN("failed to push ls id into array", K(ret), KPC(ls));
}

View File

@ -642,6 +642,7 @@ bool ObLS::safe_to_destroy()
bool is_data_check_point_safe = false;
bool is_dup_table_handler_safe = false;
bool is_log_handler_safe = false;
bool is_transfer_handler_safe = false;
if (OB_FAIL(ls_tablet_svr_.safe_to_destroy(is_tablet_service_safe))) {
LOG_WARN("ls tablet service check safe to destroy failed", K(ret), KPC(this));
@ -658,6 +659,9 @@ bool ObLS::safe_to_destroy()
} else if (OB_FAIL(log_handler_.safe_to_destroy(is_log_handler_safe))) {
LOG_WARN("log_handler_ check safe to destroy failed", K(ret), KPC(this));
} else if (!is_log_handler_safe) {
} else if (OB_FAIL(transfer_handler_.safe_to_destroy(is_transfer_handler_safe))) {
LOG_WARN("transfer_handler_ check safe to destroy failed", K(ret), KPC(this));
} else if (!is_transfer_handler_safe) {
} else {
if (1 == ref_mgr_.get_total_ref_cnt()) { // only has one ref at the safe destroy task
is_safe = true;
@ -672,6 +676,7 @@ bool ObLS::safe_to_destroy()
K(is_tablet_service_safe), K(is_data_check_point_safe),
K(is_dup_table_handler_safe),
K(is_ls_restore_handler_safe), K(is_log_handler_safe),
K(is_transfer_handler_safe),
"ls_ref", ref_mgr_.get_total_ref_cnt(),
K(ret), KP(this), KPC(this));
ref_mgr_.print();
@ -876,6 +881,8 @@ int ObLS::offline_()
LOG_WARN("checkpoint executor offline failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_restore_handler_.offline())) {
LOG_WARN("failed to offline ls restore handler", K(ret));
} else if (OB_FAIL(transfer_handler_.offline())) {
LOG_WARN("transfer_handler failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(log_handler_.offline())) {
LOG_WARN("failed to offline log", K(ret));
// TODO: delete it if apply sequence
@ -1006,6 +1013,7 @@ int ObLS::online()
LOG_WARN("weak read handler online failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(online_compaction_())) {
LOG_WARN("compaction online failed", K(ret), K(ls_meta_));
} else if (FALSE_IT(transfer_handler_.online())) {
} else if (OB_FAIL(ls_restore_handler_.online())) {
LOG_WARN("ls restore handler online failed", K(ret));
} else if (FALSE_IT(checkpoint_executor_.online())) {