[FEAT MERGE]4_2_sql_feature

Co-authored-by: yinyj17 <yinyijun92@gmail.com>
Co-authored-by: xianyu-w <707512433@qq.com>
Co-authored-by: jingtaoye35 <1255153887@qq.com>
This commit is contained in:
zzg19950727
2023-04-28 11:12:11 +00:00
committed by ob-robot
parent 3bf92459f1
commit 3cada22bdc
161 changed files with 16883 additions and 4730 deletions

View File

@ -309,11 +309,18 @@ int ObDirectReceiveOp::setup_next_scanner()
int ObDirectReceiveOp::get_next_row_from_cur_scanner()
{
int ret = OB_SUCCESS;
if (OB_FAIL(scanner_iter_.get_next_row(MY_SPEC.output_, eval_ctx_))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail get next row", K(ret));
} else {}
const ObChunkDatumStore::StoredRow *tmp_sr = NULL;
if (OB_FAIL(scanner_iter_.get_next_row(tmp_sr))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next stored row failed", K(ret));
}
} else if (OB_ISNULL(tmp_sr) || (tmp_sr->cnt_ != MY_SPEC.output_.count())) {
ret = OB_ERR_UNEXPECTED;
} else {
for (uint32_t i = 0; i < tmp_sr->cnt_; ++i) {
MY_SPEC.output_.at(i)->locate_expr_datum(eval_ctx_) = tmp_sr->cells()[i];
MY_SPEC.output_.at(i)->set_evaluated_projected(eval_ctx_);
}
LOG_DEBUG("direct receive next row", "row", ROWEXPR2STR(eval_ctx_, MY_SPEC.output_));
}
return ret;

View File

@ -549,74 +549,65 @@ bool ObRemoteBaseExecuteP<T>::query_can_retry_in_remote(int &last_err,
template<typename T>
void ObRemoteBaseExecuteP<T>::record_sql_audit_and_plan_stat(
const ObPhysicalPlan *plan,
ObSQLSessionInfo *session,
ObExecRecord exec_record,
ObExecTimestamp exec_timestamp,
ObWaitEventDesc &max_wait_desc,
ObWaitEventStat &total_wait_desc)
ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(session)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(session));
} else {
exec_timestamp.exec_type_ = RpcProcessor;
ObAuditRecordData &audit_record = session->get_raw_audit_record();
audit_record.try_cnt_++;
audit_record.seq_ = 0; //don't use now
audit_record.status_ =
(OB_SUCCESS == ret || common::OB_ITER_END == ret) ? obmysql::REQUEST_SUCC : ret;
session->get_cur_sql_id(audit_record.sql_id_, OB_MAX_SQL_ID_LENGTH + 1);
audit_record.db_id_ = session->get_database_id();
audit_record.execution_id_ = session->get_current_execution_id();
audit_record.client_addr_ = session->get_client_addr();
audit_record.user_client_addr_ = session->get_user_client_addr();
audit_record.user_group_ = THIS_WORKER.get_group_id();
audit_record.affected_rows_ = 0;
audit_record.return_rows_ = 0;
const bool enable_sql_audit =
GCONF.enable_sql_audit && session->get_local_ob_enable_sql_audit();
if (enable_sql_audit) {
ObAuditRecordData &audit_record = session->get_raw_audit_record();
audit_record.try_cnt_++;
audit_record.seq_ = 0; //don't use now
audit_record.status_ =
(OB_SUCCESS == ret || common::OB_ITER_END == ret) ? obmysql::REQUEST_SUCC : ret;
session->get_cur_sql_id(audit_record.sql_id_, OB_MAX_SQL_ID_LENGTH + 1);
audit_record.db_id_ = session->get_database_id();
audit_record.execution_id_ = session->get_current_execution_id();
audit_record.client_addr_ = session->get_client_addr();
audit_record.user_client_addr_ = session->get_user_client_addr();
audit_record.user_group_ = THIS_WORKER.get_group_id();
audit_record.affected_rows_ = 0;
audit_record.return_rows_ = 0;
exec_record.max_wait_event_ = max_wait_desc;
exec_record.wait_time_end_ = total_wait_desc.time_waited_;
exec_record.wait_count_end_ = total_wait_desc.total_waits_;
audit_record.plan_id_ = plan != nullptr ? plan->get_plan_id() : 0;
audit_record.plan_type_ = plan != nullptr ? plan->get_plan_type() : OB_PHY_PLAN_UNINITIALIZED;
audit_record.is_executor_rpc_ = true;
audit_record.is_inner_sql_ = session->is_inner();
audit_record.is_hit_plan_cache_ = true;
audit_record.is_multi_stmt_ = false;
audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
audit_record.plan_id_ = plan != nullptr ? plan->get_plan_id() : 0;
audit_record.plan_type_ = plan != nullptr ? plan->get_plan_type() : OB_PHY_PLAN_UNINITIALIZED;
audit_record.is_executor_rpc_ = true;
audit_record.is_inner_sql_ = session->is_inner();
audit_record.is_hit_plan_cache_ = true;
audit_record.is_multi_stmt_ = false;
audit_record.exec_timestamp_ = exec_timestamp;
audit_record.exec_record_ = exec_record;
//更新阶段累加时间
audit_record.update_stage_stat();
//统计plan相关的信息
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx_);
ObIArray<ObTableRowCount> *table_row_count_list = NULL;
if (NULL != plan_ctx) {
audit_record.consistency_level_ = plan_ctx->get_consistency_level();
audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
table_row_count_list = &(plan_ctx->get_table_row_count_list());
}
if (NULL != plan) {
ObPhysicalPlan *mutable_plan = const_cast<ObPhysicalPlan*>(plan);
if (!exec_ctx_.get_sql_ctx()->self_add_plan_ && exec_ctx_.get_sql_ctx()->plan_cache_hit_) {
mutable_plan->update_plan_stat(audit_record,
false, // false mean not first update plan stat
exec_ctx_.get_is_evolution(),
table_row_count_list);
} else if (exec_ctx_.get_sql_ctx()->self_add_plan_ && !exec_ctx_.get_sql_ctx()->plan_cache_hit_) {
mutable_plan->update_plan_stat(audit_record,
true,
exec_ctx_.get_is_evolution(),
table_row_count_list);
//统计plan相关的信息
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx_);
ObIArray<ObTableRowCount> *table_row_count_list = NULL;
if (NULL != plan_ctx) {
audit_record.consistency_level_ = plan_ctx->get_consistency_level();
audit_record.table_scan_stat_ = plan_ctx->get_table_scan_stat();
table_row_count_list = &(plan_ctx->get_table_row_count_list());
}
if (NULL != plan) {
ObPhysicalPlan *mutable_plan = const_cast<ObPhysicalPlan*>(plan);
if (!exec_ctx_.get_sql_ctx()->self_add_plan_ && exec_ctx_.get_sql_ctx()->plan_cache_hit_) {
mutable_plan->update_plan_stat(audit_record,
false, // false mean not first update plan stat
exec_ctx_.get_is_evolution(),
table_row_count_list);
} else if (exec_ctx_.get_sql_ctx()->self_add_plan_ && !exec_ctx_.get_sql_ctx()->plan_cache_hit_) {
mutable_plan->update_plan_stat(audit_record,
true,
exec_ctx_.get_is_evolution(),
table_row_count_list);
}
}
}
ObSQLUtils::handle_audit_record(false, EXECUTE_REMOTE, *session);
}
}
template<typename T>
@ -625,9 +616,9 @@ int ObRemoteBaseExecuteP<T>::execute_with_sql(ObRemoteTask &task)
int ret = OB_SUCCESS;
NG_TRACE(exec_remote_plan_begin);
ObExecRecord exec_record;
int64_t local_tenant_schema_version = -1;
ObExecTimestamp exec_timestamp;
exec_timestamp.exec_type_ = RpcProcessor;
int64_t local_tenant_schema_version = -1;
ObSQLSessionInfo *session = NULL;
const bool enable_perf_event = lib::is_diagnose_info_enabled();
bool enable_sql_audit = GCONF.enable_sql_audit;
@ -667,7 +658,7 @@ int ObRemoteBaseExecuteP<T>::execute_with_sql(ObRemoteTask &task)
{
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &max_wait_desc : NULL, di);
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
if (enable_sql_audit) {
if (enable_perf_event) {
exec_record.record_start();
}
if (OB_FAIL(gctx_.sql_engine_->handle_remote_query(plan_ctx->get_remote_sql_info(),
@ -681,19 +672,17 @@ int ObRemoteBaseExecuteP<T>::execute_with_sql(ObRemoteTask &task)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("plan is null", K(ret));
}
if (enable_perf_event) {
//监控项统计开始
exec_start_timestamp_ = ObTimeUtility::current_time();
exec_ctx_.set_plan_start_time(exec_start_timestamp_);
}
//监控项统计开始
exec_start_timestamp_ = ObTimeUtility::current_time();
exec_ctx_.set_plan_start_time(exec_start_timestamp_);
ObAuditRecordData &audit_record = session->get_raw_audit_record();
if (OB_SUCC(ret)) {
NG_TRACE_EXT(execute_task, OB_ID(task), task, OB_ID(stmt_type), plan->get_stmt_type());
ObWorkerSessionGuard worker_session_guard(session);
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
session->get_raw_audit_record().request_memory_used_ = 0;
observer::ObProcessMallocCallback pmcb(0,
session->get_raw_audit_record().request_memory_used_);
audit_record.request_memory_used_ = 0;
observer::ObProcessMallocCallback pmcb(0, audit_record.request_memory_used_);
lib::ObMallocCallbackGuard guard(pmcb);
session->set_peer_addr(task.get_ctrl_server());
NG_TRACE_EXT(execute_task, OB_ID(task), task, OB_ID(stmt_type), plan->get_stmt_type());
@ -739,18 +728,25 @@ int ObRemoteBaseExecuteP<T>::execute_with_sql(ObRemoteTask &task)
}
}
}
//监控项统计结束
exec_end_timestamp_ = ObTimeUtility::current_time();
// some statistics must be recorded for plan stat, even though sql audit disabled
record_exec_timestamp(true, exec_timestamp);
audit_record.exec_timestamp_ = exec_timestamp;
audit_record.exec_timestamp_.update_stage_time();
if (enable_perf_event) {
//监控项统计结束
exec_end_timestamp_ = ObTimeUtility::current_time();
if (enable_sql_audit) {
exec_record.record_end();
}
record_exec_timestamp(true, exec_timestamp);
exec_record.record_end();
exec_record.max_wait_event_ = max_wait_desc;
exec_record.wait_time_end_ = total_wait_desc.time_waited_;
exec_record.wait_count_end_ = total_wait_desc.total_waits_;
audit_record.exec_record_ = exec_record;
audit_record.update_event_stage_state();
}
//此处代码要放在scanner.set_err_code(ret)代码前,避免ret被都写成了OB_SUCCESS
record_sql_audit_and_plan_stat(plan, session, exec_record, exec_timestamp,
max_wait_desc, total_wait_desc);
record_sql_audit_and_plan_stat(plan, session);
}
}
if (OB_NOT_NULL(plan)) {
@ -925,7 +921,7 @@ int ObRpcRemoteExecuteP::process()
{
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &max_wait_desc : NULL, di);
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
if (enable_sql_audit) {
if (enable_perf_event) {
exec_record.record_start(di);
}
if (OB_FAIL(ret)) {
@ -936,19 +932,17 @@ int ObRpcRemoteExecuteP::process()
LOG_ERROR("op_spec is NULL");
ret = OB_ERR_UNEXPECTED;
}
if (enable_perf_event) {
//监控项统计开始
exec_start_timestamp_ = ObTimeUtility::current_time();
exec_ctx->set_plan_start_time(exec_start_timestamp_);
}
//监控项统计开始
exec_start_timestamp_ = ObTimeUtility::current_time();
exec_ctx->set_plan_start_time(exec_start_timestamp_);
ObAuditRecordData &audit_record = session->get_raw_audit_record();
if (OB_SUCC(ret)) {
NG_TRACE_EXT(execute_task, OB_ID(task), task, OB_ID(stmt_type), task.get_des_phy_plan().get_stmt_type());
ObWorkerSessionGuard worker_session_guard(session);
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
session->get_raw_audit_record().request_memory_used_ = 0;
observer::ObProcessMallocCallback pmcb(0,
session->get_raw_audit_record().request_memory_used_);
audit_record.request_memory_used_ = 0;
observer::ObProcessMallocCallback pmcb(0, audit_record.request_memory_used_);
lib::ObMallocCallbackGuard guard(pmcb);
NG_TRACE_EXT(execute_task, OB_ID(task), task,
OB_ID(stmt_type), task.get_des_phy_plan().get_stmt_type());
@ -979,21 +973,25 @@ int ObRpcRemoteExecuteP::process()
ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
}
}
if (enable_perf_event) {
//监控项统计结束
exec_end_timestamp_ = ObTimeUtility::current_time();
if (enable_sql_audit) {
exec_record.record_end(di);
}
}
//监控项统计结束
exec_end_timestamp_ = ObTimeUtility::current_time();
// some statistics must be recorded for plan stat, even though sql audit disabled
record_exec_timestamp(true, exec_timestamp);
audit_record.exec_timestamp_ = exec_timestamp;
audit_record.exec_timestamp_.update_stage_time();
if (enable_perf_event) {
exec_record.record_end();
exec_record.max_wait_event_ = max_wait_desc;
exec_record.wait_time_end_ = total_wait_desc.time_waited_;
exec_record.wait_count_end_ = total_wait_desc.total_waits_;
audit_record.exec_record_ = exec_record;
audit_record.update_event_stage_state();
}
//此处代码要放在scanner.set_err_code(ret)代码前,避免ret被都写成了OB_SUCCESS
if (enable_sql_audit) {
record_sql_audit_and_plan_stat(
&phy_plan_, session, exec_record, exec_timestamp,
max_wait_desc, total_wait_desc);
}
record_sql_audit_and_plan_stat(&phy_plan_, session);
}
}

View File

@ -66,11 +66,7 @@ protected:
{ ObExecStatUtils::record_exec_timestamp(*this, is_first, exec_timestamp); }
void record_sql_audit_and_plan_stat(
const ObPhysicalPlan *plan,
ObSQLSessionInfo *session,
ObExecRecord exec_record,
ObExecTimestamp exec_timestamp,
ObWaitEventDesc &max_wait_desc,
ObWaitEventStat &total_wait_desc);
ObSQLSessionInfo *session);
int base_before_response(common::ObScanner &scanner);
int base_after_process();
void base_cleanup();