[FEAT MERGE] Phase one of 3A project

Co-authored-by: rolandqi <qikai456@126.com>
Co-authored-by: yangzhifeng <yangzhifeng83@gmail.com>
This commit is contained in:
AntiTopQuark
2023-08-25 08:44:14 +00:00
committed by ob-robot
parent 56ee77a670
commit e0b1dda713
142 changed files with 15224 additions and 1314 deletions

View File

@ -69,6 +69,19 @@ OB_DEF_SERIALIZE(ObDASRemoteInfo)
OB_UNIS_ENCODE(rtdef->op_type_);
OB_UNIS_ENCODE(*rtdef);
}
OB_UNIS_ENCODE(static_cast<int64_t>(sizeof(sql_id_)));
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(pos + sizeof(sql_id_) > buf_len)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("serialization of ObDASRemoteInfo has not enough buffer", KR(ret), K(pos), K(buf_len), K(sizeof(sql_id_)));
} else {
MEMCPY(buf + pos, sql_id_, sizeof(sql_id_));
pos += sizeof(sql_id_);
}
OB_UNIS_ENCODE(user_id_);
OB_UNIS_ENCODE(session_id_);
OB_UNIS_ENCODE(plan_id_);
return ret;
}
@ -76,8 +89,9 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo)
{
int ret = OB_SUCCESS;
bool need_session = false;
int ctdef_cnt = 0;
int rtdef_cnt = 0;
int64_t ctdef_cnt = 0;
int64_t rtdef_cnt = 0;
int64_t sql_id_len = 0;
ObEvalCtx *eval_ctx = nullptr;
ObDASTaskFactory *das_factory =
ObDASAsyncAccessP::get_das_factory() != nullptr
@ -107,6 +121,14 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo)
typedef ObSQLSessionInfo::ExecCtxSessionRegister MyExecCtxSessionRegister;
des_exec_ctx->get_my_session()->set_is_remote(true);
MyExecCtxSessionRegister ctx_register(*des_exec_ctx->get_my_session(), *des_exec_ctx);
// for remote das, we use thread local ash stat to record ash.
// des_exec_ctx->get_my_session()->set_session_type_with_flag();
// if (OB_FAIL(des_exec_ctx->get_my_session()->set_session_active(
// ObString::make_string("REMOTE/DISTRIBUTE DAS PLAN EXECUTING"),
// obmysql::COM_QUERY))) {
// LOG_WARN("set das remote session active failed", K(ret));
// }
// EVENT_INC(ACTIVE_SESSIONS);
}
}
OZ(exec_ctx_->create_physical_plan_ctx());
@ -141,6 +163,18 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo)
OX(rtdef->eval_ctx_ = eval_ctx);
OZ(rtdefs_.push_back(rtdef));
}
OB_UNIS_DECODE(sql_id_len);
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(pos + sql_id_len > data_len)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("deserialization of ObDASRemoteInfo has not enough buffer", KR(ret), K(pos), K(data_len), K(sql_id_len));
} else {
MEMCPY(sql_id_, buf + pos, sql_id_len);
pos += sql_id_len;
}
OB_UNIS_DECODE(user_id_);
OB_UNIS_DECODE(session_id_);
OB_UNIS_DECODE(plan_id_);
return ret;
}
@ -173,6 +207,12 @@ OB_DEF_SERIALIZE_SIZE(ObDASRemoteInfo)
OB_UNIS_ADD_LEN(rtdef->op_type_);
OB_UNIS_ADD_LEN(*rtdef);
}
OB_UNIS_ADD_LEN(sizeof(sql_id_));
len += sizeof(sql_id_);
OB_UNIS_ADD_LEN(user_id_);
OB_UNIS_ADD_LEN(session_id_);
OB_UNIS_ADD_LEN(plan_id_);
return len;
}