diff --git a/deps/oblib/src/lib/allocator/ob_allocator.h b/deps/oblib/src/lib/allocator/ob_allocator.h index d566826524..194f266370 100644 --- a/deps/oblib/src/lib/allocator/ob_allocator.h +++ b/deps/oblib/src/lib/allocator/ob_allocator.h @@ -93,6 +93,8 @@ public: alloc_->free(ptr); ptr = NULL; } } + virtual int64_t total() const { return alloc_ != nullptr ? alloc_->total() : 0; } + virtual int64_t used() const { return alloc_ != nullptr ? alloc_->used() : 0; } void set_alloc(ObIAllocator *alloc) { alloc_ = alloc; } ObWrapperAllocator &operator=(const ObWrapperAllocator &that) { diff --git a/deps/oblib/src/lib/ob_name_id_def.h b/deps/oblib/src/lib/ob_name_id_def.h index 0640a0edf1..853f5456af 100644 --- a/deps/oblib/src/lib/ob_name_id_def.h +++ b/deps/oblib/src/lib/ob_name_id_def.h @@ -567,6 +567,15 @@ DEF_NAME(id, "id") DEF_NAME(update_start_check_row, "update_start_check_row") DEF_NAME(update_end_check_row, "update_end_check_row") + DEF_NAME_PAIR(get_das_id, "get_das_id") + DEF_NAME_PAIR(do_local_das_task, "do_local_das_task") + DEF_NAME_PAIR(do_remote_das_task, "do_remote_das_task") + DEF_NAME_PAIR(rescan_das_task, "rescan_das_task") + DEF_NAME_PAIR(das_rpc_process, "das_rpc_process") + DEF_NAME_PAIR(close_das_task, "close_das_task") + DEF_NAME_PAIR(fetch_das_extra_result, "fetch_das_extra_result") + DEF_NAME_PAIR(fetch_das_result_process, "fetch_das_result_process") + DEF_NAME(before_calculate_row, "before_calculate_row") DEF_NAME(end_calculate_row, "end_calculate_row") DEF_NAME(row, "row") @@ -739,6 +748,7 @@ DEF_NAME(id, "id") // pc: plan cache // E: executor DEF_NAME_PAIR(S_table_scan, "storage: table scan") + DEF_NAME_PAIR(S_table_rescan, "storage: table rescan") DEF_NAME_PAIR(S_revert_iter, "storage: revert iterator") DEF_NAME_PAIR(S_delete_rows, "storage: delete rows") DEF_NAME_PAIR(S_delete_row, "storage: delete row") diff --git a/deps/oblib/src/lib/trace/ob_trace_def.h b/deps/oblib/src/lib/trace/ob_trace_def.h index 0fa78b03b5..9de0f04519 100644 --- a/deps/oblib/src/lib/trace/ob_trace_def.h +++ b/deps/oblib/src/lib/trace/ob_trace_def.h @@ -30,7 +30,7 @@ FLT_DEF_SPAN(com_query_process, "com_query process") FLT_DEF_SPAN(resolve, "resolve syntax tree's semantics and generate statement") FLT_DEF_SPAN(rewrite, "transform statement") FLT_DEF_SPAN(optimize, "do cost-base optimization and generate log plan") - FLT_DEF_SPAN(code_generate, "generate physical plan accordding to log plan") + FLT_DEF_SPAN(code_generate, "generate physical plan according to log plan") FLT_DEF_SPAN(sql_execute, "execute physical plan") //TODO shengle code of open interface need refator FLT_DEF_SPAN(open, "open plan") @@ -38,7 +38,7 @@ FLT_DEF_SPAN(com_query_process, "com_query process") FLT_DEF_SPAN(px_schedule, "schedule tasks divided by px") FLT_DEF_SPAN(px_task, "execution of px's schedule") FLT_DEF_SPAN(close, "close plan") - FLT_DEF_SPAN(cmd_execute, "commmand execute") + FLT_DEF_SPAN(cmd_execute, "command execute") FLT_DEF_SPAN(cmd_open, "command open") // **** for sql end **** @@ -48,7 +48,7 @@ FLT_DEF_SPAN(com_query_process, "com_query process") FLT_DEF_SPAN(pc_get_pl_object, "get pl object from plan cache") FLT_DEF_SPAN(pc_add_pl_object, "add pl object to plan cache") FLT_DEF_SPAN(pl_execute, "execute pl object") - FLT_DEF_SPAN(pl_spi_query, "exeute pl spi query") + FLT_DEF_SPAN(pl_spi_query, "execute pl spi query") FLT_DEF_SPAN(pl_spi_prepare, "prepare phase of pl execution") FLT_DEF_SPAN(pl_spi_execute, "execute phase of pl execution") // **** for pl end **** @@ -72,6 +72,16 @@ FLT_DEF_SPAN(com_query_process, "com_query process") FLT_DEF_SPAN(ps_close, "close phase of ps protocol") // for ps end + // for das + FLT_DEF_SPAN(get_das_id, "fetch das task id") + FLT_DEF_SPAN(do_local_das_task, "execute local das task") + FLT_DEF_SPAN(do_remote_das_task, "execute remote das task") + FLT_DEF_SPAN(das_rpc_process, "das task rpc process") + FLT_DEF_SPAN(rescan_das_task, "rescan das task") + FLT_DEF_SPAN(close_das_task, "close das task") + FLT_DEF_SPAN(fetch_das_extra_result, "fetch das extra result") + FLT_DEF_SPAN(fetch_das_result_process, "fetch das result process") + #endif // __HIGH_LEVEL_SPAN #ifdef __MIDDLE_LEVEL_SPAN diff --git a/src/sql/das/ob_das_dml_ctx_define.cpp b/src/sql/das/ob_das_dml_ctx_define.cpp index 695b6a44e7..6b33bea876 100644 --- a/src/sql/das/ob_das_dml_ctx_define.cpp +++ b/src/sql/das/ob_das_dml_ctx_define.cpp @@ -303,7 +303,7 @@ int ObDASWriteBuffer::try_add_row(const ObIArray &exprs, ret = OB_ERR_UNEXPECTED; LOG_WARN("stored row is null", K(ret)); } else { - LOG_DEBUG("add dml_row pay_load here"); + LOG_TRACE("add dml_row pay_load here", KPC(stored_row)); } } diff --git a/src/sql/das/ob_das_extra_data.cpp b/src/sql/das/ob_das_extra_data.cpp index 260c259ca7..392695e780 100644 --- a/src/sql/das/ob_das_extra_data.cpp +++ b/src/sql/das/ob_das_extra_data.cpp @@ -51,6 +51,8 @@ int ObDASExtraData::init(const int64_t task_id, int ObDASExtraData::fetch_result() { int ret = OB_SUCCESS; + NG_TRACE(fetch_das_extra_result_begin); + FLTSpanGuard(fetch_das_extra_result); ObDASDataFetchReq req; int64_t tenant_id = MTL_ID(); int64_t timeout = timeout_ts_ - ObTimeUtility::current_time(); @@ -72,6 +74,7 @@ int ObDASExtraData::fetch_result() LOG_TRACE("das fetch task result", KR(ret), K(req), K(result_)); has_more_ = result_.has_more(); } + NG_TRACE(fetch_das_extra_result_end); return ret; } diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index a362baed10..b72be60163 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -245,6 +245,8 @@ int ObDASRef::close_all_task() int ret = OB_SUCCESS; int last_end_ret = OB_SUCCESS; if (has_task()) { + NG_TRACE(close_das_task_begin); + FLTSpanGuard(close_das_task); ObSQLSessionInfo *session = nullptr; DASTaskIter task_iter = begin_task_iter(); @@ -280,6 +282,7 @@ int ObDASRef::close_all_task() if (task_map_.created()) { task_map_.destroy(); } + NG_TRACE(close_das_task_end); } return ret; } diff --git a/src/sql/das/ob_das_rpc_processor.cpp b/src/sql/das/ob_das_rpc_processor.cpp index 3d44f747b6..88378d1626 100644 --- a/src/sql/das/ob_das_rpc_processor.cpp +++ b/src/sql/das/ob_das_rpc_processor.cpp @@ -68,11 +68,14 @@ int ObDASSyncAccessP::before_process() int ObDASSyncAccessP::process() { int ret = OB_SUCCESS; + NG_TRACE(das_rpc_process_begin); + FLTSpanGuard(das_rpc_process); ObDASTaskArg &task = arg_; ObDASTaskResp &task_resp = result_; ObIDASTaskOp *task_op = task.get_task_op(); ObIDASTaskResult *task_result = task_resp.get_op_result(); bool has_more = false; + ObDASOpType task_type = DAS_OP_INVALID; //regardless of the success of the task execution, the fllowing meta info must be set task_result->set_task_id(task_op->get_task_id()); task_resp.set_ctrl_svr(task.get_ctrl_svr()); @@ -87,6 +90,7 @@ int ObDASSyncAccessP::process() } else if (OB_UNLIKELY(has_more) && OB_FAIL(task_op->fill_extra_result())) { LOG_WARN("fill extra result to controller failed", KR(ret)); } else { + task_type = task_op->get_type(); task_resp.set_has_more(has_more); ObWarningBuffer *wb = ob_get_tsi_warning_buffer(); if (wb != nullptr) { @@ -122,14 +126,19 @@ int ObDASSyncAccessP::process() } } LOG_DEBUG("process das sync access task", K(ret), K(task), KPC(task_result), K(has_more)); + NG_TRACE_EXT(das_rpc_process_end, OB_ID(type), task_type); return OB_SUCCESS; } int ObDASSyncAccessP::after_process(int error_code) { int ret = OB_SUCCESS; + const int64_t elapsed_time = common::ObTimeUtility::current_time() - get_receive_timestamp(); if (OB_FAIL(ObDASSyncRpcProcessor::after_process(error_code))) { LOG_WARN("do das sync base rpc process failed", K(ret)); + } else if (elapsed_time >= ObServerConfig::get_instance().trace_log_slow_query_watermark) { + //slow das task, print trace info + FORCE_PRINT_TRACE(THE_TRACE, "[slow das rpc process]"); } //执行相关的错误信息不用传递给RPC框架,RPC框架不处理具体的RPC执行错误信息,始终返回OB_SUCCESS return OB_SUCCESS; @@ -150,6 +159,8 @@ void ObDASSyncAccessP::cleanup() int ObDASSyncFetchP::process() { int ret = OB_SUCCESS; + NG_TRACE(fetch_das_result_process_begin); + FLTSpanGuard(fetch_das_result_process); ObDASDataFetchReq &req = arg_; ObDASDataFetchRes &res = result_; ObDataAccessService *das = NULL; @@ -179,9 +190,26 @@ int ObDASSyncFetchP::process() } else { res.set_has_more(has_more); } + NG_TRACE(fetch_das_result_process_end); return ret; } -int ObDASAsyncEraseP::process() { + +int ObDASSyncFetchP::after_process(int error_code) +{ + int ret = OB_SUCCESS; + const int64_t elapsed_time = common::ObTimeUtility::current_time() - get_receive_timestamp(); + if (OB_FAIL(ObDASSyncFetchResRpcProcessor::after_process(error_code))) { + LOG_WARN("do das sync base rpc process failed", K(ret)); + } else if (elapsed_time >= ObServerConfig::get_instance().trace_log_slow_query_watermark) { + //slow das task, print trace info + FORCE_PRINT_TRACE(THE_TRACE, "[slow das rpc process]"); + } + //执行相关的错误信息不用传递给RPC框架,RPC框架不处理具体的RPC执行错误信息,始终返回OB_SUCCESS + return OB_SUCCESS; +} + +int ObDASAsyncEraseP::process() +{ int ret = OB_SUCCESS; ObDASDataEraseReq &req = arg_; ObDataAccessService *das = NULL; diff --git a/src/sql/das/ob_das_rpc_processor.h b/src/sql/das/ob_das_rpc_processor.h index 1355650b38..b92cea25a5 100644 --- a/src/sql/das/ob_das_rpc_processor.h +++ b/src/sql/das/ob_das_rpc_processor.h @@ -68,7 +68,8 @@ class ObDASSyncFetchP : public ObDASSyncFetchResRpcProcessor public: ObDASSyncFetchP() {} ~ObDASSyncFetchP() {} - int process(); + virtual int process() override; + virtual int after_process(int error_code); private: DISALLOW_COPY_AND_ASSIGN(ObDASSyncFetchP); }; diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index 7945be256e..a63746231f 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -142,7 +142,6 @@ public: K_(need_switch_param), KPC_(trans_desc), KPC_(snapshot), - K_(trans_result), K_(tablet_id), K_(ls_id), KPC_(tablet_loc), @@ -158,7 +157,6 @@ public: transaction::ObTxDesc *get_trans_desc() { return trans_desc_; } void set_snapshot(transaction::ObTxReadSnapshot *snapshot) { snapshot_ = snapshot; } transaction::ObTxReadSnapshot *get_snapshot() { return snapshot_; } - transaction::ObTxExecResult &get_trans_result() { return trans_result_; } bool is_local_task() const { return task_started_; } void set_can_part_retry(const bool flag) { can_part_retry_ = flag; } bool can_part_retry() const { return can_part_retry_; } @@ -192,7 +190,6 @@ protected: }; transaction::ObTxDesc *trans_desc_; //trans desc,事务是全局信息,由RPC框架管理,这里不维护其内存 transaction::ObTxReadSnapshot *snapshot_; // Mvcc snapshot - transaction::ObTxExecResult trans_result_; //does not need serialize it common::ObTabletID tablet_id_; share::ObLSID ls_id_; const ObDASTabletLoc *tablet_loc_; //does not need serialize it diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index 740dcdc88b..5cb2f1289e 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -74,9 +74,12 @@ int ObDataAccessService::execute_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_ { int ret = OB_SUCCESS; if (OB_LIKELY(das_ref.is_execute_directly())) { + NG_TRACE(do_local_das_task_begin); + FLTSpanGuard(do_local_das_task); if (OB_FAIL(task_op.start_das_task())) { LOG_WARN("start das task failed", K(ret), K(task_op)); } + NG_TRACE(do_local_das_task_end); } else { ret = execute_dist_das_task(das_ref, task_op); task_op.errcode_ = ret; @@ -97,6 +100,8 @@ int ObDataAccessService::execute_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_ int ObDataAccessService::get_das_task_id(int64_t &das_id) { int ret = OB_SUCCESS; + NG_TRACE(get_das_id_begin); + FLTSpanGuard(get_das_id); const int MAX_RETRY_TIMES = 50; int64_t tmp_das_id = 0; bool force_renew = false; @@ -123,6 +128,7 @@ int ObDataAccessService::get_das_task_id(int64_t &das_id) if (OB_SUCC(ret)) { das_id = tmp_das_id; } + NG_TRACE(get_das_id_end); return ret; } @@ -240,6 +246,8 @@ int ObDataAccessService::end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op) int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op) { int ret = OB_SUCCESS; + NG_TRACE(rescan_das_task_begin); + FLTSpanGuard(rescan_das_task); if (scan_op.is_local_task()) { if (OB_FAIL(scan_op.rescan())) { LOG_WARN("rescan das task failed", K(ret)); @@ -257,6 +265,7 @@ int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op LOG_WARN("failed to retry das task", K(tmp_ret)); } } + NG_TRACE(rescan_das_task_end); return ret; } @@ -266,9 +275,12 @@ int ObDataAccessService::do_local_das_task(ObDASRef &das_ref, ObDASTaskArg &task int ret = OB_SUCCESS; LOG_DEBUG("begin to do local das task", K(task_arg)); ObIDASTaskOp *task_op = task_arg.get_task_op(); + NG_TRACE(do_local_das_task_begin); + FLTSpanGuard(do_local_das_task); if (OB_FAIL(task_op->start_das_task())) { LOG_WARN("start local das task failed", K(ret)); } + NG_TRACE(do_local_das_task_end); return ret; } @@ -276,6 +288,8 @@ int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &tas { int ret = OB_SUCCESS; void *resp_buf = nullptr; + NG_TRACE(do_remote_das_task_begin); + FLTSpanGuard(do_remote_das_task); ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session(); ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx(); int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time(); @@ -338,6 +352,7 @@ int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &tas } } } + NG_TRACE_EXT(do_remote_das_task_end, Y(ret), OB_ID(addr), task_arg.get_runner_svr()); return ret; } diff --git a/src/sql/engine/dml/ob_dml_service.cpp b/src/sql/engine/dml/ob_dml_service.cpp index ed635c5d7c..b5fd8477a0 100644 --- a/src/sql/engine/dml/ob_dml_service.cpp +++ b/src/sql/engine/dml/ob_dml_service.cpp @@ -1239,8 +1239,6 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, int64_t simulate_row_cnt = - EVENT_CALL(EventTable::EN_DAS_DML_BUFFER_OVERFLOW); if (OB_UNLIKELY(simulate_row_cnt > 0 && dml_op->get_row_cnt() >= simulate_row_cnt)) { buffer_full = true; - } else if (dml_rtctx.get_das_alloc().used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE) { - buffer_full = true; } else if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), buffer_full))) { LOG_WARN("insert row to das dml op buffer failed", K(ret), K(ctdef), K(rtdef)); } @@ -1263,13 +1261,8 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, if (dml_rtctx.need_pick_del_task_first() && OB_FAIL(dml_rtctx.das_ref_.pick_del_task_to_first())) { LOG_WARN("fail to pick delete das task to first", K(ret)); - } else if (OB_FAIL(dml_rtctx.das_ref_.execute_all_task())) { - LOG_WARN("execute all das task failed", K(ret)); - } else if (OB_FAIL(dml_rtctx.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); - } else { - //don't release all memory, need to reuse das ctx - dml_rtctx.reuse(); + } else if (OB_FAIL(dml_rtctx.op_.submit_all_dml_task())) { + LOG_WARN("submit all dml task failed", K(ret)); } } } diff --git a/src/sql/engine/dml/ob_table_delete_op.cpp b/src/sql/engine/dml/ob_table_delete_op.cpp index da1751586f..7d08c36283 100644 --- a/src/sql/engine/dml/ob_table_delete_op.cpp +++ b/src/sql/engine/dml/ob_table_delete_op.cpp @@ -125,43 +125,6 @@ int ObTableDeleteOp::inner_open() return ret; } -int ObTableDeleteOp::inner_get_next_row() -{ - int ret = OB_SUCCESS; - if (iter_end_) { - LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); - ret = OB_ITER_END; - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(delete_row_to_das())) { - LOG_WARN("delete row to das failed", K(ret)); - } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { - err_log_rt_def_.reset(); - continue; - } else if (MY_SPEC.is_returning_) { - break; - } - } - if (OB_ITER_END == ret) { - if (!MY_SPEC.del_ctdefs_.at(0).at(0)->has_instead_of_trigger_ && OB_FAIL(del_rows_post_proc())) { - LOG_WARN("do delete rows post process failed", K(ret)); - } else { - //can not overwrite the original error code - ret = OB_ITER_END; - } - } - } - return ret; -} - OB_INLINE int ObTableDeleteOp::inner_open_with_das() { int ret = OB_SUCCESS; @@ -256,6 +219,13 @@ OB_INLINE int ObTableDeleteOp::calc_tablet_loc(const ObDelCtDef &del_ctdef, return ret; } +int ObTableDeleteOp::write_row_to_das_buffer() +{ + int ret = OB_SUCCESS; + ret = delete_row_to_das(); + return ret; +} + OB_INLINE int ObTableDeleteOp::delete_row_to_das() { int ret = OB_SUCCESS; @@ -309,22 +279,19 @@ OB_INLINE int ObTableDeleteOp::delete_row_to_das() return ret; } -OB_INLINE int ObTableDeleteOp::del_rows_post_proc() +int ObTableDeleteOp::write_rows_post_proc(int last_errno) { - int ret = OB_SUCCESS; - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); - //iterator end, if das ref has task, need flush all task data to partition storage - if (OB_FAIL(submit_all_dml_task())) { - LOG_WARN("submit all dml task failed", K(ret)); - } else { + int ret = last_errno; + if (iter_end_) { + ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); for (int64_t i = 0; OB_SUCC(ret) && i < del_rtdefs_.count(); ++i) { plan_ctx->add_affected_rows(del_rtdefs_.at(i).at(0).das_rtdef_.affected_rows_); LOG_DEBUG("del rows post proc", K(plan_ctx->get_affected_rows()), K(del_rtdefs_.at(i).at(0))); } - } - if (OB_SUCC(ret) && GCONF.enable_defensive_check()) { - if (OB_FAIL(check_delete_affected_row())) { - LOG_WARN("check delete affected row failed", K(ret)); + if (OB_SUCC(ret) && GCONF.enable_defensive_check()) { + if (OB_FAIL(check_delete_affected_row())) { + LOG_WARN("check delete affected row failed", K(ret)); + } } } return ret; @@ -366,20 +333,6 @@ int ObTableDeleteOp::check_delete_affected_row() return ret; } -OB_INLINE int ObTableDeleteOp::get_next_row_from_child() -{ - int ret = OB_SUCCESS; - clear_evaluated_flag(); - if (OB_FAIL(child_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); - } - return ret; -} - int ObTableDeleteOp::inner_close() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/dml/ob_table_delete_op.h b/src/sql/engine/dml/ob_table_delete_op.h index e574b76a54..390f05a3d7 100644 --- a/src/sql/engine/dml/ob_table_delete_op.h +++ b/src/sql/engine/dml/ob_table_delete_op.h @@ -80,20 +80,19 @@ public: virtual ~ObTableDeleteOp() {} protected: virtual int inner_open() override; - virtual int inner_get_next_row() override; virtual int inner_rescan() override; virtual int inner_close() override; protected: int inner_open_with_das(); int delete_row_to_das(); - int del_rows_post_proc(); + virtual int write_rows_post_proc(int last_errno); int calc_tablet_loc(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, ObDASTabletLoc *&tablet_loc); int open_table_for_each(); int close_table_for_each(); - int get_next_row_from_child(); int check_delete_affected_row(); + virtual int write_row_to_das_buffer() override; protected: DelRtDef2DArray del_rtdefs_; //see the comment of DelCtDef2DArray ObErrLogService err_log_service_; diff --git a/src/sql/engine/dml/ob_table_insert_all_op.cpp b/src/sql/engine/dml/ob_table_insert_all_op.cpp index a4e584c788..a69af536b3 100644 --- a/src/sql/engine/dml/ob_table_insert_all_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_all_op.cpp @@ -100,7 +100,7 @@ int ObTableInsertAllOp::switch_iterator(ObExecContext &ctx) return common::OB_ITER_END; } -int ObTableInsertAllOp::insert_all_row_to_das() +int ObTableInsertAllOp::write_row_to_das_buffer() { int ret = OB_SUCCESS; //int64_t savepoint_no = 0; @@ -269,47 +269,6 @@ int ObTableInsertAllOp::inner_rescan() return ObTableInsertOp::inner_rescan(); } -int ObTableInsertAllOp::inner_get_next_row() -{ - int ret = OB_SUCCESS; - if (iter_end_) { - LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); - ret = OB_ITER_END; - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(ObTableInsertOp::get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(insert_all_row_to_das())) { - LOG_WARN("insert row to das failed", K(ret)); - } - //erro logging not support, fix it later - // } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { - // clear_evaluated_flag(); - // err_log_rt_def_.curr_err_log_record_num_++; - // err_log_rt_def_.reset(); - // continue; - // } else if (MY_SPEC.is_returning_) { - // break; - // } - } - - if (OB_ITER_END == ret) { - if (OB_FAIL(ObTableInsertOp::ins_rows_post_proc())) { - LOG_WARN("do insert rows post process failed", K(ret)); - } else { - ret = OB_ITER_END; - } - } - } - return ret; -} - int ObTableInsertAllOp::inner_close() { return ObTableInsertOp::inner_close(); diff --git a/src/sql/engine/dml/ob_table_insert_all_op.h b/src/sql/engine/dml/ob_table_insert_all_op.h index cde6bf8edb..ff3dd5826f 100644 --- a/src/sql/engine/dml/ob_table_insert_all_op.h +++ b/src/sql/engine/dml/ob_table_insert_all_op.h @@ -74,11 +74,10 @@ class ObTableInsertAllOp : public ObTableInsertOp protected: virtual int inner_open() override; - virtual int inner_get_next_row() override; virtual int inner_rescan() override; virtual int inner_close() override; protected: - int insert_all_row_to_das(); + virtual int write_row_to_das_buffer() override; private: int check_match_conditions(const int64_t tbl_idx, const bool have_insert_row, diff --git a/src/sql/engine/dml/ob_table_insert_op.cpp b/src/sql/engine/dml/ob_table_insert_op.cpp index 8a6d28c8b2..5d3de67788 100644 --- a/src/sql/engine/dml/ob_table_insert_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_op.cpp @@ -186,6 +186,13 @@ int ObTableInsertOp::calc_tablet_loc(const ObInsCtDef &ins_ctdef, return ret; } +int ObTableInsertOp::write_row_to_das_buffer() +{ + int ret = OB_SUCCESS; + ret = insert_row_to_das(); + return ret; +} + OB_INLINE int ObTableInsertOp::insert_row_to_das() { int ret = OB_SUCCESS; @@ -272,48 +279,52 @@ OB_INLINE int ObTableInsertOp::insert_row_to_das() if (OB_SUCC(ret)) { plan_ctx->record_last_insert_id_cur_stmt(); } - if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { - plan_ctx->set_last_insert_id_cur_stmt(0); - } NG_TRACE(insert_end); return ret; } -int ObTableInsertOp::ins_rows_post_proc() +int ObTableInsertOp::write_rows_post_proc(int last_errno) { - int ret = OB_SUCCESS; - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); - //iterator end, if das ref has task, need flush all task data to partition storage - if (OB_FAIL(submit_all_dml_task())) { - LOG_WARN("execute all insert das task failed", K(ret)); - } - if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { - plan_ctx->set_last_insert_id_cur_stmt(0); - } - if (OB_SUCC(ret)) { - int64_t changed_rows = 0; - //for multi table - for (int64_t i = 0; i < ins_rtdefs_.count(); ++i) { - changed_rows += ins_rtdefs_.at(i).at(0).das_rtdef_.affected_rows_; + int ret = last_errno; + if (iter_end_) { + ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); + if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { + plan_ctx->set_last_insert_id_cur_stmt(0); } - plan_ctx->add_row_matched_count(changed_rows); - plan_ctx->add_affected_rows(changed_rows); - // sync last user specified value after iter ends(compatible with MySQL) - if (OB_FAIL(plan_ctx->sync_last_value_local())) { - LOG_WARN("failed to sync last value", K(ret)); + if (OB_SUCC(ret)) { + int64_t changed_rows = 0; + //for multi table + for (int64_t i = 0; i < ins_rtdefs_.count(); ++i) { + changed_rows += ins_rtdefs_.at(i).at(0).das_rtdef_.affected_rows_; + } + plan_ctx->add_row_matched_count(changed_rows); + plan_ctx->add_affected_rows(changed_rows); + // sync last user specified value after iter ends(compatible with MySQL) + if (OB_FAIL(plan_ctx->sync_last_value_local())) { + LOG_WARN("failed to sync last value", K(ret)); + } + } + int sync_ret = OB_SUCCESS; + if (OB_SUCCESS != (sync_ret = plan_ctx->sync_last_value_global())) { + LOG_WARN("failed to sync value globally", K(sync_ret)); + } + NG_TRACE(sync_auto_value); + if (OB_SUCC(ret)) { + ret = sync_ret; + } + if (OB_SUCC(ret) && GCONF.enable_defensive_check() && !is_error_logging_) { + if (OB_FAIL(check_insert_affected_row())) { + LOG_WARN("check index insert consistency failed", K(ret)); + } } } - int sync_ret = OB_SUCCESS; - if (OB_SUCCESS != (sync_ret = plan_ctx->sync_last_value_global())) { - LOG_WARN("failed to sync value globally", K(sync_ret)); - } - NG_TRACE(sync_auto_value); - if (OB_SUCC(ret)) { - ret = sync_ret; - } - if (OB_SUCC(ret) && GCONF.enable_defensive_check() && !is_error_logging_) { - if (OB_FAIL(check_insert_affected_row())) { - LOG_WARN("check index insert consistency failed", K(ret)); + // all error, we must rollback with single execute when batch executed + if (OB_SUCCESS != ret && OB_ITER_END != ret) { + ObMultiStmtItem &multi_stmt_item = ctx_.get_sql_ctx()->multi_stmt_item_; + if (MY_SPEC.ins_ctdefs_.at(0).at(0)->das_ctdef_.is_batch_stmt_ && !multi_stmt_item.is_ins_multi_val_opt()) { + int tmp_ret = ret; + ret = OB_BATCHED_MULTI_STMT_ROLLBACK; + LOG_TRACE("batch exec with some exception, rollback with single execute", K(ret), K(tmp_ret)); } } return ret; @@ -392,68 +403,6 @@ int ObTableInsertOp::inner_rescan() return ret; } -int ObTableInsertOp::get_next_row_from_child() -{ - int ret = OB_SUCCESS; - if (OB_FAIL(child_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - clear_evaluated_flag(); - LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); - } - return ret; -} - -int ObTableInsertOp::inner_get_next_row() -{ - int ret = OB_SUCCESS; - if (iter_end_) { - LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); - ret = OB_ITER_END; - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(insert_row_to_das())) { - LOG_WARN("insert row to das failed", K(ret)); - } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { - clear_evaluated_flag(); - err_log_rt_def_.curr_err_log_record_num_++; - err_log_rt_def_.reset(); - continue; - } else if (MY_SPEC.is_returning_) { - break; - } - } - - if (OB_ITER_END == ret) { - if (!MY_SPEC.ins_ctdefs_.at(0).at(0)->has_instead_of_trigger_ && OB_FAIL(ins_rows_post_proc())) { - LOG_WARN("do insert rows post process failed", K(ret)); - } else { - ret = OB_ITER_END; - } - } - // all error, we must rollback with single execute when batch executed - if (OB_SUCCESS != ret && OB_ITER_END != ret) { - ObMultiStmtItem &multi_stmt_item = ctx_.get_sql_ctx()->multi_stmt_item_; - if (MY_SPEC.ins_ctdefs_.at(0).at(0)->das_ctdef_.is_batch_stmt_ && !multi_stmt_item.is_ins_multi_val_opt()) { - int tmp_ret = ret; - ret = OB_BATCHED_MULTI_STMT_ROLLBACK; - LOG_TRACE("batch exec with some exception, rollback with single execute", K(ret), K(tmp_ret)); - } - } - } - return ret; -} - int ObTableInsertOp::inner_close() { NG_TRACE(insert_close); diff --git a/src/sql/engine/dml/ob_table_insert_op.h b/src/sql/engine/dml/ob_table_insert_op.h index c9372828d6..77720fa9f5 100644 --- a/src/sql/engine/dml/ob_table_insert_op.h +++ b/src/sql/engine/dml/ob_table_insert_op.h @@ -77,18 +77,17 @@ public: virtual ~ObTableInsertOp() {}; protected: virtual int inner_open() override; - virtual int inner_get_next_row() override; virtual int inner_rescan() override; virtual int inner_close() override; protected: int inner_open_with_das(); int insert_row_to_das(); - int ins_rows_post_proc(); + virtual int write_row_to_das_buffer() override; + virtual int write_rows_post_proc(int last_errno) override; int calc_tablet_loc(const ObInsCtDef &ins_ctdef, ObInsRtDef &ins_rtdef, ObDASTabletLoc *&tablet_loc); int open_table_for_each(); - int get_next_row_from_child(); int close_table_for_each(); int check_insert_affected_row(); diff --git a/src/sql/engine/dml/ob_table_insert_up_op.cpp b/src/sql/engine/dml/ob_table_insert_up_op.cpp index f9de9f41a6..03fd27a08b 100644 --- a/src/sql/engine/dml/ob_table_insert_up_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_up_op.cpp @@ -850,10 +850,12 @@ int ObTableInsertUpOp::load_batch_insert_up_rows(bool &is_iter_end, int64_t &ins default_row_batch_cnt = 1; } LOG_DEBUG("simulate lookup row batch count", K(simulate_batch_row_cnt), K(default_row_batch_cnt)); - while (OB_SUCC(ret) && ++row_cnt <= default_row_batch_cnt) { + while (OB_SUCC(ret) && ++row_cnt <= default_row_batch_cnt) { if (OB_FAIL(get_next_row_from_child())) { if (OB_ITER_END != ret) { LOG_WARN("fail to load next row from child", K(ret)); + } else { + iter_end_ = true; } } else if (OB_FAIL(try_insert_row())) { LOG_WARN("try insert row to das", K(ret)); diff --git a/src/sql/engine/dml/ob_table_lock_op.cpp b/src/sql/engine/dml/ob_table_lock_op.cpp index b6fbe855f0..1966dbe811 100644 --- a/src/sql/engine/dml/ob_table_lock_op.cpp +++ b/src/sql/engine/dml/ob_table_lock_op.cpp @@ -119,7 +119,8 @@ ObTableLockOp::ObTableLockOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input) : ObTableModifyOp(exec_ctx, spec, input), - savepoint_no_(0) + savepoint_no_(0), + need_return_row_(false) { } @@ -177,108 +178,148 @@ int ObTableLockOp::init_lock_rtdef() int ObTableLockOp::inner_get_next_row() { int ret = OB_SUCCESS; - const ObTableLockSpec &spec = MY_SPEC; - bool need_get_next_row = false; if (iter_end_) { LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); ret = OB_ITER_END; - } else if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (!MY_SPEC.is_skip_locked()) { - if (OB_FAIL(get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(lock_row_to_das())) { - LOG_WARN("lock row to das failed", K(ret)); - } - } else if (MY_SPEC.is_skip_locked()) { - do { - need_get_next_row = false; - if (OB_FAIL(get_next_row_from_child())) { + } else { + need_return_row_ = false; + while (OB_SUCC(ret)) { + if (OB_FAIL(try_check_status())) { + LOG_WARN("check status failed", K(ret)); + } else if (OB_FAIL(get_next_row_from_child())) { if (OB_ITER_END != ret) { LOG_WARN("fail to get next row", K(ret)); } else { iter_end_ = true; + ret = OB_SUCCESS; + break; } } else if (OB_FAIL(lock_row_to_das())) { - LOG_WARN("lock row to das failed", K(ret)); - } else if (OB_FAIL(lock_one_row_post_proc(need_get_next_row))) { - LOG_WARN("fail to execute lock_one_row_post_proc", K(ret)); + LOG_WARN("write row to das failed", K(ret)); + } else if (OB_FAIL(submit_row_by_strategy())) { + LOG_WARN("submit row by strategy failed", K(ret)); + } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { + clear_evaluated_flag(); + err_log_rt_def_.curr_err_log_record_num_++; + err_log_rt_def_.reset(); + continue; + } else if (need_return_row_) { + //break to output this row + break; } - } while(need_get_next_row); - } + } - if (OB_ITER_END == ret) { - if (OB_FAIL(lock_rows_post_proc(need_get_next_row))) { - LOG_WARN("do lock rows post process failed", K(ret)); - } else { - //can not overwrite the original error code + if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { + //DML operator reach iter end, + //now submit the remaining rows in the DAS Write Buffer to the storage + if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { + LOG_WARN("execute all dml das task failed", K(ret)); + } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { + LOG_WARN("close all das task failed", K(ret)); + } + //to post process the DML info after writing all data to the storage + ret = write_rows_post_proc(ret); + } + if (OB_SUCC(ret) && iter_end_) { ret = OB_ITER_END; } } return ret; +} +int ObTableLockOp::submit_row_by_strategy() +{ + int ret = OB_SUCCESS; + if (!MY_SPEC.is_skip_locked()) { + need_return_row_ = true; + if (OB_FAIL(discharge_das_write_buffer())) { + if (OB_TRY_LOCK_ROW_CONFLICT != ret + && OB_TRANSACTION_SET_VIOLATION != ret + && OB_ERR_EXCLUSIVE_LOCK_CONFLICT != ret) { + LOG_WARN("failed to lock row with das", K(ret)); + } else if (MY_SPEC.is_nowait() && OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret) { + ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT; + } + } + } else if (OB_FAIL(lock_one_row_post_proc())) { + LOG_WARN("lock one row post proc failed", K(ret)); + } + return ret; } int ObTableLockOp::inner_get_next_batch(const int64_t max_row_cnt) { int ret = OB_SUCCESS; - const ObTableLockSpec &spec = MY_SPEC; - const ObBatchRows * child_brs = nullptr; - bool need_get_next_batch = false; if (iter_end_) { + LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); brs_.end_ = true; brs_.size_ = 0; - LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); - if (OB_FAIL(lock_rows_post_proc(need_get_next_batch))) { - LOG_WARN("do lock rows post process failed", K(ret)); - } } else { - if (OB_FAIL(get_next_batch_from_child(max_row_cnt, child_brs))) { - // do nothing: log is done in previous call - } else if (OB_FAIL(lock_batch_to_das(child_brs, MY_SPEC.is_skip_locked()))) { - LOG_WARN("lock batch to das failed", K(ret)); - } - - if (OB_SUCC(ret) && child_brs->end_ == true) { - if (!MY_SPEC.is_skip_locked() && - OB_FAIL(lock_rows_post_proc(need_get_next_batch))) { - LOG_WARN("do lock rows post process failed", K(ret)); + need_return_row_ = false; + const ObBatchRows * child_brs = nullptr; + while (OB_SUCC(ret)) { + if (OB_FAIL(try_check_status())) { + LOG_WARN("check status failed", K(ret)); + } else if (OB_FAIL(get_next_batch_from_child(max_row_cnt, child_brs))) { + if (OB_ITER_END == ret) { + iter_end_ = true; + ret = OB_SUCCESS; + break; + } + } else if (child_brs->size_ == 0 && child_brs->end_) { + iter_end_ = true; + brs_.end_ = true; + brs_.size_ = 0; + break; + } else if (OB_FAIL(lock_batch_to_das(child_brs))) { + LOG_WARN("write row to das failed", K(ret)); + } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { + clear_evaluated_flag(); + err_log_rt_def_.curr_err_log_record_num_++; + err_log_rt_def_.reset(); + continue; + } else if (need_return_row_) { + //break to output this batch + break; } - iter_end_ = true; } } + if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { + //DML operator reach iter end, + //now submit the remaining rows in the DAS Write Buffer to the storage + if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { + LOG_WARN("execute all dml das task failed", K(ret)); + } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { + LOG_WARN("close all das task failed", K(ret)); + } + //to post process the DML info after writing all data to the storage + ret = write_rows_post_proc(ret); + } return ret; } // this func only work for for update skip locked -OB_INLINE int ObTableLockOp::lock_one_row_post_proc(bool &need_get_next_row) +OB_INLINE int ObTableLockOp::lock_one_row_post_proc() { int ret = OB_SUCCESS; - need_get_next_row = false; - if (MY_SPEC.is_multi_table_skip_locked_) { - if (OB_FAIL(ObSqlTransControl::create_anonymous_savepoint(ctx_, savepoint_no_))) { - LOG_WARN("fail to get save point", K(ret)); + if (MY_SPEC.is_multi_table_skip_locked_ && + OB_FAIL(ObSqlTransControl::create_anonymous_savepoint(ctx_, savepoint_no_))) { + LOG_WARN("fail to get save point", K(ret)); + } else if (OB_FAIL(submit_all_dml_task())) { + if (OB_TRY_LOCK_ROW_CONFLICT != ret && + OB_TRANSACTION_SET_VIOLATION != ret && + OB_ERR_EXCLUSIVE_LOCK_CONFLICT != ret) { + LOG_WARN("submit all dml task failed", K(ret)); + } else if (MY_SPEC.is_skip_locked()) { + ret = OB_SUCCESS; } - } - - if (OB_FAIL(ret)) { - - } else if (OB_FAIL(lock_rows_post_proc(need_get_next_row))) { - LOG_WARN("execute lock_rows_post_proc failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); } else { - // don't release all memory, need to reuse das ctx - dml_rtctx_.reuse(); + need_return_row_ = true; } // if fail must rollback to save point - if (OB_SUCC(ret) && need_get_next_row && MY_SPEC.is_multi_table_skip_locked_) { + if (OB_SUCC(ret) && !need_return_row_ && MY_SPEC.is_multi_table_skip_locked_) { if (OB_FAIL(ObSqlTransControl::rollback_savepoint(ctx_, savepoint_no_))) { LOG_WARN("fail to rollback save point", K(ret)); } @@ -286,17 +327,15 @@ OB_INLINE int ObTableLockOp::lock_one_row_post_proc(bool &need_get_next_row) return ret; } -OB_INLINE int ObTableLockOp::lock_rows_post_proc(bool &need_get_next_row) +int ObTableLockOp::write_rows_post_proc(int last_errno) { int ret = OB_SUCCESS; - //iterator end, if das ref has task, need flush all task data to partition storage - if (OB_FAIL(submit_all_dml_task())) { + if (OB_FAIL(last_errno)) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret && OB_ERR_EXCLUSIVE_LOCK_CONFLICT != ret) { LOG_WARN("failed to lock row with das", K(ret)); } else if (MY_SPEC.is_skip_locked()) { - need_get_next_row = true; ret = OB_SUCCESS; } else if (MY_SPEC.is_nowait() && OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret) { ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT; @@ -372,11 +411,9 @@ int ObTableLockOp::lock_row_to_das() return ret; } -int ObTableLockOp::lock_batch_to_das(const ObBatchRows *child_brs, - const bool skip_locked) +int ObTableLockOp::lock_batch_to_das(const ObBatchRows *child_brs) { int ret = OB_SUCCESS; - bool lock_conflict = false; // Note: there are three evalctx involved in das lock: // 1. eval_ctx_, @@ -394,19 +431,11 @@ int ObTableLockOp::lock_batch_to_das(const ObBatchRows *child_brs, operator_evalctx_guard.set_batch_idx(i); if (OB_FAIL(lock_row_to_das())) { LOG_WARN("Failed to lock das row", K(i), K(ret)); - } - if (skip_locked) { - if (OB_FAIL(lock_one_row_post_proc(lock_conflict))) { - LOG_WARN("fail to execute lock_one_row_post_proc", K(ret)); - } else { - // NO need to reset lock_conflict inside loop as it is reset within - // routine "lock_one_row_post_proc" - if (lock_conflict) { - brs_.skip_->set(i); - } - LOG_DEBUG("lock_batch_to_das", K(lock_conflict), K(i), - K(brs_)); - } + } else if (OB_FAIL(submit_row_by_strategy())) { + LOG_WARN("submit row by strategy failed", K(ret)); + } else if (MY_SPEC.is_skip_locked() && !need_return_row_) { + //lock conflict, skip it + brs_.skip_->set(i); } } clear_evaluated_flag(); @@ -414,20 +443,6 @@ int ObTableLockOp::lock_batch_to_das(const ObBatchRows *child_brs, return ret; } -OB_INLINE int ObTableLockOp::get_next_row_from_child() -{ - int ret = OB_SUCCESS; - clear_evaluated_flag(); - if (OB_FAIL(child_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); - } - return ret; -} - OB_INLINE int ObTableLockOp::get_next_batch_from_child(const int64_t max_row_cnt, const ObBatchRows *&child_brs) { diff --git a/src/sql/engine/dml/ob_table_lock_op.h b/src/sql/engine/dml/ob_table_lock_op.h index 9d828ee43a..2f9741fad1 100644 --- a/src/sql/engine/dml/ob_table_lock_op.h +++ b/src/sql/engine/dml/ob_table_lock_op.h @@ -113,25 +113,21 @@ public: int init_lock_rtdef(); protected: - int get_curr_part_key(); - int lock_single_part(); - int lock_multi_part(); - int lock_multi_part_skip_locked(); - int prepare_lock_row(); - int get_next_row_from_child(); OB_INLINE int get_next_batch_from_child(const int64_t max_row_cnt, const ObBatchRows *&child_brs); int lock_row_to_das(); - int lock_batch_to_das(const ObBatchRows *child_brs, const bool skip_locked); + int lock_batch_to_das(const ObBatchRows *child_brs); int calc_tablet_loc(const ObLockCtDef &lock_ctdef, ObLockRtDef &lock_rtdef, ObDASTabletLoc *&tablet_loc); - int lock_one_row_post_proc(bool &need_get_next_row); - int lock_rows_post_proc(bool &need_get_next_row); + int lock_one_row_post_proc(); + virtual int write_rows_post_proc(int last_errno) override; + int submit_row_by_strategy(); protected: int64_t savepoint_no_; LockRtDef2DArray lock_rtdefs_; + bool need_return_row_; }; } // end of sql diff --git a/src/sql/engine/dml/ob_table_merge_op.cpp b/src/sql/engine/dml/ob_table_merge_op.cpp index bb1d3c3364..ea42960b9c 100644 --- a/src/sql/engine/dml/ob_table_merge_op.cpp +++ b/src/sql/engine/dml/ob_table_merge_op.cpp @@ -355,40 +355,7 @@ int ObTableMergeOp::inner_close() return (OB_SUCCESS == ret) ? close_ret : ret; } -int ObTableMergeOp::inner_get_next_row() -{ - int ret = OB_SUCCESS; - if (iter_end_) { - LOG_WARN("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); - ret = OB_ITER_END; - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(merge_row_to_das())) { - LOG_WARN("merge row to das failed", K(ret)); - } - } - - if (OB_ITER_END == ret) { - if (OB_FAIL(merge_rows_post_proc())) { - LOG_WARN("do insert rows post process failed", K(ret)); - } else { - ret = OB_ITER_END; - } - } - } - - return ret; -} - -int ObTableMergeOp::merge_row_to_das() +int ObTableMergeOp::write_row_to_das_buffer() { int ret = OB_SUCCESS; bool is_match = false; @@ -408,26 +375,18 @@ int ObTableMergeOp::merge_row_to_das() return ret; } -int ObTableMergeOp::merge_rows_post_proc() +int ObTableMergeOp::write_rows_post_proc(int last_errno) { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); //iterator end, if das ref has task, need flush all task data to partition storage - if (OB_ISNULL(plan_ctx)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("plan_ctx is null", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { - LOG_WARN("pick delete das task to first failed", K(ret)); - } else if (OB_FAIL(submit_all_dml_task())) { - LOG_WARN("submit all dml task failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); - } else { - dml_rtctx_.reuse(); - } - - if (OB_SUCC(ret)) { - plan_ctx->add_affected_rows(affected_rows_); + if (OB_SUCC(last_errno) && iter_end_) { + if (OB_ISNULL(plan_ctx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("plan_ctx is null", K(ret)); + } else { + plan_ctx->add_affected_rows(affected_rows_); + } } return ret; } @@ -623,20 +582,6 @@ int ObTableMergeOp::do_insert() return ret; } -OB_INLINE int ObTableMergeOp::get_next_row_from_child() -{ - int ret = OB_SUCCESS; - clear_evaluated_flag(); - if (OB_FAIL(child_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); - } - return ret; -} - int ObTableMergeOp::calc_update_tablet_loc(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, ObDASTabletLoc *&old_tablet_loc, diff --git a/src/sql/engine/dml/ob_table_merge_op.h b/src/sql/engine/dml/ob_table_merge_op.h index a097dacb72..adb00994e7 100644 --- a/src/sql/engine/dml/ob_table_merge_op.h +++ b/src/sql/engine/dml/ob_table_merge_op.h @@ -91,11 +91,6 @@ public: void inc_affected_rows() { } void inc_found_rows() { } void inc_changed_rows() { } - - virtual int inner_get_next_row(); - int get_next_row_from_child(); - int merge_row_to_das(); - int merge_rows_post_proc(); int do_update(); int update_row_das(); int delete_row_das(); @@ -119,6 +114,8 @@ protected: int inner_open_with_das(); int open_table_for_each(); int close_table_for_each(); + virtual int write_row_to_das_buffer() override; + virtual int write_rows_post_proc(int last_errno) override; protected: int64_t affected_rows_; diff --git a/src/sql/engine/dml/ob_table_modify_op.cpp b/src/sql/engine/dml/ob_table_modify_op.cpp index 005f99b93d..efc4d74097 100644 --- a/src/sql/engine/dml/ob_table_modify_op.cpp +++ b/src/sql/engine/dml/ob_table_modify_op.cpp @@ -1047,5 +1047,85 @@ int ObTableModifyOp::submit_all_dml_task() return ret; } +//The data to be written by DML will be buffered in the DAS Write Buffer +//When the buffer data exceeds 6M, +//needs to be written to the storage to release the memory. +int ObTableModifyOp::discharge_das_write_buffer() +{ + int ret = OB_SUCCESS; + if (dml_rtctx_.das_ref_.get_das_alloc().used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE) { + LOG_INFO("DASWriteBuffer full, now to write storage", + "buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used()); + ret = submit_all_dml_task(); + } + return ret; +} + +int ObTableModifyOp::get_next_row_from_child() +{ + int ret = OB_SUCCESS; + clear_evaluated_flag(); + if (OB_FAIL(child_->get_next_row())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next row", K(ret)); + } + } else { + LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); + } + return ret; +} + +int ObTableModifyOp::inner_get_next_row() +{ + int ret = OB_SUCCESS; + if (iter_end_) { + LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); + ret = OB_ITER_END; + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(try_check_status())) { + LOG_WARN("check status failed", K(ret)); + } else if (OB_FAIL(get_next_row_from_child())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next row", K(ret)); + } else { + iter_end_ = true; + ret = OB_SUCCESS; + break; + } + } else if (OB_FAIL(write_row_to_das_buffer())) { + LOG_WARN("write row to das failed", K(ret)); + } else if (OB_FAIL(discharge_das_write_buffer())) { + LOG_WARN("discharge das write buffer failed", K(ret)); + } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { + clear_evaluated_flag(); + err_log_rt_def_.curr_err_log_record_num_++; + err_log_rt_def_.reset(); + continue; + } else if (MY_SPEC.is_returning_) { + break; + } + } + + if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { + //DML operator reach iter end, + //now submit the remaining rows in the DAS Write Buffer to the storage + if (dml_rtctx_.need_pick_del_task_first() && + OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { + LOG_WARN("pick delete das task to first failed", K(ret)); + } else if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { + LOG_WARN("execute all dml das task failed", K(ret)); + } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { + LOG_WARN("close all das task failed", K(ret)); + } + } + //to post process the DML info after writing all data to the storage or returning one row + ret = write_rows_post_proc(ret); + if (OB_SUCC(ret) && iter_end_) { + ret = OB_ITER_END; + } + } + return ret; +} } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/dml/ob_table_modify_op.h b/src/sql/engine/dml/ob_table_modify_op.h index 7986b8fda7..fc0514d16d 100644 --- a/src/sql/engine/dml/ob_table_modify_op.h +++ b/src/sql/engine/dml/ob_table_modify_op.h @@ -205,6 +205,7 @@ public: void clear_dml_evaluated_flag(); void clear_dml_evaluated_flag(int64_t parent_cnt, ObExpr **parent_exprs); void clear_dml_evaluated_flag(ObExpr *clear_expr); + int submit_all_dml_task(); protected: OperatorOpenOrder get_operator_open_order() const; virtual int inner_open(); @@ -217,14 +218,24 @@ protected: int calc_single_table_loc(); virtual int inner_rescan() override; + virtual int inner_get_next_row() override; + int get_next_row_from_child(); + //Override this interface to complete the write semantics of the DML operator, + //and write a row to the DAS Write Buffer according to the specific DML behavior + virtual int write_row_to_das_buffer() { return common::OB_NOT_IMPLEMENT; } + //Override this interface to post process the DML info after + //writing all data to the storage or returning one row + //such as: set affected_rows to query context, rewrite some error code + virtual int write_rows_post_proc(int last_errno) + { UNUSED(last_errno); return common::OB_NOT_IMPLEMENT; } - int submit_all_dml_task(); int init_das_dml_ctx(); //to merge array binding cusor info when array binding is executed in batch mode int merge_implict_cursor(int64_t insert_rows, int64_t update_rows, int64_t delete_rows, int64_t found_rows); + int discharge_das_write_buffer(); public: common::ObMySQLProxy *sql_proxy_; observer::ObInnerSQLConnection *inner_conn_; diff --git a/src/sql/engine/dml/ob_table_update_op.cpp b/src/sql/engine/dml/ob_table_update_op.cpp index 272684449d..5835e3aceb 100644 --- a/src/sql/engine/dml/ob_table_update_op.cpp +++ b/src/sql/engine/dml/ob_table_update_op.cpp @@ -170,42 +170,10 @@ int ObTableUpdateOp::inner_switch_iterator() return ret; } -int ObTableUpdateOp::inner_get_next_row() +int ObTableUpdateOp::write_row_to_das_buffer() { int ret = OB_SUCCESS; - if (iter_end_) { - LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_)); - ret = OB_ITER_END; - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(try_check_status())) { - LOG_WARN("check status failed", K(ret)); - } else if (OB_FAIL(get_next_row_from_child())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } else { - iter_end_ = true; - } - } else if (OB_FAIL(update_row_to_das())) { - LOG_WARN("update row to das failed", K(ret)); - } else if (is_error_logging_ && err_log_rt_def_.first_err_ret_ != OB_SUCCESS) { - clear_evaluated_flag(); - err_log_rt_def_.curr_err_log_record_num_++; - err_log_rt_def_.reset(); - continue; - } else if (MY_SPEC.is_returning_) { - break; - } - } - if (OB_ITER_END == ret) { - if (!MY_SPEC.upd_ctdefs_.at(0).at(0)->has_instead_of_trigger_ && OB_FAIL(upd_rows_post_proc())) { - LOG_WARN("do update rows post process failed", K(ret)); - } else { - //can not overwrite the original error code - ret = OB_ITER_END; - } - } - } + ret = update_row_to_das(); return ret; } @@ -451,60 +419,40 @@ int ObTableUpdateOp::check_update_affected_row() return ret; } -OB_INLINE int ObTableUpdateOp::upd_rows_post_proc() +int ObTableUpdateOp::write_rows_post_proc(int last_errno) { - int ret = OB_SUCCESS; - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); - ObSQLSessionInfo *session = GET_MY_SESSION(ctx_); - int64_t found_rows = 0; - int64_t changed_rows = 0; - //iterator end, if das ref has task, need flush all task data to partition storage - if (dml_rtctx_.das_ref_.has_task()) { - if (OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { - LOG_WARN("pick delete das task to first failed", K(ret)); - } else if (OB_FAIL(submit_all_dml_task())) { - LOG_WARN("execute all update das task failed", K(ret)); + int ret = last_errno; + if (iter_end_) { + ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); + ObSQLSessionInfo *session = GET_MY_SESSION(ctx_); + int64_t found_rows = 0; + int64_t changed_rows = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < upd_rtdefs_.count(); ++i) { + ObUpdRtDef &upd_rtdef = upd_rtdefs_.at(i).at(0); + found_rows += upd_rtdef.found_rows_; + changed_rows += upd_rtdef.dupd_rtdef_.affected_rows_; + if (upd_rtdef.ddel_rtdef_ != nullptr) { + //update rows across partitions, need to add das delete op's affected rows + changed_rows += upd_rtdef.ddel_rtdef_->affected_rows_; + //insert new row to das after old row has been deleted in storage + //reference to: https://work.aone.alibaba-inc.com/issue/31915604 + } + LOG_DEBUG("update rows post proc", K(ret), K(found_rows), K(changed_rows), K(upd_rtdef)); } - } - for (int64_t i = 0; OB_SUCC(ret) && i < upd_rtdefs_.count(); ++i) { - ObUpdRtDef &upd_rtdef = upd_rtdefs_.at(i).at(0); - found_rows += upd_rtdef.found_rows_; - changed_rows += upd_rtdef.dupd_rtdef_.affected_rows_; - if (upd_rtdef.ddel_rtdef_ != nullptr) { - //update rows across partitions, need to add das delete op's affected rows - changed_rows += upd_rtdef.ddel_rtdef_->affected_rows_; - //insert new row to das after old row has been deleted in storage - //reference to: https://work.aone.alibaba-inc.com/issue/31915604 + if (OB_SUCC(ret)) { + plan_ctx->add_row_matched_count(found_rows); + plan_ctx->add_row_duplicated_count(changed_rows); + plan_ctx->add_affected_rows(session->get_capability().cap_flags_.OB_CLIENT_FOUND_ROWS ? + found_rows : changed_rows); } - LOG_DEBUG("update rows post proc", K(ret), K(found_rows), K(changed_rows), K(upd_rtdef)); - } - if (OB_SUCC(ret)) { - plan_ctx->add_row_matched_count(found_rows); - plan_ctx->add_row_duplicated_count(changed_rows); - plan_ctx->add_affected_rows(session->get_capability().cap_flags_.OB_CLIENT_FOUND_ROWS ? - found_rows : changed_rows); - } - if (OB_SUCC(ret) && GCONF.enable_defensive_check()) { - if (OB_FAIL(check_update_affected_row())) { - LOG_WARN("check index upd consistency failed", K(ret)); + if (OB_SUCC(ret) && GCONF.enable_defensive_check()) { + if (OB_FAIL(check_update_affected_row())) { + LOG_WARN("check index upd consistency failed", K(ret)); + } } } return ret; } - -OB_INLINE int ObTableUpdateOp::get_next_row_from_child() -{ - int ret = OB_SUCCESS; - clear_evaluated_flag(); - if (OB_FAIL(child_->get_next_row())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - LOG_TRACE("child output row", "row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); - } - return ret; -} } // end namespace sql } // end namespace oceanbase diff --git a/src/sql/engine/dml/ob_table_update_op.h b/src/sql/engine/dml/ob_table_update_op.h index c829757b46..06e94db6ac 100644 --- a/src/sql/engine/dml/ob_table_update_op.h +++ b/src/sql/engine/dml/ob_table_update_op.h @@ -90,7 +90,6 @@ public: int inner_open() override; int inner_rescan() override; int inner_switch_iterator() override; - int inner_get_next_row() override; int inner_close() override; protected: int inner_open_with_das(); @@ -106,8 +105,9 @@ protected: bool check_exist = false); int open_table_for_each(); int close_table_for_each(); - int get_next_row_from_child(); int check_update_affected_row(); + virtual int write_row_to_das_buffer() override; + virtual int write_rows_post_proc(int last_errno) override; protected: UpdRtDef2DArray upd_rtdefs_; //see the comment of UpdCtDef2DArray common::ObArrayWrap ins_rtdefs_; diff --git a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp index 1e1aa2417d..923fe8cf90 100644 --- a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp +++ b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp @@ -19,9 +19,9 @@ #include "sql/session/ob_sql_session_info.h" #include "sql/engine/ob_exec_context.h" -using namespace oceanbase::common; - -namespace oceanbase +using namespace oceanbase::common; + +namespace oceanbase { namespace sql { @@ -114,9 +114,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr, ObEvalCtx &ctx, ObIAllocator &allocator, ObExprOutFileInfo &out_info) -{ - int ret = OB_SUCCESS; - ObObj objs_array[PARAM_SELECT_ITEM]; +{ + int ret = OB_SUCCESS; + ObObj objs_array[PARAM_SELECT_ITEM]; ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session(); if (OB_ISNULL(session)) { ret = OB_ERR_UNEXPECTED; @@ -136,9 +136,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr, } if (OB_SUCC(ret)) { out_info.field_ = objs_array[PARAM_FIELD]; - out_info.line_ = objs_array[PARAM_LINE]; - out_info.enclose_ = objs_array[PARAM_ENCLOSED]; - out_info.escape_ = objs_array[PARAM_ESCAPED]; + out_info.line_ = objs_array[PARAM_LINE]; + out_info.enclose_ = objs_array[PARAM_ENCLOSED]; + out_info.escape_ = objs_array[PARAM_ESCAPED]; out_info.print_params_.cs_type_ = static_cast(objs_array[PARAM_CHARSET].get_int()); } diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index dfb916c939..e5fd39f68e 100644 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -714,6 +714,7 @@ void ObLSTabletService::report_tablet_to_rs( int ObLSTabletService::table_scan(ObTableScanIterator &iter, ObTableScanParam ¶m, ObNewRowIterator *&result) { int ret = OB_SUCCESS; + NG_TRACE(S_table_scan_begin); ObTabletHandle data_tablet; AllowToReadMgr::AllowToReadInfo read_info; @@ -742,12 +743,14 @@ int ObLSTabletService::table_scan(ObTableScanIterator &iter, ObTableScanParam &p LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } } + NG_TRACE(S_table_scan_end); return ret; } int ObLSTabletService::table_rescan(ObTableScanParam ¶m, ObNewRowIterator *&result) { int ret = OB_SUCCESS; + NG_TRACE(S_table_rescan_begin); ObTabletHandle data_tablet; AllowToReadMgr::AllowToReadInfo read_info; @@ -780,6 +783,7 @@ int ObLSTabletService::table_rescan(ObTableScanParam ¶m, ObNewRowIterator *& LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } } + NG_TRACE(S_table_rescan_end); return ret; } @@ -2050,6 +2054,7 @@ int ObLSTabletService::insert_rows( int64_t &affected_rows) { int ret = OB_SUCCESS; + NG_TRACE(S_insert_rows_begin); ObTabletHandle tablet_handle; int64_t afct_num = 0; int64_t dup_num = 0; @@ -2134,6 +2139,7 @@ int ObLSTabletService::insert_rows( EVENT_ADD(STORAGE_INSERT_ROW_COUNT, afct_num); } } + NG_TRACE(S_insert_rows_end); return ret; } @@ -2233,6 +2239,7 @@ int ObLSTabletService::update_rows( int64_t &affected_rows) { int ret = OB_SUCCESS; + NG_TRACE(S_update_rows_begin); const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; int64_t afct_num = 0; @@ -2393,6 +2400,7 @@ int ObLSTabletService::update_rows( LOG_WARN("update rows use too much time", K(afct_num), K(got_row_count)); } } + NG_TRACE(S_update_rows_end); return ret; } @@ -2477,6 +2485,7 @@ int ObLSTabletService::delete_rows( int64_t &affected_rows) { int ret = OB_SUCCESS; + NG_TRACE(S_delete_rows_begin); const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; ObRowReshape *row_reshape = nullptr; @@ -2530,6 +2539,7 @@ int ObLSTabletService::delete_rows( EVENT_ADD(STORAGE_DELETE_ROW_COUNT, afct_num); } } + NG_TRACE(S_delete_rows_end); return ret; } @@ -2544,6 +2554,7 @@ int ObLSTabletService::lock_rows( int64_t &affected_rows) { UNUSEDx(lock_flag, is_sfu); + NG_TRACE(S_lock_rows_begin); int ret = OB_SUCCESS; const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); @@ -2613,6 +2624,7 @@ int ObLSTabletService::lock_rows( } } } + NG_TRACE(S_lock_rows_end); return ret; } diff --git a/src/storage/tx_storage/ob_access_service.cpp b/src/storage/tx_storage/ob_access_service.cpp index 87072c9fd7..a5ca345a3d 100644 --- a/src/storage/tx_storage/ob_access_service.cpp +++ b/src/storage/tx_storage/ob_access_service.cpp @@ -1040,6 +1040,7 @@ int ObAccessService::reuse_scan_iter(const bool switch_param, ObNewRowIterator * int ObAccessService::revert_scan_iter(ObNewRowIterator *iter) { int ret = OB_SUCCESS; + NG_TRACE(S_revert_iter_begin); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("access service is not initiated", K(ret)); @@ -1051,7 +1052,7 @@ int ObAccessService::revert_scan_iter(ObNewRowIterator *iter) iter->~ObNewRowIterator(); } iter = nullptr; - NG_TRACE(revert_scan_iter); + NG_TRACE(S_revert_iter_end); return ret; }