[CP] [OBKV] Fix TTL issue 52872768
This commit is contained in:

committed by
ob-robot

parent
c82d98fede
commit
b4720dcc91
@ -453,6 +453,7 @@ int ObTenantTabletTTLMgr::generate_one_tablet_task(ObTTLTaskInfo& task_info, con
|
||||
} else {
|
||||
char *ctx_buf = static_cast<char *>(local_tenant_task_.allocator_.alloc(sizeof(ObTTLTaskCtx)));
|
||||
if (OB_ISNULL(ctx_buf)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc ttl task ctx", KR(ret));
|
||||
} else {
|
||||
ctx = new(ctx_buf)ObTTLTaskCtx();
|
||||
@ -472,12 +473,17 @@ int ObTenantTabletTTLMgr::generate_one_tablet_task(ObTTLTaskInfo& task_info, con
|
||||
} else {
|
||||
if (OB_FAIL(local_tenant_task_.tablet_task_map_.set_refactored(task_info.tablet_id_, ctx))) {
|
||||
LOG_WARN("fail to insert ttl task ctx into map", KR(ret), K(task_info.tablet_id_));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(ctx)) {
|
||||
ctx->~ObTTLTaskCtx();
|
||||
local_tenant_task_.allocator_.free(ctx);
|
||||
ctx = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG("finish generate one partition task", KR(ret), K(task_info.tablet_id_), K(param));
|
||||
return ret;
|
||||
}
|
||||
@ -647,7 +653,7 @@ int ObTenantTabletTTLMgr::deep_copy_task(ObTTLTaskCtx* ctx, ObTTLTaskInfo& task_
|
||||
if (OB_ISNULL(ctx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the ctx is null", KR(ret));
|
||||
} else if (OB_FAIL(ob_write_string(allocator_, task_info.row_key_, ctx->task_info_.row_key_)) ) {
|
||||
} else if (OB_FAIL(ctx->deep_copy_rowkey(task_info.row_key_)) ) {
|
||||
LOG_WARN("fail to deep copy rowkey", KR(ret), K(task_info.row_key_));
|
||||
} else {
|
||||
ctx->task_info_.ttl_del_cnt_ = task_info.ttl_del_cnt_;
|
||||
@ -1159,13 +1165,8 @@ int ObTenantTabletTTLMgr::from_ttl_record(ObTabletID& tablet_id, common::ObTTLSt
|
||||
ctx->task_status_ = static_cast<ObTTLTaskStatus>(record.status_);
|
||||
}
|
||||
if (!record.row_key_.empty()) {
|
||||
char *rowkey_buf = static_cast<char *>(local_tenant_task_.allocator_.alloc(record.row_key_.length()));
|
||||
if (OB_ISNULL(rowkey_buf)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc memory", KR(ret));
|
||||
} else {
|
||||
MEMCPY(rowkey_buf, record.row_key_.ptr(), record.row_key_.length());
|
||||
ctx->task_info_.row_key_.assign(rowkey_buf, record.row_key_.length());
|
||||
if (OB_FAIL(ctx->deep_copy_rowkey(record.row_key_))) {
|
||||
LOG_WARN("fail to deep copy rowkey", KR(ret), K(record.row_key_));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1314,8 +1315,11 @@ int ObTenantTabletTTLMgr::reload_tenant_task()
|
||||
// do nothing, not leader
|
||||
} else if (OB_FAIL(ObTTLUtil::read_tenant_ttl_task(tenant_id_, *sql_proxy_, tenant_task))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
// do nothing
|
||||
ret = OB_SUCCESS;
|
||||
// tenant task may finish before the tablet task
|
||||
if (!local_tenant_task_.is_finished_) {
|
||||
local_tenant_task_.reuse();
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("fail to read tenant ttl task", KR(ret), K_(tenant_id));
|
||||
}
|
||||
@ -1388,5 +1392,35 @@ int ObTenantTabletTTLMgr::safe_to_destroy(bool &is_safe)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTTLTaskCtx::deep_copy_rowkey(const ObString &rowkey)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
task_info_.row_key_.reset();
|
||||
rowkey_cp_allcoator_.reuse();
|
||||
if (OB_FAIL(ob_write_string(rowkey_cp_allcoator_, rowkey, task_info_.row_key_))) {
|
||||
LOG_WARN("fail to deep copy rowkey", KR(ret), K(rowkey));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenantTabletTTLMgr::ObTTLTenantInfo::reuse()
|
||||
{
|
||||
for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end();
|
||||
++iter) {
|
||||
ObTTLTaskCtx *ctx = iter->second;
|
||||
if (OB_NOT_NULL(ctx)) {
|
||||
ctx->~ObTTLTaskCtx();
|
||||
}
|
||||
}
|
||||
tablet_task_map_.reuse();
|
||||
allocator_.reset();
|
||||
is_usr_trigger_ = false;
|
||||
need_check_ = false;
|
||||
is_dirty_ = false;
|
||||
ttl_continue_ = true;
|
||||
state_ = common::ObTTLTaskStatus::OB_TTL_TASK_INVALID;
|
||||
is_finished_ = true;
|
||||
}
|
||||
|
||||
} // table
|
||||
} // oceanbase
|
||||
|
@ -26,7 +26,8 @@ namespace table
|
||||
struct ObTTLTaskCtx
|
||||
{
|
||||
public :
|
||||
ObTTLTaskCtx() : task_info_(),
|
||||
ObTTLTaskCtx() : rowkey_cp_allcoator_(ObMemAttr(MTL_ID(), "TTLTaskCtx")),
|
||||
task_info_(),
|
||||
task_status_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID),
|
||||
ttl_para_(),
|
||||
task_start_time_(OB_INVALID_ID),
|
||||
@ -39,10 +40,13 @@ public :
|
||||
return task_info_.is_valid() && ttl_para_.is_valid();
|
||||
}
|
||||
|
||||
int deep_copy_rowkey(const ObString &rowkey);
|
||||
|
||||
TO_STRING_KV(K_(task_info), K_(task_status), K_(ttl_para), K_(task_start_time),
|
||||
K_(last_modify_time), K_(failure_times), K_(is_dirty), K_(need_refresh));
|
||||
|
||||
public:
|
||||
common::ObArenaAllocator rowkey_cp_allcoator_; // for rowkey copy in ObTTLTaskInfo
|
||||
ObTTLTaskInfo task_info_;
|
||||
common::ObTTLTaskStatus task_status_;
|
||||
table::ObTTLTaskParam ttl_para_;
|
||||
@ -78,8 +82,7 @@ class ObTenantTabletTTLMgr : public logservice::ObIReplaySubHandler,
|
||||
public:
|
||||
friend ObTTLTaskCtx;
|
||||
ObTenantTabletTTLMgr()
|
||||
: allocator_(ObMemAttr(MTL_ID(), "TenantTTLMgr")),
|
||||
tenant_id_(common::OB_INVALID_TENANT_ID),
|
||||
: tenant_id_(common::OB_INVALID_TENANT_ID),
|
||||
schema_service_(NULL),
|
||||
sql_proxy_(NULL),
|
||||
is_inited_(false),
|
||||
@ -180,21 +183,17 @@ private:
|
||||
}
|
||||
void destory()
|
||||
{
|
||||
for (TabletTaskMap::const_iterator iter = tablet_task_map_.begin(); iter != tablet_task_map_.end();
|
||||
++iter) {
|
||||
ObTTLTaskCtx *ctx = iter->second;
|
||||
if (OB_NOT_NULL(ctx)) {
|
||||
ctx->~ObTTLTaskCtx();
|
||||
}
|
||||
}
|
||||
tablet_task_map_.destroy();
|
||||
allocator_.reset();
|
||||
}
|
||||
void reuse()
|
||||
{
|
||||
tablet_task_map_.reuse();
|
||||
allocator_.reuse();
|
||||
is_usr_trigger_ = false;
|
||||
need_check_ = false;
|
||||
is_dirty_ = false;
|
||||
ttl_continue_ = true;
|
||||
state_ = common::ObTTLTaskStatus::OB_TTL_TASK_INVALID;
|
||||
is_finished_ = true;
|
||||
}
|
||||
|
||||
void reuse();
|
||||
TO_STRING_KV(K_(tenant_id),
|
||||
K_(task_id),
|
||||
K_(is_usr_trigger),
|
||||
@ -265,7 +264,6 @@ private:
|
||||
static const int64_t DEFAULT_TABLET_PAIR_SIZE = 1024;
|
||||
static const int64_t DEFAULT_PARAM_BUCKET_SIZE = 200;
|
||||
static const int64_t TTL_NORMAL_TIME_THRESHOLD = 3*1000*1000; // 3s
|
||||
common::ObArenaAllocator allocator_;
|
||||
uint64_t tenant_id_;
|
||||
ObTTLTenantInfo local_tenant_task_;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service_;
|
||||
|
Reference in New Issue
Block a user