Fix direct load use dirty phy_plan

This commit is contained in:
obdev
2024-02-10 03:33:26 +00:00
committed by ob-robot
parent 5edca85174
commit a2113b1923
2 changed files with 21 additions and 11 deletions

View File

@ -1720,25 +1720,31 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
ctx_ = &ctx;
load_stmt_ = &load_stmt;
const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
ObSQLSessionInfo *session = nullptr;
int64_t total_line_count = 0;
if (OB_ISNULL(session = ctx.get_my_session()) || OB_ISNULL(ctx.get_stmt_factory()) ||
OB_ISNULL(ctx.get_stmt_factory()->get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx is unexpected", KR(ret), K(ctx));
} else if (OB_FAIL(plan_.set_vars(ctx.get_stmt_factory()->get_query_ctx()->variables_))) {
LOG_WARN("fail to set vars", KR(ret));
} else if (OB_FAIL(session->set_cur_phy_plan(&plan_))) {
LOG_WARN("fail to set cur phy plan", KR(ret));
} else if (FALSE_IT(ctx.reference_my_plan(&plan_))) {
} else if (OB_FAIL(ctx.init_phy_op(1))) {
LOG_WARN("fail to init phy op", KR(ret));
}
if (OB_SUCC(ret)) {
ObSQLSessionInfo *session = nullptr;
int64_t query_timeout = 0;
if (OB_ISNULL(session = ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", KR(ret));
} else if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) {
if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) {
LOG_WARN("fail to get value", K(ret));
} else if (query_timeout < 0) {
ret = OB_TIMEOUT;
LOG_WARN("session is timeout", K(ret));
} else if (0 == query_timeout) {
ObSQLSessionInfo *session = nullptr;
if (OB_ISNULL(session = ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", KR(ret));
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
if (OB_FAIL(session->get_query_timeout(query_timeout))) {
LOG_WARN("fail to get query timeout", KR(ret));
} else if (query_timeout <= 0) {
ret = OB_TIMEOUT;
@ -1746,7 +1752,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
}
}
if (OB_SUCC(ret)) {
const int64_t timeout_ts = ctx.get_my_session()->get_query_start_time() + query_timeout;
const int64_t timeout_ts = session->get_query_start_time() + query_timeout;
ctx.get_physical_plan_ctx()->set_timeout_timestamp(timeout_ts);
THIS_WORKER.set_timeout_ts(timeout_ts);
}
@ -1829,6 +1835,9 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
}
direct_loader_.destroy();
if (OB_NOT_NULL(session)) {
session->reset_cur_phy_plan_to_null();
}
return ret;
}