diff --git a/src/observer/table/ttl/ob_table_ttl_task.cpp b/src/observer/table/ttl/ob_table_ttl_task.cpp index f4a486ecd0..cae509df3b 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.cpp +++ b/src/observer/table/ttl/ob_table_ttl_task.cpp @@ -216,18 +216,20 @@ int ObTableTTLDeleteTask::process_one() ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret; } - info_.max_version_del_cnt_ += result.get_max_version_del_row(); - info_.ttl_del_cnt_ += result.get_ttl_del_row(); info_.scan_cnt_ += result.get_scan_row(); info_.err_code_ = ret; info_.row_key_ = result.get_end_rowkey(); - if (OB_SUCC(ret) - && result.get_del_row() < PER_TASK_DEL_ROWS - && result.get_end_ts() > ObTimeUtility::current_time()) { - ret = OB_ITER_END; // finsh task - info_.err_code_ = ret; - LOG_DEBUG("finish delete", KR(ret), K_(info)); + if (OB_SUCC(ret)) { + info_.max_version_del_cnt_ += result.get_max_version_del_row(); + info_.ttl_del_cnt_ += result.get_ttl_del_row(); + if (result.get_del_row() < PER_TASK_DEL_ROWS + && result.get_end_ts() > ObTimeUtility::current_time()) { + ret = OB_ITER_END; // finsh task + info_.err_code_ = ret; + LOG_DEBUG("finish delete", KR(ret), K_(info)); + } } + int64_t cost = ObTimeUtil::current_time() - start_time; LOG_DEBUG("finish process one", KR(ret), K(cost)); return ret; diff --git a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp index 961c14b0a5..d2cbc9efa4 100644 --- a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp +++ b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp @@ -126,6 +126,7 @@ void ObTenantTabletTTLMgr::inner_switch_to_follower() FLOG_INFO("tenant_tablet_ttl_mgr: begin to switch_to_follower", K_(tenant_id), KPC_(ls)); const int64_t start_time_us = ObTimeUtility::current_time(); pause(); + ATOMIC_STORE(&need_reuse_for_switch_, true); const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_follower", K_(tenant_id), KPC_(ls), K(cost_us)); } @@ -214,7 +215,7 @@ int ObTenantTabletTTLMgr::check_and_handle_event() common::ObSpinLockGuard guard(lock_); // after observer restart, need check tenant even when cancel and move state is_dirty = local_tenant_task_.is_dirty_; - is_finished = local_tenant_task_.is_finished_; + is_finished = local_tenant_task_.state_ == OB_TTL_TASK_FINISH; need_check = !is_finished && local_tenant_task_.need_check_; } @@ -247,109 +248,54 @@ void ObTenantTabletTTLMgr::check_ttl_tenant_state() bool tenant_finish = true; ObTTLTaskCtx* ctx = nullptr; - for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin(); - !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) { - ctx = iter->second; - if (OB_ISNULL(ctx)) { - LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_)); - } else if (ctx->is_dirty_) { - tenant_dirty = true; - } else if (ctx->task_status_ != OB_TTL_TASK_CANCEL && - ctx->task_status_ != OB_TTL_TASK_FINISH) { - tenant_finish = false; - local_tenant_task_.is_finished_ = false; - } - } - if (OB_SUCC(ret) && !tenant_dirty) { - local_tenant_task_.is_dirty_ = false; - if (tenant_finish) { - // all task already in cancel or runing status - if (local_tenant_task_.state_ == OB_TTL_TASK_CANCEL || local_tenant_task_.state_ == OB_TTL_TASK_RUNNING) { - local_tenant_task_.reuse(); - FLOG_INFO("local ls ttl task is finished", K_(local_tenant_task), KPC_(ls)); - } else { + if (local_tenant_task_.need_check_) { + // when local tenant task need check(maybe schema changed), this task cannot finish + // we should end this process(don't execute else) and check tenant task first + } else { + for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin(); + OB_SUCC(ret) && !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) { + ctx = iter->second; + if (OB_ISNULL(ctx)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected tenant ttl state", KR(ret), K(local_tenant_task_.state_)); + LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_)); + } else { + tenant_dirty = tenant_dirty ? tenant_dirty : ctx->is_dirty_; + tenant_finish = tenant_finish ? (ctx->task_status_ != OB_TTL_TASK_CANCEL && ctx->task_status_ != OB_TTL_TASK_FINISH) : tenant_finish; + } + } + if (OB_SUCC(ret) && !tenant_dirty) { + local_tenant_task_.is_dirty_ = false; + if (tenant_finish) { + local_tenant_task_.state_ = OB_TTL_TASK_FINISH; + FLOG_INFO("local ls ttl task is finished", K_(local_tenant_task), KPC_(ls)); } } } - LOG_DEBUG("check ttl tenant dirty", K(local_tenant_task_.is_dirty_), K(local_tenant_task_.state_), KR(ret), K_(tenant_id)); } -int ObTenantTabletTTLMgr::check_cmd_state_valid(const common::ObTTLTaskStatus current_state, - const common::ObTTLTaskStatus incoming_state) -{ - int ret = OB_SUCCESS; - switch (incoming_state) { - case OB_TTL_TASK_RUNNING: { - if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_INVALID && - current_state != OB_TTL_TASK_RUNNING) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive rs cmd, but current tenant state is unmatached", - KR(ret), K(current_state), K(incoming_state)); - } - break; - } - case OB_TTL_TASK_MOVING: { - if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_CANCEL && - current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_MOVING) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive a move cmd, current task state is unmatached", K(current_state)); - } - break; - } - case OB_TTL_TASK_PENDING: { - if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_INVALID && - current_state != OB_TTL_TASK_PENDING) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive rs cmd, but current tenant state is unmatached", - KR(ret), K(current_state), K(incoming_state)); - } - break; - } - case OB_TTL_TASK_CANCEL: { - if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_RUNNING && - current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_CANCEL) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive rs cmd, but current tenant state is unmatached", - KR(ret), K(current_state), K(incoming_state)); - } - break; - } - default: { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid incoming status", KR(ret), K(incoming_state)); - break; - } - } - return ret; -} - void ObTenantTabletTTLMgr::mark_tenant_need_check() { int ret = OB_SUCCESS; if (common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) { common::ObSpinLockGuard guard(lock_); - local_tenant_task_.need_check_ = true; + if (local_tenant_task_.task_id_ != OB_INVALID_ID) { + local_tenant_task_.need_check_ = true; + if (local_tenant_task_.state_ == OB_TTL_TASK_FINISH) { + // this local tenant task(outdated) should not be finished when we mark tenant need check, + // instead, we should check and decide whether it should be finished in the future + local_tenant_task_.state_ = OB_TTL_TASK_RUNNING; + } + FLOG_INFO("finish mark tenant need check", K(local_tenant_task_)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unpected task id is found", KR(ret), K(local_tenant_task_)); + } } LOG_DEBUG("finsh mark tenant need check", KR(ret)); } -void ObTenantTabletTTLMgr::on_schema_changed(uint64_t schema_changed_tenant_id) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ttl manager not init"); - } else if (!common::ObTTLUtil::check_can_process_tenant_tasks(schema_changed_tenant_id)) { - //do nothing - } else { - mark_tenant_need_check(); - } -} - int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTaskParam& task_para, bool& is_stop, bool need_copy_task/* true*/) { @@ -390,6 +336,7 @@ int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTask } else if (OB_ITER_END == task_info.err_code_) { ctx->task_status_ = OB_TTL_TASK_FINISH; ctx->task_info_.err_code_ = OB_SUCCESS; + FLOG_INFO("task execute finished", KR(ret)); } else if (OB_NOT_MASTER == task_info.err_code_ || OB_PARTITION_NOT_EXIST == task_info.err_code_ || OB_TABLE_NOT_EXIST == task_info.err_code_ || @@ -618,7 +565,7 @@ int ObTenantTabletTTLMgr::alloc_tenant_info(uint64_t tenant_id) int ret = OB_SUCCESS; local_tenant_task_.tenant_id_ = tenant_id; local_tenant_task_.ttl_continue_ = false; - local_tenant_task_.is_dirty_ = true; + local_tenant_task_.is_dirty_ = false; ObMemAttr bucket_attr(tenant_id, "TTLTaskBucket"); ObMemAttr node_attr(tenant_id, "TTLTaskNode"); if(OB_FAIL(local_tenant_task_.tablet_task_map_.create(DEFAULT_TTL_BUCKET_NUM, bucket_attr, node_attr))) { @@ -894,18 +841,73 @@ int ObTenantTabletTTLMgr::try_schedule_prepare_task(ObTabletID& tablet_id) int ObTenantTabletTTLMgr::sync_all_dirty_task(ObIArray& dirty_tasks) { int ret = OB_SUCCESS; + bool tenant_state_changed = false; ObTimeGuard guard("ObTenantTabletTTLMgr::sync_all_dirty_record", TTL_NORMAL_TIME_THRESHOLD); - for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count(); i++) { - if (OB_FAIL(sync_sys_table(dirty_tasks.at(i)))) { + for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count() && !tenant_state_changed; i++) { + // tenant_state_changed is true means that tenant status is changed, we should refresh our status first + if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) { LOG_WARN("fail to sync sys table", KR(ret)); } } return ret; } -int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id) +int ObTenantTabletTTLMgr::sync_sys_table_op(ObTTLTaskCtx* ctx, + bool force_update, + bool &tenant_state_changed) { int ret = OB_SUCCESS; + ObMySQLTransaction trans; + common::ObTTLStatus ttl_record; + bool commit = false; + int tmp_ret = OB_SUCCESS; + bool is_exists = false; + bool is_end_state = false; + if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) { + LOG_WARN("fail to start transation", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ObTTLUtil::check_task_status_from_sys_table(tenant_id_, trans, ctx->task_info_.task_id_, + ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists, is_end_state))) { + LOG_WARN("fail to check ttl task exist"); + } else if (is_end_state) { + // record in system table is end state, do nothing + FLOG_INFO("Finished / Canceled in sys table, could not sync sys table", K(local_tenant_task_)); + } else if (!is_exists) { + if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { + LOG_WARN("fail to construct sys table record", KR(ret)); + } else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME, + trans, ttl_record))) { + LOG_WARN("fail to insert ttl task", KR(ret)); + } + } else if (force_update) { + if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { + LOG_WARN("fail to construct sys table record", KR(ret)); + } else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id_, + share::OB_ALL_KV_TTL_TASK_TNAME, + trans, ttl_record))) { + LOG_WARN("fail to update ttl task in sys table", KR(ret), K(ttl_record)); + } + } + + // check tenant ttl status + if (OB_SUCC(ret) && OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_, tenant_state_changed))) { + FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_)); + } + + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("faile to end trans", "commit", commit, KR(ret)); + } + ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; + } + return ret; +} + +int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id, bool &tenant_state_changed) +{ + int ret = OB_SUCCESS; + tenant_state_changed = false; ObArenaAllocator allocator(lib::ObLabel("TTLStatusRecord")); ObTTLTaskCtx* ctx = nullptr; { @@ -961,43 +963,12 @@ int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id) } if (OB_SUCC(ret)) { - common::ObTTLStatus ttl_record; switch (ctx->task_status_) { case OB_TTL_TASK_PREPARE: { - ObMySQLTransaction trans; - ObTTLStatusFieldArray filters; - common::ObTTLStatusArray ttl_records; - ObTTLStatusFieldArray filter; - bool commit = false; - int tmp_ret = OB_SUCCESS; - bool is_exists = false; - if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) { - LOG_WARN("fail to start transation", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_))) { - FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_)); - } else if (OB_FAIL(ObTTLUtil::check_ttl_task_exists(tenant_id_, trans, ctx->task_info_.task_id_, - ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists))) { - LOG_WARN("fail to check ttl task exist"); - } else if (!is_exists) { - if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { - LOG_WARN("fail to construct sys table record", KR(ret)); - } else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME, - trans, ttl_record))) { - LOG_WARN("fail to insert ttl task", KR(ret)); - } - } - - if (trans.is_started()) { - bool commit = (OB_SUCCESS == ret); - int tmp_ret = ret; - if (OB_FAIL(trans.end(commit))) { - LOG_WARN("faile to end trans", "commit", commit, KR(ret)); - } - ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; - } - - // change prepare state to running/pending - if (OB_SUCC(ret) && OB_FAIL(try_schedule_prepare_task(tablet_id))) { + if (OB_FAIL(sync_sys_table_op(ctx, false, tenant_state_changed))) { + LOG_WARN("fail to sync ttl record into sys table", KR(ret)); + } else if (OB_FAIL(try_schedule_prepare_task(tablet_id))) { + // change prepare state to running/pending LOG_WARN("fail to schedule prepare task", KR(ret)); } break; @@ -1006,44 +977,8 @@ int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id) case OB_TTL_TASK_RUNNING: case OB_TTL_TASK_PENDING: case OB_TTL_TASK_CANCEL: { - ObMySQLTransaction trans; - ObTTLStatusFieldArray filters; - common::ObTTLStatusArray ttl_records; - ObTTLStatusFieldArray filter; - bool commit = false; - int tmp_ret = OB_SUCCESS; - bool is_exists = false; - if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) { - LOG_WARN("fail to start transation", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_))) { - FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_)); - } else if (OB_FAIL(ObTTLUtil::check_ttl_task_exists(tenant_id_, trans, ctx->task_info_.task_id_, - ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists))) { - LOG_WARN("fail to check ttl task exist"); - } else if (!is_exists) { - if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { - LOG_WARN("fail to construct sys table record", KR(ret)); - } else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME, - trans, ttl_record))) { - LOG_WARN("fail to insert ttl task", KR(ret)); - } - } else { - if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { - LOG_WARN("fail to construct sys table record", KR(ret)); - } else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id_, - share::OB_ALL_KV_TTL_TASK_TNAME, - trans, ttl_record))) { - LOG_WARN("fail to update ttl task in sys table", KR(ret), K(ttl_record)); - } - } - - if (trans.is_started()) { - bool commit = (OB_SUCCESS == ret); - int tmp_ret = ret; - if (OB_FAIL(trans.end(commit))) { - LOG_WARN("faile to end trans", "commit", commit, KR(ret)); - } - ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; + if (OB_FAIL(sync_sys_table_op(ctx, true, tenant_state_changed))) { + LOG_WARN("fail to sync ttl record into sys table", KR(ret)); } break; } @@ -1301,60 +1236,50 @@ int ObTenantTabletTTLMgr::refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refre // 2. check the status and change local tenant info int ObTenantTabletTTLMgr::reload_tenant_task() { + common::ObSpinLockGuard guard(lock_); + if (ATOMIC_BCAS(&need_reuse_for_switch_, true, false)) { + local_tenant_task_.reuse(); + FLOG_INFO("resue local tenant task cuz of switch to follower"); + } int ret = OB_SUCCESS; ObTTLStatus tenant_task; - common::ObSpinLockGuard guard(lock_); ObTTLTaskStatus expected_state; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); } else if (is_paused_) { - // do nothing, not leader - if (!local_tenant_task_.is_finished_) { - local_tenant_task_.reuse(); - } + // do nothing } else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) { if (OB_ITER_END == ret) { ret = OB_SUCCESS; - // tenant task may finish before the tablet task - if (!local_tenant_task_.is_finished_) { - local_tenant_task_.reuse(); - } + local_tenant_task_.reuse(); } else { LOG_WARN("fail to read tenant ttl task", KR(ret), K_(tenant_id)); } - } else if (!local_tenant_task_.is_finished_ && local_tenant_task_.task_id_ != tenant_task.task_id_) { - FLOG_INFO("tenant task is finished, but local tenant task is not, maybe schema changed", - KR(ret), K_(local_tenant_task), K(tenant_task.task_id_)); - local_tenant_task_.reuse(); - } else if (OB_RS_TTL_TASK_MOVE == static_cast(tenant_task.status_)) { - FLOG_INFO("tenant task is moving now, tablet ttl task should not continue", - KR(ret), K_(local_tenant_task), K(tenant_task.task_id_)); + } else if (OB_RS_TTL_TASK_MOVE == static_cast(tenant_task.status_) || + OB_RS_TTL_TASK_CANCEL == static_cast(tenant_task.status_)) { local_tenant_task_.reuse(); + FLOG_INFO("tenant task is finish now, reuse local tenant task", KR(ret), K_(local_tenant_task), K(tenant_task.task_id_)); } else if (OB_FAIL(ObTTLUtil::transform_tenant_state(static_cast(tenant_task.status_), expected_state))) { LOG_WARN("fail to transform ttl tenant task status", KR(ret), K(tenant_task.status_)); - } else if (OB_FAIL(check_cmd_state_valid(local_tenant_task_.state_, expected_state))) { - LOG_WARN("ttl cmd state machine is wrong", KR(ret), K_(tenant_id), K(tenant_task), K(expected_state)); - } else { - if (local_tenant_task_.is_finished_ && local_tenant_task_.task_id_ != tenant_task.task_id_) { - // new ttl request - local_tenant_task_.task_id_ = tenant_task.task_id_; - local_tenant_task_.is_usr_trigger_ = (tenant_task.trigger_type_ == USER_TRIGGER); - local_tenant_task_.state_ = expected_state; - local_tenant_task_.need_check_ = true; - local_tenant_task_.is_dirty_ = true; - local_tenant_task_.is_finished_ = false; - FLOG_INFO("new ttl task", KR(ret), K_(tenant_id), K_(local_tenant_task)); - } else if (local_tenant_task_.task_id_ != tenant_task.task_id_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("task id is mismatch", KR(ret), K(local_tenant_task_.task_id_), K(tenant_task.task_id_)); - } else if (!local_tenant_task_.is_finished_ && local_tenant_task_.state_ != expected_state) { - FLOG_INFO("old ttl task changed", KR(ret), K_(tenant_id), K_(local_tenant_task), K(tenant_task)); - // current tenant task status changed - local_tenant_task_.state_ = expected_state; - local_tenant_task_.is_dirty_ = true; - } - } + } else if (local_tenant_task_.task_id_ != tenant_task.task_id_) { + local_tenant_task_.reuse(); + local_tenant_task_.task_id_ = tenant_task.task_id_; + local_tenant_task_.is_usr_trigger_ = (tenant_task.trigger_type_ == USER_TRIGGER); + local_tenant_task_.state_ = expected_state; + local_tenant_task_.need_check_ = true; + local_tenant_task_.is_dirty_ = true; + local_tenant_task_.is_reused_ = false; + FLOG_INFO("new ttl task", KR(ret), K_(tenant_id), K_(local_tenant_task)); + } else if (OB_TTL_TASK_FINISH == static_cast(local_tenant_task_.state_)) { + // do nothing + } else if (local_tenant_task_.state_ != expected_state) { + FLOG_INFO("change local tenant task status", KR(ret), K_(tenant_id), K_(local_tenant_task), K(tenant_task)); + // current tenant task status changed + local_tenant_task_.state_ = expected_state; + local_tenant_task_.is_dirty_ = true; + } else {/* task status not change, do nothing */} + FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K(is_paused_)); return ret; } @@ -1371,7 +1296,7 @@ int ObTenantTabletTTLMgr::check_schema_version() ret = OB_EAGAIN; LOG_INFO("is not a formal_schema_version", KR(ret), K(schema_version)); } else if (local_schema_version_ == OB_INVALID_VERSION || local_schema_version_ < schema_version) { - FLOG_INFO("schema changed, mark tenant need check", KR(ret), K_(local_schema_version), K(schema_version)); + FLOG_INFO("schema changed", KR(ret), K_(local_schema_version), K(schema_version)); local_schema_version_ = schema_version; mark_tenant_need_check(); } @@ -1403,23 +1328,28 @@ int ObTTLTaskCtx::deep_copy_rowkey(const ObString &rowkey) return ret; } +// reuse means this task id is finished void ObTenantTabletTTLMgr::ObTTLTenantInfo::reuse() { - for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end(); - ++iter) { - ObTTLTaskCtx *ctx = iter->second; - if (OB_NOT_NULL(ctx)) { - ctx->~ObTTLTaskCtx(); + if (OB_UNLIKELY(!is_reused_)) { + for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end(); + ++iter) { + ObTTLTaskCtx *ctx = iter->second; + if (OB_NOT_NULL(ctx)) { + ctx->~ObTTLTaskCtx(); + } } + tablet_task_map_.reuse(); + allocator_.reset(); + is_usr_trigger_ = false; + need_check_ = false; + is_dirty_ = false; + ttl_continue_ = true; + state_ = common::ObTTLTaskStatus::OB_TTL_TASK_FINISH; + is_reused_ = true; + task_id_ = OB_INVALID_ID; + FLOG_INFO("reuse tenant info", K(*this)); } - tablet_task_map_.reuse(); - allocator_.reset(); - is_usr_trigger_ = false; - need_check_ = false; - is_dirty_ = false; - ttl_continue_ = true; - state_ = common::ObTTLTaskStatus::OB_TTL_TASK_INVALID; - is_finished_ = true; } } // table diff --git a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h index 8e913821e4..051b927268 100644 --- a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h +++ b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h @@ -94,7 +94,8 @@ public: local_schema_version_(OB_INVALID_VERSION), has_start_(false), is_paused_(false), - dag_ref_cnt_(0) + dag_ref_cnt_(0), + need_reuse_for_switch_(false) { } @@ -174,8 +175,7 @@ private: cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE), rsp_time_(OB_INVALID_ID), state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID), - is_droped_(false), - is_finished_(true) + is_reused_(false) {} ~ObTTLTenantInfo() { @@ -202,7 +202,7 @@ private: K_(ttl_continue), K_(rsp_time), K_(state), - K_(is_finished)); + K_(is_reused)); public: TabletTaskMap tablet_task_map_; @@ -216,8 +216,7 @@ private: obrpc::ObTTLRequestArg::TTLRequestType cmd_type_; // deprecated @dazhi int64_t rsp_time_; // OB_INVALID_ID means no need response common::ObTTLTaskStatus state_; - bool is_droped_; // tenant is droped - bool is_finished_; // all delete task is finished (or canceled) + bool is_reused_; // all delete task is finished (or canceled) }; int alloc_tenant_info(uint64_t tenant_id); @@ -238,7 +237,8 @@ private: ObTabletID& tablet_id, ObTTLStatusFieldArray& filter); common::ObMySQLProxy *get_sql_proxy() { return sql_proxy_; } - int sync_sys_table(common::ObTabletID& tablet_id); + int sync_sys_table_op(ObTTLTaskCtx* ctx, bool force_update, bool &tenant_state_changed); + int sync_sys_table(common::ObTabletID& tablet_id, bool &tenant_state_changed); int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record); int try_schedule_task(ObTTLTaskCtx* ctx); int try_schedule_remaining_tasks(const ObTTLTaskCtx *current_ctx); @@ -280,6 +280,7 @@ private: bool has_start_; bool is_paused_; volatile int64_t dag_ref_cnt_; // ttl dag ref count for current ls + bool need_reuse_for_switch_; }; } // end namespace table diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp index e8534d8fd6..542be32387 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp @@ -150,7 +150,7 @@ int ObTTLTaskScheduler::reload_tenant_task() *sql_proxy_, filters, ttl_task_arr))) { LOG_WARN("fail to read ttl tasks status", KR(ret)); } else if (ttl_task_arr.empty()) { - // do nothing + tenant_task_.reset(); } else if (ttl_task_arr.size() == 1) { ObTTLStatus &task = ttl_task_arr.at(0); tenant_task_.ttl_status_ = task; @@ -422,6 +422,7 @@ int ObTTLTaskScheduler::check_all_tablet_task() { int ret = OB_SUCCESS; bool need_move = true; + bool is_cancel_task = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ttl tenant task mgr not init", KR(ret)); @@ -429,6 +430,8 @@ int ObTTLTaskScheduler::check_all_tablet_task() // do nothing } else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) { // do nothing + } else if (FALSE_IT(is_cancel_task = (tenant_task_.ttl_status_.status_ == ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL)? true : false)) { + // do nothing } else if (OB_FAIL(check_task_need_move(need_move))) { LOG_WARN("fail to check task need move", KR(ret), K_(tenant_id)); } else if (need_move) { @@ -453,7 +456,7 @@ int ObTTLTaskScheduler::check_all_tablet_task() } if (OB_SUCC(ret)) { - if (OB_FAIL(move_all_task_to_history_table())) { + if (OB_FAIL(move_all_task_to_history_table(is_cancel_task))) { LOG_WARN("fail to move all tasks to history table", KR(ret), K_(tenant_id), K(tenant_task_.ttl_status_.table_id_)); } else { tenant_task_.reset(); @@ -643,7 +646,7 @@ int ObTenantTTLManager::handle_user_ttl(const obrpc::ObTTLRequestArg& arg) return ret; } -int ObTTLTaskScheduler::move_all_task_to_history_table() +int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel) { int ret = OB_SUCCESS; int64_t one_move_rows = TBALET_CHECK_BATCH_SIZE; @@ -652,7 +655,8 @@ int ObTTLTaskScheduler::move_all_task_to_history_table() if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { LOG_WARN("fail start transaction", KR(ret), K_(tenant_id)); } else if (OB_FAIL(ObTTLUtil::move_task_to_history_table(tenant_id_, tenant_task_.ttl_status_.task_id_, - trans, TBALET_CHECK_BATCH_SIZE, one_move_rows))) { + trans, TBALET_CHECK_BATCH_SIZE, one_move_rows, + need_cancel))) { LOG_WARN("fail to move task to history table", KR(ret), K_(tenant_id)); } @@ -703,15 +707,16 @@ int ObTTLTaskScheduler::check_task_need_move(bool &need_move) { int ret = OB_SUCCESS; need_move = false; - if (OB_RS_TTL_TASK_MOVE == tenant_task_.ttl_status_.status_) { + if (OB_RS_TTL_TASK_MOVE == tenant_task_.ttl_status_.status_ || OB_RS_TTL_TASK_CANCEL == tenant_task_.ttl_status_.status_) { + // cancel will also need move all tasks into history table now need_move = true; - } else if (OB_FAIL(check_all_tabelt_finished(need_move))) { + } else if (OB_FAIL(check_all_tablet_finished(need_move))) { LOG_WARN("fail to check all tablet task finished", KR(ret)); } return ret; } -int ObTTLTaskScheduler::check_all_tabelt_finished(bool &all_finished) +int ObTTLTaskScheduler::check_all_tablet_finished(bool &all_finished) { int ret = OB_SUCCESS; all_finished = true; diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.h b/src/observer/table/ttl/ob_tenant_ttl_manager.h index 6c78f49121..f095b602a9 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.h +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.h @@ -146,9 +146,9 @@ private: int check_task_need_move(bool &need_move); private: - int check_all_tabelt_finished(bool &all_finished); + int check_all_tablet_finished(bool &all_finished); int check_tablet_table_finished(common::ObIArray &pairs, bool &all_finished); - int move_all_task_to_history_table(); + int move_all_task_to_history_table(bool need_cancel); private: static const int64_t TBALE_CHECK_BATCH_SIZE = 200; static const int64_t TBALET_CHECK_BATCH_SIZE = 1024; diff --git a/src/share/table/ob_ttl_util.cpp b/src/share/table/ob_ttl_util.cpp index 8071fac226..c0abac2208 100644 --- a/src/share/table/ob_ttl_util.cpp +++ b/src/share/table/ob_ttl_util.cpp @@ -139,10 +139,6 @@ int ObTTLUtil::transform_tenant_state(const common::ObTTLTaskStatus& tenant_stat status = OB_TTL_TASK_RUNNING; } else if (tenant_status == OB_RS_TTL_TASK_SUSPEND) { status = OB_TTL_TASK_PENDING; - } else if (tenant_status == OB_RS_TTL_TASK_CANCEL) { - status = OB_TTL_TASK_CANCEL; - } else if (tenant_status == OB_RS_TTL_TASK_MOVE) { - status = OB_TTL_TASK_MOVING; } else { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid type", K(tenant_status), K(status)); @@ -153,7 +149,8 @@ int ObTTLUtil::transform_tenant_state(const common::ObTTLTaskStatus& tenant_stat int ObTTLUtil::check_tenant_state(uint64_t tenant_id, common::ObISQLClient& proxy, const ObTTLTaskStatus local_state, - const int64_t local_task_id) + const int64_t local_task_id, + bool &tenant_state_changed) { int ret = OB_SUCCESS; @@ -174,6 +171,7 @@ int ObTTLUtil::check_tenant_state(uint64_t tenant_id, LOG_WARN("fail to transform ttl tenant task status", KR(ret), K(tenant_task.status_)); } else if (tenant_state != local_state) { ret = OB_EAGAIN; + tenant_state_changed = true; FLOG_INFO("state of tenant task is different from local task state", K(ret), K(tenant_id), K(tenant_task.task_id_ ), K(local_state)); } @@ -594,13 +592,26 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id) int ObTTLUtil::move_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObMySQLTransaction& proxy, - int64_t batch_size, int64_t &move_rows) + int64_t batch_size, int64_t &move_rows, + bool need_cancel) { int ret = OB_SUCCESS; ObSqlString sql; int64_t insert_rows = 0; int64_t delete_rows = 0; - if (OB_FAIL(sql.assign_fmt("replace into %s select * from %s " + if (!need_cancel && + OB_FAIL(sql.assign_fmt("replace into %s select * from %s " + " where task_id = %ld and tablet_id != -1 and table_id != -1" + " order by tenant_id, task_id, table_id, tablet_id LIMIT %ld", + share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, + share::OB_ALL_KV_TTL_TASK_TNAME, + task_id, batch_size))) { + LOG_WARN("sql assign fmt failed", K(ret)); + } else if (need_cancel && + OB_FAIL(sql.assign_fmt("replace into %s select gmt_create, gmt_modified," + " tenant_id, task_id, table_id, tablet_id, task_start_time," + " task_update_time, trigger_type, if(status=4, 4, 3) as status," + " ttl_del_cnt, max_version_del_cnt, scan_cnt, row_key, ret_code from %s" " where task_id = %ld and tablet_id != -1 and table_id != -1" " order by tenant_id, task_id, table_id, tablet_id LIMIT %ld", share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, @@ -807,6 +818,7 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f ObString right = ttl_definition; bool is_end = false; int64_t i = 0; + // example: "c + INTERVAL 40 MINUTE" while (OB_SUCC(ret) && !is_end) { ObString left = right.split_on(','); if (left.empty()) { @@ -815,11 +827,13 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f } ObTableTTLExpr ttl_expr; ObString column_str = left.split_on('+').trim(); + // example: " INTERVAL 40 MINUTE" left = left.trim(); + // example: "INTERVAL 40 MINUTE" left += strlen("INTERVAL"); + // example: "40 MINUTE" left = left.trim(); ObString interval_str = left.split_on(' '); - left.trim(); ObString time_unit_str = left; ttl_expr.column_name_ = column_str; @@ -838,7 +852,7 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f ttl_expr.time_unit_ = ObTableTTLTimeUnit::YEAR; } else { ret = OB_NOT_SUPPORTED; - LOG_WARN("unepxected time unit", K(ret)); + LOG_WARN("unepxected time unit", K(ret), K(time_unit_str)); } // 2. get delta second and month @@ -1141,16 +1155,16 @@ int ObTTLUtil::check_is_ttl_table(const ObTableSchema &table_schema, bool &is_tt return ret; } -int ObTTLUtil::check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& proxy, - const uint64_t& task_id, const uint64_t& table_id, - ObTabletID& tablet_id, bool &is_exists) +int ObTTLUtil::check_task_status_from_sys_table(uint64_t tenant_id, common::ObISQLClient& proxy, + const uint64_t& task_id, const uint64_t& table_id, + ObTabletID& tablet_id, bool &is_exists, bool &is_end_state) { int ret = OB_SUCCESS; ObSqlString sql; - uint64_t result_cnt = 0; - if (OB_FAIL(sql.assign_fmt("SELECT (SELECT COUNT(*) FROM %s WHERE table_id = %ld" - " AND tablet_id = %ld AND task_id = %ld) + (SELECT COUNT(*) FROM %s WHERE" - " table_id = %ld AND tablet_id = %ld AND task_id = %ld) AS cnt", + ObTTLTaskStatus status = ObTTLTaskStatus::OB_TTL_TASK_INVALID; + if (OB_FAIL(sql.assign_fmt("(SELECT STATUS FROM %s WHERE table_id = %ld" + " AND tablet_id = %ld AND task_id = %ld limit 1) UNION (SELECT STATUS FROM %s WHERE" + " table_id = %ld AND tablet_id = %ld AND task_id = %ld limit 1)", share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, table_id, tablet_id.id(), task_id, share::OB_ALL_KV_TTL_TASK_TNAME, table_id, tablet_id.id(), task_id))) { LOG_WARN("sql assign fmt failed", K(ret)); @@ -1163,20 +1177,27 @@ int ObTTLUtil::check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& p ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, query result must not be NULL", K(ret)); } else if (OB_FAIL(result->next())) { - LOG_WARN("fail to get next row", K(ret)); + if (OB_ITER_END == ret) { + // not exist, refresh ret + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to get next row", K(ret)); + } } else { - EXTRACT_INT_FIELD_MYSQL(*result, "cnt", result_cnt, uint64_t); + int64_t temp_status = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "STATUS", temp_status, int64_t); + status = EVAL_TASK_PURE_STATUS(temp_status); + if (OB_SUCCESS == result->next()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ttl task record count", KR(ret), K(tenant_id), K(task_id), K(table_id), K(tablet_id)); + } } } } if (OB_SUCC(ret)) { - if (result_cnt > 1) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected ttl task record count", KR(ret), K(tenant_id), K(task_id), K(table_id), K(tablet_id)); - } else { - is_exists = (result_cnt > 0); - } + is_exists = (status != ObTTLTaskStatus::OB_TTL_TASK_INVALID); + is_end_state = ObTTLUtil::is_ttl_task_status_end_state(status); } return ret; diff --git a/src/share/table/ob_ttl_util.h b/src/share/table/ob_ttl_util.h index 188007e404..43e7ca95e8 100644 --- a/src/share/table/ob_ttl_util.h +++ b/src/share/table/ob_ttl_util.h @@ -272,7 +272,8 @@ public: static int check_tenant_state(uint64_t tenant_id, common::ObISQLClient& proxy, const ObTTLTaskStatus local_state, - const int64_t local_task_id); + const int64_t local_task_id, + bool &tenant_state_changed); static int insert_ttl_task(uint64_t tenant_id, const char* tname, common::ObISQLClient& proxy, @@ -316,7 +317,8 @@ public: static int move_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObMySQLTransaction& proxy, - int64_t batch_size, int64_t &move_rows); + int64_t batch_size, int64_t &move_rows, + bool need_cancel = false); static int move_tenant_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObMySQLTransaction& proxy); @@ -332,10 +334,12 @@ public: static int check_is_ttl_table(const ObTableSchema &table_schema, bool &is_ttl_table); static int get_tenant_table_ids(const uint64_t tenant_id, common::ObIArray &table_id_array); - static int check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& proxy, - const uint64_t& task_id, const uint64_t& table_id, - ObTabletID& tablet_id, bool &is_exists); - + static int check_task_status_from_sys_table(uint64_t tenant_id, common::ObISQLClient& proxy, + const uint64_t& task_id, const uint64_t& table_id, + ObTabletID& tablet_id, bool &is_exists, bool &is_end_state); + static inline bool is_ttl_task_status_end_state(ObTTLTaskStatus status) { + return status == ObTTLTaskStatus::OB_TTL_TASK_CANCEL || status == ObTTLTaskStatus::OB_TTL_TASK_FINISH; + } const static uint64_t TTL_TENNAT_TASK_TABLET_ID = -1; const static uint64_t TTL_TENNAT_TASK_TABLE_ID = -1; private: