[CP] [Fix] ttl delete
This commit is contained in:
@ -512,49 +512,56 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
LOG_DEBUG("finish get next row", KR(ret), K(cur_del_rows_), K(limit_del_rows_));
|
LOG_DEBUG("finish get next row", KR(ret), K(cur_del_rows_), K(limit_del_rows_));
|
||||||
} else {
|
} else {
|
||||||
bool is_expired = false;
|
bool is_expired = false;
|
||||||
while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) {
|
while(OB_SUCC(ret) && !is_expired) {
|
||||||
last_row_ = row;
|
if (OB_FAIL(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) {
|
||||||
// NOTE: For hbase table, the row expired if and only if
|
if (OB_ITER_END != ret) {
|
||||||
// 1. The row's version exceed maxversion
|
LOG_WARN("fail to get next row", K(ret));
|
||||||
// 2. The row's expired time(cell_ts + ttl) exceed current time
|
}
|
||||||
if (is_hbase_table_) {
|
last_row_ = nullptr;
|
||||||
scan_cnt_++;
|
} else {
|
||||||
ObHTableCellEntity cell(row);
|
last_row_ = row;
|
||||||
ObString cell_rowkey = cell.get_rowkey();
|
// NOTE: For hbase table, the row expired if and only if
|
||||||
ObString cell_qualifier = cell.get_qualifier();
|
// 1. The row's version exceed maxversion
|
||||||
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
// 2. The row's expired time(cell_ts + ttl) exceed current time
|
||||||
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
if (is_hbase_table_) {
|
||||||
cur_version_ = 1;
|
scan_cnt_++;
|
||||||
allocator_.reuse();
|
ObHTableCellEntity cell(row);
|
||||||
if (OB_FAIL(ob_write_string(allocator_, cell_rowkey, cur_rowkey_))) {
|
ObString cell_rowkey = cell.get_rowkey();
|
||||||
LOG_WARN("fail to copy cell rowkey", KR(ret), K(cell_rowkey));
|
ObString cell_qualifier = cell.get_qualifier();
|
||||||
} else if (OB_FAIL(ob_write_string(allocator_, cell_qualifier, cur_qualifier_))) {
|
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
||||||
LOG_WARN("fail to copy cell qualifier", KR(ret), K(cell_qualifier));
|
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
||||||
|
cur_version_ = 1;
|
||||||
|
allocator_.reuse();
|
||||||
|
if (OB_FAIL(ob_write_string(allocator_, cell_rowkey, cur_rowkey_))) {
|
||||||
|
LOG_WARN("fail to copy cell rowkey", KR(ret), K(cell_rowkey));
|
||||||
|
} else if (OB_FAIL(ob_write_string(allocator_, cell_qualifier, cur_qualifier_))) {
|
||||||
|
LOG_WARN("fail to copy cell qualifier", KR(ret), K(cell_qualifier));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cur_version_++;
|
||||||
|
}
|
||||||
|
if (max_version_ > 0 && cur_version_ > max_version_) {
|
||||||
|
max_version_cnt_++;
|
||||||
|
cur_del_rows_++;
|
||||||
|
is_last_row_ttl_ = false;
|
||||||
|
is_expired = true;
|
||||||
|
} else if (time_to_live_ms_ > 0 && (cell_ts + time_to_live_ms_ < ObHTableUtils::current_time_millis())) {
|
||||||
|
ttl_cnt_++;
|
||||||
|
cur_del_rows_++;
|
||||||
|
is_last_row_ttl_ = true;
|
||||||
|
is_expired = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cur_version_++;
|
// NOTE: For relation table, the row expired if and only if
|
||||||
}
|
// 1. The row's expired time (the result of ttl definition) exceed current time
|
||||||
if (max_version_ > 0 && cur_version_ > max_version_) {
|
scan_cnt_++;
|
||||||
max_version_cnt_++;
|
if (OB_FAIL(ttl_checker_.check_row_expired(*row, is_expired))) {
|
||||||
cur_del_rows_++;
|
LOG_WARN("fail to check row expired", KR(ret));
|
||||||
is_last_row_ttl_ = false;
|
} else if (is_expired) {
|
||||||
is_expired = true;
|
ttl_cnt_++;
|
||||||
} else if (time_to_live_ms_ > 0 && (cell_ts + time_to_live_ms_ < ObHTableUtils::current_time_millis())) {
|
cur_del_rows_++;
|
||||||
ttl_cnt_++;
|
is_last_row_ttl_ = true;
|
||||||
cur_del_rows_++;
|
}
|
||||||
is_last_row_ttl_ = true;
|
|
||||||
is_expired = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// NOTE: For relation table, the row expired if and only if
|
|
||||||
// 1. The row's expired time (the result of ttl definition) exceed current time
|
|
||||||
scan_cnt_++;
|
|
||||||
if (OB_FAIL(ttl_checker_.check_row_expired(*row, is_expired))) {
|
|
||||||
LOG_WARN("fail to check row expired", KR(ret));
|
|
||||||
} else if (is_expired) {
|
|
||||||
ttl_cnt_++;
|
|
||||||
cur_del_rows_++;
|
|
||||||
is_last_row_ttl_ = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -658,6 +665,7 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret) && rowkey_.is_valid()) {
|
if (OB_SUCC(ret) && rowkey_.is_valid()) {
|
||||||
|
// if ITER_END in ttl_row_iter, rowkey_ will not be assigned by last_row_ in this round
|
||||||
uint64_t buf_len = rowkey_.get_serialize_size();
|
uint64_t buf_len = rowkey_.get_serialize_size();
|
||||||
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
|
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
|
||||||
int64_t pos = 0;
|
int64_t pos = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user