[CP] [OBKV] cherry-pick bugfix from 421

This commit is contained in:
obdev
2024-02-09 13:38:33 +00:00
committed by ob-robot
parent 5fe99ad26f
commit 13c335d128
26 changed files with 287 additions and 140 deletions

View File

@ -423,6 +423,7 @@ public:
int get_entity(ObITableEntity *&entity);
ObITableEntity *get_entity() { return entity_; }
int64_t get_affected_rows() const { return affected_rows_; }
int get_return_rows() { return ((entity_ == NULL || entity_->is_empty()) ? 0 : 1); }
void set_entity(ObITableEntity &entity) { entity_ = &entity; }
void set_type(ObTableOperationType::Type op_type) { operation_type_ = op_type; }
@ -842,7 +843,8 @@ public:
uint64_t get_checksum();
TO_STRING_KV(K_(query),
K_(mutations));
K_(mutations),
K_(return_affected_entity));
private:
ObTableQuery query_;
ObTableBatchOperation mutations_;

View File

@ -592,23 +592,13 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id)
int ObTTLUtil::move_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy,
int64_t batch_size, int64_t &move_rows,
bool need_cancel)
int64_t batch_size, int64_t &move_rows)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t insert_rows = 0;
int64_t delete_rows = 0;
if (!need_cancel &&
OB_FAIL(sql.assign_fmt("replace into %s select * from %s "
" where task_id = %ld and tablet_id != -1 and table_id != -1"
" order by tenant_id, task_id, table_id, tablet_id LIMIT %ld",
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
share::OB_ALL_KV_TTL_TASK_TNAME,
task_id, batch_size))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (need_cancel &&
OB_FAIL(sql.assign_fmt("replace into %s select gmt_create, gmt_modified,"
if (OB_FAIL(sql.assign_fmt("replace into %s select gmt_create, gmt_modified,"
" tenant_id, task_id, table_id, tablet_id, task_start_time,"
" task_update_time, trigger_type, if(status=4, 4, 3) as status,"
" ttl_del_cnt, max_version_del_cnt, scan_cnt, row_key, ret_code from %s"
@ -731,9 +721,9 @@ int ObTTLUtil::parse_kv_attributes(const ObString &kv_attributes, int32_t &max_v
json::Value *ttl_val = elem->value_;
if (NULL != ttl_val && ttl_val->get_type() == json::JT_NUMBER) {
if (ttl_val->get_number() <= 0) {
ret = OB_INVALID_ARGUMENT;
ret = OB_TTL_INVALID_HBASE_TTL;
LOG_WARN("time to live should greater than 0", K(ret), K(ttl_val));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "time to live, which should greater than 0");
LOG_USER_ERROR(OB_TTL_INVALID_HBASE_TTL);
} else {
time_to_live = static_cast<int32_t>(ttl_val->get_number());
}
@ -742,9 +732,9 @@ int ObTTLUtil::parse_kv_attributes(const ObString &kv_attributes, int32_t &max_v
json::Value *max_versions_val = elem->value_;
if (NULL != max_versions_val && max_versions_val->get_type() == json::JT_NUMBER) {
if (max_versions_val->get_number() <= 0) {
ret = OB_INVALID_ARGUMENT;
ret = OB_TTL_INVALID_HBASE_MAXVERSIONS;
LOG_WARN("max versions should greater than 0", K(ret), K(max_versions_val));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "max versions, which should greater than 0");
LOG_USER_ERROR(OB_TTL_INVALID_HBASE_MAXVERSIONS);
} else {
max_versions = static_cast<int32_t>(max_versions_val->get_number());
}
@ -771,7 +761,7 @@ int ObTTLUtil::parse_kv_attributes(const ObString &kv_attributes, int32_t &max_v
}
}
} else {
ret = OB_INVALID_ARGUMENT;
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported kv attribute", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "kv attributes with wrong format");
}
@ -1205,5 +1195,46 @@ int ObTTLUtil::check_task_status_from_sys_table(uint64_t tenant_id, common::ObIS
}
bool ObTTLUtil::is_enable_ttl(uint64_t tenant_id)
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
return tenant_config.is_valid() && tenant_config->enable_kv_ttl;
}
const char * ObTTLUtil::get_ttl_tenant_status_cstr(const ObTTLTaskStatus &status)
{
const char *status_cstr = NULL;
switch (status) {
case OB_RS_TTL_TASK_CREATE: {
status_cstr = "RUNNING";
break;
}
case OB_RS_TTL_TASK_SUSPEND: {
status_cstr = "PENDING";
break;
}
case OB_RS_TTL_TASK_CANCEL: {
status_cstr = "CANCELING";
break;
}
case OB_RS_TTL_TASK_MOVE: {
status_cstr = "MOVING";
break;
}
case OB_TTL_TASK_FINISH: { // wait
status_cstr = "FINISHED";
break;
}
default: {
status_cstr = "UNKNOWN";
break;
}
}
return status_cstr;
}
} // end namespace rootserver
} // end namespace oceanbase

View File

@ -263,7 +263,8 @@ public:
obrpc::ObTTLRequestArg::TTLRequestType type_;
};
class ObTTLUtil {
class ObTTLUtil
{
public:
static int parse(const char* str, ObTTLDutyDuration& duration);
static bool current_in_duration(ObTTLDutyDuration& duration);
@ -317,8 +318,7 @@ public:
static int move_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy,
int64_t batch_size, int64_t &move_rows,
bool need_cancel = false);
int64_t batch_size, int64_t &move_rows);
static int move_tenant_task_to_history_table(uint64_t tenant_id, uint64_t task_id,
common::ObMySQLTransaction& proxy);
@ -340,6 +340,8 @@ public:
static inline bool is_ttl_task_status_end_state(ObTTLTaskStatus status) {
return status == ObTTLTaskStatus::OB_TTL_TASK_CANCEL || status == ObTTLTaskStatus::OB_TTL_TASK_FINISH;
}
static bool is_enable_ttl(uint64_t tenant_id);
static const char *get_ttl_tenant_status_cstr(const ObTTLTaskStatus &status);
const static uint64_t TTL_TENNAT_TASK_TABLET_ID = -1;
const static uint64_t TTL_TENNAT_TASK_TABLE_ID = -1;
private: