[OBKV] Fix TTL issue 51960431 & 51996497
This commit is contained in:
@ -30,6 +30,7 @@ using namespace oceanbase::sql;
|
||||
|
||||
ObTableBatchExecuteP::ObTableBatchExecuteP(const ObGlobalContext &gctx)
|
||||
:ObTableRpcProcessor(gctx),
|
||||
default_entity_factory_("TableBatchEntFac", MTL_ID()),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
tb_ctx_(allocator_),
|
||||
need_rollback_trans_(false),
|
||||
|
@ -68,7 +68,8 @@ class ObTableBatchExecuteEndTransCb: public ObTableAPITransCb
|
||||
{
|
||||
public:
|
||||
ObTableBatchExecuteEndTransCb(rpc::ObRequest *req, ObTableOperationType::Type table_operation_type)
|
||||
:response_sender_(req, result_),
|
||||
: entity_factory_("TableBatchCbEntFac", MTL_ID()),
|
||||
response_sender_(req, result_),
|
||||
table_operation_type_(table_operation_type)
|
||||
{
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ ObTableApiExecuteP::ObTableApiExecuteP(const ObGlobalContext &gctx)
|
||||
:ObTableRpcProcessor(gctx),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
tb_ctx_(allocator_),
|
||||
default_entity_factory_("TableExecuteEncFac", MTL_ID()),
|
||||
need_rollback_trans_(false),
|
||||
query_timeout_ts_(0)
|
||||
{
|
||||
|
@ -35,6 +35,7 @@ ObTableQueryAndMutateP::ObTableQueryAndMutateP(const ObGlobalContext &gctx)
|
||||
:ObTableRpcProcessor(gctx),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
tb_ctx_(allocator_),
|
||||
default_entity_factory_("QueryAndMutateEntFac", MTL_ID()),
|
||||
end_in_advance_(false)
|
||||
{
|
||||
}
|
||||
|
@ -37,10 +37,10 @@ ObTableTTLDeleteTask::ObTableTTLDeleteTask():
|
||||
is_inited_(false),
|
||||
param_(),
|
||||
info_(NULL),
|
||||
allocator_(ObMemAttr(MTL_ID(), "TTLDeleteTask")),
|
||||
allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskCtx")),
|
||||
rowkey_(),
|
||||
ttl_tablet_mgr_(NULL),
|
||||
default_entity_factory_("TTLEntityFac")
|
||||
rowkey_allocator_(ObMemAttr(MTL_ID(), "TTLDelTaskRKey"))
|
||||
{
|
||||
}
|
||||
|
||||
@ -133,12 +133,12 @@ int ObTableTTLDeleteTask::process()
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(MTL_ID(), "TTLDeleteTask", ObCtxIds::DEFAULT_CTX_ID)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
CREATE_WITH_TEMP_CONTEXT(param) {
|
||||
bool need_stop = false;
|
||||
while(!need_stop) {
|
||||
bool need_stop = false;
|
||||
while(!need_stop) {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(MTL_ID(), "TTLDeleteMemCtx", ObCtxIds::DEFAULT_CTX_ID)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
CREATE_WITH_TEMP_CONTEXT(param) {
|
||||
if (OB_FAIL(process_one())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to process one", KR(ret));
|
||||
@ -147,6 +147,7 @@ int ObTableTTLDeleteTask::process()
|
||||
if (OB_FAIL(ttl_tablet_mgr_->report_task_status(const_cast<ObTTLTaskInfo&>(*info_), param_, need_stop))) {
|
||||
LOG_WARN("fail to report ttl task status", KR(ret));
|
||||
}
|
||||
allocator_.reuse();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -169,8 +170,8 @@ int ObTableTTLDeleteTask::process_one()
|
||||
param_,
|
||||
PER_TASK_DEL_ROWS,
|
||||
rowkey_);
|
||||
SMART_VAR(ObTableCtx, tb_ctx, allocator_) {
|
||||
if (OB_FAIL(init_scan_tb_ctx(tb_ctx, cache_guard))) {
|
||||
SMART_VAR(ObTableCtx, scan_ctx, allocator_) {
|
||||
if (OB_FAIL(init_scan_tb_ctx(scan_ctx, cache_guard))) {
|
||||
LOG_WARN("fail to init tb ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableApiProcessorBase::start_trans_(
|
||||
false,
|
||||
@ -178,20 +179,20 @@ int ObTableTTLDeleteTask::process_one()
|
||||
tx_snapshot,
|
||||
ObTableConsistencyLevel::STRONG,
|
||||
&trans_state,
|
||||
tb_ctx.get_table_id(),
|
||||
tb_ctx.get_ls_id(),
|
||||
scan_ctx.get_table_id(),
|
||||
scan_ctx.get_ls_id(),
|
||||
get_timeout_ts()))) {
|
||||
LOG_WARN("fail to start trans", KR(ret));
|
||||
} else if (OB_FAIL(tb_ctx.init_trans(trans_desc, tx_snapshot))) {
|
||||
} else if (OB_FAIL(scan_ctx.init_trans(trans_desc, tx_snapshot))) {
|
||||
LOG_WARN("fail to init trans", KR(ret));
|
||||
} else if (OB_FAIL(cache_guard.get_spec<TABLE_API_EXEC_SCAN>(&tb_ctx, scan_spec))) {
|
||||
} else if (OB_FAIL(cache_guard.get_spec<TABLE_API_EXEC_SCAN>(&scan_ctx, scan_spec))) {
|
||||
LOG_WARN("fail to get scan spec from cache", KR(ret));
|
||||
} else {
|
||||
ObTableTTLDeleteRowIterator row_iter;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
if (OB_FAIL(scan_spec->create_executor(tb_ctx, executor))) {
|
||||
LOG_WARN("fail to generate executor", KR(ret), K(tb_ctx));
|
||||
} else if (OB_FAIL(row_iter.init(*tb_ctx.get_table_schema(), ttl_operation))){
|
||||
if (OB_FAIL(scan_spec->create_executor(scan_ctx, executor))) {
|
||||
LOG_WARN("fail to generate executor", KR(ret), K(scan_ctx));
|
||||
} else if (OB_FAIL(row_iter.init(*scan_ctx.get_table_schema(), ttl_operation))){
|
||||
LOG_WARN("fail to init ttl row iterator", KR(ret));
|
||||
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
||||
LOG_WARN("fail to open scan row iterator", KR(ret));
|
||||
@ -207,7 +208,7 @@ int ObTableTTLDeleteTask::process_one()
|
||||
|
||||
if (OB_NOT_NULL(scan_spec)) {
|
||||
scan_spec->destroy_executor(executor);
|
||||
tb_ctx.set_expr_info(nullptr);
|
||||
scan_ctx.set_expr_info(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -240,7 +241,6 @@ int ObTableTTLDeleteTask::init_scan_tb_ctx(ObTableCtx &tb_ctx, ObTableApiCacheGu
|
||||
int ret = OB_SUCCESS;
|
||||
ObExprFrameInfo *expr_frame_info = nullptr;
|
||||
tb_ctx.set_scan(true);
|
||||
tb_ctx.set_operation_type(ObTableOperationType::DEL);
|
||||
if (tb_ctx.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K(tb_ctx));
|
||||
} else if (OB_FAIL(tb_ctx.init_common(credential_,
|
||||
@ -273,16 +273,16 @@ int ObTableTTLDeleteTask::process_ttl_delete(const ObITableEntity &new_entity,
|
||||
transaction::ObTxReadSnapshot &snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SMART_VAR(ObTableCtx, tb_ctx, allocator_) {
|
||||
ObTableApiSpec *spec = nullptr;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
ObTableOperationResult op_result;
|
||||
if (OB_FAIL(init_tb_ctx(new_entity, tb_ctx))) {
|
||||
ObTableApiSpec *spec = nullptr;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
ObTableOperationResult op_result;
|
||||
SMART_VAR(ObTableCtx, delete_ctx, allocator_) {
|
||||
if (OB_FAIL(init_tb_ctx(new_entity, delete_ctx))) {
|
||||
LOG_WARN("fail to init table ctx", K(ret), K(new_entity));
|
||||
} else if (OB_FAIL(tb_ctx.init_trans(trans_desc, snapshot))) {
|
||||
LOG_WARN("fail to init trans", K(ret), K(tb_ctx));
|
||||
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_DELETE>(tb_ctx, op_result))) {
|
||||
LOG_WARN("fail to process insert op", K(ret));
|
||||
} else if (OB_FAIL(delete_ctx.init_trans(trans_desc, snapshot))) {
|
||||
LOG_WARN("fail to init trans", K(ret), K(delete_ctx));
|
||||
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_DELETE>(delete_ctx, op_result))) {
|
||||
LOG_WARN("fail to process delete op", K(ret));
|
||||
} else {
|
||||
affected_rows = op_result.get_affected_rows();
|
||||
}
|
||||
@ -555,31 +555,28 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
||||
int64_t affected_rows = 0;
|
||||
while (OB_SUCC(ret)) {
|
||||
ObNewRow *row = nullptr;
|
||||
ObITableEntity *new_entity = nullptr;
|
||||
if (OB_FAIL(ttl_row_iter.get_next_row(row))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
}
|
||||
} else if (OB_ISNULL(new_entity = default_entity_factory_.alloc())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc entity", K(ret));
|
||||
} else {
|
||||
int64_t rowkey_cnt = ttl_row_iter.rowkey_cell_ids_.count();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; i++) {
|
||||
if (OB_FAIL(new_entity->add_rowkey_value(row->get_cell(ttl_row_iter.rowkey_cell_ids_[i])))) {
|
||||
if (OB_FAIL(delete_entity_.add_rowkey_value(row->get_cell(ttl_row_iter.rowkey_cell_ids_[i])))) {
|
||||
LOG_WARN("fail to add rowkey value", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t tmp_affect_rows = 0;
|
||||
if (OB_FAIL(process_ttl_delete(*new_entity, tmp_affect_rows, trans_desc, snapshot))) {
|
||||
if (OB_FAIL(process_ttl_delete(delete_entity_, tmp_affect_rows, trans_desc, snapshot))) {
|
||||
LOG_WARN("fail to execute table delete", K(ret));
|
||||
} else {
|
||||
affected_rows += tmp_affect_rows;
|
||||
}
|
||||
}
|
||||
}
|
||||
delete_entity_.reset();
|
||||
}
|
||||
|
||||
if (OB_ITER_END == ret) {
|
||||
@ -606,7 +603,11 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected rowkey column count", K(ttl_row_iter.last_row_->get_count()), K(ttl_row_iter.get_rowkey_column_cnt()));
|
||||
} else {
|
||||
rowkey_allocator_.reuse();
|
||||
row_key.assign(ttl_row_iter.last_row_->cells_, ttl_row_iter.get_rowkey_column_cnt());
|
||||
if (OB_FAIL(row_key.deep_copy(rowkey_, rowkey_allocator_))) {
|
||||
LOG_WARN("fail to deep copy rowkey", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,6 @@ public:
|
||||
int64_t &affected_rows,
|
||||
transaction::ObTxDesc *trans_desc,
|
||||
transaction::ObTxReadSnapshot &snapshot);
|
||||
table::ObTableEntityFactory<table::ObTableEntity> &get_entity_factory() { return default_entity_factory_; }
|
||||
common::ObIAllocator &get_allocator() { return allocator_; }
|
||||
int init_credential(const table::ObTTLTaskParam &ttl_param);
|
||||
|
||||
@ -108,8 +107,9 @@ private:
|
||||
common::ObRowkey rowkey_;
|
||||
table::ObTenantTabletTTLMgr *ttl_tablet_mgr_;
|
||||
share::ObLSID ls_id_;
|
||||
table::ObTableEntityFactory<table::ObTableEntity> default_entity_factory_;
|
||||
ObTableEntity delete_entity_;
|
||||
table::ObTableApiCredential credential_;
|
||||
common::ObArenaAllocator rowkey_allocator_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableTTLDeleteTask);
|
||||
};
|
||||
|
||||
|
@ -406,6 +406,7 @@ int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTask
|
||||
task_para = ctx->ttl_para_;
|
||||
is_stop = false;
|
||||
} else {
|
||||
ctx->task_status_ = OB_TTL_TASK_PENDING;
|
||||
LOG_INFO("pending current task", K(local_tenant_task_.state_), K(local_tenant_task_.ttl_continue_));
|
||||
}
|
||||
} else if (OB_ITER_END == task_info.err_code_) {
|
||||
@ -426,9 +427,12 @@ int ObTenantTabletTTLMgr::report_task_status(ObTTLTaskInfo& task_info, ObTTLTask
|
||||
}
|
||||
}
|
||||
|
||||
//schedule task
|
||||
if (is_stop && OB_FAIL(try_schedule_remaining_tasks(ctx))) {
|
||||
LOG_WARN("fail to try schedule task", KR(ret));
|
||||
// schedule remaining tasks
|
||||
if (is_stop) {
|
||||
LOG_INFO("stop current task", K(ret), KPC(ctx), K_(local_tenant_task));
|
||||
if (OB_FAIL(try_schedule_remaining_tasks(ctx))) {
|
||||
LOG_WARN("fail to try schedule task", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -144,8 +144,8 @@ template <typename T>
|
||||
class ObTableEntityFactory: public ObITableEntityFactory
|
||||
{
|
||||
public:
|
||||
ObTableEntityFactory(const char *label = common::ObModIds::TABLE_PROC)
|
||||
:alloc_(label)
|
||||
ObTableEntityFactory(const char *label = common::ObModIds::TABLE_PROC, uint64_t tenant_id = OB_SERVER_TENANT_ID)
|
||||
:alloc_(label, OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id)
|
||||
{}
|
||||
virtual ~ObTableEntityFactory();
|
||||
virtual ObITableEntity *alloc() override;
|
||||
|
Reference in New Issue
Block a user