diff --git a/src/sql/das/ob_das_define.h b/src/sql/das/ob_das_define.h index de5730e16..f0864f328 100644 --- a/src/sql/das/ob_das_define.h +++ b/src/sql/das/ob_das_define.h @@ -57,7 +57,8 @@ const int64_t OB_DAS_MAX_PACKET_SIZE = 2 * 1024 * 1024l - 8 * 1024; * it can be considered that the DAS request will send at most one RPC request to each zone. * so OB_DAS_MAX_TOTAL_PACKET_SIZE was defined as: */ -const int64_t OB_DAS_MAX_TOTAL_PACKET_SIZE = 3 * OB_DAS_MAX_PACKET_SIZE; +const int64_t OB_DAS_MAX_TOTAL_PACKET_SIZE = 2 * OB_DAS_MAX_PACKET_SIZE; +const int64_t OB_DAS_MAX_META_TENANT_PACKET_SIZE = 1 * 1024 * 1024l - 8 * 1024; } // namespace das enum class ObDasTaskStatus: uint8_t diff --git a/src/sql/engine/dml/ob_table_delete_op.cpp b/src/sql/engine/dml/ob_table_delete_op.cpp index 952602e97..6630ccbc3 100644 --- a/src/sql/engine/dml/ob_table_delete_op.cpp +++ b/src/sql/engine/dml/ob_table_delete_op.cpp @@ -137,17 +137,20 @@ OB_INLINE int ObTableDeleteOp::inner_open_with_das() int ObTableDeleteOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.del_ctdefs_.count() && !execute_single_row_; ++i) { - const ObTableDeleteSpec::DelCtDefArray &ctdefs = MY_SPEC.del_ctdefs_.at(i); - const ObDelCtDef &del_ctdef = *ctdefs.at(0); - if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) { - execute_single_row_ = true; - } - const ObForeignKeyArgArray &fk_args = del_ctdef.fk_args_; - for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { - if (fk_args.at(j).is_self_ref_ && fk_args.at(j).ref_action_ == ACTION_CASCADE) { + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.del_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableDeleteSpec::DelCtDefArray &ctdefs = MY_SPEC.del_ctdefs_.at(i); + const ObDelCtDef &del_ctdef = *ctdefs.at(0); + if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) { execute_single_row_ = true; } + const ObForeignKeyArgArray &fk_args = del_ctdef.fk_args_; + for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { + if (fk_args.at(j).is_self_ref_ && fk_args.at(j).ref_action_ == ACTION_CASCADE) { + execute_single_row_ = true; + } + } } } return ret; diff --git a/src/sql/engine/dml/ob_table_insert_all_op.cpp b/src/sql/engine/dml/ob_table_insert_all_op.cpp index c17c0eeb7..646039c95 100644 --- a/src/sql/engine/dml/ob_table_insert_all_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_all_op.cpp @@ -104,21 +104,24 @@ int ObTableInsertAllOp::switch_iterator(ObExecContext &ctx) int ObTableInsertAllOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) { - const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i); - const ObInsCtDef &ins_ctdef = *(ctdefs.at(0)); - const uint64_t table_id = ins_ctdef.das_base_ctdef_.index_tid_; - const ObForeignKeyArgArray &fk_args = ins_ctdef.fk_args_; - if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { - execute_single_row_ = true; - } - for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { - const ObForeignKeyArg &fk_arg = fk_args.at(j); - const uint64_t parent_table_id = fk_arg.table_id_; - for (int k = 0; k < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++k) { - const uint64_t tmp_table_id = MY_SPEC.ins_ctdefs_.at(k).at(0)->das_base_ctdef_.index_tid_; - if (parent_table_id == tmp_table_id && k != i) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i); + const ObInsCtDef &ins_ctdef = *(ctdefs.at(0)); + const uint64_t table_id = ins_ctdef.das_base_ctdef_.index_tid_; + const ObForeignKeyArgArray &fk_args = ins_ctdef.fk_args_; + if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { + execute_single_row_ = true; + } + for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { + const ObForeignKeyArg &fk_arg = fk_args.at(j); + const uint64_t parent_table_id = fk_arg.table_id_; + for (int k = 0; k < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++k) { + const uint64_t tmp_table_id = MY_SPEC.ins_ctdefs_.at(k).at(0)->das_base_ctdef_.index_tid_; + if (parent_table_id == tmp_table_id && k != i) { + execute_single_row_ = true; + } } } } diff --git a/src/sql/engine/dml/ob_table_insert_op.cpp b/src/sql/engine/dml/ob_table_insert_op.cpp index 95e640ade..366ca7914 100644 --- a/src/sql/engine/dml/ob_table_insert_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_op.cpp @@ -111,11 +111,14 @@ OB_DEF_SERIALIZE_SIZE(ObTableInsertSpec) int ObTableInsertOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) { - const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i); - const ObInsCtDef &ins_ctdef = *ctdefs.at(0); - if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i); + const ObInsCtDef &ins_ctdef = *ctdefs.at(0); + if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { + execute_single_row_ = true; + } } } return ret; diff --git a/src/sql/engine/dml/ob_table_insert_up_op.cpp b/src/sql/engine/dml/ob_table_insert_up_op.cpp index e3141bebd..dd5bc1825 100644 --- a/src/sql/engine/dml/ob_table_insert_up_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_up_op.cpp @@ -91,17 +91,20 @@ OB_DEF_SERIALIZE_SIZE(ObTableInsertUpSpec) int ObTableInsertUpOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - ObInsertUpCtDef *insert_up_ctdef = MY_SPEC.insert_up_ctdefs_.at(0); - const ObInsCtDef *ins_ctdef = insert_up_ctdef->ins_ctdef_; - const ObUpdCtDef *upd_ctdef = insert_up_ctdef->upd_ctdef_; - if (OB_NOT_NULL(ins_ctdef) && OB_NOT_NULL(upd_ctdef)) { - if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) || - has_before_row_trigger(*upd_ctdef) || has_after_row_trigger(*upd_ctdef)) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + ObInsertUpCtDef *insert_up_ctdef = MY_SPEC.insert_up_ctdefs_.at(0); + const ObInsCtDef *ins_ctdef = insert_up_ctdef->ins_ctdef_; + const ObUpdCtDef *upd_ctdef = insert_up_ctdef->upd_ctdef_; + if (OB_NOT_NULL(ins_ctdef) && OB_NOT_NULL(upd_ctdef)) { + if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) || + has_before_row_trigger(*upd_ctdef) || has_after_row_trigger(*upd_ctdef)) { + execute_single_row_ = true; + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ins_ctdef or upd_ctdef of primary table is nullptr", K(ret)); } - } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ins_ctdef or upd_ctdef of primary table is nullptr", K(ret)); } return ret; } diff --git a/src/sql/engine/dml/ob_table_merge_op.cpp b/src/sql/engine/dml/ob_table_merge_op.cpp index a88152392..b87a92471 100644 --- a/src/sql/engine/dml/ob_table_merge_op.cpp +++ b/src/sql/engine/dml/ob_table_merge_op.cpp @@ -156,25 +156,28 @@ ObTableMergeOp::ObTableMergeOp(ObExecContext &ctx, const ObOpSpec &spec, ObOpInp int ObTableMergeOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - ObMergeCtDef *merge_ctdef = MY_SPEC.merge_ctdefs_.at(0); - if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->ins_ctdef_)) { - const ObInsCtDef &ins_ctdef = *merge_ctdef->ins_ctdef_; - if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + ObMergeCtDef *merge_ctdef = MY_SPEC.merge_ctdefs_.at(0); + if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->ins_ctdef_)) { + const ObInsCtDef &ins_ctdef = *merge_ctdef->ins_ctdef_; + if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) { + execute_single_row_ = true; + } } - } - if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->upd_ctdef_)) { - const ObUpdCtDef &upd_ctdef = *merge_ctdef->upd_ctdef_; - if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) { - execute_single_row_ = true; + if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->upd_ctdef_)) { + const ObUpdCtDef &upd_ctdef = *merge_ctdef->upd_ctdef_; + if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) { + execute_single_row_ = true; + } } - } - if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->del_ctdef_)) { - const ObDelCtDef &del_ctdef = *merge_ctdef->del_ctdef_; - if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) { - execute_single_row_ = true; + if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->del_ctdef_)) { + const ObDelCtDef &del_ctdef = *merge_ctdef->del_ctdef_; + if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) { + execute_single_row_ = true; + } } } return ret; diff --git a/src/sql/engine/dml/ob_table_modify_op.cpp b/src/sql/engine/dml/ob_table_modify_op.cpp index 8caf6c048..6908e2ca4 100644 --- a/src/sql/engine/dml/ob_table_modify_op.cpp +++ b/src/sql/engine/dml/ob_table_modify_op.cpp @@ -758,7 +758,9 @@ int ObTableModifyOp::inner_rescan() int ObTableModifyOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - execute_single_row_ = false; + if (MY_SPEC.is_returning_ && need_foreign_key_checks()) { + execute_single_row_ = true; + } return ret; } @@ -1099,9 +1101,16 @@ int ObTableModifyOp::submit_all_dml_task() int ObTableModifyOp::discharge_das_write_buffer() { int ret = OB_SUCCESS; - if (dml_rtctx_.get_cached_row_size() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) { - LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage", - "buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_), K(dml_rtctx_.get_cached_row_size())); + int64_t simulate_buffer_size = - EVENT_CALL(EventTable::EN_DAS_DML_BUFFER_OVERFLOW); + int64_t buffer_size_limit = is_meta_tenant(tenant_id_) ? das::OB_DAS_MAX_META_TENANT_PACKET_SIZE : das::OB_DAS_MAX_TOTAL_PACKET_SIZE; + if (OB_UNLIKELY(simulate_buffer_size > 0)) { + buffer_size_limit = simulate_buffer_size; + } + if (dml_rtctx_.get_cached_row_size() >= buffer_size_limit) { + LOG_INFO("DASWriteBuffer full, now to write storage", + "buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(dml_rtctx_.get_cached_row_size())); + ret = submit_all_dml_task(); + } else if (execute_single_row_) { ret = submit_all_dml_task(); } return ret; diff --git a/src/sql/engine/dml/ob_table_replace_op.cpp b/src/sql/engine/dml/ob_table_replace_op.cpp index 0c7916c63..8a617c91d 100644 --- a/src/sql/engine/dml/ob_table_replace_op.cpp +++ b/src/sql/engine/dml/ob_table_replace_op.cpp @@ -97,17 +97,20 @@ OB_DEF_SERIALIZE_SIZE(ObTableReplaceSpec) int ObTableReplaceOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - ObReplaceCtDef *replace_ctdef = MY_SPEC.replace_ctdefs_.at(0); - const ObInsCtDef *ins_ctdef = replace_ctdef->ins_ctdef_; - const ObDelCtDef *del_ctdef = replace_ctdef->del_ctdef_; - if (OB_NOT_NULL(ins_ctdef) || OB_NOT_NULL(del_ctdef)) { - if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) - || has_before_row_trigger(*del_ctdef) || has_after_row_trigger(*del_ctdef)) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + ObReplaceCtDef *replace_ctdef = MY_SPEC.replace_ctdefs_.at(0); + const ObInsCtDef *ins_ctdef = replace_ctdef->ins_ctdef_; + const ObDelCtDef *del_ctdef = replace_ctdef->del_ctdef_; + if (OB_NOT_NULL(ins_ctdef) || OB_NOT_NULL(del_ctdef)) { + if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) + || has_before_row_trigger(*del_ctdef) || has_after_row_trigger(*del_ctdef)) { + execute_single_row_ = true; + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ins_ctdef or del_ctdef of primary table is nullptr", K(ret)); } - } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ins_ctdef or del_ctdef of primary table is nullptr", K(ret)); } return ret; } diff --git a/src/sql/engine/dml/ob_table_update_op.cpp b/src/sql/engine/dml/ob_table_update_op.cpp index d4323a768..a700d46a6 100644 --- a/src/sql/engine/dml/ob_table_update_op.cpp +++ b/src/sql/engine/dml/ob_table_update_op.cpp @@ -116,11 +116,14 @@ ObTableUpdateOp::ObTableUpdateOp(ObExecContext &exec_ctx, const ObOpSpec &spec, int ObTableUpdateOp::check_need_exec_single_row() { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.upd_ctdefs_.count() && !execute_single_row_; ++i) { - const ObTableUpdateSpec::UpdCtDefArray &ctdefs = MY_SPEC.upd_ctdefs_.at(i); - const ObUpdCtDef &upd_ctdef = *ctdefs.at(0); - if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) { - execute_single_row_ = true; + ret = ObTableModifyOp::check_need_exec_single_row(); + if (OB_SUCC(ret) && !execute_single_row_) { + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.upd_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableUpdateSpec::UpdCtDefArray &ctdefs = MY_SPEC.upd_ctdefs_.at(i); + const ObUpdCtDef &upd_ctdef = *ctdefs.at(0); + if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) { + execute_single_row_ = true; + } } } return ret;