[FEAT MERGE] [OBKV] support TTL
Co-authored-by: IHEII <602280108@qq.com>
This commit is contained in:
committed by
ob-robot
parent
464ba63de9
commit
b60e4f8d97
626
src/observer/table/ttl/ob_table_ttl_task.cpp
Normal file
626
src/observer/table/ttl/ob_table_ttl_task.cpp
Normal file
@ -0,0 +1,626 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
#include "ob_table_ttl_task.h"
|
||||
#include "share/table/ob_table_ttl_common.h"
|
||||
#include "share/table/ob_table.h"
|
||||
#include "observer/table/ob_table_op_wrapper.h"
|
||||
#include "share/scheduler/ob_dag_warning_history_mgr.h"
|
||||
#include "observer/table/ob_table_query_common.h"
|
||||
#include "observer/table/ob_table_query_and_mutate_processor.h"
|
||||
#include "lib/utility/utility.h"
|
||||
|
||||
using namespace oceanbase::sql;
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::observer;
|
||||
using namespace oceanbase::storage;
|
||||
using namespace oceanbase::share;
|
||||
using namespace oceanbase::table;
|
||||
using namespace oceanbase::rootserver;
|
||||
|
||||
|
||||
/**
|
||||
* ---------------------------------------- ObTableTTLDeleteTask ----------------------------------------
|
||||
*/
|
||||
ObTableTTLDeleteTask::ObTableTTLDeleteTask():
|
||||
ObITask(ObITaskType::TASK_TYPE_TTL_DELETE),
|
||||
is_inited_(false),
|
||||
param_(),
|
||||
info_(NULL),
|
||||
allocator_(ObMemAttr(MTL_ID(), "TTLDeleteTask")),
|
||||
rowkey_(),
|
||||
ttl_tablet_mgr_(NULL),
|
||||
default_entity_factory_("TTLEntityFac")
|
||||
{
|
||||
}
|
||||
|
||||
ObTableTTLDeleteTask::~ObTableTTLDeleteTask()
|
||||
{
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::init(ObTenantTabletTTLMgr *ttl_tablet_mgr,
|
||||
const ObTTLTaskParam &ttl_para,
|
||||
ObTTLTaskInfo &ttl_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", KR(ret));
|
||||
} else if (OB_ISNULL(ttl_tablet_mgr) || ttl_info.table_id_ == OB_INVALID_ID || !ttl_info.ls_id_.is_valid() || !ttl_info.is_valid() || ttl_para.tenant_id_ == OB_INVALID_ID) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(ttl_tablet_mgr), K(ttl_info), K(ttl_para));
|
||||
} else {
|
||||
if (ttl_info.row_key_.empty()) {
|
||||
rowkey_.reset();
|
||||
} else {
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(rowkey_.deserialize(allocator_, ttl_info.row_key_.ptr(), ttl_info.row_key_.length(), pos))) {
|
||||
LOG_WARN("fail to deserialize rowkey", KR(ret), K(ttl_info.row_key_));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(init_credential(ttl_para))) {
|
||||
LOG_WARN("fail to init credential", KR(ret));
|
||||
} else if (OB_FAIL(create_session_pool(ttl_para.tenant_id_))) {
|
||||
LOG_WARN("fail to update session pool");
|
||||
} else {
|
||||
param_ = ttl_para;
|
||||
info_ = &ttl_info;
|
||||
ttl_tablet_mgr_ = ttl_tablet_mgr;
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::create_session_pool(int64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(GCTX.table_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table service is null", KR(ret));
|
||||
} else if (OB_FAIL(GCTX.table_service_->get_sess_mgr().create_pool_if_not_exists(tenant_id))) {
|
||||
LOG_WARN("fait to get session pool", K(ret), K(tenant_id));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObTableTTLDeleteTask::init_credential(const ObTTLTaskParam &ttl_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObUserInfo *user_info = nullptr;
|
||||
int64_t tenant_id = ttl_param.tenant_id_;
|
||||
int64_t user_id = ttl_param.user_id_;
|
||||
int64_t database_id = ttl_param.database_id_;
|
||||
schema::ObSchemaGetterGuard schema_guard;
|
||||
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("fail to get schema guard", K(ret), K(tenant_id));
|
||||
} else if(OB_FAIL(schema_guard.get_user_info(tenant_id, user_id, user_info))) {
|
||||
LOG_WARN("fail to get user id", KR(ret), K(tenant_id), K(user_id));
|
||||
} else if (OB_ISNULL(user_info)) {
|
||||
ret = OB_USER_NOT_EXIST;
|
||||
LOG_WARN("user not exist", KR(ret), K(tenant_id), K(user_id));
|
||||
} else {
|
||||
credential_.cluster_id_ = GCONF.cluster_id;
|
||||
credential_.tenant_id_ = tenant_id;
|
||||
credential_.user_id_ = user_id;
|
||||
credential_.database_id_ = database_id;
|
||||
credential_.expire_ts_ = 0;
|
||||
credential_.hash(credential_.hash_val_, user_info->get_passwd_str().hash());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObTableTTLDeleteTask::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(MTL_ID(), "TTLDeleteTask", ObCtxIds::DEFAULT_CTX_ID)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
CREATE_WITH_TEMP_CONTEXT(param) {
|
||||
bool need_stop = false;
|
||||
while(!need_stop) {
|
||||
if (OB_FAIL(process_one())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to process one", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ttl_tablet_mgr_->report_task_status(const_cast<ObTTLTaskInfo&>(*info_), param_, need_stop))) {
|
||||
LOG_WARN("fail to report ttl task status", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::process_one()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t start_time = ObTimeUtil::current_time();
|
||||
ObTxDesc *trans_desc = nullptr;
|
||||
ObTxReadSnapshot tx_snapshot;
|
||||
TransState trans_state;
|
||||
ObTableTTLOperationResult result;
|
||||
ObTableApiSpec *scan_spec = nullptr;
|
||||
observer::ObReqTimeGuard req_timeinfo_guard; // 引用cache资源必须加ObReqTimeGuard
|
||||
ObTableApiCacheGuard cache_guard;
|
||||
ObTableTTLOperation ttl_operation(info_->tenant_id_,
|
||||
info_->table_id_,
|
||||
param_,
|
||||
PER_TASK_DEL_ROWS,
|
||||
rowkey_);
|
||||
SMART_VAR(ObTableCtx, tb_ctx, allocator_) {
|
||||
if (OB_FAIL(init_scan_tb_ctx(tb_ctx, cache_guard))) {
|
||||
LOG_WARN("fail to init tb ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableApiProcessorBase::start_trans_(
|
||||
false,
|
||||
trans_desc,
|
||||
tx_snapshot,
|
||||
ObTableConsistencyLevel::STRONG,
|
||||
&trans_state,
|
||||
tb_ctx.get_table_id(),
|
||||
tb_ctx.get_ls_id(),
|
||||
get_timeout_ts()))) {
|
||||
LOG_WARN("fail to start trans", KR(ret));
|
||||
} else if (OB_FAIL(tb_ctx.init_trans(trans_desc, tx_snapshot))) {
|
||||
LOG_WARN("fail to init trans", KR(ret));
|
||||
} else if (OB_FAIL(cache_guard.get_spec<TABLE_API_EXEC_SCAN>(&tb_ctx, scan_spec))) {
|
||||
LOG_WARN("fail to get scan spec from cache", KR(ret));
|
||||
} else {
|
||||
ObTableTTLDeleteRowIterator row_iter;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
if (OB_FAIL(scan_spec->create_executor(tb_ctx, executor))) {
|
||||
LOG_WARN("fail to generate executor", KR(ret), K(tb_ctx));
|
||||
} else if (OB_FAIL(row_iter.init(*tb_ctx.get_table_schema(), ttl_operation))){
|
||||
LOG_WARN("fail to init ttl row iterator", KR(ret));
|
||||
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
||||
LOG_WARN("fail to open scan row iterator", KR(ret));
|
||||
} else if (OB_FAIL(execute_ttl_delete(row_iter, result, trans_desc, tx_snapshot))) {
|
||||
LOG_WARN("fail to execute ttl table", KR(ret));
|
||||
}
|
||||
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = row_iter.close())) {
|
||||
LOG_WARN("fail to close row iter", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(scan_spec)) {
|
||||
scan_spec->destroy_executor(executor);
|
||||
tb_ctx.set_expr_info(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (trans_state.is_start_trans_executed() && trans_state.is_start_trans_success()) {
|
||||
int tmp_ret = ret;
|
||||
if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts()))) {
|
||||
LOG_WARN("fail to end trans", KR(ret));
|
||||
}
|
||||
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
|
||||
}
|
||||
|
||||
info_->max_version_del_cnt_ += result.get_max_version_del_row();
|
||||
info_->ttl_del_cnt_ += result.get_ttl_del_row();
|
||||
info_->scan_cnt_ += result.get_scan_row();
|
||||
info_->err_code_ = ret;
|
||||
info_->row_key_ = result.get_end_rowkey();
|
||||
if (OB_SUCC(ret) && result.get_del_row() < PER_TASK_DEL_ROWS) {
|
||||
ret = OB_ITER_END; // finsh task
|
||||
info_->err_code_ = ret;
|
||||
LOG_DEBUG("finish delete", KR(ret), KPC_(info));
|
||||
}
|
||||
int64_t cost = ObTimeUtil::current_time() - start_time;
|
||||
LOG_DEBUG("finish process one", KR(ret), K(cost));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::init_scan_tb_ctx(ObTableCtx &tb_ctx, ObTableApiCacheGuard &cache_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObExprFrameInfo *expr_frame_info = nullptr;
|
||||
tb_ctx.set_scan(true);
|
||||
tb_ctx.set_operation_type(ObTableOperationType::DEL);
|
||||
if (tb_ctx.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K(tb_ctx));
|
||||
} else if (OB_FAIL(tb_ctx.init_common(credential_,
|
||||
get_tablet_id(),
|
||||
get_table_id(),
|
||||
get_timeout_ts()))) {
|
||||
LOG_WARN("fail to init table ctx common part", KR(ret), "table_id", get_table_id(),
|
||||
"tablet_id", get_tablet_id(), "timeout_ts", get_timeout_ts());
|
||||
} else if (OB_FAIL(tb_ctx.init_ttl_delete(get_start_rowkey()))) {
|
||||
LOG_WARN("fail to init delete ctx", KR(ret), K(tb_ctx));
|
||||
} else if (OB_FAIL(cache_guard.init(&tb_ctx))) {
|
||||
LOG_WARN("fail to init cache guard", K(ret));
|
||||
} else if (OB_FAIL(cache_guard.get_expr_info(&tb_ctx, expr_frame_info))) {
|
||||
LOG_WARN("fail to get expr frame info from cache", K(ret));
|
||||
} else if (OB_FAIL(ObTableExprCgService::alloc_exprs_memory(tb_ctx, *expr_frame_info))) {
|
||||
LOG_WARN("fail to alloc expr memory", K(ret));
|
||||
} else if (OB_FAIL(tb_ctx.init_exec_ctx())) {
|
||||
LOG_WARN("fail to init exec ctx", KR(ret), K(tb_ctx));
|
||||
} else {
|
||||
tb_ctx.set_init_flag(true);
|
||||
tb_ctx.set_expr_info(expr_frame_info);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::process_ttl_delete(const ObITableEntity &new_entity,
|
||||
int64_t &affected_rows,
|
||||
transaction::ObTxDesc *trans_desc,
|
||||
transaction::ObTxReadSnapshot &snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SMART_VAR(ObTableCtx, tb_ctx, allocator_) {
|
||||
ObTableApiSpec *spec = nullptr;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
ObTableOperationResult op_result;
|
||||
if (OB_FAIL(init_tb_ctx(new_entity, tb_ctx))) {
|
||||
LOG_WARN("fail to init table ctx", K(ret), K(new_entity));
|
||||
} else if (OB_FAIL(tb_ctx.init_trans(trans_desc, snapshot))) {
|
||||
LOG_WARN("fail to init trans", K(ret), K(tb_ctx));
|
||||
} else if (OB_FAIL(ObTableOpWrapper::process_op<TABLE_API_EXEC_DELETE>(tb_ctx, op_result))) {
|
||||
LOG_WARN("fail to process insert op", K(ret));
|
||||
} else {
|
||||
affected_rows = op_result.get_affected_rows();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::init_tb_ctx(const ObITableEntity &entity,
|
||||
ObTableCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ctx.set_entity(&entity);
|
||||
ctx.set_entity_type(ObTableEntityType::ET_KV);
|
||||
ctx.set_operation_type(ObTableOperationType::DEL);
|
||||
ctx.set_batch_operation(NULL);
|
||||
|
||||
if (!ctx.is_init()) {
|
||||
if (OB_FAIL(ctx.init_common(credential_,
|
||||
get_tablet_id(),
|
||||
get_table_id(),
|
||||
get_timeout_ts()))) {
|
||||
LOG_WARN("fail to init table ctx common part", K(ret), "table_id", get_table_id(),
|
||||
"tablet_id", get_tablet_id(), "timeout_ts", get_timeout_ts());
|
||||
} else if (OB_FAIL(ctx.init_delete())) {
|
||||
LOG_WARN("fail to init delete ctx", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(ctx.init_exec_ctx())) {
|
||||
LOG_WARN("fail to init exec ctx", K(ret), K(ctx));
|
||||
} else {
|
||||
ctx.set_is_ttl_table(false);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* ---------------------------------------- ObTableTTLDag ----------------------------------------
|
||||
*/
|
||||
ObTableTTLDag::ObTableTTLDag()
|
||||
: ObIDag(ObDagType::DAG_TYPE_TTL),
|
||||
is_inited_(false), param_(), info_(),
|
||||
compat_mode_(lib::Worker::CompatMode::INVALID)
|
||||
{
|
||||
}
|
||||
|
||||
ObTableTTLDag::~ObTableTTLDag()
|
||||
{
|
||||
}
|
||||
|
||||
int ObTableTTLDag::init(const ObTTLTaskParam ¶m, ObTTLTaskInfo &info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObTableTTLDag has already been inited", KR(ret));
|
||||
} else if (OB_UNLIKELY(!param.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid task para", KR(ret), K(info));
|
||||
} else if (OB_UNLIKELY(!info.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid task info", KR(ret), K(param));
|
||||
} else if (OB_FAIL(ObCompatModeGetter::get_table_compat_mode(info.tenant_id_, info.table_id_, compat_mode_))) {
|
||||
LOG_WARN("fail to get compat mode", KR(ret), K(info.tenant_id_), K(info.table_id_));
|
||||
} else {
|
||||
param_ = param;
|
||||
is_inited_ = true;
|
||||
info_ = info;
|
||||
consumer_group_id_ = info_.consumer_group_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTableTTLDag::operator==(const ObIDag& other) const
|
||||
{
|
||||
bool is_equal = false;
|
||||
if (OB_UNLIKELY(this == &other)) {
|
||||
is_equal = true;
|
||||
} else if (get_type() == other.get_type()) {
|
||||
const ObTableTTLDag &dag = static_cast<const ObTableTTLDag &>(other);
|
||||
if (OB_UNLIKELY(!param_.is_valid() || !dag.param_.is_valid() ||
|
||||
!dag.info_.is_valid() || !dag.info_.is_valid())) {
|
||||
LOG_ERROR_RET(OB_ERR_SYS, "invalid argument", K_(param), K_(info), K(dag.param_), K(dag.info_));
|
||||
} else {
|
||||
is_equal = (info_.tenant_id_ == dag.info_.tenant_id_) &&
|
||||
(info_.task_id_ == dag.info_.task_id_) &&
|
||||
(info_.table_id_ == dag.info_.table_id_) &&
|
||||
(info_.tablet_id_ == dag.info_.tablet_id_) &&
|
||||
(param_.ttl_ == dag.param_.ttl_) &&
|
||||
(param_.max_version_ == dag.param_.max_version_);
|
||||
}
|
||||
}
|
||||
return is_equal;
|
||||
}
|
||||
|
||||
int64_t ObTableTTLDag::hash() const
|
||||
{
|
||||
int64_t hash_val = 0;
|
||||
if (OB_UNLIKELY(!is_inited_ || !param_.is_valid() || !info_.is_valid())) {
|
||||
LOG_ERROR_RET(OB_ERR_SYS, "invalid argument", K(is_inited_), K(param_));
|
||||
} else {
|
||||
hash_val = common::murmurhash(&info_.tenant_id_, sizeof(info_.tenant_id_), hash_val);
|
||||
hash_val = common::murmurhash(&info_.task_id_, sizeof(info_.task_id_), hash_val);
|
||||
hash_val = common::murmurhash(&info_.table_id_, sizeof(info_.table_id_), hash_val);
|
||||
hash_val += info_.tablet_id_.hash();
|
||||
hash_val = common::murmurhash(¶m_.ttl_, sizeof(param_.ttl_), hash_val);
|
||||
hash_val = common::murmurhash(¶m_.max_version_, sizeof(param_.max_version_), hash_val);
|
||||
}
|
||||
return hash_val;
|
||||
}
|
||||
|
||||
int ObTableTTLDag::fill_dag_key(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableTTLDag has not been initialized", KR(ret));
|
||||
} else if (OB_FAIL(databuff_printf(buf, buf_len, "ttl task: ls_id = %ld, tenant_id = %ld, table_id = %ld, "
|
||||
"tablet_id=%ld, max_version=%d, time_to_live=%d",
|
||||
info_.ls_id_.id(),
|
||||
info_.tenant_id_,
|
||||
info_.table_id_,
|
||||
info_.tablet_id_.id(),
|
||||
param_.max_version_,
|
||||
param_.ttl_))) {
|
||||
LOG_WARN("fail to fill comment", KR(ret), K(param_), K(info_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(IS_NOT_INIT)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableTTLDag has not been initialized", KR(ret));
|
||||
} else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(),
|
||||
static_cast<int64_t>(info_.tenant_id_),
|
||||
static_cast<int64_t>(info_.ls_id_.id()),
|
||||
static_cast<int64_t>(info_.table_id_),
|
||||
static_cast<int64_t>(info_.tablet_id_.id())))) {
|
||||
LOG_WARN("fail to fill info param", KR(ret), K_(info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
|
||||
: is_inited_(false), max_version_(0), time_to_live_ms_(0), limit_del_rows_(-1), cur_del_rows_(0),
|
||||
cur_version_(0), cur_rowkey_(), cur_qualifier_(), max_version_cnt_(0), ttl_cnt_(0), scan_cnt_(0),
|
||||
is_last_row_ttl_(true), is_hbase_table_(false), last_row_(nullptr), rowkey_cnt_(0)
|
||||
{
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema,
|
||||
const ObTableTTLOperation &ttl_operation)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("the table api ttl delete row iterator has been inited, ", KR(ret));
|
||||
} else if (OB_UNLIKELY(!ttl_operation.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid ttl operation", KR(ret), K(ttl_operation));
|
||||
} else if (OB_FAIL(ttl_checker_.init(table_schema, true))) {
|
||||
LOG_WARN("fail to init ttl checker", KR(ret));
|
||||
} else {
|
||||
time_to_live_ms_ = ttl_operation.time_to_live_ * 1000l;
|
||||
max_version_ = ttl_operation.max_version_;
|
||||
limit_del_rows_ = ttl_operation.del_row_limit_;
|
||||
is_hbase_table_ = ttl_operation.is_htable_;
|
||||
rowkey_cnt_ = table_schema.get_rowkey_column_num();
|
||||
ObSArray<uint64_t> rowkey_column_ids;
|
||||
ObSArray<uint64_t> full_column_ids;
|
||||
if (OB_FAIL(table_schema.get_rowkey_column_ids(rowkey_column_ids))) {
|
||||
LOG_WARN("fail to get rowkey column ids", KR(ret));
|
||||
} else if (OB_FAIL(table_schema.get_column_ids(full_column_ids))) {
|
||||
LOG_WARN("fail to get full column ids", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0, idx = -1; OB_SUCC(ret) && i < rowkey_column_ids.count(); i++) {
|
||||
if (has_exist_in_array(full_column_ids, rowkey_column_ids[i], &idx)) {
|
||||
if (OB_FAIL(rowkey_cell_ids_.push_back(idx))) {
|
||||
LOG_WARN("fail to add rowkey cell idx", K(ret), K(i), K(rowkey_column_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && is_hbase_table_ && ttl_operation.start_rowkey_.is_valid()) {
|
||||
if (ttl_operation.start_rowkey_.get_obj_cnt() != 3 || OB_ISNULL(ttl_operation.start_rowkey_.get_obj_ptr())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(ttl_operation.start_rowkey_));
|
||||
} else {
|
||||
ObObj *obj_ptr = const_cast<ObObj *>(ttl_operation.start_rowkey_.get_obj_ptr());
|
||||
cur_rowkey_ = obj_ptr[ObHTableConstants::COL_IDX_K].get_string();
|
||||
cur_qualifier_ = obj_ptr[ObHTableConstants::COL_IDX_Q].get_string();
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// get next expired row to delete
|
||||
int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row = nullptr;
|
||||
if (cur_del_rows_ >= limit_del_rows_) {
|
||||
ret = OB_ITER_END;
|
||||
LOG_DEBUG("finish get next row", KR(ret), K(cur_del_rows_), K(limit_del_rows_));
|
||||
} else {
|
||||
bool is_expired = false;
|
||||
while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row))) {
|
||||
last_row_ = row;
|
||||
// NOTE: For hbase table, the row expired if and only if
|
||||
// 1. The row's version exceed maxversion
|
||||
// 2. The row's expired time(cell_ts + ttl) exceed current time
|
||||
if (is_hbase_table_) {
|
||||
scan_cnt_++;
|
||||
ObHTableCellEntity cell(row);
|
||||
ObString cell_rowkey = cell.get_rowkey();
|
||||
ObString cell_qualifier = cell.get_qualifier();
|
||||
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
||||
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
||||
cur_version_ = 1;
|
||||
cur_rowkey_ = cell_rowkey;
|
||||
cur_qualifier_ = cell_qualifier;
|
||||
} else {
|
||||
cur_version_++;
|
||||
}
|
||||
if (max_version_ > 0 && cur_version_ > max_version_) {
|
||||
max_version_cnt_++;
|
||||
cur_del_rows_++;
|
||||
is_last_row_ttl_ = false;
|
||||
is_expired = true;
|
||||
} else if (time_to_live_ms_ > 0 && (cell_ts + time_to_live_ms_ < ObHTableUtils::current_time_millis())) {
|
||||
ttl_cnt_++;
|
||||
cur_del_rows_++;
|
||||
is_last_row_ttl_ = true;
|
||||
is_expired = true;
|
||||
}
|
||||
} else {
|
||||
// NOTE: For relation table, the row expired if and only if
|
||||
// 1. The row's expired time (the result of ttl definition) exceed current time
|
||||
scan_cnt_++;
|
||||
if (OB_FAIL(ttl_checker_.check_row_expired(*row, is_expired))) {
|
||||
LOG_WARN("fail to check row expired", KR(ret));
|
||||
} else if (is_expired) {
|
||||
ttl_cnt_++;
|
||||
cur_del_rows_++;
|
||||
is_last_row_ttl_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("fail to get next row", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_row_iter,
|
||||
ObTableTTLOperationResult &result,
|
||||
transaction::ObTxDesc *trans_desc,
|
||||
transaction::ObTxReadSnapshot &snapshot)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_rows = 0;
|
||||
while (OB_SUCC(ret)) {
|
||||
ObNewRow *row = nullptr;
|
||||
ObITableEntity *new_entity = nullptr;
|
||||
if (OB_FAIL(ttl_row_iter.get_next_row(row))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
}
|
||||
} else if (OB_ISNULL(new_entity = default_entity_factory_.alloc())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc entity", K(ret));
|
||||
} else {
|
||||
int64_t rowkey_cnt = ttl_row_iter.rowkey_cell_ids_.count();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; i++) {
|
||||
if (OB_FAIL(new_entity->add_rowkey_value(row->get_cell(ttl_row_iter.rowkey_cell_ids_[i])))) {
|
||||
LOG_WARN("fail to add rowkey value", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t tmp_affect_rows = 0;
|
||||
if (OB_FAIL(process_ttl_delete(*new_entity, tmp_affect_rows, trans_desc, snapshot))) {
|
||||
LOG_WARN("fail to execute table delete", K(ret));
|
||||
} else {
|
||||
affected_rows += tmp_affect_rows;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
uint64_t iter_ttl_cnt = ttl_row_iter.ttl_cnt_;
|
||||
uint64_t iter_max_version_cnt = ttl_row_iter.max_version_cnt_;
|
||||
uint64_t iter_return_cnt = iter_ttl_cnt + iter_max_version_cnt;
|
||||
if (OB_UNLIKELY(affected_rows != iter_return_cnt)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("unexpected affect rows", K(ret), K(affected_rows), K(iter_return_cnt));
|
||||
} else {
|
||||
result.ttl_del_rows_ = iter_ttl_cnt;
|
||||
result.max_version_del_rows_ = iter_max_version_cnt;
|
||||
result.scan_rows_ = ttl_row_iter.scan_cnt_;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObRowkey row_key;
|
||||
if (OB_NOT_NULL(ttl_row_iter.last_row_)) {
|
||||
if (ttl_row_iter.last_row_->get_count() < ttl_row_iter.get_rowkey_column_cnt()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected rowkey column count", K(ttl_row_iter.last_row_->get_count()), K(ttl_row_iter.get_rowkey_column_cnt()));
|
||||
} else {
|
||||
row_key.assign(ttl_row_iter.last_row_->cells_, ttl_row_iter.get_rowkey_column_cnt());
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && row_key.is_valid()) {
|
||||
uint64_t buf_len = row_key.get_serialize_size();
|
||||
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(row_key.serialize(buf, buf_len, pos))) {
|
||||
LOG_WARN("fail to serialize", K(ret), K(buf_len), K(pos), K(row_key));
|
||||
} else {
|
||||
result.end_rowkey_.assign_ptr(buf, buf_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("execute ttl delete", K(ret), K(result));
|
||||
return ret;
|
||||
}
|
||||
Reference in New Issue
Block a user