[CP] [OBKV] Fix TTL task holds too much memory when scanning TTL table with few expired records

This commit is contained in:
obdev
2024-02-07 07:52:36 +00:00
committed by ob-robot
parent 3cf04d8c66
commit 09d966dc33
9 changed files with 80 additions and 31 deletions

View File

@ -643,7 +643,7 @@ ObHTableRowIterator::~ObHTableRowIterator()
int ObHTableRowIterator::next_cell()
{
ObNewRow *ob_row = NULL;
int ret = child_op_->get_next_row(ob_row, false);
int ret = child_op_->get_next_row(ob_row);
if (OB_SUCCESS == ret) {
curr_cell_.set_ob_row(ob_row);
LOG_DEBUG("[yzfdebug] fetch next cell", K_(curr_cell));
@ -659,7 +659,7 @@ int ObHTableRowIterator::next_cell()
int ObHTableRowIterator::reverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result)
{
ObNewRow *ob_row = NULL;
int ret = child_op_->get_next_row(ob_row, false);
int ret = child_op_->get_next_row(ob_row);
if ((ObQueryFlag::Reverse == scan_order_ && OB_ITER_END == ret) ||
(ObQueryFlag::Reverse == scan_order_ && OB_SUCCESS == ret &&
NULL != hfilter_ && hfilter_->filter_all_remaining())) {

View File

@ -249,7 +249,7 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe
LOG_WARN("one_result_ should not be null", K(ret));
} else {
ObNewRow *row = nullptr;
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
if (OB_FAIL(agg_calculator_.aggregate(*row))) {
LOG_WARN("fail to aggregate", K(ret), K(*row));
}
@ -293,7 +293,7 @@ int ObNormalTableQueryResultIterator::get_normal_result(table::ObTableQueryResul
if (OB_SUCC(ret)) {
next_result = one_result_;
ObNewRow *row = nullptr;
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
LOG_DEBUG("[yzfdebug] scan result", "row", *row);
if (OB_FAIL(one_result_->add_row(*row))) {
if (OB_SIZE_OVERFLOW == ret) {
@ -392,7 +392,7 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next
const ObIArray<ObString> &select_columns = one_result_->get_select_columns();
const int64_t N = select_columns.count();
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
OB_SUCC(scan_result_->get_next_row(row))) {
if (N != row->get_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
@ -473,7 +473,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
const int64_t N = select_columns.count();
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
OB_SUCC(scan_result_->get_next_row(row))) {
if (N != row->get_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));

View File

@ -185,7 +185,7 @@ int ObTableOpWrapper::process_get_with_spec(ObTableCtx &tb_ctx,
LOG_WARN("fail to create scan executor", K(ret));
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
LOG_WARN("fail to open scan row iterator", K(ret));
} else if (OB_FAIL(row_iter.get_next_row(row))) {
} else if (OB_FAIL(row_iter.get_next_row(row, tb_ctx.get_allocator()))) {
if (ret != OB_ITER_END) {
LOG_WARN("fail to get next row", K(ret));
}

View File

@ -409,7 +409,7 @@ int ObTableQueryAndMutateP::get_old_row(ObTableApiSpec &scan_spec, ObNewRow *&ro
LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_));
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
LOG_WARN("fail to open scan row iterator", K(ret));
} else if (OB_FAIL(row_iter.get_next_row(row))) {
} else if (OB_FAIL(row_iter.get_next_row(row, tb_ctx_.get_allocator()))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row", K(ret));
} else {

View File

@ -295,17 +295,18 @@ int ObTableApiScanRowIterator::open(ObTableApiScanExecutor *executor)
return ret;
}
// Memory of row is owned by iterator, and row cannot be used beyond iterator, unless you use deep copy.
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy /* =true */)
// Memory of row is owned by iterator, and row cannot be used after iterator close
// or get_next_row next time unless you use deep copy.
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row)
{
int ret = OB_SUCCESS;
ObNewRow *tmp_row = nullptr;
char *row_buf = nullptr;
ObObj *cells = nullptr;
const ObTableCtx &tb_ctx = scan_executor_->get_table_ctx();
ObIAllocator &allocator = tb_ctx.get_allocator();
const ExprFixedArray &output_exprs = scan_executor_->get_spec().get_ctdef().output_exprs_;
const int64_t cells_cnt = output_exprs.count();
row_allocator_.reuse();
if (OB_ISNULL(scan_executor_)) {
ret = OB_ERR_UNEXPECTED;
@ -314,10 +315,10 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row by scan executor", K(ret));
}
} else if (OB_ISNULL(row_buf = static_cast<char*>(allocator.alloc(sizeof(ObNewRow))))) {
} else if (OB_ISNULL(row_buf = static_cast<char*>(row_allocator_.alloc(sizeof(ObNewRow))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc ObNewRow buffer", K(ret));
} else if (OB_ISNULL(cells = static_cast<ObObj*>(allocator.alloc(sizeof(ObObj) * cells_cnt)))) {
} else if (OB_ISNULL(cells = static_cast<ObObj*>(row_allocator_.alloc(sizeof(ObObj) * cells_cnt)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc cells buffer", K(ret), K(cells_cnt));
} else {
@ -339,10 +340,8 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
LOG_WARN("fail to eval datum", K(ret));
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(idx)->obj_meta_))) {
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(idx)->obj_meta_), K(i), K(idx));
} else if (!need_deep_copy) {
} else {
cells[i] = tmp_obj;
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
}
}
} else {
@ -351,10 +350,8 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
LOG_WARN("fail to eval datum", K(ret));
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(i)->obj_meta_))) {
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(i)->obj_meta_));
} else if (!need_deep_copy) {
} else {
cells[i] = tmp_obj;
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
}
}
}
@ -367,6 +364,36 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
return ret;
}
// deep copy the new row using given allocator
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
ObNewRow *inner_row = nullptr;
if (OB_FAIL(get_next_row(inner_row))) {
LOG_WARN("fail to get next row", KR(ret));
} else if (OB_ISNULL(inner_row)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("new row is null", KR(ret));
} else {
ObNewRow *tmp_row = nullptr;
int64_t buf_size = inner_row->get_deep_copy_size() + sizeof(ObNewRow);
char *tmp_row_buf = static_cast<char *>(allocator.alloc(buf_size));
if (OB_ISNULL(tmp_row_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc new row", KR(ret));
} else {
tmp_row = new(tmp_row_buf)ObNewRow();
int64_t pos = sizeof(ObNewRow);
if (OB_FAIL(tmp_row->deep_copy(*inner_row, tmp_row_buf, buf_size, pos))) {
LOG_WARN("fail to deep copy new row", KR(ret));
} else {
row = tmp_row;
}
}
}
return ret;
}
int ObTableApiScanRowIterator::close()
{
int ret = OB_SUCCESS;
@ -376,6 +403,8 @@ int ObTableApiScanRowIterator::close()
LOG_WARN("scan executor is null", K(ret));
} else if (OB_FAIL(scan_executor_->close())) {
LOG_WARN("fail to close scan executor", K(ret));
} else {
row_allocator_.reset();
}
return ret;

