[CP] [Fix] OBKV TTL cancel

Co-authored-by: shenyunlong.syl <ylshen0919@gmail.com>
This commit is contained in:
IHEII
2023-12-13 15:13:53 +00:00
committed by ob-robot
parent 11ce06a576
commit cb432ec65e
7 changed files with 233 additions and 270 deletions

View File

@ -216,18 +216,20 @@ int ObTableTTLDeleteTask::process_one()
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
}
info_.max_version_del_cnt_ += result.get_max_version_del_row();
info_.ttl_del_cnt_ += result.get_ttl_del_row();
info_.scan_cnt_ += result.get_scan_row();
info_.err_code_ = ret;
info_.row_key_ = result.get_end_rowkey();
if (OB_SUCC(ret)
&& result.get_del_row() < PER_TASK_DEL_ROWS
&& result.get_end_ts() > ObTimeUtility::current_time()) {
ret = OB_ITER_END; // finsh task
info_.err_code_ = ret;
LOG_DEBUG("finish delete", KR(ret), K_(info));
if (OB_SUCC(ret)) {
info_.max_version_del_cnt_ += result.get_max_version_del_row();
info_.ttl_del_cnt_ += result.get_ttl_del_row();
if (result.get_del_row() < PER_TASK_DEL_ROWS
&& result.get_end_ts() > ObTimeUtility::current_time()) {
ret = OB_ITER_END; // finsh task
info_.err_code_ = ret;
LOG_DEBUG("finish delete", KR(ret), K_(info));
}
}
int64_t cost = ObTimeUtil::current_time() - start_time;
LOG_DEBUG("finish process one", KR(ret), K(cost));
return ret;

View File

@ -126,6 +126,7 @@ void ObTenantTabletTTLMgr::inner_switch_to_follower()
FLOG_INFO("tenant_tablet_ttl_mgr: begin to switch_to_follower", K_(tenant_id), KPC_(ls));
const int64_t start_time_us = ObTimeUtility::current_time();
pause();
ATOMIC_STORE(&need_reuse_for_switch_, true);
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_follower", K_(tenant_id), KPC_(ls), K(cost_us));
}
@ -214,7 +215,7 @@ int ObTenantTabletTTLMgr::check_and_handle_event()
common::ObSpinLockGuard guard(lock_);
// after observer restart, need check tenant even when cancel and move state
is_dirty = local_tenant_task_.is_dirty_;
is_finished = local_tenant_task_.is_finished_;
is_finished = local_tenant_task_.state_ == OB_TTL_TASK_FINISH;
need_check = !is_finished && local_tenant_task_.need_check_;
}
@ -247,109 +248,54 @@ void ObTenantTabletTTLMgr::check_ttl_tenant_state()
bool tenant_finish = true;
ObTTLTaskCtx* ctx = nullptr;
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
!tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
ctx = iter->second;
if (OB_ISNULL(ctx)) {
LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_));
} else if (ctx->is_dirty_) {
tenant_dirty = true;
} else if (ctx->task_status_ != OB_TTL_TASK_CANCEL &&
ctx->task_status_ != OB_TTL_TASK_FINISH) {
tenant_finish = false;
local_tenant_task_.is_finished_ = false;
}
}
if (OB_SUCC(ret) && !tenant_dirty) {
local_tenant_task_.is_dirty_ = false;
if (tenant_finish) {
// all task already in cancel or runing status
if (local_tenant_task_.state_ == OB_TTL_TASK_CANCEL || local_tenant_task_.state_ == OB_TTL_TASK_RUNNING) {
local_tenant_task_.reuse();
FLOG_INFO("local ls ttl task is finished", K_(local_tenant_task), KPC_(ls));
} else {
if (local_tenant_task_.need_check_) {
// when local tenant task need check(maybe schema changed), this task cannot finish
// we should end this process(don't execute else) and check tenant task first
} else {
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
OB_SUCC(ret) && !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
ctx = iter->second;
if (OB_ISNULL(ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected tenant ttl state", KR(ret), K(local_tenant_task_.state_));
LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_));
} else {
tenant_dirty = tenant_dirty ? tenant_dirty : ctx->is_dirty_;
tenant_finish = tenant_finish ? (ctx->task_status_ != OB_TTL_TASK_CANCEL && ctx->task_status_ != OB_TTL_TASK_FINISH) : tenant_finish;
}
}
if (OB_SUCC(ret) && !tenant_dirty) {
local_tenant_task_.is_dirty_ = false;
if (tenant_finish) {
local_tenant_task_.state_ = OB_TTL_TASK_FINISH;
FLOG_INFO("local ls ttl task is finished", K_(local_tenant_task), KPC_(ls));
}
}
}
LOG_DEBUG("check ttl tenant dirty", K(local_tenant_task_.is_dirty_), K(local_tenant_task_.state_), KR(ret), K_(tenant_id));
}
int ObTenantTabletTTLMgr::check_cmd_state_valid(const common::ObTTLTaskStatus current_state,
const common::ObTTLTaskStatus incoming_state)
{
int ret = OB_SUCCESS;
switch (incoming_state) {
case OB_TTL_TASK_RUNNING: {
if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_INVALID &&
current_state != OB_TTL_TASK_RUNNING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
KR(ret), K(current_state), K(incoming_state));
}
break;
}
case OB_TTL_TASK_MOVING: {
if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_CANCEL &&
current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_MOVING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive a move cmd, current task state is unmatached", K(current_state));
}
break;
}
case OB_TTL_TASK_PENDING: {
if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_INVALID &&
current_state != OB_TTL_TASK_PENDING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
KR(ret), K(current_state), K(incoming_state));
}
break;
}
case OB_TTL_TASK_CANCEL: {
if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_RUNNING &&
current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_CANCEL) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
KR(ret), K(current_state), K(incoming_state));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid incoming status", KR(ret), K(incoming_state));
break;
}
}
return ret;
}
void ObTenantTabletTTLMgr::mark_tenant_need_check()
{
int ret = OB_SUCCESS;
if (common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
common::ObSpinLockGuard guard(lock_);
local_tenant_task_.need_check_ = true;
if (local_tenant_task_.task_id_ != OB_INVALID_ID) {
local_tenant_task_.need_check_ = true;
if (local_tenant_task_.state_ == OB_TTL_TASK_FINISH) {
// this local tenant task(outdated) should not be finished when we mark tenant need check,
// instead, we should check and decide whether it should be finished in the future
local_tenant_task_.state_ = OB_TTL_TASK_RUNNING;
}
FLOG_INFO("finish mark tenant need check", K(local_tenant_task_));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unpected task id is found", KR(ret), K(local_tenant_task_));
}
}
LOG_DEBUG("finsh mark tenant need check", KR(ret));
}
void ObTenantTabletTTLMgr::on_schema_changed(uint64_t schema_changed_tenant_id)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ttl manager not init");
} else if (!common::ObTTLUtil::check_can_process_tenant_tasks(schema_changed_tenant_id)) {
//do nothing
} else {
mark_tenant_need_check();
}
}
int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTaskParam& task_para,
bool& is_stop, bool need_copy_task/* true*/)
{
@ -390,6 +336,7 @@ int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTask
} else if (OB_ITER_END == task_info.err_code_) {
ctx->task_status_ = OB_TTL_TASK_FINISH;
ctx->task_info_.err_code_ = OB_SUCCESS;
FLOG_INFO("task execute finished", KR(ret));
} else if (OB_NOT_MASTER == task_info.err_code_ ||
OB_PARTITION_NOT_EXIST == task_info.err_code_ ||
OB_TABLE_NOT_EXIST == task_info.err_code_ ||
@ -618,7 +565,7 @@ int ObTenantTabletTTLMgr::alloc_tenant_info(uint64_t tenant_id)
int ret = OB_SUCCESS;
local_tenant_task_.tenant_id_ = tenant_id;
local_tenant_task_.ttl_continue_ = false;
local_tenant_task_.is_dirty_ = true;
local_tenant_task_.is_dirty_ = false;
ObMemAttr bucket_attr(tenant_id, "TTLTaskBucket");
ObMemAttr node_attr(tenant_id, "TTLTaskNode");
if(OB_FAIL(local_tenant_task_.tablet_task_map_.create(DEFAULT_TTL_BUCKET_NUM, bucket_attr, node_attr))) {
@ -894,18 +841,73 @@ int ObTenantTabletTTLMgr::try_schedule_prepare_task(ObTabletID& tablet_id)
int ObTenantTabletTTLMgr::sync_all_dirty_task(ObIArray<ObTabletID>& dirty_tasks)
{
int ret = OB_SUCCESS;
bool tenant_state_changed = false;
ObTimeGuard guard("ObTenantTabletTTLMgr::sync_all_dirty_record", TTL_NORMAL_TIME_THRESHOLD);
for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count(); i++) {
if (OB_FAIL(sync_sys_table(dirty_tasks.at(i)))) {
for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count() && !tenant_state_changed; i++) {
// tenant_state_changed is true means that tenant status is changed, we should refresh our status first
if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) {
LOG_WARN("fail to sync sys table", KR(ret));
}
}
return ret;
}
int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id)
int ObTenantTabletTTLMgr::sync_sys_table_op(ObTTLTaskCtx* ctx,
bool force_update,
bool &tenant_state_changed)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
common::ObTTLStatus ttl_record;
bool commit = false;
int tmp_ret = OB_SUCCESS;
bool is_exists = false;
bool is_end_state = false;
if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) {
LOG_WARN("fail to start transation", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ObTTLUtil::check_task_status_from_sys_table(tenant_id_, trans, ctx->task_info_.task_id_,
ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists, is_end_state))) {
LOG_WARN("fail to check ttl task exist");
} else if (is_end_state) {
// record in system table is end state, do nothing
FLOG_INFO("Finished / Canceled in sys table, could not sync sys table", K(local_tenant_task_));
} else if (!is_exists) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", KR(ret));
} else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to insert ttl task", KR(ret));
}
} else if (force_update) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", KR(ret));
} else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id_,
share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to update ttl task in sys table", KR(ret), K(ttl_record));
}
}
// check tenant ttl status
if (OB_SUCC(ret) && OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_, tenant_state_changed))) {
FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_));
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, KR(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
return ret;
}
int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id, bool &tenant_state_changed)
{
int ret = OB_SUCCESS;
tenant_state_changed = false;
ObArenaAllocator allocator(lib::ObLabel("TTLStatusRecord"));
ObTTLTaskCtx* ctx = nullptr;
{
@ -961,43 +963,12 @@ int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id)
}
if (OB_SUCC(ret)) {
common::ObTTLStatus ttl_record;
switch (ctx->task_status_) {
case OB_TTL_TASK_PREPARE: {
ObMySQLTransaction trans;
ObTTLStatusFieldArray filters;
common::ObTTLStatusArray ttl_records;
ObTTLStatusFieldArray filter;
bool commit = false;
int tmp_ret = OB_SUCCESS;
bool is_exists = false;
if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) {
LOG_WARN("fail to start transation", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_))) {
FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_));
} else if (OB_FAIL(ObTTLUtil::check_ttl_task_exists(tenant_id_, trans, ctx->task_info_.task_id_,
ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists))) {
LOG_WARN("fail to check ttl task exist");
} else if (!is_exists) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", KR(ret));
} else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to insert ttl task", KR(ret));
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, KR(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
// change prepare state to running/pending
if (OB_SUCC(ret) && OB_FAIL(try_schedule_prepare_task(tablet_id))) {
if (OB_FAIL(sync_sys_table_op(ctx, false, tenant_state_changed))) {
LOG_WARN("fail to sync ttl record into sys table", KR(ret));
} else if (OB_FAIL(try_schedule_prepare_task(tablet_id))) {
// change prepare state to running/pending
LOG_WARN("fail to schedule prepare task", KR(ret));
}
break;
@ -1006,44 +977,8 @@ int ObTenantTabletTTLMgr::sync_sys_table(ObTabletID& tablet_id)
case OB_TTL_TASK_RUNNING:
case OB_TTL_TASK_PENDING:
case OB_TTL_TASK_CANCEL: {
ObMySQLTransaction trans;
ObTTLStatusFieldArray filters;
common::ObTTLStatusArray ttl_records;
ObTTLStatusFieldArray filter;
bool commit = false;
int tmp_ret = OB_SUCCESS;
bool is_exists = false;
if (OB_FAIL(trans.start(get_sql_proxy(), gen_meta_tenant_id(tenant_id_)))) {
LOG_WARN("fail to start transation", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ObTTLUtil::check_tenant_state(tenant_id_, trans, local_tenant_task_.state_, local_tenant_task_.task_id_))) {
FLOG_INFO("local tenant task state is different from sys table", KR(ret), K_(tenant_id), K(local_tenant_task_.state_));
} else if (OB_FAIL(ObTTLUtil::check_ttl_task_exists(tenant_id_, trans, ctx->task_info_.task_id_,
ctx->task_info_.table_id_, ctx->task_info_.tablet_id_, is_exists))) {
LOG_WARN("fail to check ttl task exist");
} else if (!is_exists) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", KR(ret));
} else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id_, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to insert ttl task", KR(ret));
}
} else {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", KR(ret));
} else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id_,
share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to update ttl task in sys table", KR(ret), K(ttl_record));
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, KR(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
if (OB_FAIL(sync_sys_table_op(ctx, true, tenant_state_changed))) {
LOG_WARN("fail to sync ttl record into sys table", KR(ret));
}
break;
}
@ -1301,60 +1236,50 @@ int ObTenantTabletTTLMgr::refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refre
// 2. check the status and change local tenant info
int ObTenantTabletTTLMgr::reload_tenant_task()
{
common::ObSpinLockGuard guard(lock_);
if (ATOMIC_BCAS(&need_reuse_for_switch_, true, false)) {
local_tenant_task_.reuse();
FLOG_INFO("resue local tenant task cuz of switch to follower");
}
int ret = OB_SUCCESS;
ObTTLStatus tenant_task;
common::ObSpinLockGuard guard(lock_);
ObTTLTaskStatus expected_state;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (is_paused_) {
// do nothing, not leader
if (!local_tenant_task_.is_finished_) {
local_tenant_task_.reuse();
}
// do nothing
} else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
// tenant task may finish before the tablet task
if (!local_tenant_task_.is_finished_) {
local_tenant_task_.reuse();
}
local_tenant_task_.reuse();
} else {
LOG_WARN("fail to read tenant ttl task", KR(ret), K_(tenant_id));
}
} else if (!local_tenant_task_.is_finished_ && local_tenant_task_.task_id_ != tenant_task.task_id_) {
FLOG_INFO("tenant task is finished, but local tenant task is not, maybe schema changed",
KR(ret), K_(local_tenant_task), K(tenant_task.task_id_));
local_tenant_task_.reuse();
} else if (OB_RS_TTL_TASK_MOVE == static_cast<ObTTLTaskStatus>(tenant_task.status_)) {
FLOG_INFO("tenant task is moving now, tablet ttl task should not continue",
KR(ret), K_(local_tenant_task), K(tenant_task.task_id_));
} else if (OB_RS_TTL_TASK_MOVE == static_cast<ObTTLTaskStatus>(tenant_task.status_) ||
OB_RS_TTL_TASK_CANCEL == static_cast<ObTTLTaskStatus>(tenant_task.status_)) {
local_tenant_task_.reuse();
FLOG_INFO("tenant task is finish now, reuse local tenant task", KR(ret), K_(local_tenant_task), K(tenant_task.task_id_));
} else if (OB_FAIL(ObTTLUtil::transform_tenant_state(static_cast<ObTTLTaskStatus>(tenant_task.status_), expected_state))) {
LOG_WARN("fail to transform ttl tenant task status", KR(ret), K(tenant_task.status_));
} else if (OB_FAIL(check_cmd_state_valid(local_tenant_task_.state_, expected_state))) {
LOG_WARN("ttl cmd state machine is wrong", KR(ret), K_(tenant_id), K(tenant_task), K(expected_state));
} else {
if (local_tenant_task_.is_finished_ && local_tenant_task_.task_id_ != tenant_task.task_id_) {
// new ttl request
local_tenant_task_.task_id_ = tenant_task.task_id_;
local_tenant_task_.is_usr_trigger_ = (tenant_task.trigger_type_ == USER_TRIGGER);
local_tenant_task_.state_ = expected_state;
local_tenant_task_.need_check_ = true;
local_tenant_task_.is_dirty_ = true;
local_tenant_task_.is_finished_ = false;
FLOG_INFO("new ttl task", KR(ret), K_(tenant_id), K_(local_tenant_task));
} else if (local_tenant_task_.task_id_ != tenant_task.task_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task id is mismatch", KR(ret), K(local_tenant_task_.task_id_), K(tenant_task.task_id_));
} else if (!local_tenant_task_.is_finished_ && local_tenant_task_.state_ != expected_state) {
FLOG_INFO("old ttl task changed", KR(ret), K_(tenant_id), K_(local_tenant_task), K(tenant_task));
// current tenant task status changed
local_tenant_task_.state_ = expected_state;
local_tenant_task_.is_dirty_ = true;
}
}
} else if (local_tenant_task_.task_id_ != tenant_task.task_id_) {
local_tenant_task_.reuse();
local_tenant_task_.task_id_ = tenant_task.task_id_;
local_tenant_task_.is_usr_trigger_ = (tenant_task.trigger_type_ == USER_TRIGGER);
local_tenant_task_.state_ = expected_state;
local_tenant_task_.need_check_ = true;
local_tenant_task_.is_dirty_ = true;
local_tenant_task_.is_reused_ = false;
FLOG_INFO("new ttl task", KR(ret), K_(tenant_id), K_(local_tenant_task));
} else if (OB_TTL_TASK_FINISH == static_cast<ObTTLTaskStatus>(local_tenant_task_.state_)) {
// do nothing
} else if (local_tenant_task_.state_ != expected_state) {
FLOG_INFO("change local tenant task status", KR(ret), K_(tenant_id), K_(local_tenant_task), K(tenant_task));
// current tenant task status changed
local_tenant_task_.state_ = expected_state;
local_tenant_task_.is_dirty_ = true;
} else {/* task status not change, do nothing */}
FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K(is_paused_));
return ret;
}
@ -1371,7 +1296,7 @@ int ObTenantTabletTTLMgr::check_schema_version()
ret = OB_EAGAIN;
LOG_INFO("is not a formal_schema_version", KR(ret), K(schema_version));
} else if (local_schema_version_ == OB_INVALID_VERSION || local_schema_version_ < schema_version) {
FLOG_INFO("schema changed, mark tenant need check", KR(ret), K_(local_schema_version), K(schema_version));
FLOG_INFO("schema changed", KR(ret), K_(local_schema_version), K(schema_version));
local_schema_version_ = schema_version;
mark_tenant_need_check();
}
@ -1403,23 +1328,28 @@ int ObTTLTaskCtx::deep_copy_rowkey(const ObString &rowkey)
return ret;
}
// reuse means this task id is finished
void ObTenantTabletTTLMgr::ObTTLTenantInfo::reuse()
{
for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end();
++iter) {
ObTTLTaskCtx *ctx = iter->second;
if (OB_NOT_NULL(ctx)) {
ctx->~ObTTLTaskCtx();
if (OB_UNLIKELY(!is_reused_)) {
for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end();
++iter) {
ObTTLTaskCtx *ctx = iter->second;
if (OB_NOT_NULL(ctx)) {
ctx->~ObTTLTaskCtx();
}
}
tablet_task_map_.reuse();
allocator_.reset();
is_usr_trigger_ = false;
need_check_ = false;
is_dirty_ = false;
ttl_continue_ = true;
state_ = common::ObTTLTaskStatus::OB_TTL_TASK_FINISH;
is_reused_ = true;
task_id_ = OB_INVALID_ID;
FLOG_INFO("reuse tenant info", K(*this));
}
tablet_task_map_.reuse();
allocator_.reset();
is_usr_trigger_ = false;
need_check_ = false;
is_dirty_ = false;
ttl_continue_ = true;
state_ = common::ObTTLTaskStatus::OB_TTL_TASK_INVALID;
is_finished_ = true;
}
} // table

