Fix DASScanOp retry memory expansion.
This commit is contained in:
@ -165,7 +165,8 @@ ObDASScanOp::ObDASScanOp(ObIAllocator &op_alloc)
|
||||
scan_ctdef_(nullptr),
|
||||
scan_rtdef_(nullptr),
|
||||
result_(nullptr),
|
||||
remain_row_cnt_(0)
|
||||
remain_row_cnt_(0),
|
||||
retry_alloc_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
@ -173,6 +174,11 @@ ObDASScanOp::~ObDASScanOp()
|
||||
{
|
||||
scan_param_.destroy();
|
||||
trans_info_array_.destroy();
|
||||
|
||||
if (retry_alloc_ != nullptr) {
|
||||
retry_alloc_->reset();
|
||||
retry_alloc_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObDASScanOp::swizzling_remote_task(ObDASRemoteInfo *remote_info)
|
||||
@ -217,9 +223,14 @@ int ObDASScanOp::init_scan_param()
|
||||
scan_param_.timeout_ = scan_rtdef_->timeout_ts_;
|
||||
scan_param_.scan_flag_ = scan_rtdef_->scan_flag_;
|
||||
scan_param_.reserved_cell_count_ = scan_ctdef_->access_column_ids_.count();
|
||||
if (in_part_retry_ && retry_alloc_ != nullptr) {
|
||||
scan_param_.allocator_ = retry_alloc_;
|
||||
scan_param_.scan_allocator_ = retry_alloc_;
|
||||
} else {
|
||||
scan_param_.allocator_ = &scan_rtdef_->stmt_allocator_;
|
||||
scan_param_.sql_mode_ = scan_rtdef_->sql_mode_;
|
||||
scan_param_.scan_allocator_ = &scan_rtdef_->scan_allocator_;
|
||||
}
|
||||
scan_param_.sql_mode_ = scan_rtdef_->sql_mode_;
|
||||
scan_param_.frozen_version_ = scan_rtdef_->frozen_version_;
|
||||
scan_param_.force_refresh_lc_ = scan_rtdef_->force_refresh_lc_;
|
||||
scan_param_.output_exprs_ = &(scan_ctdef_->pd_expr_spec_.access_exprs_);
|
||||
@ -294,6 +305,12 @@ int ObDASScanOp::open_op()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObITabletScan &tsc_service = get_tsc_service();
|
||||
//Retry may be called many times.
|
||||
//Only for DASScanOp now, we add a retry alloc to avoid
|
||||
//memory expansion.
|
||||
if (in_part_retry_) {
|
||||
init_retry_alloc();
|
||||
}
|
||||
reset_access_datums_ptr();
|
||||
if (OB_FAIL(init_scan_param())) {
|
||||
LOG_WARN("init scan param failed", K(ret));
|
||||
@ -352,6 +369,11 @@ int ObDASScanOp::release_op()
|
||||
scan_param_.partition_guard_ = nullptr;
|
||||
scan_param_.destroy_schema_guard();
|
||||
|
||||
if (retry_alloc_ != nullptr) {
|
||||
retry_alloc_->reset();
|
||||
retry_alloc_ = nullptr;
|
||||
}
|
||||
|
||||
int simulate_error = EVENT_CALL(EventTable::EN_DAS_SIMULATE_LOOKUPOP_INIT_ERROR);
|
||||
if (OB_UNLIKELY(OB_SUCCESS != simulate_error)) {
|
||||
if (result_ != nullptr) {
|
||||
@ -647,6 +669,10 @@ int ObDASScanOp::rescan()
|
||||
int ObDASScanOp::reuse_iter()
|
||||
{
|
||||
int &ret = errcode_;
|
||||
//May be retry change to retry alloc.
|
||||
//Change back.
|
||||
scan_param_.scan_allocator_ = &scan_rtdef_->scan_allocator_;
|
||||
|
||||
ObITabletScan &tsc_service = get_tsc_service();
|
||||
ObLocalIndexLookupOp *lookup_op = get_lookup_op();
|
||||
const ObTabletID &storage_tablet_id = scan_param_.tablet_id_;
|
||||
|
||||
@ -224,6 +224,16 @@ protected:
|
||||
public:
|
||||
ObSEArray<ObDatum *, 4> trans_info_array_;
|
||||
protected:
|
||||
void init_retry_alloc()
|
||||
{
|
||||
if (nullptr == retry_alloc_) {
|
||||
ObMemAttr attr;
|
||||
attr.tenant_id_ = MTL_ID();
|
||||
attr.label_ = "RetryDASCtx";
|
||||
retry_alloc_ = new(&retry_alloc_buf_) common::ObArenaAllocator();
|
||||
retry_alloc_->set_attr(attr);
|
||||
}
|
||||
}
|
||||
//对于DASScanOp,本质上是对PartitionService的table_scan()接口的封装,
|
||||
//参数为scan_param,结果为result iterator
|
||||
storage::ObTableScanParam scan_param_;
|
||||
@ -232,6 +242,11 @@ protected:
|
||||
common::ObNewRowIterator *result_;
|
||||
//Indicates the number of remaining rows currently that need to be sent through DTL
|
||||
int64_t remain_row_cnt_;
|
||||
|
||||
common::ObArenaAllocator *retry_alloc_;
|
||||
union {
|
||||
common::ObArenaAllocator retry_alloc_buf_;
|
||||
};
|
||||
};
|
||||
|
||||
class ObDASScanResult : public ObIDASTaskResult, public common::ObNewRowIterator
|
||||
|
||||
Reference in New Issue
Block a user