diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index 4c9a0605cd..f88904a39a 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -31,6 +31,7 @@ #include "storage/tx/wrs/ob_weak_read_util.h" #include "ob_table_move_response.h" #include "ob_table_connection_mgr.h" +#include "share/table/ob_table_util.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -38,8 +39,6 @@ using namespace oceanbase::table; using namespace oceanbase::share; using namespace oceanbase::obrpc; -const ObString ObTableApiProcessorBase::OBKV_TRACE_INFO = ObString::make_string("OBKV Operation"); - int ObTableLoginP::process() { int ret = OB_SUCCESS; @@ -522,7 +521,7 @@ int ObTableApiProcessorBase::end_trans(bool is_rollback, rpc::ObRequest *req, in int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/) { - return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle, &OBKV_TRACE_INFO); + return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle, &ObTableUtils::get_kv_normal_trace_info()); } int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTxDesc *&trans_desc, @@ -581,7 +580,7 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim callback.set_tx_desc(trans_desc_); const int64_t stmt_timeout_ts = timeout_ts; // callback won't been called if any error occurred - if (OB_FAIL(txs->submit_commit_tx(*trans_desc_, stmt_timeout_ts, callback))) { + if (OB_FAIL(txs->submit_commit_tx(*trans_desc_, stmt_timeout_ts, callback, &ObTableUtils::get_kv_normal_trace_info()))) { LOG_WARN("fail end trans when session terminate", K(ret), KPC_(trans_desc), K(stmt_timeout_ts), KP(&callback)); callback.callback(ret); } diff --git a/src/observer/table/ob_table_rpc_processor.h b/src/observer/table/ob_table_rpc_processor.h index 09ba2b9144..17731e9bd5 100644 --- a/src/observer/table/ob_table_rpc_processor.h +++ b/src/observer/table/ob_table_rpc_processor.h @@ -191,7 +191,6 @@ protected: bool had_do_response_; // asynchronous transactions return packet in advance sql::TransState *trans_state_ptr_; transaction::ObTxReadSnapshot tx_snapshot_; - static const ObString OBKV_TRACE_INFO; ObAddr user_client_addr_; }; diff --git a/src/observer/table/ttl/ob_table_ttl_task.cpp b/src/observer/table/ttl/ob_table_ttl_task.cpp index cae509df3b..6bff0ab988 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.cpp +++ b/src/observer/table/ttl/ob_table_ttl_task.cpp @@ -19,6 +19,7 @@ #include "observer/table/ob_table_query_common.h" #include "observer/table/ob_table_query_and_mutate_processor.h" #include "lib/utility/utility.h" +#include "share/table/ob_table_util.h" using namespace oceanbase::sql; using namespace oceanbase::transaction; @@ -28,7 +29,6 @@ using namespace oceanbase::share; using namespace oceanbase::table; using namespace oceanbase::rootserver; -const ObString ObTableTTLDeleteTask::TTL_TRACE_INFO = ObString::make_string("TTL Delete"); /** * ---------------------------------------- ObTableTTLDeleteTask ---------------------------------------- @@ -41,6 +41,7 @@ ObTableTTLDeleteTask::ObTableTTLDeleteTask(): allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskCtx")), rowkey_(), ttl_tablet_mgr_(NULL), + hbase_cur_version_(0), rowkey_allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskRKey")) { } @@ -163,7 +164,8 @@ int ObTableTTLDeleteTask::process_one() info_.table_id_, param_, PER_TASK_DEL_ROWS, - rowkey_); + rowkey_, + hbase_cur_version_); SMART_VAR(ObTableCtx, scan_ctx, allocator_) { if (OB_FAIL(init_scan_tb_ctx(scan_ctx, cache_guard))) { LOG_WARN("fail to init tb ctx", KR(ret)); @@ -210,7 +212,7 @@ int ObTableTTLDeleteTask::process_one() if (trans_state.is_start_trans_executed() && trans_state.is_start_trans_success()) { int tmp_ret = ret; if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts(), - nullptr, &TTL_TRACE_INFO))) { + nullptr, &ObTableUtils::get_kv_ttl_trace_info()))) { LOG_WARN("fail to end trans", KR(ret)); } ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret; @@ -503,6 +505,7 @@ int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema, ObObj *obj_ptr = const_cast(ttl_operation.start_rowkey_.get_obj_ptr()); cur_rowkey_ = obj_ptr[ObHTableConstants::COL_IDX_K].get_string(); cur_qualifier_ = obj_ptr[ObHTableConstants::COL_IDX_Q].get_string(); + cur_version_ = ttl_operation.hbase_cur_version_; is_inited_ = true; } } @@ -672,6 +675,7 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro if (OB_SUCC(ret)) { rowkey_.assign(rowkey_buf, rowkey_cnt); + hbase_cur_version_ = ttl_row_iter.cur_version_; } } } @@ -679,13 +683,36 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro if (OB_SUCC(ret) && rowkey_.is_valid()) { // if ITER_END in ttl_row_iter, rowkey_ will not be assigned by last_row_ in this round - uint64_t buf_len = rowkey_.get_serialize_size(); - char *buf = static_cast(allocator_.alloc(buf_len)); - int64_t pos = 0; - if (OB_FAIL(rowkey_.serialize(buf, buf_len, pos))) { - LOG_WARN("fail to serialize", K(ret), K(buf_len), K(pos), K_(rowkey)); - } else { - result.end_rowkey_.assign_ptr(buf, buf_len); + ObRowkey saved_rowkey = rowkey_; + if (param_.is_htable_) { + // for hbase table, only k,q is saved, set t to min, cuz we do not remember version in sys table + const int hbase_rowkey_size = 3; + ObObj *hbase_rowkey_objs = nullptr; + if (rowkey_.get_obj_cnt() < hbase_rowkey_size) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(rowkey)); + } else if (OB_ISNULL(hbase_rowkey_objs = + static_cast(allocator_.alloc(sizeof(ObObj) * hbase_rowkey_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc", K(ret), K(hbase_rowkey_size)); + } else { + ObObj *raw_obj_ptr = const_cast(rowkey_.get_obj_ptr()); + hbase_rowkey_objs[ObHTableConstants::COL_IDX_K] = raw_obj_ptr[ObHTableConstants::COL_IDX_K]; + hbase_rowkey_objs[ObHTableConstants::COL_IDX_Q] = raw_obj_ptr[ObHTableConstants::COL_IDX_Q]; + hbase_rowkey_objs[ObHTableConstants::COL_IDX_T].set_min_value(); + saved_rowkey.assign(hbase_rowkey_objs, hbase_rowkey_size); + } + } + + if (OB_SUCC(ret)) { + uint64_t buf_len = saved_rowkey.get_serialize_size(); + char *buf = static_cast(allocator_.alloc(buf_len)); + int64_t pos = 0; + if (OB_FAIL(saved_rowkey.serialize(buf, buf_len, pos))) { + LOG_WARN("fail to serialize", K(ret), K(buf_len), K(pos), K_(rowkey)); + } else { + result.end_rowkey_.assign_ptr(buf, buf_len); + } } } } diff --git a/src/observer/table/ttl/ob_table_ttl_task.h b/src/observer/table/ttl/ob_table_ttl_task.h index bdd92a517d..a77e4919dc 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.h +++ b/src/observer/table/ttl/ob_table_ttl_task.h @@ -110,7 +110,6 @@ private: static const int64_t RETRY_INTERVAL = 30 * 60 * 1000 * 1000l; // 30min static const int64_t PER_TASK_DEL_ROWS = 1024l; static const int64_t ONE_TASK_TIMEOUT = 1 * 60 * 1000 * 1000l; // 1min - static const ObString TTL_TRACE_INFO; private: int process_one(); @@ -124,10 +123,10 @@ private: share::ObLSID ls_id_; ObTableEntity delete_entity_; table::ObTableApiCredential credential_; + uint64_t hbase_cur_version_; common::ObArenaAllocator rowkey_allocator_; DISALLOW_COPY_AND_ASSIGN(ObTableTTLDeleteTask); }; - class ObTableTTLDag final: public share::ObIDag { public: @@ -140,15 +139,12 @@ public: virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override; virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } - virtual bool is_ha_dag() const { return false; } - private: bool is_inited_; table::ObTTLTaskParam param_; table::ObTTLTaskInfo info_; lib::Worker::CompatMode compat_mode_; - DISALLOW_COPY_AND_ASSIGN(ObTableTTLDag); }; diff --git a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp index d2cbc9efa4..a8f59c4a2a 100644 --- a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp +++ b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.cpp @@ -90,8 +90,10 @@ int ObTenantTabletTTLMgr::switch_to_leader() } else { has_start_ = true; } - } else { - resume(); + } + if (OB_SUCC(ret)) { + ATOMIC_STORE(&is_leader_, true); + ATOMIC_STORE(&need_do_for_switch_, true); } const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_leader", KR(ret), K_(tenant_id), KPC_(ls), K(cost_us)); @@ -125,22 +127,12 @@ void ObTenantTabletTTLMgr::inner_switch_to_follower() { FLOG_INFO("tenant_tablet_ttl_mgr: begin to switch_to_follower", K_(tenant_id), KPC_(ls)); const int64_t start_time_us = ObTimeUtility::current_time(); - pause(); - ATOMIC_STORE(&need_reuse_for_switch_, true); + ATOMIC_STORE(&is_leader_, false); + ATOMIC_STORE(&need_do_for_switch_, true); const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_follower", K_(tenant_id), KPC_(ls), K(cost_us)); } -void ObTenantTabletTTLMgr::resume() -{ - is_paused_ = false; -} - -void ObTenantTabletTTLMgr::pause() -{ - is_paused_ = true; -} - int ObTenantTabletTTLMgr::start() { int ret = OB_SUCCESS; @@ -149,7 +141,7 @@ int ObTenantTabletTTLMgr::start() ret = OB_NOT_INIT; LOG_WARN("tablet ttl mgr not init", KR(ret)); } else if (OB_FAIL(TG_START(tg_id_))) { - LOG_WARN("failed to create ObTenantTabletTTLMgr thread", K(ret), K_(tg_id)); + LOG_WARN("fail to create ObTenantTabletTTLMgr thread", K(ret), K_(tg_id)); } else if (OB_FAIL(TG_SCHEDULE(tg_id_, periodic_task_, periodic_delay_, true))) { LOG_WARN("fail to schedule periodic task", KR(ret), K_(tg_id)); } else { @@ -172,9 +164,9 @@ void ObTenantTabletTTLMgr::stop() TG_STOP(tg_id_); is_timer_start_ = false; common::ObSpinLockGuard guard(lock_); - // set is_paused_ to true to ensure after stop, not new TTL dag task will be generate, + // set is_leader_ to false to ensure after stop, not new TTL dag task will be generate, // i.e., dag_ref won't increase anymore - is_paused_ = true; + ATOMIC_STORE(&is_leader_, false); } FLOG_INFO("tenant_tablet_ttl_mgr: finish to stop", K(ret), K_(is_timer_start), K_(tenant_id), KPC_(ls)); } @@ -206,7 +198,7 @@ int ObTenantTabletTTLMgr::check_and_handle_event() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tablet ttl manager not init", KR(ret)); - } else if (is_paused_) { + } else if (!is_leader_) { // do nothing, not leader } else { if (OB_FAIL(check_schema_version())) { @@ -255,7 +247,10 @@ void ObTenantTabletTTLMgr::check_ttl_tenant_state() for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin(); OB_SUCC(ret) && !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) { ctx = iter->second; - if (OB_ISNULL(ctx)) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_ISNULL(ctx)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_)); } else { @@ -368,16 +363,21 @@ int ObTenantTabletTTLMgr::generate_batch_tablet_task(ObIArray& for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin(); iter != local_tenant_task_.tablet_task_map_.end(); ++iter) { ctx = iter->second; - if (OB_ISNULL(ctx)) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_ISNULL(ctx)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("fatal err, ttl ctx in map is null", KR(ret)); } else if (OB_FAIL(handle_one_tablet_event(ctx))) { @@ -754,7 +767,7 @@ int ObTenantTabletTTLMgr::check_tenant_memory() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tablet ttl mgr not init", KR(ret)); - } else if (!is_paused_) { + } else if (is_leader_) { common::ObSpinLockGuard guard(lock_); bool last_ttl_continue = local_tenant_task_.ttl_continue_; int64_t total_memstore_used = 0; @@ -845,7 +858,10 @@ int ObTenantTabletTTLMgr::sync_all_dirty_task(ObIArray& dirty_tasks) ObTimeGuard guard("ObTenantTabletTTLMgr::sync_all_dirty_record", TTL_NORMAL_TIME_THRESHOLD); for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count() && !tenant_state_changed; i++) { // tenant_state_changed is true means that tenant status is changed, we should refresh our status first - if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) { LOG_WARN("fail to sync sys table", KR(ret)); } } @@ -1113,7 +1129,7 @@ bool ObTenantTabletTTLMgr::can_schedule_tenant(const ObTTLTenantInfo &tenant_inf bool ObTenantTabletTTLMgr::can_schedule_task(const ObTTLTaskCtx &ttl_task) { - return !is_paused_ && ttl_task.task_status_ == OB_TTL_TASK_PENDING; + return is_leader_ && ttl_task.task_status_ == OB_TTL_TASK_PENDING; } int ObTenantTabletTTLMgr::try_schedule_remaining_tasks(const ObTTLTaskCtx *current_ctx) @@ -1125,7 +1141,10 @@ int ObTenantTabletTTLMgr::try_schedule_remaining_tasks(const ObTTLTaskCtx *curre iter != local_tenant_task_.tablet_task_map_.end() && OB_SUCC(ret); ++iter) { ctx = iter->second; - if (OB_ISNULL(ctx)) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_ISNULL(ctx)) { ret = OB_ERR_NULL_VALUE; LOG_ERROR("fatal err, ttl ctx in map is null", KR(ret), K(local_tenant_task_.tenant_id_)); } else if (current_ctx == ctx) { @@ -1237,17 +1256,13 @@ int ObTenantTabletTTLMgr::refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refre int ObTenantTabletTTLMgr::reload_tenant_task() { common::ObSpinLockGuard guard(lock_); - if (ATOMIC_BCAS(&need_reuse_for_switch_, true, false)) { - local_tenant_task_.reuse(); - FLOG_INFO("resue local tenant task cuz of switch to follower"); - } int ret = OB_SUCCESS; ObTTLStatus tenant_task; ObTTLTaskStatus expected_state; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (is_paused_) { + } else if (!is_leader_) { // do nothing } else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) { if (OB_ITER_END == ret) { @@ -1279,7 +1294,7 @@ int ObTenantTabletTTLMgr::reload_tenant_task() local_tenant_task_.state_ = expected_state; local_tenant_task_.is_dirty_ = true; } else {/* task status not change, do nothing */} - FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K(is_paused_)); + FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K_(is_leader)); return ret; } diff --git a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h index 051b927268..92378e0804 100644 --- a/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h +++ b/src/observer/table/ttl/ob_tenant_tablet_ttl_mgr.h @@ -93,9 +93,9 @@ public: tg_id_(0), local_schema_version_(OB_INVALID_VERSION), has_start_(false), - is_paused_(false), + is_leader_(true), dag_ref_cnt_(0), - need_reuse_for_switch_(false) + need_do_for_switch_(true) { } @@ -157,6 +157,7 @@ public: int64_t get_dag_ref() const { return ATOMIC_LOAD(&dag_ref_cnt_); } int safe_to_destroy(bool &is_safe); int sync_all_dirty_task(common::ObIArray& dirty_tasks); + void run_task(); private: typedef common::hash::ObHashMap TabletTaskMap; typedef TabletTaskMap::iterator tablet_task_iter; @@ -254,11 +255,10 @@ private: void mark_tenant_checked(); int refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode = false); int check_schema_version(); - void resume(); - void pause(); + OB_INLINE bool need_skip_run() { return ATOMIC_LOAD(&need_do_for_switch_); } private: static const int64_t DEFAULT_TTL_BUCKET_NUM = 100; - static const int64_t TTL_PERIODIC_DELAY = 5*1000*1000; //5s + static const int64_t TTL_PERIODIC_DELAY = 10*1000*1000; //10s static const int64_t TBALE_GENERATE_BATCH_SIZE = 200; static const int64_t DEFAULT_TABLE_ARRAY_SIZE = 200; static const int64_t DEFAULT_TABLET_PAIR_SIZE = 1024; @@ -278,9 +278,10 @@ private: ObArray tablet_table_pairs_; int64_t local_schema_version_; bool has_start_; - bool is_paused_; + bool is_leader_; // current tenant ttl mgr is in leader ls or not volatile int64_t dag_ref_cnt_; // ttl dag ref count for current ls - bool need_reuse_for_switch_; + // after leader switch, need wait and reset status + bool need_do_for_switch_; }; } // end namespace table diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp index 542be32387..18c440d598 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp @@ -121,7 +121,7 @@ int ObTTLTaskScheduler::reload_tenant_task() LOG_WARN("ttl tenant task mgr not init", KR(ret)); } else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) { // do nothing - } else if (need_reload_) { + } else if (ATOMIC_BCAS(&need_reload_, true, false)) { lib::ObMutexGuard guard(mutex_); SMART_VAR(ObMySQLProxy::MySQLResult, res) { ObTTLStatusField table_id_field; @@ -162,8 +162,11 @@ int ObTTLTaskScheduler::reload_tenant_task() } if (OB_SUCC(ret)) { - set_need_reload(false); FLOG_INFO("reload tenant task", K_(tenant_task)); + } else { + ret = OB_EAGAIN; + ATOMIC_STORE(&need_reload_, true); + LOG_WARN("fail to reload tenant task", KR(ret), K_(tenant_task)); } } return ret; @@ -401,6 +404,9 @@ int ObTTLTaskScheduler::try_add_periodic_task() } else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) { ret = OB_NOT_SUPPORTED; LOG_WARN("cann't process ttl task, maybe tenant is restoring", K_(tenant_id), KR(ret)); + } else if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); } else if (OB_FAIL(in_active_time(is_active_time))) { LOG_WARN("fail to check is in active time", KR(ret)); } else if (is_active_time) { @@ -437,7 +443,10 @@ int ObTTLTaskScheduler::check_all_tablet_task() } else if (need_move) { { lib::ObMutexGuard guard(mutex_); - if (ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE != tenant_task_.ttl_status_.status_) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE != tenant_task_.ttl_status_.status_) { tenant_task_.ttl_status_.status_ = static_cast(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE); ObMySQLTransaction trans; if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { @@ -536,12 +545,14 @@ void ObTTLTaskScheduler::reset_local_tenant_task() void ObTTLTaskScheduler::resume() { - is_paused_ = false; + ATOMIC_STORE(&is_leader_, true); + ATOMIC_STORE(&need_do_for_switch_, true); } void ObTTLTaskScheduler::pause() { - is_paused_ = true; + ATOMIC_STORE(&is_leader_, false); + ATOMIC_STORE(&need_do_for_switch_, true); } int ObTenantTTLManager::init(const uint64_t tenant_id, ObMySQLProxy &sql_proxy) @@ -608,15 +619,22 @@ void ObTenantTTLManager::destroy() void ObTTLTaskScheduler::runTimerTask() { + DEBUG_SYNC(BEFORE_TTL_SCHEDULER_RUN); int ret = OB_SUCCESS; ObCurTraceId::init(GCONF.self_addr_); + if (!ObKVFeatureModeUitl::is_ttl_enable()) { // do nothing LOG_DEBUG("ttl is disable"); } else if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ttl task mgr not init", KR(ret)); - } else if (is_paused_) { + } else if (ATOMIC_BCAS(&need_do_for_switch_, true, false)) { + // need skip this round for waiting follower finish executing task + if (is_leader_) { + FLOG_INFO("need wait for switch leader, skip schedule once", K_(need_do_for_switch)); + } + } else if (!is_leader_) { // timer paused, do nothing } else if (OB_FAIL(reload_tenant_task())) { LOG_WARN("fail to process tenant task", KR(ret), K_(tenant_id)); @@ -652,7 +670,10 @@ int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel) int64_t one_move_rows = TBALET_CHECK_BATCH_SIZE; while (OB_SUCC(ret) && one_move_rows == TBALET_CHECK_BATCH_SIZE) { ObMySQLTransaction trans; - if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { LOG_WARN("fail start transaction", KR(ret), K_(tenant_id)); } else if (OB_FAIL(ObTTLUtil::move_task_to_history_table(tenant_id_, tenant_task_.ttl_status_.task_id_, trans, TBALET_CHECK_BATCH_SIZE, one_move_rows, @@ -671,7 +692,10 @@ int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel) if (OB_SUCC(ret)) { ObMySQLTransaction trans; - if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { + if (need_skip_run()) { + ret = OB_EAGAIN; + FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch)); + } else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) { LOG_WARN("fail start transaction", KR(ret), K_(tenant_id)); } else if (OB_FAIL(update_task_status(tenant_task_.ttl_status_.task_id_, OB_TTL_TASK_FINISH, trans))) { LOG_WARN("fail to update task status", KR(ret)); @@ -695,7 +719,8 @@ void ObTenantTTLManager::resume() { clear_ttl_history_task_.resume(); task_scheduler_.resume(); - task_scheduler_.set_need_reload(true);} + task_scheduler_.set_need_reload(true); +} void ObTenantTTLManager::pause() { @@ -718,6 +743,7 @@ int ObTTLTaskScheduler::check_task_need_move(bool &need_move) int ObTTLTaskScheduler::check_all_tablet_finished(bool &all_finished) { + DEBUG_SYNC(BEFORE_CHECK_TTL_TASK_FINISH); int ret = OB_SUCCESS; all_finished = true; ObSEArray table_id_array; diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.h b/src/observer/table/ttl/ob_tenant_ttl_manager.h index f095b602a9..77c0691bf6 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.h +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.h @@ -95,7 +95,7 @@ class ObTTLTaskScheduler : public common::ObTimerTask public: ObTTLTaskScheduler() : del_ten_arr_(), sql_proxy_(nullptr), is_inited_(false), periodic_launched_(false), - need_reload_(true), is_paused_(false) + need_reload_(true), is_leader_(true), need_do_for_switch_(true) {} ~ObTTLTaskScheduler() {} @@ -111,7 +111,7 @@ public: void runTimerTask() override; int try_add_periodic_task(); - void set_need_reload(bool need_reload) { need_reload_ = need_reload; } + void set_need_reload(bool need_reload) { ATOMIC_STORE(&need_reload_, need_reload); } void pause(); void resume(); @@ -149,6 +149,7 @@ private: int check_all_tablet_finished(bool &all_finished); int check_tablet_table_finished(common::ObIArray &pairs, bool &all_finished); int move_all_task_to_history_table(bool need_cancel); + OB_INLINE bool need_skip_run() { return ATOMIC_LOAD(&need_do_for_switch_); } private: static const int64_t TBALE_CHECK_BATCH_SIZE = 200; static const int64_t TBALET_CHECK_BATCH_SIZE = 1024; @@ -166,8 +167,9 @@ private: bool need_reload_; lib::ObMutex mutex_; ObArray tablet_table_pairs_; - bool is_paused_; + bool is_leader_; // current ttl manager in ls leader or not const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s + bool need_do_for_switch_; // need wait follower finish after switch leader }; class ObTenantTTLManager diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index 8f0026ea0a..6d4490a2d4 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -277,6 +277,7 @@ ob_set_subtarget(ob_share common_mixed restore/ob_tenant_clone_table_operator.cpp index_usage/ob_index_usage_info_mgr.cpp index_usage/ob_index_usage_report_task.cpp + table/ob_table_util.cpp ) ob_set_subtarget(ob_share tablet diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index a20ffbf5b2..e23430b649 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -572,6 +572,8 @@ class ObString; ACT(BEFORE_CREATE_CLONE_TENANT_END,)\ ACT(BEFORE_CALC_CONSISTENT_SCN,)\ ACT(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK,)\ + ACT(BEFORE_CHECK_TTL_TASK_FINISH,)\ + ACT(BEFORE_TTL_SCHEDULER_RUN,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index 917375a5e2..5226354250 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -339,10 +339,10 @@ class ObTableTTLOperation { public: ObTableTTLOperation(uint64_t tenant_id, uint64_t table_id, const ObTTLTaskParam ¶, - uint64_t del_row_limit, ObRowkey start_rowkey) + uint64_t del_row_limit, ObRowkey start_rowkey, uint64_t hbase_cur_version) : tenant_id_(tenant_id), table_id_(table_id), max_version_(para.max_version_), time_to_live_(para.ttl_), is_htable_(para.is_htable_), del_row_limit_(del_row_limit), - start_rowkey_(start_rowkey) + start_rowkey_(start_rowkey), hbase_cur_version_(hbase_cur_version) {} ~ObTableTTLOperation() {} @@ -360,6 +360,7 @@ public: bool is_htable_; uint64_t del_row_limit_; ObRowkey start_rowkey_; + uint64_t hbase_cur_version_; }; /// common result for ObTable diff --git a/src/share/table/ob_table_util.cpp b/src/share/table/ob_table_util.cpp new file mode 100644 index 0000000000..b0a12846f1 --- /dev/null +++ b/src/share/table/ob_table_util.cpp @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_table_util.h" + +using namespace oceanbase::common; +namespace oceanbase +{ +namespace table +{ + const ObString ObTableUtils::KV_NORMAL_TRACE_INFO = ObString::make_string("OBKV Operation"); + const ObString ObTableUtils::KV_TTL_TRACE_INFO = ObString::make_string("TTL Delete"); + + bool ObTableUtils::is_kv_trace_info(const ObString &trace_info) + { + return (trace_info.compare(KV_NORMAL_TRACE_INFO) == 0 || trace_info.compare(KV_TTL_TRACE_INFO) == 0); + } +} +} diff --git a/src/share/table/ob_table_util.h b/src/share/table/ob_table_util.h new file mode 100644 index 0000000000..c4a4ee23cb --- /dev/null +++ b/src/share/table/ob_table_util.h @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_ +#define OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_ + +#include "lib/string/ob_string.h" + +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace table +{ + +class ObTableUtils +{ +public: + static const ObString &get_kv_normal_trace_info() { return KV_NORMAL_TRACE_INFO; } + static const ObString &get_kv_ttl_trace_info() { return KV_TTL_TRACE_INFO; } + static bool is_kv_trace_info(const ObString &trace_info); +private: + static const ObString KV_NORMAL_TRACE_INFO; + static const ObString KV_TTL_TRACE_INFO; +}; + +} // namespace table +} // namespace oceanbase + +#endif /* OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_ */ \ No newline at end of file diff --git a/src/share/table/ob_ttl_util.cpp b/src/share/table/ob_ttl_util.cpp index c0abac2208..7ad6d1ad89 100644 --- a/src/share/table/ob_ttl_util.cpp +++ b/src/share/table/ob_ttl_util.cpp @@ -831,10 +831,10 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f left = left.trim(); // example: "INTERVAL 40 MINUTE" left += strlen("INTERVAL"); - // example: "40 MINUTE" left = left.trim(); + // example: "40 MINUTE" ObString interval_str = left.split_on(' '); - ObString time_unit_str = left; + ObString time_unit_str = left.trim(); ttl_expr.column_name_ = column_str; ttl_expr.interval_ = atol(interval_str.ptr()); @@ -858,37 +858,38 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f // 2. get delta second and month int64_t nsecond = 0; int64_t nmonth = 0; - switch (ttl_expr.time_unit_) { - case ObTableTTLTimeUnit::SECOND: { - nsecond = ttl_expr.interval_; - break; + if (OB_SUCC(ret)) { + switch (ttl_expr.time_unit_) { + case ObTableTTLTimeUnit::SECOND: { + nsecond = ttl_expr.interval_; + break; + } + case ObTableTTLTimeUnit::MINUTE: { + nsecond = ttl_expr.interval_ * 60; + break; + } + case ObTableTTLTimeUnit::HOUR: { + nsecond = ttl_expr.interval_ * 60 * 60; + break; + } + case ObTableTTLTimeUnit::DAY: { + nsecond = ttl_expr.interval_ * 60 * 60 * 24; + break; + } + case ObTableTTLTimeUnit::MONTH: { + nmonth = ttl_expr.interval_; + break; + } + case ObTableTTLTimeUnit::YEAR: { + nmonth = ttl_expr.interval_ * 12; + break; + } + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected time unit", K(ret), K_(ttl_expr.time_unit)); } - case ObTableTTLTimeUnit::MINUTE: { - nsecond = ttl_expr.interval_ * 60; - break; - } - case ObTableTTLTimeUnit::HOUR: { - nsecond = ttl_expr.interval_ * 60 * 60; - break; - } - case ObTableTTLTimeUnit::DAY: { - nsecond = ttl_expr.interval_ * 60 * 60 * 24; - break; - } - case ObTableTTLTimeUnit::MONTH: { - nmonth = ttl_expr.interval_; - break; - } - case ObTableTTLTimeUnit::YEAR: { - nmonth = ttl_expr.interval_ * 12; - break; - } - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected time unit", K(ret), K_(ttl_expr.time_unit)); } - if (OB_SUCC(ret)) { ttl_expr.nsecond_ = nsecond; ttl_expr.nmonth_ = nmonth;