[CP] [OBKV] Fix TTL task holds too much memory when scanning TTL table with few expired records
This commit is contained in:
committed by
ob-robot
parent
6d38ff7ff2
commit
8fe7e16313
@ -643,7 +643,7 @@ ObHTableRowIterator::~ObHTableRowIterator()
|
|||||||
int ObHTableRowIterator::next_cell()
|
int ObHTableRowIterator::next_cell()
|
||||||
{
|
{
|
||||||
ObNewRow *ob_row = NULL;
|
ObNewRow *ob_row = NULL;
|
||||||
int ret = child_op_->get_next_row(ob_row, false);
|
int ret = child_op_->get_next_row(ob_row);
|
||||||
if (OB_SUCCESS == ret) {
|
if (OB_SUCCESS == ret) {
|
||||||
curr_cell_.set_ob_row(ob_row);
|
curr_cell_.set_ob_row(ob_row);
|
||||||
LOG_DEBUG("[yzfdebug] fetch next cell", K_(curr_cell));
|
LOG_DEBUG("[yzfdebug] fetch next cell", K_(curr_cell));
|
||||||
@ -659,7 +659,7 @@ int ObHTableRowIterator::next_cell()
|
|||||||
int ObHTableRowIterator::reverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result)
|
int ObHTableRowIterator::reverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result)
|
||||||
{
|
{
|
||||||
ObNewRow *ob_row = NULL;
|
ObNewRow *ob_row = NULL;
|
||||||
int ret = child_op_->get_next_row(ob_row, false);
|
int ret = child_op_->get_next_row(ob_row);
|
||||||
if ((ObQueryFlag::Reverse == scan_order_ && OB_ITER_END == ret) ||
|
if ((ObQueryFlag::Reverse == scan_order_ && OB_ITER_END == ret) ||
|
||||||
(ObQueryFlag::Reverse == scan_order_ && OB_SUCCESS == ret &&
|
(ObQueryFlag::Reverse == scan_order_ && OB_SUCCESS == ret &&
|
||||||
NULL != hfilter_ && hfilter_->filter_all_remaining())) {
|
NULL != hfilter_ && hfilter_->filter_all_remaining())) {
|
||||||
|
|||||||
@ -249,7 +249,7 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe
|
|||||||
LOG_WARN("one_result_ should not be null", K(ret));
|
LOG_WARN("one_result_ should not be null", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ObNewRow *row = nullptr;
|
ObNewRow *row = nullptr;
|
||||||
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
|
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
|
||||||
if (OB_FAIL(agg_calculator_.aggregate(*row))) {
|
if (OB_FAIL(agg_calculator_.aggregate(*row))) {
|
||||||
LOG_WARN("fail to aggregate", K(ret), K(*row));
|
LOG_WARN("fail to aggregate", K(ret), K(*row));
|
||||||
}
|
}
|
||||||
@ -293,7 +293,7 @@ int ObNormalTableQueryResultIterator::get_normal_result(table::ObTableQueryResul
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
next_result = one_result_;
|
next_result = one_result_;
|
||||||
ObNewRow *row = nullptr;
|
ObNewRow *row = nullptr;
|
||||||
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
|
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
|
||||||
LOG_DEBUG("[yzfdebug] scan result", "row", *row);
|
LOG_DEBUG("[yzfdebug] scan result", "row", *row);
|
||||||
if (OB_FAIL(one_result_->add_row(*row))) {
|
if (OB_FAIL(one_result_->add_row(*row))) {
|
||||||
if (OB_SIZE_OVERFLOW == ret) {
|
if (OB_SIZE_OVERFLOW == ret) {
|
||||||
@ -392,7 +392,7 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next
|
|||||||
const ObIArray<ObString> &select_columns = one_result_->get_select_columns();
|
const ObIArray<ObString> &select_columns = one_result_->get_select_columns();
|
||||||
const int64_t N = select_columns.count();
|
const int64_t N = select_columns.count();
|
||||||
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
|
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
|
||||||
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
|
OB_SUCC(scan_result_->get_next_row(row))) {
|
||||||
if (N != row->get_count()) {
|
if (N != row->get_count()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
|
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
|
||||||
@ -473,7 +473,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
|
|||||||
const int64_t N = select_columns.count();
|
const int64_t N = select_columns.count();
|
||||||
|
|
||||||
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
|
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
|
||||||
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
|
OB_SUCC(scan_result_->get_next_row(row))) {
|
||||||
if (N != row->get_count()) {
|
if (N != row->get_count()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
|
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
|
||||||
|
|||||||
@ -185,7 +185,7 @@ int ObTableOpWrapper::process_get_with_spec(ObTableCtx &tb_ctx,
|
|||||||
LOG_WARN("fail to create scan executor", K(ret));
|
LOG_WARN("fail to create scan executor", K(ret));
|
||||||
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
||||||
LOG_WARN("fail to open scan row iterator", K(ret));
|
LOG_WARN("fail to open scan row iterator", K(ret));
|
||||||
} else if (OB_FAIL(row_iter.get_next_row(row))) {
|
} else if (OB_FAIL(row_iter.get_next_row(row, tb_ctx.get_allocator()))) {
|
||||||
if (ret != OB_ITER_END) {
|
if (ret != OB_ITER_END) {
|
||||||
LOG_WARN("fail to get next row", K(ret));
|
LOG_WARN("fail to get next row", K(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -409,7 +409,7 @@ int ObTableQueryAndMutateP::get_old_row(ObTableApiSpec &scan_spec, ObNewRow *&ro
|
|||||||
LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_));
|
LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_));
|
||||||
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
} else if (OB_FAIL(row_iter.open(static_cast<ObTableApiScanExecutor*>(executor)))) {
|
||||||
LOG_WARN("fail to open scan row iterator", K(ret));
|
LOG_WARN("fail to open scan row iterator", K(ret));
|
||||||
} else if (OB_FAIL(row_iter.get_next_row(row))) {
|
} else if (OB_FAIL(row_iter.get_next_row(row, tb_ctx_.get_allocator()))) {
|
||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
LOG_WARN("fail to get next row", K(ret));
|
LOG_WARN("fail to get next row", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -295,17 +295,18 @@ int ObTableApiScanRowIterator::open(ObTableApiScanExecutor *executor)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Memory of row is owned by iterator, and row cannot be used beyond iterator, unless you use deep copy.
|
// Memory of row is owned by iterator, and row cannot be used after iterator close
|
||||||
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy /* =true */)
|
// or get_next_row next time unless you use deep copy.
|
||||||
|
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObNewRow *tmp_row = nullptr;
|
ObNewRow *tmp_row = nullptr;
|
||||||
char *row_buf = nullptr;
|
char *row_buf = nullptr;
|
||||||
ObObj *cells = nullptr;
|
ObObj *cells = nullptr;
|
||||||
const ObTableCtx &tb_ctx = scan_executor_->get_table_ctx();
|
const ObTableCtx &tb_ctx = scan_executor_->get_table_ctx();
|
||||||
ObIAllocator &allocator = tb_ctx.get_allocator();
|
|
||||||
const ExprFixedArray &output_exprs = scan_executor_->get_spec().get_ctdef().output_exprs_;
|
const ExprFixedArray &output_exprs = scan_executor_->get_spec().get_ctdef().output_exprs_;
|
||||||
const int64_t cells_cnt = output_exprs.count();
|
const int64_t cells_cnt = output_exprs.count();
|
||||||
|
row_allocator_.reuse();
|
||||||
|
|
||||||
if (OB_ISNULL(scan_executor_)) {
|
if (OB_ISNULL(scan_executor_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -314,10 +315,10 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
|
|||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
LOG_WARN("fail to get next row by scan executor", K(ret));
|
LOG_WARN("fail to get next row by scan executor", K(ret));
|
||||||
}
|
}
|
||||||
} else if (OB_ISNULL(row_buf = static_cast<char*>(allocator.alloc(sizeof(ObNewRow))))) {
|
} else if (OB_ISNULL(row_buf = static_cast<char*>(row_allocator_.alloc(sizeof(ObNewRow))))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to alloc ObNewRow buffer", K(ret));
|
LOG_WARN("fail to alloc ObNewRow buffer", K(ret));
|
||||||
} else if (OB_ISNULL(cells = static_cast<ObObj*>(allocator.alloc(sizeof(ObObj) * cells_cnt)))) {
|
} else if (OB_ISNULL(cells = static_cast<ObObj*>(row_allocator_.alloc(sizeof(ObObj) * cells_cnt)))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to alloc cells buffer", K(ret), K(cells_cnt));
|
LOG_WARN("fail to alloc cells buffer", K(ret), K(cells_cnt));
|
||||||
} else {
|
} else {
|
||||||
@ -339,10 +340,8 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
|
|||||||
LOG_WARN("fail to eval datum", K(ret));
|
LOG_WARN("fail to eval datum", K(ret));
|
||||||
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(idx)->obj_meta_))) {
|
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(idx)->obj_meta_))) {
|
||||||
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(idx)->obj_meta_), K(i), K(idx));
|
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(idx)->obj_meta_), K(i), K(idx));
|
||||||
} else if (!need_deep_copy) {
|
} else {
|
||||||
cells[i] = tmp_obj;
|
cells[i] = tmp_obj;
|
||||||
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
|
|
||||||
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -351,10 +350,8 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
|
|||||||
LOG_WARN("fail to eval datum", K(ret));
|
LOG_WARN("fail to eval datum", K(ret));
|
||||||
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(i)->obj_meta_))) {
|
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(i)->obj_meta_))) {
|
||||||
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(i)->obj_meta_));
|
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(i)->obj_meta_));
|
||||||
} else if (!need_deep_copy) {
|
} else {
|
||||||
cells[i] = tmp_obj;
|
cells[i] = tmp_obj;
|
||||||
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
|
|
||||||
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -367,6 +364,36 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deep copy the new row using given allocator
|
||||||
|
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, common::ObIAllocator &allocator)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObNewRow *inner_row = nullptr;
|
||||||
|
if (OB_FAIL(get_next_row(inner_row))) {
|
||||||
|
LOG_WARN("fail to get next row", KR(ret));
|
||||||
|
} else if (OB_ISNULL(inner_row)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("new row is null", KR(ret));
|
||||||
|
} else {
|
||||||
|
ObNewRow *tmp_row = nullptr;
|
||||||
|
int64_t buf_size = inner_row->get_deep_copy_size() + sizeof(ObNewRow);
|
||||||
|
char *tmp_row_buf = static_cast<char *>(allocator.alloc(buf_size));
|
||||||
|
if (OB_ISNULL(tmp_row_buf)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("fail to alloc new row", KR(ret));
|
||||||
|
} else {
|
||||||
|
tmp_row = new(tmp_row_buf)ObNewRow();
|
||||||
|
int64_t pos = sizeof(ObNewRow);
|
||||||
|
if (OB_FAIL(tmp_row->deep_copy(*inner_row, tmp_row_buf, buf_size, pos))) {
|
||||||
|
LOG_WARN("fail to deep copy new row", KR(ret));
|
||||||
|
} else {
|
||||||
|
row = tmp_row;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTableApiScanRowIterator::close()
|
int ObTableApiScanRowIterator::close()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -376,6 +403,8 @@ int ObTableApiScanRowIterator::close()
|
|||||||
LOG_WARN("scan executor is null", K(ret));
|
LOG_WARN("scan executor is null", K(ret));
|
||||||
} else if (OB_FAIL(scan_executor_->close())) {
|
} else if (OB_FAIL(scan_executor_->close())) {
|
||||||
LOG_WARN("fail to close scan executor", K(ret));
|
LOG_WARN("fail to close scan executor", K(ret));
|
||||||
|
} else {
|
||||||
|
row_allocator_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -88,16 +88,19 @@ class ObTableApiScanRowIterator
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableApiScanRowIterator()
|
ObTableApiScanRowIterator()
|
||||||
: scan_executor_(nullptr)
|
: scan_executor_(nullptr),
|
||||||
|
row_allocator_("TbScanRowIter", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
virtual ~ObTableApiScanRowIterator() {};
|
virtual ~ObTableApiScanRowIterator() {};
|
||||||
public:
|
public:
|
||||||
virtual int open(ObTableApiScanExecutor *executor);
|
virtual int open(ObTableApiScanExecutor *executor);
|
||||||
virtual int get_next_row(common::ObNewRow *&row, bool need_deep_copy = true);
|
virtual int get_next_row(ObNewRow *&row);
|
||||||
|
virtual int get_next_row(ObNewRow *&row, common::ObIAllocator &allocator);
|
||||||
virtual int close();
|
virtual int close();
|
||||||
private:
|
private:
|
||||||
ObTableApiScanExecutor *scan_executor_;
|
ObTableApiScanExecutor *scan_executor_;
|
||||||
|
common::ObArenaAllocator row_allocator_; // alloc the memory of result row
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTableApiScanRowIterator);
|
DISALLOW_COPY_AND_ASSIGN(ObTableApiScanRowIterator);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -221,7 +221,9 @@ int ObTableTTLDeleteTask::process_one()
|
|||||||
info_.scan_cnt_ += result.get_scan_row();
|
info_.scan_cnt_ += result.get_scan_row();
|
||||||
info_.err_code_ = ret;
|
info_.err_code_ = ret;
|
||||||
info_.row_key_ = result.get_end_rowkey();
|
info_.row_key_ = result.get_end_rowkey();
|
||||||
if (OB_SUCC(ret) && result.get_del_row() < PER_TASK_DEL_ROWS) {
|
if (OB_SUCC(ret)
|
||||||
|
&& result.get_del_row() < PER_TASK_DEL_ROWS
|
||||||
|
&& result.get_end_ts() > ObTimeUtility::current_time()) {
|
||||||
ret = OB_ITER_END; // finsh task
|
ret = OB_ITER_END; // finsh task
|
||||||
info_.err_code_ = ret;
|
info_.err_code_ = ret;
|
||||||
LOG_DEBUG("finish delete", KR(ret), K_(info));
|
LOG_DEBUG("finish delete", KR(ret), K_(info));
|
||||||
@ -420,7 +422,7 @@ int ObTableTTLDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, Ob
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
|
ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
|
||||||
: allocator_(ObMemAttr(MTL_ID(), "TTLDelRowIter")),
|
: hbase_kq_allocator_(ObMemAttr(MTL_ID(), "TTLHbaseCQAlloc")),
|
||||||
is_inited_(false),
|
is_inited_(false),
|
||||||
max_version_(0),
|
max_version_(0),
|
||||||
time_to_live_ms_(0),
|
time_to_live_ms_(0),
|
||||||
@ -435,7 +437,9 @@ ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator()
|
|||||||
is_last_row_ttl_(true),
|
is_last_row_ttl_(true),
|
||||||
is_hbase_table_(false),
|
is_hbase_table_(false),
|
||||||
last_row_(nullptr),
|
last_row_(nullptr),
|
||||||
rowkey_cnt_(0)
|
rowkey_cnt_(0),
|
||||||
|
hbase_new_cq_(false),
|
||||||
|
iter_end_ts_(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,6 +463,8 @@ int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema,
|
|||||||
rowkey_cnt_ = table_schema.get_rowkey_column_num();
|
rowkey_cnt_ = table_schema.get_rowkey_column_num();
|
||||||
ObSArray<uint64_t> rowkey_column_ids;
|
ObSArray<uint64_t> rowkey_column_ids;
|
||||||
ObSArray<uint64_t> full_column_ids;
|
ObSArray<uint64_t> full_column_ids;
|
||||||
|
hbase_new_cq_ = is_hbase_table_ ? false : true;
|
||||||
|
iter_end_ts_ = ObTimeUtility::current_time() + ONE_ITER_EXECUTE_MAX_TIME;
|
||||||
if (OB_FAIL(table_schema.get_rowkey_column_ids(rowkey_column_ids))) {
|
if (OB_FAIL(table_schema.get_rowkey_column_ids(rowkey_column_ids))) {
|
||||||
LOG_WARN("fail to get rowkey column ids", KR(ret));
|
LOG_WARN("fail to get rowkey column ids", KR(ret));
|
||||||
} else if (OB_FAIL(table_schema.get_column_ids(full_column_ids))) {
|
} else if (OB_FAIL(table_schema.get_column_ids(full_column_ids))) {
|
||||||
@ -513,7 +519,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
} else {
|
} else {
|
||||||
bool is_expired = false;
|
bool is_expired = false;
|
||||||
while(OB_SUCC(ret) && !is_expired) {
|
while(OB_SUCC(ret) && !is_expired) {
|
||||||
if (OB_FAIL(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) {
|
if (OB_FAIL(ObTableApiScanRowIterator::get_next_row(row))) {
|
||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
LOG_WARN("fail to get next row", K(ret));
|
LOG_WARN("fail to get next row", K(ret));
|
||||||
}
|
}
|
||||||
@ -530,11 +536,12 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
ObString cell_qualifier = cell.get_qualifier();
|
ObString cell_qualifier = cell.get_qualifier();
|
||||||
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
int64_t cell_ts = -cell.get_timestamp(); // obhtable timestamp is nagative in ms
|
||||||
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
if ((cell_rowkey != cur_rowkey_) || (cell_qualifier != cur_qualifier_)) {
|
||||||
|
hbase_new_cq_ = true;
|
||||||
cur_version_ = 1;
|
cur_version_ = 1;
|
||||||
allocator_.reuse();
|
hbase_kq_allocator_.reuse();
|
||||||
if (OB_FAIL(ob_write_string(allocator_, cell_rowkey, cur_rowkey_))) {
|
if (OB_FAIL(ob_write_string(hbase_kq_allocator_, cell_rowkey, cur_rowkey_))) {
|
||||||
LOG_WARN("fail to copy cell rowkey", KR(ret), K(cell_rowkey));
|
LOG_WARN("fail to copy cell rowkey", KR(ret), K(cell_rowkey));
|
||||||
} else if (OB_FAIL(ob_write_string(allocator_, cell_qualifier, cur_qualifier_))) {
|
} else if (OB_FAIL(ob_write_string(hbase_kq_allocator_, cell_qualifier, cur_qualifier_))) {
|
||||||
LOG_WARN("fail to copy cell qualifier", KR(ret), K(cell_qualifier));
|
LOG_WARN("fail to copy cell qualifier", KR(ret), K(cell_qualifier));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -564,6 +571,9 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (ObTimeUtility::current_time() > iter_end_ts_ && hbase_new_cq_) {
|
||||||
|
ret = OB_ITER_END;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -629,6 +639,7 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro
|
|||||||
result.ttl_del_rows_ = iter_ttl_cnt;
|
result.ttl_del_rows_ = iter_ttl_cnt;
|
||||||
result.max_version_del_rows_ = iter_max_version_cnt;
|
result.max_version_del_rows_ = iter_max_version_cnt;
|
||||||
result.scan_rows_ = ttl_row_iter.scan_cnt_;
|
result.scan_rows_ = ttl_row_iter.scan_cnt_;
|
||||||
|
result.iter_end_ts_ = ttl_row_iter.iter_end_ts_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -49,7 +49,8 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
common::ObArenaAllocator allocator_;
|
const static int64_t ONE_ITER_EXECUTE_MAX_TIME = 30 * 1000 * 1000; // 30s
|
||||||
|
common::ObArenaAllocator hbase_kq_allocator_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
int32_t max_version_;
|
int32_t max_version_;
|
||||||
int64_t time_to_live_ms_; // ttl in millisecond
|
int64_t time_to_live_ms_; // ttl in millisecond
|
||||||
@ -70,6 +71,8 @@ public:
|
|||||||
common::ObSArray<uint64_t> rowkey_cell_ids_;
|
common::ObSArray<uint64_t> rowkey_cell_ids_;
|
||||||
// map new row -> normal column
|
// map new row -> normal column
|
||||||
common::ObSArray<PropertyPair> properties_pairs_;
|
common::ObSArray<PropertyPair> properties_pairs_;
|
||||||
|
bool hbase_new_cq_;
|
||||||
|
int64_t iter_end_ts_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -945,7 +945,8 @@ public:
|
|||||||
: ttl_del_rows_(0),
|
: ttl_del_rows_(0),
|
||||||
max_version_del_rows_(0),
|
max_version_del_rows_(0),
|
||||||
scan_rows_(0),
|
scan_rows_(0),
|
||||||
end_rowkey_()
|
end_rowkey_(),
|
||||||
|
iter_end_ts_(0)
|
||||||
{}
|
{}
|
||||||
~ObTableTTLOperationResult() {}
|
~ObTableTTLOperationResult() {}
|
||||||
uint64_t get_ttl_del_row() { return ttl_del_rows_; }
|
uint64_t get_ttl_del_row() { return ttl_del_rows_; }
|
||||||
@ -953,12 +954,14 @@ public:
|
|||||||
uint64_t get_del_row() { return ttl_del_rows_ + max_version_del_rows_; }
|
uint64_t get_del_row() { return ttl_del_rows_ + max_version_del_rows_; }
|
||||||
uint64_t get_scan_row() { return scan_rows_; }
|
uint64_t get_scan_row() { return scan_rows_; }
|
||||||
common::ObString get_end_rowkey() { return end_rowkey_; }
|
common::ObString get_end_rowkey() { return end_rowkey_; }
|
||||||
TO_STRING_KV(K_(ttl_del_rows), K_(max_version_del_rows), K_(scan_rows), K_(end_rowkey));
|
int64_t get_end_ts() { return iter_end_ts_; }
|
||||||
|
TO_STRING_KV(K_(ttl_del_rows), K_(max_version_del_rows), K_(scan_rows), K_(end_rowkey), K_(iter_end_ts));
|
||||||
public:
|
public:
|
||||||
uint64_t ttl_del_rows_;
|
uint64_t ttl_del_rows_;
|
||||||
uint64_t max_version_del_rows_;
|
uint64_t max_version_del_rows_;
|
||||||
uint64_t scan_rows_;
|
uint64_t scan_rows_;
|
||||||
common::ObString end_rowkey_;
|
common::ObString end_rowkey_;
|
||||||
|
int64_t iter_end_ts_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObTableMoveReplicaInfo final
|
struct ObTableMoveReplicaInfo final
|
||||||
|
|||||||
Reference in New Issue
Block a user