View File

@ -88,16 +88,19 @@ class ObTableApiScanRowIterator
{
public:
ObTableApiScanRowIterator()
: scan_executor_(nullptr)
: scan_executor_(nullptr),
row_allocator_("TbScanRowIter", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{
}
virtual ~ObTableApiScanRowIterator() {};
public:
virtual int open(ObTableApiScanExecutor *executor);
virtual int get_next_row(common::ObNewRow *&row, bool need_deep_copy = true);
virtual int get_next_row(ObNewRow *&row);
virtual int get_next_row(ObNewRow *&row, common::ObIAllocator &allocator);
virtual int close();
private:
ObTableApiScanExecutor *scan_executor_;
common::ObArenaAllocator row_allocator_; // alloc the memory of result row
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiScanRowIterator);
};

View File

@ -221,7 +221,9 @@ int ObTableTTLDeleteTask::process_one()
info_.scan_cnt_ += result.get_scan_row();
info_.err_code_ = ret;
info_.row_key_ = result.get_end_rowkey();
if (OB_SUCC(ret) && result.get_del_row() < PER_TASK_DEL_ROWS) {
if (OB_SUCC(ret)
&& result.get_del_row() < PER_TASK_DEL_ROWS
&& result.get_end_ts() > ObTimeUtility::current_time()) {
ret = OB_ITER_END; // finsh task
info_.err_code_ = ret;
LOG_DEBUG("finish delete", KR(ret), K_(info));
@ -420,7 +422,7 @@ int ObTableTTLDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, Ob
}
ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
: allocator_(ObMemAttr(MTL_ID(), "TTLDelRowIter")),
: hbase_kq_allocator_(ObMemAttr(MTL_ID(), "TTLHbaseCQAlloc")),
is_inited_(false),
max_version_(0),
time_to_live_ms_(0),
@ -435,7 +437,9 @@ ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
is_last_row_ttl_(true),
is_hbase_table_(false),
last_row_(nullptr),
rowkey_cnt_(0)
rowkey_cnt_(0),
hbase_new_cq_(false),
iter_end_ts_(0)
{
}
@ -459,6 +463,8 @@ int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema,
rowkey_cnt_ = table_schema.get_rowkey_column_num();
ObSArray<uint64_t> rowkey_column_ids;
ObSArray<uint64_t> full_column_ids;
hbase_new_cq_ = is_hbase_table_ ? false : true;
iter_end_ts_ = ObTimeUtility::current_time() + ONE_ITER_EXECUTE_MAX_TIME;
if (OB_FAIL(table_schema.get_rowkey_column_ids(rowkey_column_ids))) {
LOG_WARN("fail to get rowkey column ids", KR(ret));
} else if (OB_FAIL(table_schema.get_column_ids(full_column_ids))) {
@ -513,7 +519,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
} else {
bool is_expired = false;
while(OB_SUCC(ret) && !is_expired) {
if (OB_FAIL(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) {
if (OB_FAIL(ObTableApiScanRowIterator::get_next_row(row))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row", K(ret));
}
@ -530,11 +536,12 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
ObString cell_qualifier = cell.get_qualifier();
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
hbase_new_cq_ = true;
cur_version_ = 1;
allocator_.reuse();
if (OB_FAIL(ob_write_string(allocator_, cell_rowkey, cur_rowkey_))) {
hbase_kq_allocator_.reuse();
if (OB_FAIL(ob_write_string(hbase_kq_allocator_, cell_rowkey, cur_rowkey_))) {
LOG_WARN("fail to copy cell rowkey", KR(ret), K(cell_rowkey));
} else if (OB_FAIL(ob_write_string(allocator_, cell_qualifier, cur_qualifier_))) {
} else if (OB_FAIL(ob_write_string(hbase_kq_allocator_, cell_qualifier, cur_qualifier_))) {
LOG_WARN("fail to copy cell qualifier", KR(ret), K(cell_qualifier));
}
} else {
@ -564,6 +571,9 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
}
}
}
if (ObTimeUtility::current_time() > iter_end_ts_ && hbase_new_cq_) {
ret = OB_ITER_END;
}
}
}
@ -629,6 +639,7 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
result.ttl_del_rows_ = iter_ttl_cnt;
result.max_version_del_rows_ = iter_max_version_cnt;
result.scan_rows_ = ttl_row_iter.scan_cnt_;
result.iter_end_ts_ = ttl_row_iter.iter_end_ts_;
}
}

