fix calculate unique index checksum retry

This commit is contained in:
Charles0429
2022-11-18 08:35:34 +00:00
committed by wangzelin.wzl
parent d0cb764457
commit 6c6eaf5dc6
3 changed files with 17 additions and 6 deletions

View File

@ -530,6 +530,7 @@ int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumReq
} else { } else {
// schedule unique checking task // schedule unique checking task
const uint64_t tenant_id = arg.tenant_id_; const uint64_t tenant_id = arg.tenant_id_;
int saved_ret = OB_SUCCESS;
MTL_SWITCH(tenant_id) { MTL_SWITCH(tenant_id) {
ObGlobalUniqueIndexCallback *callback = NULL; ObGlobalUniqueIndexCallback *callback = NULL;
ObTenantDagScheduler* dag_scheduler = nullptr; ObTenantDagScheduler* dag_scheduler = nullptr;
@ -565,13 +566,16 @@ int ObService::calc_column_checksum_request(const obrpc::ObCalcColumnChecksumReq
} else if (OB_TMP_FAIL(dag->alloc_unique_checking_prepare_task(callback))) { } else if (OB_TMP_FAIL(dag->alloc_unique_checking_prepare_task(callback))) {
STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(tmp_ret)); STORAGE_LOG(WARN, "fail to alloc unique checking prepare task", KR(tmp_ret));
} else if (OB_TMP_FAIL(dag_scheduler->add_dag(dag))) { } else if (OB_TMP_FAIL(dag_scheduler->add_dag(dag))) {
if (OB_EAGAIN != tmp_ret && OB_SIZE_OVERFLOW != tmp_ret) { saved_ret = tmp_ret;
STORAGE_LOG(WARN, "fail to add dag to queue", KR(tmp_ret)); if (OB_EAGAIN == tmp_ret) {
} else { tmp_ret = OB_SUCCESS;
} else if (OB_SIZE_OVERFLOW == tmp_ret) {
tmp_ret = OB_EAGAIN; tmp_ret = OB_EAGAIN;
} else {
STORAGE_LOG(WARN, "fail to add dag to queue", KR(tmp_ret));
} }
} }
if (OB_SUCCESS != tmp_ret && NULL != dag) { if (OB_SUCCESS != saved_ret && NULL != dag) {
dag_scheduler->free_dag(*dag); dag_scheduler->free_dag(*dag);
dag = NULL; dag = NULL;
} }

View File

@ -863,7 +863,7 @@ int ObIndexBuildTask::verify_checksum()
// send column checksum calculation request and wait finish, then verify column checksum // send column checksum calculation request and wait finish, then verify column checksum
if (OB_SUCC(ret) && !state_finished && check_unique_snapshot_ > 0) { if (OB_SUCC(ret) && !state_finished && check_unique_snapshot_ > 0) {
static int64_t checksum_wait_timeout = max(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT / 50, 3600L * 1000L * 1000L); static int64_t checksum_wait_timeout = 10 * 1000 * 1000L; // 10s
bool is_column_checksum_ready = false; bool is_column_checksum_ready = false;
bool dummy_equal = false; bool dummy_equal = false;
if (!wait_column_checksum_ctx_.is_inited() && OB_FAIL(wait_column_checksum_ctx_.init( if (!wait_column_checksum_ctx_.is_inited() && OB_FAIL(wait_column_checksum_ctx_.init(

View File

@ -960,7 +960,14 @@ int ObGlobalUniqueIndexCallback::operator()(const int ret_code)
arg.source_table_id_ = data_table_id_; arg.source_table_id_ = data_table_id_;
arg.schema_version_ = schema_version_; arg.schema_version_ = schema_version_;
arg.task_id_ = task_id_; arg.task_id_ = task_id_;
if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) { #ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = E(EventTable::EN_DDL_REPORT_REPLICA_BUILD_STATUS_FAIL) OB_SUCCESS;
LOG_INFO("report replica build status errsim", K(ret));
}
#endif
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
ret = OB_ERR_SYS; ret = OB_ERR_SYS;
STORAGE_LOG(WARN, "innner system error, rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX)); STORAGE_LOG(WARN, "innner system error, rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX));
} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) { } else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {