From ca10be3971a53a01291d869a42a74a5e93d5af55 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 24 Feb 2023 14:34:10 +0000 Subject: [PATCH] Fill the packet after result close. --- src/observer/mysql/ob_async_plan_driver.cpp | 5 ---- src/sql/ob_result_set.cpp | 33 +++++++++++++++++---- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/observer/mysql/ob_async_plan_driver.cpp b/src/observer/mysql/ob_async_plan_driver.cpp index a4a068877f..c64c9264a2 100644 --- a/src/observer/mysql/ob_async_plan_driver.cpp +++ b/src/observer/mysql/ob_async_plan_driver.cpp @@ -63,13 +63,8 @@ int ObAsyncPlanDriver::response_result(ObMySQLResultSet &result) } else if (OB_FAIL(result.update_last_insert_id_to_client())) { LOG_WARN("failed to update last insert id after open", K(ret)); } else { - ObSqlEndTransCb &sql_end_cb = session_.get_mysql_end_trans_cb(); - ObEndTransCbPacketParam pkt_param; if (is_prexecute_ && OB_FAIL(sender_.flush_buffer(false))) { LOG_WARN("flush buffer fail before send async ok packet.", K(ret)); - } else if (OB_FAIL(sql_end_cb.set_packet_param( - pkt_param.fill(result, session_, *cur_trace_id)))) { - LOG_ERROR("fail set packet param", K(ret)); } else { result.set_end_trans_async(true); } diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 6a9c012494..fe622af2cd 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -833,6 +833,13 @@ int ObResultSet::close() if (OB_SUCC(ret)) { ret = ins_ret; } + + if (OB_SUCC(ret)) { + if (!get_exec_context().get_das_ctx().is_partition_hit()) { + my_session_.partition_hit().try_set_bool(false); + } + } + int prev_ret = ret; bool async = false; // for debug purpose if (OB_TRANS_XA_BRANCH_FAIL == ret) { @@ -844,15 +851,31 @@ int ObResultSet::close() my_session_.disassociate_xa(); } } else if (OB_NOT_NULL(physical_plan_)) { + //Because of the async close result we need set the partition_hit flag + //to the call back param, than close the result. + //But the das framwork set the patition_hit after result is closed. + //So we need to set the partition info at here. + if (is_end_trans_async()) { + ObCurTraceId::TraceId *cur_trace_id = NULL; + if (OB_ISNULL(cur_trace_id = ObCurTraceId::get_trace_id())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("current trace id is NULL", K(ret)); + set_end_trans_async(false); + } else { + observer::ObSqlEndTransCb &sql_end_cb = my_session_.get_mysql_end_trans_cb(); + ObEndTransCbPacketParam pkt_param; + int fill_ret = OB_SUCCESS; + fill_ret = sql_end_cb.set_packet_param(pkt_param.fill(*this, my_session_, *cur_trace_id)); + if (OB_SUCCESS != fill_ret) { + LOG_WARN("fail set packet param", K(ret)); + set_end_trans_async(false); + } + } + } ret = auto_end_plan_trans(*physical_plan_, ret, async); } //NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret, //OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async); - if (OB_SUCC(ret)) { - if (!get_exec_context().get_das_ctx().is_partition_hit()) { - my_session_.partition_hit().try_set_bool(false); - } - } return ret; // 后面所有的操作都通过callback来完成 }