fix: fix the core when has returning expr induced by dml processing order refactor
This commit is contained in:
parent
8dd6b41ead
commit
c95a7ac62c
@ -57,7 +57,8 @@ const int64_t OB_DAS_MAX_PACKET_SIZE = 2 * 1024 * 1024l - 8 * 1024;
|
||||
* it can be considered that the DAS request will send at most one RPC request to each zone.
|
||||
* so OB_DAS_MAX_TOTAL_PACKET_SIZE was defined as:
|
||||
*/
|
||||
const int64_t OB_DAS_MAX_TOTAL_PACKET_SIZE = 3 * OB_DAS_MAX_PACKET_SIZE;
|
||||
const int64_t OB_DAS_MAX_TOTAL_PACKET_SIZE = 2 * OB_DAS_MAX_PACKET_SIZE;
|
||||
const int64_t OB_DAS_MAX_META_TENANT_PACKET_SIZE = 1 * 1024 * 1024l - 8 * 1024;
|
||||
} // namespace das
|
||||
|
||||
enum class ObDasTaskStatus: uint8_t
|
||||
|
@ -137,17 +137,20 @@ OB_INLINE int ObTableDeleteOp::inner_open_with_das()
|
||||
int ObTableDeleteOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.del_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableDeleteSpec::DelCtDefArray &ctdefs = MY_SPEC.del_ctdefs_.at(i);
|
||||
const ObDelCtDef &del_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
const ObForeignKeyArgArray &fk_args = del_ctdef.fk_args_;
|
||||
for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) {
|
||||
if (fk_args.at(j).is_self_ref_ && fk_args.at(j).ref_action_ == ACTION_CASCADE) {
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.del_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableDeleteSpec::DelCtDefArray &ctdefs = MY_SPEC.del_ctdefs_.at(i);
|
||||
const ObDelCtDef &del_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
const ObForeignKeyArgArray &fk_args = del_ctdef.fk_args_;
|
||||
for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) {
|
||||
if (fk_args.at(j).is_self_ref_ && fk_args.at(j).ref_action_ == ACTION_CASCADE) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -104,21 +104,24 @@ int ObTableInsertAllOp::switch_iterator(ObExecContext &ctx)
|
||||
int ObTableInsertAllOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i);
|
||||
const ObInsCtDef &ins_ctdef = *(ctdefs.at(0));
|
||||
const uint64_t table_id = ins_ctdef.das_base_ctdef_.index_tid_;
|
||||
const ObForeignKeyArgArray &fk_args = ins_ctdef.fk_args_;
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) {
|
||||
const ObForeignKeyArg &fk_arg = fk_args.at(j);
|
||||
const uint64_t parent_table_id = fk_arg.table_id_;
|
||||
for (int k = 0; k < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++k) {
|
||||
const uint64_t tmp_table_id = MY_SPEC.ins_ctdefs_.at(k).at(0)->das_base_ctdef_.index_tid_;
|
||||
if (parent_table_id == tmp_table_id && k != i) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i);
|
||||
const ObInsCtDef &ins_ctdef = *(ctdefs.at(0));
|
||||
const uint64_t table_id = ins_ctdef.das_base_ctdef_.index_tid_;
|
||||
const ObForeignKeyArgArray &fk_args = ins_ctdef.fk_args_;
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) {
|
||||
const ObForeignKeyArg &fk_arg = fk_args.at(j);
|
||||
const uint64_t parent_table_id = fk_arg.table_id_;
|
||||
for (int k = 0; k < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++k) {
|
||||
const uint64_t tmp_table_id = MY_SPEC.ins_ctdefs_.at(k).at(0)->das_base_ctdef_.index_tid_;
|
||||
if (parent_table_id == tmp_table_id && k != i) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -111,11 +111,14 @@ OB_DEF_SERIALIZE_SIZE(ObTableInsertSpec)
|
||||
int ObTableInsertOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i);
|
||||
const ObInsCtDef &ins_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i);
|
||||
const ObInsCtDef &ins_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -91,17 +91,20 @@ OB_DEF_SERIALIZE_SIZE(ObTableInsertUpSpec)
|
||||
int ObTableInsertUpOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObInsertUpCtDef *insert_up_ctdef = MY_SPEC.insert_up_ctdefs_.at(0);
|
||||
const ObInsCtDef *ins_ctdef = insert_up_ctdef->ins_ctdef_;
|
||||
const ObUpdCtDef *upd_ctdef = insert_up_ctdef->upd_ctdef_;
|
||||
if (OB_NOT_NULL(ins_ctdef) && OB_NOT_NULL(upd_ctdef)) {
|
||||
if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) ||
|
||||
has_before_row_trigger(*upd_ctdef) || has_after_row_trigger(*upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
ObInsertUpCtDef *insert_up_ctdef = MY_SPEC.insert_up_ctdefs_.at(0);
|
||||
const ObInsCtDef *ins_ctdef = insert_up_ctdef->ins_ctdef_;
|
||||
const ObUpdCtDef *upd_ctdef = insert_up_ctdef->upd_ctdef_;
|
||||
if (OB_NOT_NULL(ins_ctdef) && OB_NOT_NULL(upd_ctdef)) {
|
||||
if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef) ||
|
||||
has_before_row_trigger(*upd_ctdef) || has_after_row_trigger(*upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ins_ctdef or upd_ctdef of primary table is nullptr", K(ret));
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ins_ctdef or upd_ctdef of primary table is nullptr", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -156,25 +156,28 @@ ObTableMergeOp::ObTableMergeOp(ObExecContext &ctx, const ObOpSpec &spec, ObOpInp
|
||||
int ObTableMergeOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMergeCtDef *merge_ctdef = MY_SPEC.merge_ctdefs_.at(0);
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->ins_ctdef_)) {
|
||||
const ObInsCtDef &ins_ctdef = *merge_ctdef->ins_ctdef_;
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
ObMergeCtDef *merge_ctdef = MY_SPEC.merge_ctdefs_.at(0);
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->ins_ctdef_)) {
|
||||
const ObInsCtDef &ins_ctdef = *merge_ctdef->ins_ctdef_;
|
||||
if (has_before_row_trigger(ins_ctdef) || has_after_row_trigger(ins_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->upd_ctdef_)) {
|
||||
const ObUpdCtDef &upd_ctdef = *merge_ctdef->upd_ctdef_;
|
||||
if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->upd_ctdef_)) {
|
||||
const ObUpdCtDef &upd_ctdef = *merge_ctdef->upd_ctdef_;
|
||||
if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->del_ctdef_)) {
|
||||
const ObDelCtDef &del_ctdef = *merge_ctdef->del_ctdef_;
|
||||
if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
if (!execute_single_row_ && OB_NOT_NULL(merge_ctdef->del_ctdef_)) {
|
||||
const ObDelCtDef &del_ctdef = *merge_ctdef->del_ctdef_;
|
||||
if (has_before_row_trigger(del_ctdef) || has_after_row_trigger(del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -758,7 +758,9 @@ int ObTableModifyOp::inner_rescan()
|
||||
|
||||
int ObTableModifyOp::check_need_exec_single_row() {
|
||||
int ret = OB_SUCCESS;
|
||||
execute_single_row_ = false;
|
||||
if (MY_SPEC.is_returning_ && need_foreign_key_checks()) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1099,9 +1101,16 @@ int ObTableModifyOp::submit_all_dml_task()
|
||||
int ObTableModifyOp::discharge_das_write_buffer()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (dml_rtctx_.get_cached_row_size() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) {
|
||||
LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage",
|
||||
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_), K(dml_rtctx_.get_cached_row_size()));
|
||||
int64_t simulate_buffer_size = - EVENT_CALL(EventTable::EN_DAS_DML_BUFFER_OVERFLOW);
|
||||
int64_t buffer_size_limit = is_meta_tenant(tenant_id_) ? das::OB_DAS_MAX_META_TENANT_PACKET_SIZE : das::OB_DAS_MAX_TOTAL_PACKET_SIZE;
|
||||
if (OB_UNLIKELY(simulate_buffer_size > 0)) {
|
||||
buffer_size_limit = simulate_buffer_size;
|
||||
}
|
||||
if (dml_rtctx_.get_cached_row_size() >= buffer_size_limit) {
|
||||
LOG_INFO("DASWriteBuffer full, now to write storage",
|
||||
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(dml_rtctx_.get_cached_row_size()));
|
||||
ret = submit_all_dml_task();
|
||||
} else if (execute_single_row_) {
|
||||
ret = submit_all_dml_task();
|
||||
}
|
||||
return ret;
|
||||
|
@ -97,17 +97,20 @@ OB_DEF_SERIALIZE_SIZE(ObTableReplaceSpec)
|
||||
int ObTableReplaceOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObReplaceCtDef *replace_ctdef = MY_SPEC.replace_ctdefs_.at(0);
|
||||
const ObInsCtDef *ins_ctdef = replace_ctdef->ins_ctdef_;
|
||||
const ObDelCtDef *del_ctdef = replace_ctdef->del_ctdef_;
|
||||
if (OB_NOT_NULL(ins_ctdef) || OB_NOT_NULL(del_ctdef)) {
|
||||
if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef)
|
||||
|| has_before_row_trigger(*del_ctdef) || has_after_row_trigger(*del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
ObReplaceCtDef *replace_ctdef = MY_SPEC.replace_ctdefs_.at(0);
|
||||
const ObInsCtDef *ins_ctdef = replace_ctdef->ins_ctdef_;
|
||||
const ObDelCtDef *del_ctdef = replace_ctdef->del_ctdef_;
|
||||
if (OB_NOT_NULL(ins_ctdef) || OB_NOT_NULL(del_ctdef)) {
|
||||
if (has_before_row_trigger(*ins_ctdef) || has_after_row_trigger(*ins_ctdef)
|
||||
|| has_before_row_trigger(*del_ctdef) || has_after_row_trigger(*del_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ins_ctdef or del_ctdef of primary table is nullptr", K(ret));
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ins_ctdef or del_ctdef of primary table is nullptr", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -116,11 +116,14 @@ ObTableUpdateOp::ObTableUpdateOp(ObExecContext &exec_ctx, const ObOpSpec &spec,
|
||||
int ObTableUpdateOp::check_need_exec_single_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.upd_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableUpdateSpec::UpdCtDefArray &ctdefs = MY_SPEC.upd_ctdefs_.at(i);
|
||||
const ObUpdCtDef &upd_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
ret = ObTableModifyOp::check_need_exec_single_row();
|
||||
if (OB_SUCC(ret) && !execute_single_row_) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.upd_ctdefs_.count() && !execute_single_row_; ++i) {
|
||||
const ObTableUpdateSpec::UpdCtDefArray &ctdefs = MY_SPEC.upd_ctdefs_.at(i);
|
||||
const ObUpdCtDef &upd_ctdef = *ctdefs.at(0);
|
||||
if (has_before_row_trigger(upd_ctdef) || has_after_row_trigger(upd_ctdef)) {
|
||||
execute_single_row_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
Loading…
x
Reference in New Issue
Block a user