View File

@ -49,7 +49,8 @@ public:
};
public:
common::ObArenaAllocator allocator_;
const static int64_t ONE_ITER_EXECUTE_MAX_TIME = 30 * 1000 * 1000; // 30s
common::ObArenaAllocator hbase_kq_allocator_;
bool is_inited_;
int32_t max_version_;
int64_t time_to_live_ms_; // ttl in millisecond
@ -70,6 +71,8 @@ public:
common::ObSArray<uint64_t> rowkey_cell_ids_;
// map new row -> normal column
common::ObSArray<PropertyPair> properties_pairs_;
bool hbase_new_cq_;
int64_t iter_end_ts_;
};

View File

@ -945,7 +945,8 @@ public:
: ttl_del_rows_(0),
max_version_del_rows_(0),
scan_rows_(0),
end_rowkey_()
end_rowkey_(),
iter_end_ts_(0)
{}
~ObTableTTLOperationResult() {}
uint64_t get_ttl_del_row() { return ttl_del_rows_; }
@ -953,12 +954,14 @@ public:
uint64_t get_del_row() { return ttl_del_rows_ + max_version_del_rows_; }
uint64_t get_scan_row() { return scan_rows_; }
common::ObString get_end_rowkey() { return end_rowkey_; }
TO_STRING_KV(K_(ttl_del_rows), K_(max_version_del_rows), K_(scan_rows), K_(end_rowkey));
int64_t get_end_ts() { return iter_end_ts_; }
TO_STRING_KV(K_(ttl_del_rows), K_(max_version_del_rows), K_(scan_rows), K_(end_rowkey), K_(iter_end_ts));
public:
uint64_t ttl_del_rows_;
uint64_t max_version_del_rows_;
uint64_t scan_rows_;
common::ObString end_rowkey_;
int64_t iter_end_ts_;
};
struct ObTableMoveReplicaInfo final