fix ttl wrong unit of time to life and change active_memstore_used to total_memstore_used
This commit is contained in:
@ -46,12 +46,12 @@ int ObHColumnDescriptor::from_string(const common::ObString &str)
|
|||||||
if (elem->name_.case_compare("TimeToLive") == 0) {
|
if (elem->name_.case_compare("TimeToLive") == 0) {
|
||||||
json::Value *ttl_val = elem->value_;
|
json::Value *ttl_val = elem->value_;
|
||||||
if (NULL != ttl_val && ttl_val->get_type() == json::JT_NUMBER) {
|
if (NULL != ttl_val && ttl_val->get_type() == json::JT_NUMBER) {
|
||||||
time_to_live_ = static_cast<int64_t>(ttl_val->get_number());
|
time_to_live_ = static_cast<int32_t>(ttl_val->get_number());
|
||||||
}
|
}
|
||||||
} else if (elem->name_.case_compare("MaxVersions") == 0) {
|
} else if (elem->name_.case_compare("MaxVersions") == 0) {
|
||||||
json::Value *max_version_val = elem->value_;
|
json::Value *max_version_val = elem->value_;
|
||||||
if (NULL != max_version_val && max_version_val->get_type() == json::JT_NUMBER) {
|
if (NULL != max_version_val && max_version_val->get_type() == json::JT_NUMBER) {
|
||||||
max_version_ = static_cast<int64_t>(max_version_val->get_number());
|
max_version_ = static_cast<int32_t>(max_version_val->get_number());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // end foreach
|
} // end foreach
|
||||||
|
|||||||
@ -2175,9 +2175,7 @@ int ObTableService::execute_ttl_delete(ObTableServiceTTLCtx &ctx, const ObTableT
|
|||||||
query.get_offset(), ctx.scan_param_))) {
|
query.get_offset(), ctx.scan_param_))) {
|
||||||
LOG_WARN("failed to fill param", K(ret));
|
LOG_WARN("failed to fill param", K(ret));
|
||||||
} else if (OB_FAIL(part_service_->table_scan(ctx.scan_param_, ctx.scan_result_))) {
|
} else if (OB_FAIL(part_service_->table_scan(ctx.scan_param_, ctx.scan_result_))) {
|
||||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
|
||||||
LOG_WARN("fail to scan table", K(ret));
|
LOG_WARN("fail to scan table", K(ret));
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ttl_row_iter.set_scan_result(ctx.scan_result_);
|
ttl_row_iter.set_scan_result(ctx.scan_result_);
|
||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
@ -2196,9 +2194,7 @@ int ObTableService::execute_ttl_delete(ObTableServiceTTLCtx &ctx, const ObTableT
|
|||||||
ctx.scan_param_.column_ids_,
|
ctx.scan_param_.column_ids_,
|
||||||
&ttl_row_iter,
|
&ttl_row_iter,
|
||||||
affected_rows))) {
|
affected_rows))) {
|
||||||
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
|
|
||||||
LOG_WARN("failed to delete", K(ret), K(table_id));
|
LOG_WARN("failed to delete", K(ret), K(table_id));
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("delete rows", K(ret), K(affected_rows));
|
LOG_DEBUG("delete rows", K(ret), K(affected_rows));
|
||||||
}
|
}
|
||||||
@ -2287,7 +2283,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
ObHTableCellEntity cell(row);
|
ObHTableCellEntity cell(row);
|
||||||
ObString cell_rowkey = cell.get_rowkey();
|
ObString cell_rowkey = cell.get_rowkey();
|
||||||
ObString cell_qualifier = cell.get_qualifier();
|
ObString cell_qualifier = cell.get_qualifier();
|
||||||
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative
|
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
||||||
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
||||||
cur_version_ = 1;
|
cur_version_ = 1;
|
||||||
cur_rowkey_ = cell_rowkey;
|
cur_rowkey_ = cell_rowkey;
|
||||||
@ -2300,7 +2296,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
cur_del_rows_++;
|
cur_del_rows_++;
|
||||||
is_last_row_ttl_ = false;
|
is_last_row_ttl_ = false;
|
||||||
break;
|
break;
|
||||||
} else if (time_to_live_ > 0 && (cell_ts + time_to_live_ < ObTimeUtility::current_time())) {
|
} else if (time_to_live_ms_ > 0 && (cell_ts + time_to_live_ms_ < ObHTableUtils::current_time_millis())) {
|
||||||
ttl_cnt_++;
|
ttl_cnt_++;
|
||||||
cur_del_rows_++;
|
cur_del_rows_++;
|
||||||
is_last_row_ttl_ = true;
|
is_last_row_ttl_ = true;
|
||||||
@ -2333,7 +2329,7 @@ void ObTableTTLDeleteRowIterator::reset()
|
|||||||
{
|
{
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
scan_result_ = NULL;
|
scan_result_ = NULL;
|
||||||
time_to_live_ = 0;
|
time_to_live_ms_ = 0;
|
||||||
max_version_ = 0;
|
max_version_ = 0;
|
||||||
limit_del_rows_ = 0;
|
limit_del_rows_ = 0;
|
||||||
cur_del_rows_ = 0;
|
cur_del_rows_ = 0;
|
||||||
@ -2352,7 +2348,7 @@ int ObTableTTLDeleteRowIterator::init(const ObTableTTLOperation &ttl_operation)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid ttl operation", K(ret), K(ttl_operation));
|
LOG_WARN("invalid ttl operation", K(ret), K(ttl_operation));
|
||||||
} else {
|
} else {
|
||||||
time_to_live_ = ttl_operation.time_to_live_;
|
time_to_live_ms_ = ttl_operation.time_to_live_ * 1000l;
|
||||||
max_version_ = ttl_operation.max_version_;
|
max_version_ = ttl_operation.max_version_;
|
||||||
limit_del_rows_ = ttl_operation.del_row_limit_;
|
limit_del_rows_ = ttl_operation.del_row_limit_;
|
||||||
cur_rowkey_ = ttl_operation.start_rowkey_;
|
cur_rowkey_ = ttl_operation.start_rowkey_;
|
||||||
|
|||||||
@ -166,7 +166,7 @@ class ObTableTTLDeleteRowIterator : public common::ObNewRowIterator
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableTTLDeleteRowIterator():
|
ObTableTTLDeleteRowIterator():
|
||||||
is_inited_(false), time_to_live_(0), max_version_(0),
|
is_inited_(false), max_version_(0), time_to_live_ms_(0),
|
||||||
limit_del_rows_(-1), cur_del_rows_(0), cur_version_(0), cur_rowkey_(), cur_qualifier_(),
|
limit_del_rows_(-1), cur_del_rows_(0), cur_version_(0), cur_rowkey_(), cur_qualifier_(),
|
||||||
max_version_cnt_(0), ttl_cnt_(0), scan_cnt_(0), is_last_row_ttl_(true) {}
|
max_version_cnt_(0), ttl_cnt_(0), scan_cnt_(0), is_last_row_ttl_(true) {}
|
||||||
~ObTableTTLDeleteRowIterator() {}
|
~ObTableTTLDeleteRowIterator() {}
|
||||||
@ -177,8 +177,8 @@ public:
|
|||||||
public:
|
public:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
common::ObNewRowIterator *scan_result_;
|
common::ObNewRowIterator *scan_result_;
|
||||||
uint64_t time_to_live_;
|
int32_t max_version_;
|
||||||
uint64_t max_version_;
|
int64_t time_to_live_ms_; // ttl in millisecond
|
||||||
uint64_t limit_del_rows_; // maximum delete row
|
uint64_t limit_del_rows_; // maximum delete row
|
||||||
uint64_t cur_del_rows_; // current delete row
|
uint64_t cur_del_rows_; // current delete row
|
||||||
uint64_t cur_version_;
|
uint64_t cur_version_;
|
||||||
|
|||||||
@ -853,11 +853,16 @@ int ObTTLManager::check_all_tenant_mem()
|
|||||||
memstore_limit,
|
memstore_limit,
|
||||||
freeze_cnt)) ) {
|
freeze_cnt)) ) {
|
||||||
LOG_WARN("fail to get tenant memstore info for tenant ", K(ret), K(tenant_id));
|
LOG_WARN("fail to get tenant memstore info for tenant ", K(ret), K(tenant_id));
|
||||||
} else if (active_memstore_used > minor_freeze_trigger) {
|
} else if (total_memstore_used > minor_freeze_trigger) {
|
||||||
|
if (tenant_info->ttl_continue_) {
|
||||||
tenant_info->ttl_continue_ = false;
|
tenant_info->ttl_continue_ = false;
|
||||||
LOG_INFO("will stop all the running task", K(tenant_id), K(active_memstore_used), K(minor_freeze_trigger));
|
LOG_INFO("will stop all the running ttl task", K(tenant_id), K(total_memstore_used), K(minor_freeze_trigger));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (!tenant_info->ttl_continue_) {
|
||||||
tenant_info->ttl_continue_ = true;
|
tenant_info->ttl_continue_ = true;
|
||||||
|
LOG_INFO("continue to execute all the ttl task", K(tenant_id), K(total_memstore_used), K(minor_freeze_trigger));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -101,7 +101,7 @@ private:
|
|||||||
is_usr_trigger_(false),
|
is_usr_trigger_(false),
|
||||||
need_check_(false),
|
need_check_(false),
|
||||||
is_dirty_(false),
|
is_dirty_(false),
|
||||||
ttl_continue_(false),
|
ttl_continue_(true),
|
||||||
cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE),
|
cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE),
|
||||||
rsp_time_(OB_INVALID_ID),
|
rsp_time_(OB_INVALID_ID),
|
||||||
state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID),
|
state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID),
|
||||||
|
|||||||
@ -303,7 +303,7 @@ int ObTableTTLDeleteTask::process_one()
|
|||||||
if (OB_FAIL(start_trans())) {
|
if (OB_FAIL(start_trans())) {
|
||||||
LOG_WARN("fail to start tx", K(ret));
|
LOG_WARN("fail to start tx", K(ret));
|
||||||
} else if (OB_FAIL(GCTX.table_service_->execute_ttl_delete(ctx, op, result))) {
|
} else if (OB_FAIL(GCTX.table_service_->execute_ttl_delete(ctx, op, result))) {
|
||||||
LOG_WARN("fail to execute ttl delete", K(ret));
|
LOG_WARN("fail to execute ttl delete, need rollback", K(ret));
|
||||||
} else {/* do nothing */}
|
} else {/* do nothing */}
|
||||||
|
|
||||||
ctx.reset_ttl_ctx(GCTX.par_ser_);
|
ctx.reset_ttl_ctx(GCTX.par_ser_);
|
||||||
|
|||||||
@ -329,8 +329,8 @@ private:
|
|||||||
class ObTableTTLOperation
|
class ObTableTTLOperation
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableTTLOperation(uint64_t tenant_id, uint64_t table_id, uint64_t max_version,
|
ObTableTTLOperation(uint64_t tenant_id, uint64_t table_id, int32_t max_version,
|
||||||
int64_t time_to_live, uint64_t del_row_limit, ObString start_rowkey,
|
int32_t time_to_live, uint64_t del_row_limit, ObString start_rowkey,
|
||||||
ObString start_qualifier):
|
ObString start_qualifier):
|
||||||
tenant_id_(tenant_id), table_id_(table_id), max_version_(max_version),
|
tenant_id_(tenant_id), table_id_(table_id), max_version_(max_version),
|
||||||
time_to_live_(time_to_live), del_row_limit_(del_row_limit),
|
time_to_live_(time_to_live), del_row_limit_(del_row_limit),
|
||||||
@ -346,8 +346,8 @@ public:
|
|||||||
public:
|
public:
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
uint64_t table_id_;
|
uint64_t table_id_;
|
||||||
uint64_t max_version_;
|
int32_t max_version_;
|
||||||
int64_t time_to_live_;
|
int32_t time_to_live_;
|
||||||
uint64_t del_row_limit_;
|
uint64_t del_row_limit_;
|
||||||
ObString start_rowkey_;
|
ObString start_rowkey_;
|
||||||
ObString start_qualifier_;
|
ObString start_qualifier_;
|
||||||
|
|||||||
Reference in New Issue
Block a user