fix: fix das memory expand induced by das used memeory calculation error
This commit is contained in:
@ -928,12 +928,14 @@ struct ObDMLRtCtx
|
|||||||
ObDMLRtCtx(ObEvalCtx &eval_ctx, ObExecContext &exec_ctx, ObTableModifyOp &op)
|
ObDMLRtCtx(ObEvalCtx &eval_ctx, ObExecContext &exec_ctx, ObTableModifyOp &op)
|
||||||
: das_ref_(eval_ctx, exec_ctx),
|
: das_ref_(eval_ctx, exec_ctx),
|
||||||
das_task_status_(),
|
das_task_status_(),
|
||||||
op_(op)
|
op_(op),
|
||||||
|
cached_row_size_(0)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
void reuse()
|
void reuse()
|
||||||
{
|
{
|
||||||
das_ref_.reuse();
|
das_ref_.reuse();
|
||||||
|
cached_row_size_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanup()
|
void cleanup()
|
||||||
@ -950,10 +952,13 @@ struct ObDMLRtCtx
|
|||||||
{ return das_task_status_.need_pick_del_task_first(); }
|
{ return das_task_status_.need_pick_del_task_first(); }
|
||||||
bool need_non_sub_full_task()
|
bool need_non_sub_full_task()
|
||||||
{ return das_task_status_.need_non_sub_full_task(); }
|
{ return das_task_status_.need_non_sub_full_task(); }
|
||||||
|
void add_cached_row_size(const int64_t row_size) { cached_row_size_ += row_size; }
|
||||||
|
int64_t get_cached_row_size() const { return cached_row_size_; }
|
||||||
|
|
||||||
ObDASRef das_ref_;
|
ObDASRef das_ref_;
|
||||||
DasTaskStatus das_task_status_;
|
DasTaskStatus das_task_status_;
|
||||||
ObTableModifyOp &op_;
|
ObTableModifyOp &op_;
|
||||||
|
int64_t cached_row_size_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
|
|||||||
@ -1549,16 +1549,18 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef,
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), stored_row, buffer_full))) {
|
if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), stored_row, buffer_full))) {
|
||||||
LOG_WARN("insert row to das dml op buffer failed", K(ret), K(ctdef), K(rtdef));
|
LOG_WARN("insert row to das dml op buffer failed", K(ret), K(ctdef), K(rtdef));
|
||||||
|
} else if (OB_NOT_NULL(stored_row)) {
|
||||||
|
dml_rtctx.add_cached_row_size(stored_row->row_size_);
|
||||||
}
|
}
|
||||||
LOG_DEBUG("write row to das op", K(ret), K(buffer_full), "op_type", N,
|
LOG_DEBUG("write row to das op", K(ret), K(buffer_full), "op_type", N,
|
||||||
"table_id", ctdef.table_id_, "index_tid", ctdef.index_tid_,
|
"table_id", ctdef.table_id_, "index_tid", ctdef.index_tid_,
|
||||||
"row", ROWEXPR2STR(dml_rtctx.get_eval_ctx(), row));
|
"row", ROWEXPR2STR(dml_rtctx.get_eval_ctx(), row), "row_size", stored_row->row_size_);
|
||||||
}
|
}
|
||||||
//3. if buffer is full, frozen node, create a new das op to add row
|
//3. if buffer is full, frozen node, create a new das op to add row
|
||||||
if (OB_SUCC(ret) && buffer_full) {
|
if (OB_SUCC(ret) && buffer_full) {
|
||||||
need_retry = true;
|
need_retry = true;
|
||||||
if (REACH_COUNT_INTERVAL(10)) { // print log per 10 times.
|
if (REACH_COUNT_INTERVAL(10)) { // print log per 10 times.
|
||||||
LOG_INFO("DAS write buffer full, ", K(dml_op->get_row_cnt()), K(dml_rtctx.das_ref_.get_das_mem_used()));
|
LOG_INFO("DAS write buffer full, ", K(dml_op->get_row_cnt()), K(dml_rtctx.das_ref_.get_das_mem_used()), K(dml_rtctx.get_cached_row_size()));
|
||||||
}
|
}
|
||||||
dml_rtctx.das_ref_.set_frozen_node();
|
dml_rtctx.das_ref_.set_frozen_node();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1099,9 +1099,9 @@ int ObTableModifyOp::submit_all_dml_task()
|
|||||||
int ObTableModifyOp::discharge_das_write_buffer()
|
int ObTableModifyOp::discharge_das_write_buffer()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (dml_rtctx_.das_ref_.get_das_mem_used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) {
|
if (dml_rtctx_.get_cached_row_size() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) {
|
||||||
LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage",
|
LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage",
|
||||||
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_));
|
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_), K(dml_rtctx_.get_cached_row_size()));
|
||||||
ret = submit_all_dml_task();
|
ret = submit_all_dml_task();
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user