View File

@ -94,7 +94,8 @@ public:
local_schema_version_(OB_INVALID_VERSION),
has_start_(false),
is_paused_(false),
dag_ref_cnt_(0)
dag_ref_cnt_(0),
need_reuse_for_switch_(false)
{
}
@ -174,8 +175,7 @@ private:
cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE),
rsp_time_(OB_INVALID_ID),
state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID),
is_droped_(false),
is_finished_(true)
is_reused_(false)
{}
~ObTTLTenantInfo()
{
@ -202,7 +202,7 @@ private:
K_(ttl_continue),
K_(rsp_time),
K_(state),
K_(is_finished));
K_(is_reused));
public:
TabletTaskMap tablet_task_map_;
@ -216,8 +216,7 @@ private:
obrpc::ObTTLRequestArg::TTLRequestType cmd_type_; // deprecated @dazhi
int64_t rsp_time_; // OB_INVALID_ID means no need response
common::ObTTLTaskStatus state_;
bool is_droped_; // tenant is droped
bool is_finished_; // all delete task is finished (or canceled)
bool is_reused_; // all delete task is finished (or canceled)
};
int alloc_tenant_info(uint64_t tenant_id);
@ -238,7 +237,8 @@ private:
ObTabletID& tablet_id,
ObTTLStatusFieldArray& filter);
common::ObMySQLProxy *get_sql_proxy() { return sql_proxy_; }
int sync_sys_table(common::ObTabletID& tablet_id);
int sync_sys_table_op(ObTTLTaskCtx* ctx, bool force_update, bool &tenant_state_changed);
int sync_sys_table(common::ObTabletID& tablet_id, bool &tenant_state_changed);
int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record);
int try_schedule_task(ObTTLTaskCtx* ctx);
int try_schedule_remaining_tasks(const ObTTLTaskCtx *current_ctx);
@ -280,6 +280,7 @@ private:
bool has_start_;
bool is_paused_;
volatile int64_t dag_ref_cnt_; // ttl dag ref count for current ls
bool need_reuse_for_switch_;
};
} // end namespace table

