[CP] [OBKV] Fix TTL status wrong when leader switch and hbase TTL MaxVersions delete
This commit is contained in:
committed by
ob-robot
parent
8c4838638d
commit
043cf555ed
@ -31,6 +31,7 @@
|
|||||||
#include "storage/tx/wrs/ob_weak_read_util.h"
|
#include "storage/tx/wrs/ob_weak_read_util.h"
|
||||||
#include "ob_table_move_response.h"
|
#include "ob_table_move_response.h"
|
||||||
#include "ob_table_connection_mgr.h"
|
#include "ob_table_connection_mgr.h"
|
||||||
|
#include "share/table/ob_table_util.h"
|
||||||
|
|
||||||
using namespace oceanbase::observer;
|
using namespace oceanbase::observer;
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -38,8 +39,6 @@ using namespace oceanbase::table;
|
|||||||
using namespace oceanbase::share;
|
using namespace oceanbase::share;
|
||||||
using namespace oceanbase::obrpc;
|
using namespace oceanbase::obrpc;
|
||||||
|
|
||||||
const ObString ObTableApiProcessorBase::OBKV_TRACE_INFO = ObString::make_string("OBKV Operation");
|
|
||||||
|
|
||||||
int ObTableLoginP::process()
|
int ObTableLoginP::process()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -522,7 +521,7 @@ int ObTableApiProcessorBase::end_trans(bool is_rollback, rpc::ObRequest *req, in
|
|||||||
|
|
||||||
int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/)
|
int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/)
|
||||||
{
|
{
|
||||||
return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle, &OBKV_TRACE_INFO);
|
return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle, &ObTableUtils::get_kv_normal_trace_info());
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTxDesc *&trans_desc,
|
int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTxDesc *&trans_desc,
|
||||||
@ -581,7 +580,7 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim
|
|||||||
callback.set_tx_desc(trans_desc_);
|
callback.set_tx_desc(trans_desc_);
|
||||||
const int64_t stmt_timeout_ts = timeout_ts;
|
const int64_t stmt_timeout_ts = timeout_ts;
|
||||||
// callback won't been called if any error occurred
|
// callback won't been called if any error occurred
|
||||||
if (OB_FAIL(txs->submit_commit_tx(*trans_desc_, stmt_timeout_ts, callback))) {
|
if (OB_FAIL(txs->submit_commit_tx(*trans_desc_, stmt_timeout_ts, callback, &ObTableUtils::get_kv_normal_trace_info()))) {
|
||||||
LOG_WARN("fail end trans when session terminate", K(ret), KPC_(trans_desc), K(stmt_timeout_ts), KP(&callback));
|
LOG_WARN("fail end trans when session terminate", K(ret), KPC_(trans_desc), K(stmt_timeout_ts), KP(&callback));
|
||||||
callback.callback(ret);
|
callback.callback(ret);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -191,7 +191,6 @@ protected:
|
|||||||
bool had_do_response_; // asynchronous transactions return packet in advance
|
bool had_do_response_; // asynchronous transactions return packet in advance
|
||||||
sql::TransState *trans_state_ptr_;
|
sql::TransState *trans_state_ptr_;
|
||||||
transaction::ObTxReadSnapshot tx_snapshot_;
|
transaction::ObTxReadSnapshot tx_snapshot_;
|
||||||
static const ObString OBKV_TRACE_INFO;
|
|
||||||
ObAddr user_client_addr_;
|
ObAddr user_client_addr_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -19,6 +19,7 @@
|
|||||||
#include "observer/table/ob_table_query_common.h"
|
#include "observer/table/ob_table_query_common.h"
|
||||||
#include "observer/table/ob_table_query_and_mutate_processor.h"
|
#include "observer/table/ob_table_query_and_mutate_processor.h"
|
||||||
#include "lib/utility/utility.h"
|
#include "lib/utility/utility.h"
|
||||||
|
#include "share/table/ob_table_util.h"
|
||||||
|
|
||||||
using namespace oceanbase::sql;
|
using namespace oceanbase::sql;
|
||||||
using namespace oceanbase::transaction;
|
using namespace oceanbase::transaction;
|
||||||
@ -28,7 +29,6 @@ using namespace oceanbase::share;
|
|||||||
using namespace oceanbase::table;
|
using namespace oceanbase::table;
|
||||||
using namespace oceanbase::rootserver;
|
using namespace oceanbase::rootserver;
|
||||||
|
|
||||||
const ObString ObTableTTLDeleteTask::TTL_TRACE_INFO = ObString::make_string("TTL Delete");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ---------------------------------------- ObTableTTLDeleteTask ----------------------------------------
|
* ---------------------------------------- ObTableTTLDeleteTask ----------------------------------------
|
||||||
@ -41,6 +41,7 @@ ObTableTTLDeleteTask::ObTableTTLDeleteTask():
|
|||||||
allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskCtx")),
|
allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskCtx")),
|
||||||
rowkey_(),
|
rowkey_(),
|
||||||
ttl_tablet_mgr_(NULL),
|
ttl_tablet_mgr_(NULL),
|
||||||
|
hbase_cur_version_(0),
|
||||||
rowkey_allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskRKey"))
|
rowkey_allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskRKey"))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -163,7 +164,8 @@ int ObTableTTLDeleteTask::process_one()
|
|||||||
info_.table_id_,
|
info_.table_id_,
|
||||||
param_,
|
param_,
|
||||||
PER_TASK_DEL_ROWS,
|
PER_TASK_DEL_ROWS,
|
||||||
rowkey_);
|
rowkey_,
|
||||||
|
hbase_cur_version_);
|
||||||
SMART_VAR(ObTableCtx, scan_ctx, allocator_) {
|
SMART_VAR(ObTableCtx, scan_ctx, allocator_) {
|
||||||
if (OB_FAIL(init_scan_tb_ctx(scan_ctx, cache_guard))) {
|
if (OB_FAIL(init_scan_tb_ctx(scan_ctx, cache_guard))) {
|
||||||
LOG_WARN("fail to init tb ctx", KR(ret));
|
LOG_WARN("fail to init tb ctx", KR(ret));
|
||||||
@ -210,7 +212,7 @@ int ObTableTTLDeleteTask::process_one()
|
|||||||
if (trans_state.is_start_trans_executed() && trans_state.is_start_trans_success()) {
|
if (trans_state.is_start_trans_executed() && trans_state.is_start_trans_success()) {
|
||||||
int tmp_ret = ret;
|
int tmp_ret = ret;
|
||||||
if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts(),
|
if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts(),
|
||||||
nullptr, &TTL_TRACE_INFO))) {
|
nullptr, &ObTableUtils::get_kv_ttl_trace_info()))) {
|
||||||
LOG_WARN("fail to end trans", KR(ret));
|
LOG_WARN("fail to end trans", KR(ret));
|
||||||
}
|
}
|
||||||
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
|
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
|
||||||
@ -503,6 +505,7 @@ int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema,
|
|||||||
ObObj *obj_ptr = const_cast<ObObj *>(ttl_operation.start_rowkey_.get_obj_ptr());
|
ObObj *obj_ptr = const_cast<ObObj *>(ttl_operation.start_rowkey_.get_obj_ptr());
|
||||||
cur_rowkey_ = obj_ptr[ObHTableConstants::COL_IDX_K].get_string();
|
cur_rowkey_ = obj_ptr[ObHTableConstants::COL_IDX_K].get_string();
|
||||||
cur_qualifier_ = obj_ptr[ObHTableConstants::COL_IDX_Q].get_string();
|
cur_qualifier_ = obj_ptr[ObHTableConstants::COL_IDX_Q].get_string();
|
||||||
|
cur_version_ = ttl_operation.hbase_cur_version_;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -672,6 +675,7 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
|||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
rowkey_.assign(rowkey_buf, rowkey_cnt);
|
rowkey_.assign(rowkey_buf, rowkey_cnt);
|
||||||
|
hbase_cur_version_ = ttl_row_iter.cur_version_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -679,13 +683,36 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
|||||||
|
|
||||||
if (OB_SUCC(ret) && rowkey_.is_valid()) {
|
if (OB_SUCC(ret) && rowkey_.is_valid()) {
|
||||||
// if ITER_END in ttl_row_iter, rowkey_ will not be assigned by last_row_ in this round
|
// if ITER_END in ttl_row_iter, rowkey_ will not be assigned by last_row_ in this round
|
||||||
uint64_t buf_len = rowkey_.get_serialize_size();
|
ObRowkey saved_rowkey = rowkey_;
|
||||||
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
|
if (param_.is_htable_) {
|
||||||
int64_t pos = 0;
|
// for hbase table, only k,q is saved, set t to min, cuz we do not remember version in sys table
|
||||||
if (OB_FAIL(rowkey_.serialize(buf, buf_len, pos))) {
|
const int hbase_rowkey_size = 3;
|
||||||
LOG_WARN("fail to serialize", K(ret), K(buf_len), K(pos), K_(rowkey));
|
ObObj *hbase_rowkey_objs = nullptr;
|
||||||
} else {
|
if (rowkey_.get_obj_cnt() < hbase_rowkey_size) {
|
||||||
result.end_rowkey_.assign_ptr(buf, buf_len);
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K_(rowkey));
|
||||||
|
} else if (OB_ISNULL(hbase_rowkey_objs =
|
||||||
|
static_cast<ObObj*>(allocator_.alloc(sizeof(ObObj) * hbase_rowkey_size)))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("fail to alloc", K(ret), K(hbase_rowkey_size));
|
||||||
|
} else {
|
||||||
|
ObObj *raw_obj_ptr = const_cast<ObObj *>(rowkey_.get_obj_ptr());
|
||||||
|
hbase_rowkey_objs[ObHTableConstants::COL_IDX_K] = raw_obj_ptr[ObHTableConstants::COL_IDX_K];
|
||||||
|
hbase_rowkey_objs[ObHTableConstants::COL_IDX_Q] = raw_obj_ptr[ObHTableConstants::COL_IDX_Q];
|
||||||
|
hbase_rowkey_objs[ObHTableConstants::COL_IDX_T].set_min_value();
|
||||||
|
saved_rowkey.assign(hbase_rowkey_objs, hbase_rowkey_size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
uint64_t buf_len = saved_rowkey.get_serialize_size();
|
||||||
|
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
|
||||||
|
int64_t pos = 0;
|
||||||
|
if (OB_FAIL(saved_rowkey.serialize(buf, buf_len, pos))) {
|
||||||
|
LOG_WARN("fail to serialize", K(ret), K(buf_len), K(pos), K_(rowkey));
|
||||||
|
} else {
|
||||||
|
result.end_rowkey_.assign_ptr(buf, buf_len);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -110,7 +110,6 @@ private:
|
|||||||
static const int64_t RETRY_INTERVAL = 30 * 60 * 1000 * 1000l; // 30min
|
static const int64_t RETRY_INTERVAL = 30 * 60 * 1000 * 1000l; // 30min
|
||||||
static const int64_t PER_TASK_DEL_ROWS = 1024l;
|
static const int64_t PER_TASK_DEL_ROWS = 1024l;
|
||||||
static const int64_t ONE_TASK_TIMEOUT = 1 * 60 * 1000 * 1000l; // 1min
|
static const int64_t ONE_TASK_TIMEOUT = 1 * 60 * 1000 * 1000l; // 1min
|
||||||
static const ObString TTL_TRACE_INFO;
|
|
||||||
private:
|
private:
|
||||||
int process_one();
|
int process_one();
|
||||||
|
|
||||||
@ -124,10 +123,10 @@ private:
|
|||||||
share::ObLSID ls_id_;
|
share::ObLSID ls_id_;
|
||||||
ObTableEntity delete_entity_;
|
ObTableEntity delete_entity_;
|
||||||
table::ObTableApiCredential credential_;
|
table::ObTableApiCredential credential_;
|
||||||
|
uint64_t hbase_cur_version_;
|
||||||
common::ObArenaAllocator rowkey_allocator_;
|
common::ObArenaAllocator rowkey_allocator_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTableTTLDeleteTask);
|
DISALLOW_COPY_AND_ASSIGN(ObTableTTLDeleteTask);
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObTableTTLDag final: public share::ObIDag
|
class ObTableTTLDag final: public share::ObIDag
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -140,15 +139,12 @@ public:
|
|||||||
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
|
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
|
||||||
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
|
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
|
||||||
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
|
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
|
||||||
|
|
||||||
virtual bool is_ha_dag() const { return false; }
|
virtual bool is_ha_dag() const { return false; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
table::ObTTLTaskParam param_;
|
table::ObTTLTaskParam param_;
|
||||||
table::ObTTLTaskInfo info_;
|
table::ObTTLTaskInfo info_;
|
||||||
lib::Worker::CompatMode compat_mode_;
|
lib::Worker::CompatMode compat_mode_;
|
||||||
|
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTableTTLDag);
|
DISALLOW_COPY_AND_ASSIGN(ObTableTTLDag);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -90,8 +90,10 @@ int ObTenantTabletTTLMgr::switch_to_leader()
|
|||||||
} else {
|
} else {
|
||||||
has_start_ = true;
|
has_start_ = true;
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
resume();
|
if (OB_SUCC(ret)) {
|
||||||
|
ATOMIC_STORE(&is_leader_, true);
|
||||||
|
ATOMIC_STORE(&need_do_for_switch_, true);
|
||||||
}
|
}
|
||||||
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
|
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
|
||||||
FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_leader", KR(ret), K_(tenant_id), KPC_(ls), K(cost_us));
|
FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_leader", KR(ret), K_(tenant_id), KPC_(ls), K(cost_us));
|
||||||
@ -125,22 +127,12 @@ void ObTenantTabletTTLMgr::inner_switch_to_follower()
|
|||||||
{
|
{
|
||||||
FLOG_INFO("tenant_tablet_ttl_mgr: begin to switch_to_follower", K_(tenant_id), KPC_(ls));
|
FLOG_INFO("tenant_tablet_ttl_mgr: begin to switch_to_follower", K_(tenant_id), KPC_(ls));
|
||||||
const int64_t start_time_us = ObTimeUtility::current_time();
|
const int64_t start_time_us = ObTimeUtility::current_time();
|
||||||
pause();
|
ATOMIC_STORE(&is_leader_, false);
|
||||||
ATOMIC_STORE(&need_reuse_for_switch_, true);
|
ATOMIC_STORE(&need_do_for_switch_, true);
|
||||||
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
|
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
|
||||||
FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_follower", K_(tenant_id), KPC_(ls), K(cost_us));
|
FLOG_INFO("tenant_tablet_ttl_mgr: finish to switch_to_follower", K_(tenant_id), KPC_(ls), K(cost_us));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTenantTabletTTLMgr::resume()
|
|
||||||
{
|
|
||||||
is_paused_ = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObTenantTabletTTLMgr::pause()
|
|
||||||
{
|
|
||||||
is_paused_ = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTenantTabletTTLMgr::start()
|
int ObTenantTabletTTLMgr::start()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -149,7 +141,7 @@ int ObTenantTabletTTLMgr::start()
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("tablet ttl mgr not init", KR(ret));
|
LOG_WARN("tablet ttl mgr not init", KR(ret));
|
||||||
} else if (OB_FAIL(TG_START(tg_id_))) {
|
} else if (OB_FAIL(TG_START(tg_id_))) {
|
||||||
LOG_WARN("failed to create ObTenantTabletTTLMgr thread", K(ret), K_(tg_id));
|
LOG_WARN("fail to create ObTenantTabletTTLMgr thread", K(ret), K_(tg_id));
|
||||||
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, periodic_task_, periodic_delay_, true))) {
|
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, periodic_task_, periodic_delay_, true))) {
|
||||||
LOG_WARN("fail to schedule periodic task", KR(ret), K_(tg_id));
|
LOG_WARN("fail to schedule periodic task", KR(ret), K_(tg_id));
|
||||||
} else {
|
} else {
|
||||||
@ -172,9 +164,9 @@ void ObTenantTabletTTLMgr::stop()
|
|||||||
TG_STOP(tg_id_);
|
TG_STOP(tg_id_);
|
||||||
is_timer_start_ = false;
|
is_timer_start_ = false;
|
||||||
common::ObSpinLockGuard guard(lock_);
|
common::ObSpinLockGuard guard(lock_);
|
||||||
// set is_paused_ to true to ensure after stop, not new TTL dag task will be generate,
|
// set is_leader_ to false to ensure after stop, not new TTL dag task will be generate,
|
||||||
// i.e., dag_ref won't increase anymore
|
// i.e., dag_ref won't increase anymore
|
||||||
is_paused_ = true;
|
ATOMIC_STORE(&is_leader_, false);
|
||||||
}
|
}
|
||||||
FLOG_INFO("tenant_tablet_ttl_mgr: finish to stop", K(ret), K_(is_timer_start), K_(tenant_id), KPC_(ls));
|
FLOG_INFO("tenant_tablet_ttl_mgr: finish to stop", K(ret), K_(is_timer_start), K_(tenant_id), KPC_(ls));
|
||||||
}
|
}
|
||||||
@ -206,7 +198,7 @@ int ObTenantTabletTTLMgr::check_and_handle_event()
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("tablet ttl manager not init", KR(ret));
|
LOG_WARN("tablet ttl manager not init", KR(ret));
|
||||||
} else if (is_paused_) {
|
} else if (!is_leader_) {
|
||||||
// do nothing, not leader
|
// do nothing, not leader
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(check_schema_version())) {
|
if (OB_FAIL(check_schema_version())) {
|
||||||
@ -255,7 +247,10 @@ void ObTenantTabletTTLMgr::check_ttl_tenant_state()
|
|||||||
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
|
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
|
||||||
OB_SUCC(ret) && !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
|
OB_SUCC(ret) && !tenant_dirty && iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
|
||||||
ctx = iter->second;
|
ctx = iter->second;
|
||||||
if (OB_ISNULL(ctx)) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_ISNULL(ctx)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_));
|
LOG_WARN("fatal err, ttl ctx in map is null", K(local_tenant_task_.tenant_id_));
|
||||||
} else {
|
} else {
|
||||||
@ -368,16 +363,21 @@ int ObTenantTabletTTLMgr::generate_batch_tablet_task(ObIArray<share::ObTabletTab
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_pairs.count(); i++) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_pairs.count(); i++) {
|
||||||
ObTTLTaskInfo task_info;
|
if (need_skip_run()) {
|
||||||
uint64_t table_id = tablet_pairs.at(i).get_table_id();
|
ret = OB_EAGAIN;
|
||||||
task_info.tablet_id_ = tablet_pairs.at(i).get_tablet_id();
|
FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
task_info.tenant_id_ = tenant_id_;
|
} else {
|
||||||
task_info.table_id_ = table_id;
|
ObTTLTaskInfo task_info;
|
||||||
ObTTLTaskParam param;
|
uint64_t table_id = tablet_pairs.at(i).get_table_id();
|
||||||
if (OB_FAIL(param_map.get_refactored(table_id, param))) {
|
task_info.tablet_id_ = tablet_pairs.at(i).get_tablet_id();
|
||||||
LOG_WARN("fail to get ttl param", KR(ret), K(table_id));
|
task_info.tenant_id_ = tenant_id_;
|
||||||
} else if (OB_FAIL(generate_one_tablet_task(task_info, param))) {
|
task_info.table_id_ = table_id;
|
||||||
LOG_WARN("fail to generate task", KR(ret), K(task_info), K(param));
|
ObTTLTaskParam param;
|
||||||
|
if (OB_FAIL(param_map.get_refactored(table_id, param))) {
|
||||||
|
LOG_WARN("fail to get ttl param", KR(ret), K(table_id));
|
||||||
|
} else if (OB_FAIL(generate_one_tablet_task(task_info, param))) {
|
||||||
|
LOG_WARN("fail to generate task", KR(ret), K(task_info), K(param));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -534,26 +534,36 @@ int ObTenantTabletTTLMgr::check_and_generate_tablet_tasks()
|
|||||||
|
|
||||||
void OBTTLTimerPeriodicTask::runTimerTask()
|
void OBTTLTimerPeriodicTask::runTimerTask()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
ObCurTraceId::init(GCONF.self_addr_);
|
ObCurTraceId::init(GCONF.self_addr_);
|
||||||
ObTimeGuard guard("OBTTLTimerPeriodicTask::runTimerTask", TTL_TIME_TASKER_THRESHOLD);
|
ObTimeGuard guard("OBTTLTimerPeriodicTask::runTimerTask", TTL_TIME_TASKER_THRESHOLD);
|
||||||
|
tablet_ttl_mgr_.run_task();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObTenantTabletTTLMgr::run_task()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
if (!ObKVFeatureModeUitl::is_ttl_enable()) {
|
if (!ObKVFeatureModeUitl::is_ttl_enable()) {
|
||||||
// do nothing
|
// do nothing
|
||||||
LOG_DEBUG("ttl is disable");
|
LOG_DEBUG("ttl is disable");
|
||||||
|
} else if (ATOMIC_BCAS(&need_do_for_switch_, true, false)) {
|
||||||
|
// reuse and skip task once
|
||||||
|
common::ObSpinLockGuard guard(lock_); // need lock for reuse tenant task
|
||||||
|
local_tenant_task_.reuse();
|
||||||
|
FLOG_INFO("resue local tenant task cuz of switch leader");
|
||||||
} else if (common::ObTTLUtil::check_can_do_work()) {
|
} else if (common::ObTTLUtil::check_can_do_work()) {
|
||||||
if (OB_FAIL(tablet_ttl_mgr_.check_tenant_memory())) {
|
if (OB_FAIL(check_tenant_memory())) {
|
||||||
LOG_WARN("fail to check all tenant memory", KR(ret));
|
LOG_WARN("fail to check all tenant memory", KR(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
// explicit cover error code
|
// explicit cover error code
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(tablet_ttl_mgr_.reload_tenant_task())) {
|
if (OB_FAIL(reload_tenant_task())) {
|
||||||
LOG_WARN("fail to reload tenant task", KR(ret));
|
LOG_WARN("fail to reload tenant task", KR(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
// explicit cover error code
|
// explicit cover error code
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(tablet_ttl_mgr_.check_and_handle_event())) {
|
if (OB_FAIL(check_and_handle_event())) {
|
||||||
LOG_WARN("fail to scan and handle all tenant event", KR(ret));
|
LOG_WARN("fail to scan and handle all tenant event", KR(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -666,7 +676,10 @@ int ObTenantTabletTTLMgr::handle_all_tablet_event(common::ObSArray<ObTabletID>&
|
|||||||
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
|
for (tablet_task_iter iter = local_tenant_task_.tablet_task_map_.begin();
|
||||||
iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
|
iter != local_tenant_task_.tablet_task_map_.end(); ++iter) {
|
||||||
ctx = iter->second;
|
ctx = iter->second;
|
||||||
if (OB_ISNULL(ctx)) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_ISNULL(ctx)) {
|
||||||
ret = OB_ERR_NULL_VALUE;
|
ret = OB_ERR_NULL_VALUE;
|
||||||
LOG_WARN("fatal err, ttl ctx in map is null", KR(ret));
|
LOG_WARN("fatal err, ttl ctx in map is null", KR(ret));
|
||||||
} else if (OB_FAIL(handle_one_tablet_event(ctx))) {
|
} else if (OB_FAIL(handle_one_tablet_event(ctx))) {
|
||||||
@ -754,7 +767,7 @@ int ObTenantTabletTTLMgr::check_tenant_memory()
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("tablet ttl mgr not init", KR(ret));
|
LOG_WARN("tablet ttl mgr not init", KR(ret));
|
||||||
} else if (!is_paused_) {
|
} else if (is_leader_) {
|
||||||
common::ObSpinLockGuard guard(lock_);
|
common::ObSpinLockGuard guard(lock_);
|
||||||
bool last_ttl_continue = local_tenant_task_.ttl_continue_;
|
bool last_ttl_continue = local_tenant_task_.ttl_continue_;
|
||||||
int64_t total_memstore_used = 0;
|
int64_t total_memstore_used = 0;
|
||||||
@ -845,7 +858,10 @@ int ObTenantTabletTTLMgr::sync_all_dirty_task(ObIArray<ObTabletID>& dirty_tasks)
|
|||||||
ObTimeGuard guard("ObTenantTabletTTLMgr::sync_all_dirty_record", TTL_NORMAL_TIME_THRESHOLD);
|
ObTimeGuard guard("ObTenantTabletTTLMgr::sync_all_dirty_record", TTL_NORMAL_TIME_THRESHOLD);
|
||||||
for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count() && !tenant_state_changed; i++) {
|
for (int i = 0; OB_SUCC(ret) && i < dirty_tasks.count() && !tenant_state_changed; i++) {
|
||||||
// tenant_state_changed is true means that tenant status is changed, we should refresh our status first
|
// tenant_state_changed is true means that tenant status is changed, we should refresh our status first
|
||||||
if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_FAIL(sync_sys_table(dirty_tasks.at(i), tenant_state_changed))) {
|
||||||
LOG_WARN("fail to sync sys table", KR(ret));
|
LOG_WARN("fail to sync sys table", KR(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1113,7 +1129,7 @@ bool ObTenantTabletTTLMgr::can_schedule_tenant(const ObTTLTenantInfo &tenant_inf
|
|||||||
|
|
||||||
bool ObTenantTabletTTLMgr::can_schedule_task(const ObTTLTaskCtx &ttl_task)
|
bool ObTenantTabletTTLMgr::can_schedule_task(const ObTTLTaskCtx &ttl_task)
|
||||||
{
|
{
|
||||||
return !is_paused_ && ttl_task.task_status_ == OB_TTL_TASK_PENDING;
|
return is_leader_ && ttl_task.task_status_ == OB_TTL_TASK_PENDING;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantTabletTTLMgr::try_schedule_remaining_tasks(const ObTTLTaskCtx *current_ctx)
|
int ObTenantTabletTTLMgr::try_schedule_remaining_tasks(const ObTTLTaskCtx *current_ctx)
|
||||||
@ -1125,7 +1141,10 @@ int ObTenantTabletTTLMgr::try_schedule_remaining_tasks(const ObTTLTaskCtx *curre
|
|||||||
iter != local_tenant_task_.tablet_task_map_.end()
|
iter != local_tenant_task_.tablet_task_map_.end()
|
||||||
&& OB_SUCC(ret); ++iter) {
|
&& OB_SUCC(ret); ++iter) {
|
||||||
ctx = iter->second;
|
ctx = iter->second;
|
||||||
if (OB_ISNULL(ctx)) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("skip this run cuz of leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_ISNULL(ctx)) {
|
||||||
ret = OB_ERR_NULL_VALUE;
|
ret = OB_ERR_NULL_VALUE;
|
||||||
LOG_ERROR("fatal err, ttl ctx in map is null", KR(ret), K(local_tenant_task_.tenant_id_));
|
LOG_ERROR("fatal err, ttl ctx in map is null", KR(ret), K(local_tenant_task_.tenant_id_));
|
||||||
} else if (current_ctx == ctx) {
|
} else if (current_ctx == ctx) {
|
||||||
@ -1237,17 +1256,13 @@ int ObTenantTabletTTLMgr::refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refre
|
|||||||
int ObTenantTabletTTLMgr::reload_tenant_task()
|
int ObTenantTabletTTLMgr::reload_tenant_task()
|
||||||
{
|
{
|
||||||
common::ObSpinLockGuard guard(lock_);
|
common::ObSpinLockGuard guard(lock_);
|
||||||
if (ATOMIC_BCAS(&need_reuse_for_switch_, true, false)) {
|
|
||||||
local_tenant_task_.reuse();
|
|
||||||
FLOG_INFO("resue local tenant task cuz of switch to follower");
|
|
||||||
}
|
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTTLStatus tenant_task;
|
ObTTLStatus tenant_task;
|
||||||
ObTTLTaskStatus expected_state;
|
ObTTLTaskStatus expected_state;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", KR(ret));
|
LOG_WARN("not init", KR(ret));
|
||||||
} else if (is_paused_) {
|
} else if (!is_leader_) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) {
|
} else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) {
|
||||||
if (OB_ITER_END == ret) {
|
if (OB_ITER_END == ret) {
|
||||||
@ -1279,7 +1294,7 @@ int ObTenantTabletTTLMgr::reload_tenant_task()
|
|||||||
local_tenant_task_.state_ = expected_state;
|
local_tenant_task_.state_ = expected_state;
|
||||||
local_tenant_task_.is_dirty_ = true;
|
local_tenant_task_.is_dirty_ = true;
|
||||||
} else {/* task status not change, do nothing */}
|
} else {/* task status not change, do nothing */}
|
||||||
FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K(is_paused_));
|
FLOG_INFO("finish reload tenant task", K(local_tenant_task_), K(tenant_task), K_(is_leader));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -93,9 +93,9 @@ public:
|
|||||||
tg_id_(0),
|
tg_id_(0),
|
||||||
local_schema_version_(OB_INVALID_VERSION),
|
local_schema_version_(OB_INVALID_VERSION),
|
||||||
has_start_(false),
|
has_start_(false),
|
||||||
is_paused_(false),
|
is_leader_(true),
|
||||||
dag_ref_cnt_(0),
|
dag_ref_cnt_(0),
|
||||||
need_reuse_for_switch_(false)
|
need_do_for_switch_(true)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,6 +157,7 @@ public:
|
|||||||
int64_t get_dag_ref() const { return ATOMIC_LOAD(&dag_ref_cnt_); }
|
int64_t get_dag_ref() const { return ATOMIC_LOAD(&dag_ref_cnt_); }
|
||||||
int safe_to_destroy(bool &is_safe);
|
int safe_to_destroy(bool &is_safe);
|
||||||
int sync_all_dirty_task(common::ObIArray<ObTabletID>& dirty_tasks);
|
int sync_all_dirty_task(common::ObIArray<ObTabletID>& dirty_tasks);
|
||||||
|
void run_task();
|
||||||
private:
|
private:
|
||||||
typedef common::hash::ObHashMap<ObTabletID, ObTTLTaskCtx*> TabletTaskMap;
|
typedef common::hash::ObHashMap<ObTabletID, ObTTLTaskCtx*> TabletTaskMap;
|
||||||
typedef TabletTaskMap::iterator tablet_task_iter;
|
typedef TabletTaskMap::iterator tablet_task_iter;
|
||||||
@ -254,11 +255,10 @@ private:
|
|||||||
void mark_tenant_checked();
|
void mark_tenant_checked();
|
||||||
int refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode = false);
|
int refresh_tablet_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode = false);
|
||||||
int check_schema_version();
|
int check_schema_version();
|
||||||
void resume();
|
OB_INLINE bool need_skip_run() { return ATOMIC_LOAD(&need_do_for_switch_); }
|
||||||
void pause();
|
|
||||||
private:
|
private:
|
||||||
static const int64_t DEFAULT_TTL_BUCKET_NUM = 100;
|
static const int64_t DEFAULT_TTL_BUCKET_NUM = 100;
|
||||||
static const int64_t TTL_PERIODIC_DELAY = 5*1000*1000; //5s
|
static const int64_t TTL_PERIODIC_DELAY = 10*1000*1000; //10s
|
||||||
static const int64_t TBALE_GENERATE_BATCH_SIZE = 200;
|
static const int64_t TBALE_GENERATE_BATCH_SIZE = 200;
|
||||||
static const int64_t DEFAULT_TABLE_ARRAY_SIZE = 200;
|
static const int64_t DEFAULT_TABLE_ARRAY_SIZE = 200;
|
||||||
static const int64_t DEFAULT_TABLET_PAIR_SIZE = 1024;
|
static const int64_t DEFAULT_TABLET_PAIR_SIZE = 1024;
|
||||||
@ -278,9 +278,10 @@ private:
|
|||||||
ObArray<share::ObTabletTablePair> tablet_table_pairs_;
|
ObArray<share::ObTabletTablePair> tablet_table_pairs_;
|
||||||
int64_t local_schema_version_;
|
int64_t local_schema_version_;
|
||||||
bool has_start_;
|
bool has_start_;
|
||||||
bool is_paused_;
|
bool is_leader_; // current tenant ttl mgr is in leader ls or not
|
||||||
volatile int64_t dag_ref_cnt_; // ttl dag ref count for current ls
|
volatile int64_t dag_ref_cnt_; // ttl dag ref count for current ls
|
||||||
bool need_reuse_for_switch_;
|
// after leader switch, need wait and reset status
|
||||||
|
bool need_do_for_switch_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace table
|
} // end namespace table
|
||||||
|
|||||||
@ -121,7 +121,7 @@ int ObTTLTaskScheduler::reload_tenant_task()
|
|||||||
LOG_WARN("ttl tenant task mgr not init", KR(ret));
|
LOG_WARN("ttl tenant task mgr not init", KR(ret));
|
||||||
} else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
|
} else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (need_reload_) {
|
} else if (ATOMIC_BCAS(&need_reload_, true, false)) {
|
||||||
lib::ObMutexGuard guard(mutex_);
|
lib::ObMutexGuard guard(mutex_);
|
||||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
ObTTLStatusField table_id_field;
|
ObTTLStatusField table_id_field;
|
||||||
@ -162,8 +162,11 @@ int ObTTLTaskScheduler::reload_tenant_task()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
set_need_reload(false);
|
|
||||||
FLOG_INFO("reload tenant task", K_(tenant_task));
|
FLOG_INFO("reload tenant task", K_(tenant_task));
|
||||||
|
} else {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
ATOMIC_STORE(&need_reload_, true);
|
||||||
|
LOG_WARN("fail to reload tenant task", KR(ret), K_(tenant_task));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -401,6 +404,9 @@ int ObTTLTaskScheduler::try_add_periodic_task()
|
|||||||
} else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
|
} else if (!ObTTLUtil::check_can_process_tenant_tasks(tenant_id_)) {
|
||||||
ret = OB_NOT_SUPPORTED;
|
ret = OB_NOT_SUPPORTED;
|
||||||
LOG_WARN("cann't process ttl task, maybe tenant is restoring", K_(tenant_id), KR(ret));
|
LOG_WARN("cann't process ttl task, maybe tenant is restoring", K_(tenant_id), KR(ret));
|
||||||
|
} else if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
} else if (OB_FAIL(in_active_time(is_active_time))) {
|
} else if (OB_FAIL(in_active_time(is_active_time))) {
|
||||||
LOG_WARN("fail to check is in active time", KR(ret));
|
LOG_WARN("fail to check is in active time", KR(ret));
|
||||||
} else if (is_active_time) {
|
} else if (is_active_time) {
|
||||||
@ -437,7 +443,10 @@ int ObTTLTaskScheduler::check_all_tablet_task()
|
|||||||
} else if (need_move) {
|
} else if (need_move) {
|
||||||
{
|
{
|
||||||
lib::ObMutexGuard guard(mutex_);
|
lib::ObMutexGuard guard(mutex_);
|
||||||
if (ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE != tenant_task_.ttl_status_.status_) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE != tenant_task_.ttl_status_.status_) {
|
||||||
tenant_task_.ttl_status_.status_ = static_cast<uint64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE);
|
tenant_task_.ttl_status_.status_ = static_cast<uint64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE);
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
||||||
@ -536,12 +545,14 @@ void ObTTLTaskScheduler::reset_local_tenant_task()
|
|||||||
|
|
||||||
void ObTTLTaskScheduler::resume()
|
void ObTTLTaskScheduler::resume()
|
||||||
{
|
{
|
||||||
is_paused_ = false;
|
ATOMIC_STORE(&is_leader_, true);
|
||||||
|
ATOMIC_STORE(&need_do_for_switch_, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTTLTaskScheduler::pause()
|
void ObTTLTaskScheduler::pause()
|
||||||
{
|
{
|
||||||
is_paused_ = true;
|
ATOMIC_STORE(&is_leader_, false);
|
||||||
|
ATOMIC_STORE(&need_do_for_switch_, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantTTLManager::init(const uint64_t tenant_id, ObMySQLProxy &sql_proxy)
|
int ObTenantTTLManager::init(const uint64_t tenant_id, ObMySQLProxy &sql_proxy)
|
||||||
@ -608,15 +619,22 @@ void ObTenantTTLManager::destroy()
|
|||||||
|
|
||||||
void ObTTLTaskScheduler::runTimerTask()
|
void ObTTLTaskScheduler::runTimerTask()
|
||||||
{
|
{
|
||||||
|
DEBUG_SYNC(BEFORE_TTL_SCHEDULER_RUN);
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObCurTraceId::init(GCONF.self_addr_);
|
ObCurTraceId::init(GCONF.self_addr_);
|
||||||
|
|
||||||
if (!ObKVFeatureModeUitl::is_ttl_enable()) {
|
if (!ObKVFeatureModeUitl::is_ttl_enable()) {
|
||||||
// do nothing
|
// do nothing
|
||||||
LOG_DEBUG("ttl is disable");
|
LOG_DEBUG("ttl is disable");
|
||||||
} else if (IS_NOT_INIT) {
|
} else if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ttl task mgr not init", KR(ret));
|
LOG_WARN("ttl task mgr not init", KR(ret));
|
||||||
} else if (is_paused_) {
|
} else if (ATOMIC_BCAS(&need_do_for_switch_, true, false)) {
|
||||||
|
// need skip this round for waiting follower finish executing task
|
||||||
|
if (is_leader_) {
|
||||||
|
FLOG_INFO("need wait for switch leader, skip schedule once", K_(need_do_for_switch));
|
||||||
|
}
|
||||||
|
} else if (!is_leader_) {
|
||||||
// timer paused, do nothing
|
// timer paused, do nothing
|
||||||
} else if (OB_FAIL(reload_tenant_task())) {
|
} else if (OB_FAIL(reload_tenant_task())) {
|
||||||
LOG_WARN("fail to process tenant task", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to process tenant task", KR(ret), K_(tenant_id));
|
||||||
@ -652,7 +670,10 @@ int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel)
|
|||||||
int64_t one_move_rows = TBALET_CHECK_BATCH_SIZE;
|
int64_t one_move_rows = TBALET_CHECK_BATCH_SIZE;
|
||||||
while (OB_SUCC(ret) && one_move_rows == TBALET_CHECK_BATCH_SIZE) {
|
while (OB_SUCC(ret) && one_move_rows == TBALET_CHECK_BATCH_SIZE) {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
||||||
LOG_WARN("fail start transaction", KR(ret), K_(tenant_id));
|
LOG_WARN("fail start transaction", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(ObTTLUtil::move_task_to_history_table(tenant_id_, tenant_task_.ttl_status_.task_id_,
|
} else if (OB_FAIL(ObTTLUtil::move_task_to_history_table(tenant_id_, tenant_task_.ttl_status_.task_id_,
|
||||||
trans, TBALET_CHECK_BATCH_SIZE, one_move_rows,
|
trans, TBALET_CHECK_BATCH_SIZE, one_move_rows,
|
||||||
@ -671,7 +692,10 @@ int ObTTLTaskScheduler::move_all_task_to_history_table(bool need_cancel)
|
|||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ObMySQLTransaction trans;
|
ObMySQLTransaction trans;
|
||||||
if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
if (need_skip_run()) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
FLOG_INFO("exit timer task once cuz leader switch", KR(ret), K_(is_leader), K_(need_do_for_switch));
|
||||||
|
} else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(tenant_id_)))) {
|
||||||
LOG_WARN("fail start transaction", KR(ret), K_(tenant_id));
|
LOG_WARN("fail start transaction", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(update_task_status(tenant_task_.ttl_status_.task_id_, OB_TTL_TASK_FINISH, trans))) {
|
} else if (OB_FAIL(update_task_status(tenant_task_.ttl_status_.task_id_, OB_TTL_TASK_FINISH, trans))) {
|
||||||
LOG_WARN("fail to update task status", KR(ret));
|
LOG_WARN("fail to update task status", KR(ret));
|
||||||
@ -695,7 +719,8 @@ void ObTenantTTLManager::resume()
|
|||||||
{
|
{
|
||||||
clear_ttl_history_task_.resume();
|
clear_ttl_history_task_.resume();
|
||||||
task_scheduler_.resume();
|
task_scheduler_.resume();
|
||||||
task_scheduler_.set_need_reload(true);}
|
task_scheduler_.set_need_reload(true);
|
||||||
|
}
|
||||||
|
|
||||||
void ObTenantTTLManager::pause()
|
void ObTenantTTLManager::pause()
|
||||||
{
|
{
|
||||||
@ -718,6 +743,7 @@ int ObTTLTaskScheduler::check_task_need_move(bool &need_move)
|
|||||||
|
|
||||||
int ObTTLTaskScheduler::check_all_tablet_finished(bool &all_finished)
|
int ObTTLTaskScheduler::check_all_tablet_finished(bool &all_finished)
|
||||||
{
|
{
|
||||||
|
DEBUG_SYNC(BEFORE_CHECK_TTL_TASK_FINISH);
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
all_finished = true;
|
all_finished = true;
|
||||||
ObSEArray<uint64_t, DEFAULT_TABLE_ARRAY_SIZE> table_id_array;
|
ObSEArray<uint64_t, DEFAULT_TABLE_ARRAY_SIZE> table_id_array;
|
||||||
|
|||||||
@ -95,7 +95,7 @@ class ObTTLTaskScheduler : public common::ObTimerTask
|
|||||||
public:
|
public:
|
||||||
ObTTLTaskScheduler()
|
ObTTLTaskScheduler()
|
||||||
: del_ten_arr_(), sql_proxy_(nullptr), is_inited_(false), periodic_launched_(false),
|
: del_ten_arr_(), sql_proxy_(nullptr), is_inited_(false), periodic_launched_(false),
|
||||||
need_reload_(true), is_paused_(false)
|
need_reload_(true), is_leader_(true), need_do_for_switch_(true)
|
||||||
{}
|
{}
|
||||||
~ObTTLTaskScheduler() {}
|
~ObTTLTaskScheduler() {}
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ public:
|
|||||||
void runTimerTask() override;
|
void runTimerTask() override;
|
||||||
|
|
||||||
int try_add_periodic_task();
|
int try_add_periodic_task();
|
||||||
void set_need_reload(bool need_reload) { need_reload_ = need_reload; }
|
void set_need_reload(bool need_reload) { ATOMIC_STORE(&need_reload_, need_reload); }
|
||||||
|
|
||||||
void pause();
|
void pause();
|
||||||
void resume();
|
void resume();
|
||||||
@ -149,6 +149,7 @@ private:
|
|||||||
int check_all_tablet_finished(bool &all_finished);
|
int check_all_tablet_finished(bool &all_finished);
|
||||||
int check_tablet_table_finished(common::ObIArray<share::ObTabletTablePair> &pairs, bool &all_finished);
|
int check_tablet_table_finished(common::ObIArray<share::ObTabletTablePair> &pairs, bool &all_finished);
|
||||||
int move_all_task_to_history_table(bool need_cancel);
|
int move_all_task_to_history_table(bool need_cancel);
|
||||||
|
OB_INLINE bool need_skip_run() { return ATOMIC_LOAD(&need_do_for_switch_); }
|
||||||
private:
|
private:
|
||||||
static const int64_t TBALE_CHECK_BATCH_SIZE = 200;
|
static const int64_t TBALE_CHECK_BATCH_SIZE = 200;
|
||||||
static const int64_t TBALET_CHECK_BATCH_SIZE = 1024;
|
static const int64_t TBALET_CHECK_BATCH_SIZE = 1024;
|
||||||
@ -166,8 +167,9 @@ private:
|
|||||||
bool need_reload_;
|
bool need_reload_;
|
||||||
lib::ObMutex mutex_;
|
lib::ObMutex mutex_;
|
||||||
ObArray<share::ObTabletTablePair> tablet_table_pairs_;
|
ObArray<share::ObTabletTablePair> tablet_table_pairs_;
|
||||||
bool is_paused_;
|
bool is_leader_; // current ttl manager in ls leader or not
|
||||||
const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s
|
const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s
|
||||||
|
bool need_do_for_switch_; // need wait follower finish after switch leader
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObTenantTTLManager
|
class ObTenantTTLManager
|
||||||
|
|||||||
@ -277,6 +277,7 @@ ob_set_subtarget(ob_share common_mixed
|
|||||||
restore/ob_tenant_clone_table_operator.cpp
|
restore/ob_tenant_clone_table_operator.cpp
|
||||||
index_usage/ob_index_usage_info_mgr.cpp
|
index_usage/ob_index_usage_info_mgr.cpp
|
||||||
index_usage/ob_index_usage_report_task.cpp
|
index_usage/ob_index_usage_report_task.cpp
|
||||||
|
table/ob_table_util.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
ob_set_subtarget(ob_share tablet
|
ob_set_subtarget(ob_share tablet
|
||||||
|
|||||||
@ -572,6 +572,8 @@ class ObString;
|
|||||||
ACT(BEFORE_CREATE_CLONE_TENANT_END,)\
|
ACT(BEFORE_CREATE_CLONE_TENANT_END,)\
|
||||||
ACT(BEFORE_CALC_CONSISTENT_SCN,)\
|
ACT(BEFORE_CALC_CONSISTENT_SCN,)\
|
||||||
ACT(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK,)\
|
ACT(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK,)\
|
||||||
|
ACT(BEFORE_CHECK_TTL_TASK_FINISH,)\
|
||||||
|
ACT(BEFORE_TTL_SCHEDULER_RUN,)\
|
||||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||||
|
|
||||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||||
|
|||||||
@ -339,10 +339,10 @@ class ObTableTTLOperation
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableTTLOperation(uint64_t tenant_id, uint64_t table_id, const ObTTLTaskParam ¶,
|
ObTableTTLOperation(uint64_t tenant_id, uint64_t table_id, const ObTTLTaskParam ¶,
|
||||||
uint64_t del_row_limit, ObRowkey start_rowkey)
|
uint64_t del_row_limit, ObRowkey start_rowkey, uint64_t hbase_cur_version)
|
||||||
: tenant_id_(tenant_id), table_id_(table_id), max_version_(para.max_version_),
|
: tenant_id_(tenant_id), table_id_(table_id), max_version_(para.max_version_),
|
||||||
time_to_live_(para.ttl_), is_htable_(para.is_htable_), del_row_limit_(del_row_limit),
|
time_to_live_(para.ttl_), is_htable_(para.is_htable_), del_row_limit_(del_row_limit),
|
||||||
start_rowkey_(start_rowkey)
|
start_rowkey_(start_rowkey), hbase_cur_version_(hbase_cur_version)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
~ObTableTTLOperation() {}
|
~ObTableTTLOperation() {}
|
||||||
@ -360,6 +360,7 @@ public:
|
|||||||
bool is_htable_;
|
bool is_htable_;
|
||||||
uint64_t del_row_limit_;
|
uint64_t del_row_limit_;
|
||||||
ObRowkey start_rowkey_;
|
ObRowkey start_rowkey_;
|
||||||
|
uint64_t hbase_cur_version_;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// common result for ObTable
|
/// common result for ObTable
|
||||||
|
|||||||
28
src/share/table/ob_table_util.cpp
Normal file
28
src/share/table/ob_table_util.cpp
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2021 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "ob_table_util.h"
|
||||||
|
|
||||||
|
using namespace oceanbase::common;
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace table
|
||||||
|
{
|
||||||
|
const ObString ObTableUtils::KV_NORMAL_TRACE_INFO = ObString::make_string("OBKV Operation");
|
||||||
|
const ObString ObTableUtils::KV_TTL_TRACE_INFO = ObString::make_string("TTL Delete");
|
||||||
|
|
||||||
|
bool ObTableUtils::is_kv_trace_info(const ObString &trace_info)
|
||||||
|
{
|
||||||
|
return (trace_info.compare(KV_NORMAL_TRACE_INFO) == 0 || trace_info.compare(KV_TTL_TRACE_INFO) == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
39
src/share/table/ob_table_util.h
Normal file
39
src/share/table/ob_table_util.h
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2023 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_
|
||||||
|
#define OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_
|
||||||
|
|
||||||
|
#include "lib/string/ob_string.h"
|
||||||
|
|
||||||
|
using namespace oceanbase::common;
|
||||||
|
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace table
|
||||||
|
{
|
||||||
|
|
||||||
|
class ObTableUtils
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static const ObString &get_kv_normal_trace_info() { return KV_NORMAL_TRACE_INFO; }
|
||||||
|
static const ObString &get_kv_ttl_trace_info() { return KV_TTL_TRACE_INFO; }
|
||||||
|
static bool is_kv_trace_info(const ObString &trace_info);
|
||||||
|
private:
|
||||||
|
static const ObString KV_NORMAL_TRACE_INFO;
|
||||||
|
static const ObString KV_TTL_TRACE_INFO;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace table
|
||||||
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
#endif /* OCEANBASE_SHARE_TABLE_OB_TABLE_UTIL_ */
|
||||||
@ -831,10 +831,10 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f
|
|||||||
left = left.trim();
|
left = left.trim();
|
||||||
// example: "INTERVAL 40 MINUTE"
|
// example: "INTERVAL 40 MINUTE"
|
||||||
left += strlen("INTERVAL");
|
left += strlen("INTERVAL");
|
||||||
// example: "40 MINUTE"
|
|
||||||
left = left.trim();
|
left = left.trim();
|
||||||
|
// example: "40 MINUTE"
|
||||||
ObString interval_str = left.split_on(' ');
|
ObString interval_str = left.split_on(' ');
|
||||||
ObString time_unit_str = left;
|
ObString time_unit_str = left.trim();
|
||||||
|
|
||||||
ttl_expr.column_name_ = column_str;
|
ttl_expr.column_name_ = column_str;
|
||||||
ttl_expr.interval_ = atol(interval_str.ptr());
|
ttl_expr.interval_ = atol(interval_str.ptr());
|
||||||
@ -858,37 +858,38 @@ int ObTableTTLChecker::init(const schema::ObTableSchema &table_schema, bool in_f
|
|||||||
// 2. get delta second and month
|
// 2. get delta second and month
|
||||||
int64_t nsecond = 0;
|
int64_t nsecond = 0;
|
||||||
int64_t nmonth = 0;
|
int64_t nmonth = 0;
|
||||||
switch (ttl_expr.time_unit_) {
|
if (OB_SUCC(ret)) {
|
||||||
case ObTableTTLTimeUnit::SECOND: {
|
switch (ttl_expr.time_unit_) {
|
||||||
nsecond = ttl_expr.interval_;
|
case ObTableTTLTimeUnit::SECOND: {
|
||||||
break;
|
nsecond = ttl_expr.interval_;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObTableTTLTimeUnit::MINUTE: {
|
||||||
|
nsecond = ttl_expr.interval_ * 60;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObTableTTLTimeUnit::HOUR: {
|
||||||
|
nsecond = ttl_expr.interval_ * 60 * 60;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObTableTTLTimeUnit::DAY: {
|
||||||
|
nsecond = ttl_expr.interval_ * 60 * 60 * 24;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObTableTTLTimeUnit::MONTH: {
|
||||||
|
nmonth = ttl_expr.interval_;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ObTableTTLTimeUnit::YEAR: {
|
||||||
|
nmonth = ttl_expr.interval_ * 12;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected time unit", K(ret), K_(ttl_expr.time_unit));
|
||||||
}
|
}
|
||||||
case ObTableTTLTimeUnit::MINUTE: {
|
|
||||||
nsecond = ttl_expr.interval_ * 60;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case ObTableTTLTimeUnit::HOUR: {
|
|
||||||
nsecond = ttl_expr.interval_ * 60 * 60;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case ObTableTTLTimeUnit::DAY: {
|
|
||||||
nsecond = ttl_expr.interval_ * 60 * 60 * 24;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case ObTableTTLTimeUnit::MONTH: {
|
|
||||||
nmonth = ttl_expr.interval_;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case ObTableTTLTimeUnit::YEAR: {
|
|
||||||
nmonth = ttl_expr.interval_ * 12;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected time unit", K(ret), K_(ttl_expr.time_unit));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ttl_expr.nsecond_ = nsecond;
|
ttl_expr.nsecond_ = nsecond;
|
||||||
ttl_expr.nmonth_ = nmonth;
|
ttl_expr.nmonth_ = nmonth;
|
||||||
|
|||||||
Reference in New Issue
Block a user