View File

@ -150,7 +150,7 @@ int ObTTLTaskScheduler::reload_tenant_task()
*sql_proxy_, filters, ttl_task_arr))) {
LOG_WARN("fail to read ttl tasks status", KR(ret));
} else if (ttl_task_arr.empty()) {
// do nothing
tenant_task_.reset();
} else if (ttl_task_arr.size() == 1) {
ObTTLStatus &task = ttl_task_arr.at(0);
tenant_task_.ttl_status_ = task;
@ -422,6 +422,7 @@ int ObTTLTaskScheduler::check_all_tablet_task()
{
int ret = OB_SUCCESS;
bool need_move = true;
bool is_cancel_task = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ttl tenant task mgr not init", KR(ret));
@ -429,6 +430,8 @@ int ObTTLTaskScheduler::check_all_tablet_task()
// do nothing
} else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
// do nothing
} else if (FALSE_IT(is_cancel_task = (tenant_task_.ttl_status_.status_ == ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL)? true : false)) {
// do nothing
} else if (OB_FAIL(check_task_need_move(need_move))) {
LOG_WARN("fail to check task need move", KR(ret), K_(tenant_id));
} else if (need_move) {
@ -453,7 +456,7 @@ int ObTTLTaskScheduler::check_all_tablet_task()
}
if (OB_SUCC(ret)) {
if (OB_FAIL(move_all_task_to_history_table())) {
if (OB_FAIL(move_all_task_to_history_table(is_cancel_task))) {
LOG_WARN("fail to move all tasks to history table", KR(ret), K_(tenant_id), K(tenant_task_.ttl_status_.table_id_));
} else {
tenant_task_.reset();
@ -643,7 +646,7 @@ int ObTenantTTLManager::handle_user_ttl(const obrpc::ObTTLRequestArg& arg)
return ret;
}
int ObTTLTaskScheduler::move_all_task_to_history_table()
int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel)
{
int ret = OB_SUCCESS;
int64_t one_move_rows = TBALET_CHECK_BATCH_SIZE;
@ -652,7 +655,8 @@ int ObTTLTaskScheduler::move_all_task_to_history_table()
if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
LOG_WARN("fail start transaction", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ObTTLUtil::move_task_to_history_table(tenant_id_, tenant_task_.ttl_status_.task_id_,
trans, TBALET_CHECK_BATCH_SIZE, one_move_rows))) {
trans, TBALET_CHECK_BATCH_SIZE, one_move_rows,
need_cancel))) {
LOG_WARN("fail to move task to history table", KR(ret), K_(tenant_id));
}
@ -703,15 +707,16 @@ int ObTTLTaskScheduler::check_task_need_move(bool &need_move)
{
int ret = OB_SUCCESS;
need_move = false;
if (OB_RS_TTL_TASK_MOVE == tenant_task_.ttl_status_.status_) {
if (OB_RS_TTL_TASK_MOVE == tenant_task_.ttl_status_.status_ || OB_RS_TTL_TASK_CANCEL == tenant_task_.ttl_status_.status_) {
// cancel will also need move all tasks into history table now
need_move = true;
} else if (OB_FAIL(check_all_tabelt_finished(need_move))) {
} else if (OB_FAIL(check_all_tablet_finished(need_move))) {
LOG_WARN("fail to check all tablet task finished", KR(ret));
}
return ret;
}
int ObTTLTaskScheduler::check_all_tabelt_finished(bool &all_finished)
int ObTTLTaskScheduler::check_all_tablet_finished(bool &all_finished)
{
int ret = OB_SUCCESS;
all_finished = true;

View File

@ -146,9 +146,9 @@ private:
int check_task_need_move(bool &need_move);
private:
int check_all_tabelt_finished(bool &all_finished);
int check_all_tablet_finished(bool &all_finished);
int check_tablet_table_finished(common::ObIArray<share::ObTabletTablePair> &pairs, bool &all_finished);
int move_all_task_to_history_table();
int move_all_task_to_history_table(bool need_cancel);
private:
static const int64_t TBALE_CHECK_BATCH_SIZE = 200;
static const int64_t TBALET_CHECK_BATCH_SIZE = 1024;

View File

@ -139,10 +139,6 @@ int ObTTLUtil::transform_tenant_state(const common::ObTTLTaskStatus& tenant_stat
status = OB_TTL_TASK_RUNNING;
} else if (tenant_status == OB_RS_TTL_TASK_SUSPEND) {
status = OB_TTL_TASK_PENDING;
} else if (tenant_status == OB_RS_TTL_TASK_CANCEL) {
status = OB_TTL_TASK_CANCEL;
} else if (tenant_status == OB_RS_TTL_TASK_MOVE) {
status = OB_TTL_TASK_MOVING;
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid type", K(tenant_status), K(status));
@ -153,7 +149,8 @@ int ObTTLUtil::transform_tenant_state(const common::ObTTLTaskStatus& tenant_stat
int ObTTLUtil::check_tenant_state(uint64_t tenant_id,
common::ObISQLClient& proxy,
const ObTTLTaskStatus local_state,
const int64_t local_task_id)
const int64_t local_task_id,
bool &tenant_state_changed)
{
int ret = OB_SUCCESS;
@ -174,6 +171,7 @@ int ObTTLUtil::check_tenant_state(uint64_t tenant_id,
LOG_WARN("fail to transform ttl tenant task status", KR(ret), K(tenant_task.status_));
} else if (tenant_state != local_state) {
ret = OB_EAGAIN;
tenant_state_changed = true;
FLOG_INFO("state of tenant task is different from local task state", K(ret), K(tenant_id), K(tenant_task.task_id_ ), K(local_state));
}
@ -594,13 +592,26 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id)
int ObTTLUtil::move_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy,
int64_t batch_size, int64_t &move_rows)
int64_t batch_size, int64_t &move_rows,
bool need_cancel)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t insert_rows = 0;
int64_t delete_rows = 0;
if (OB_FAIL(sql.assign_fmt("replace into %s select * from %s "
if (!need_cancel &&
OB_FAIL(sql.assign_fmt("replace into %s select * from %s "
" where task_id = %ld and tablet_id != -1 and table_id != -1"
" order by tenant_id, task_id, table_id, tablet_id LIMIT %ld",
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
share::OB_ALL_KV_TTL_TASK_TNAME,
task_id, batch_size))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (need_cancel &&
OB_FAIL(sql.assign_fmt("replace into %s select gmt_create, gmt_modified,"
" tenant_id, task_id, table_id, tablet_id, task_start_time,"
" task_update_time, trigger_type, if(status=4, 4, 3) as status,"
" ttl_del_cnt, max_version_del_cnt, scan_cnt, row_key, ret_code from %s"
" where task_id = %ld and tablet_id != -1 and table_id != -1"
" order by tenant_id, task_id, table_id, tablet_id LIMIT %ld",
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
@ -807,6 +818,7 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f
ObString right = ttl_definition;
bool is_end = false;
int64_t i = 0;
// example: "c + INTERVAL 40 MINUTE"
while (OB_SUCC(ret) && !is_end) {
ObString left = right.split_on(',');
if (left.empty()) {
@ -815,11 +827,13 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f
}
ObTableTTLExpr ttl_expr;
ObString column_str = left.split_on('+').trim();
// example: " INTERVAL 40 MINUTE"
left = left.trim();
// example: "INTERVAL 40 MINUTE"
left += strlen("INTERVAL");
// example: "40 MINUTE"
left = left.trim();
ObString interval_str = left.split_on(' ');
left.trim();
ObString time_unit_str = left;
ttl_expr.column_name_ = column_str;
@ -838,7 +852,7 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f
ttl_expr.time_unit_ = ObTableTTLTimeUnit::YEAR;
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unepxected time unit", K(ret));
LOG_WARN("unepxected time unit", K(ret), K(time_unit_str));
}
// 2. get delta second and month
@ -1141,16 +1155,16 @@ int ObTTLUtil::check_is_ttl_table(const ObTableSchema &table_schema, bool &is_tt
return ret;
}
int ObTTLUtil::check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& proxy,
const uint64_t& task_id, const uint64_t& table_id,
ObTabletID& tablet_id, bool &is_exists)
int ObTTLUtil::check_task_status_from_sys_table(uint64_t tenant_id, common::ObISQLClient& proxy,
const uint64_t& task_id, const uint64_t& table_id,
ObTabletID& tablet_id, bool &is_exists, bool &is_end_state)
{
int ret = OB_SUCCESS;
ObSqlString sql;
uint64_t result_cnt = 0;
if (OB_FAIL(sql.assign_fmt("SELECT (SELECT COUNT(*) FROM %s WHERE table_id = %ld"
" AND tablet_id = %ld AND task_id = %ld) + (SELECT COUNT(*) FROM %s WHERE"
" table_id = %ld AND tablet_id = %ld AND task_id = %ld) AS cnt",
ObTTLTaskStatus status = ObTTLTaskStatus::OB_TTL_TASK_INVALID;
if (OB_FAIL(sql.assign_fmt("(SELECT STATUS FROM %s WHERE table_id = %ld"
" AND tablet_id = %ld AND task_id = %ld limit 1) UNION (SELECT STATUS FROM %s WHERE"
" table_id = %ld AND tablet_id = %ld AND task_id = %ld limit 1)",
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, table_id, tablet_id.id(), task_id,
share::OB_ALL_KV_TTL_TASK_TNAME, table_id, tablet_id.id(), task_id))) {
LOG_WARN("sql assign fmt failed", K(ret));
@ -1163,20 +1177,27 @@ int ObTTLUtil::check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& p
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, query result must not be NULL", K(ret));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get next row", K(ret));
if (OB_ITER_END == ret) {
// not exist, refresh ret
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get next row", K(ret));
}
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "cnt", result_cnt, uint64_t);
int64_t temp_status = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "STATUS", temp_status, int64_t);
status = EVAL_TASK_PURE_STATUS(temp_status);
if (OB_SUCCESS == result->next()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ttl task record count", KR(ret), K(tenant_id), K(task_id), K(table_id), K(tablet_id));
}
}
}
}
if (OB_SUCC(ret)) {
if (result_cnt > 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ttl task record count", KR(ret), K(tenant_id), K(task_id), K(table_id), K(tablet_id));
} else {
is_exists = (result_cnt > 0);
}
is_exists = (status != ObTTLTaskStatus::OB_TTL_TASK_INVALID);
is_end_state = ObTTLUtil::is_ttl_task_status_end_state(status);
}
return ret;

View File

@ -272,7 +272,8 @@ public:
static int check_tenant_state(uint64_t tenant_id,
common::ObISQLClient& proxy,
const ObTTLTaskStatus local_state,
const int64_t local_task_id);
const int64_t local_task_id,
bool &tenant_state_changed);
static int insert_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
@ -316,7 +317,8 @@ public:
static int move_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy,
int64_t batch_size, int64_t &move_rows);
int64_t batch_size, int64_t &move_rows,
bool need_cancel = false);
static int move_tenant_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy);
@ -332,10 +334,12 @@ public:
static int check_is_ttl_table(const ObTableSchema &table_schema, bool &is_ttl_table);
static int get_tenant_table_ids(const uint64_t tenant_id, common::ObIArray<uint64_t> &table_id_array);
static int check_ttl_task_exists(uint64_t tenant_id, common::ObISQLClient& proxy,
const uint64_t& task_id, const uint64_t& table_id,
ObTabletID& tablet_id, bool &is_exists);
static int check_task_status_from_sys_table(uint64_t tenant_id, common::ObISQLClient& proxy,
const uint64_t& task_id, const uint64_t& table_id,
ObTabletID& tablet_id, bool &is_exists, bool &is_end_state);
static inline bool is_ttl_task_status_end_state(ObTTLTaskStatus status) {
return status == ObTTLTaskStatus::OB_TTL_TASK_CANCEL || status == ObTTLTaskStatus::OB_TTL_TASK_FINISH;
}
const static uint64_t TTL_TENNAT_TASK_TABLET_ID = -1;
const static uint64_t TTL_TENNAT_TASK_TABLE_ID = -1;
private: