[FEAT MERGE]Query Interface Enhancement

Co-authored-by: chinaxing <chen.yack@gmail.com>
This commit is contained in:
obdev
2023-04-28 02:41:45 +00:00
committed by ob-robot
parent d48cc591bd
commit 23b365a0a7
74 changed files with 2611 additions and 330 deletions

View File

@ -0,0 +1,528 @@
// Copyright 2010-2016 Alibaba Inc. All Rights Reserved.
// Author:
// yaojing
// this file defines implementation of session info verification
#define USING_LOG_PREFIX SERVER
#include "sql/session/ob_sess_info_verify.h"
#include "share/ob_define.h"
#include "share/system_variable/ob_system_variable_factory.h"
#include "lib/trace/ob_trace_event.h"
#include "lib/utility/ob_tracepoint.h"
#include "observer/ob_sql_client_decorator.h"
#include "observer/ob_server_struct.h"
#include "share/ob_all_server_tracer.h"
namespace oceanbase
{
namespace sql
{
int SessionInfoVerifacation::set_verify_info_sess_id(const uint32_t sess_id) {
int ret = OB_SUCCESS;
sess_id_ = sess_id;
return ret;
}
int SessionInfoVerifacation::set_verify_info_proxy_sess_id(const uint64_t proxy_sess_id) {
int ret = OB_SUCCESS;
proxy_sess_id_ = proxy_sess_id;
return ret;
}
int SessionInfoVerifacation::set_verify_info_addr(const ObAddr addr) {
int ret = OB_SUCCESS;
addr_ = addr;
return ret;
}
// extra_info.sess_info_veri_ to session
int ObSessInfoVerify::sync_sess_info_veri(sql::ObSQLSessionInfo &sess,
const common::ObString &sess_info_veri,
SessionInfoVerifacation &sess_info_verification)
{
int ret = OB_SUCCESS;
const char *buf = sess_info_veri.ptr();
const char *data = sess_info_veri.ptr();
const int64_t len = sess_info_veri.length();
const char *end = buf + len;
int64_t pos = 0;
LOG_TRACE("start sync proxy sess info verification", K(sess.get_is_in_retry()),
K(sess.get_sessid()), KP(data), K(len), KPHEX(data, len));
// decode sess_info
if (NULL != sess_info_veri.ptr() && !sess.get_is_in_retry()) {
while (OB_SUCC(ret) && pos < len) {
int16_t extra_id = 0;
int32_t info_len = 0;
char *sess_buf = NULL;
LOG_TRACE("sync field sess_inf", K(sess.get_sessid()),
KP(data), K(pos), K(len), KPHEX(data+pos, len-pos));
if (OB_FAIL(ObProtoTransUtil::resolve_type_and_len(buf, len, pos, extra_id, info_len))) {
LOG_WARN("failed to resolve type and len", K(ret), K(len), K(pos));
} else if (extra_id < 0 || info_len <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid session sync info verification extra id", K(ret), K(extra_id));
// for old version compatible
// if new version has a val that old version doesn't has, just ignore.
} else if (extra_id >= sql::SESS_INFO_VERI_MAX_TYPE) {
pos += info_len;
} else if (OB_FAIL(ObSessInfoVerify::deserialize_sess_info_veri_id(sess,
static_cast<SessionInfoVerificationId>(extra_id),
info_len, buf, len, pos,
sess_info_verification))) {
LOG_WARN("failed to resolve value", K(extra_id), KP(buf), K(len), K(pos), K(info_len));
} else {
LOG_TRACE("success to resolve value", K(extra_id), K(len), K(pos), K(info_len));
}
}
LOG_TRACE("success to get sess info verification requied by proxy",
K(sess_info_verification), K(sess.get_sessid()),
K(sess.get_proxy_sessid()));
}
return ret;
}
// rpc to last server to fetch verify session info
int ObSessInfoVerify::verify_session_info(sql::ObSQLSessionInfo &sess,
SessionInfoVerifacation &sess_info_verification)
{
int ret = OB_SUCCESS;
ObAddr addr;
obrpc::ObSessInfoVerifyArg arg;
obrpc::ObSessionInfoVeriRes result;
// current session verify info
ObString current_verify_info;
if (GET_MIN_CLUSTER_VERSION() == CLUSTER_CURRENT_VERSION) {
if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fail to get srv_rpc_proxy", K(ret), K(GCTX.srv_rpc_proxy_));
} else if (OB_FAIL(ObSessInfoVerify::sql_port_to_rpc_port(sess,
sess_info_verification))) {
LOG_WARN("fail to rpc port", K(ret));
// For compatibility, no error
ret = OB_SUCCESS;
} else if (FALSE_IT(addr = sess_info_verification.get_verify_info_addr())) {
} else if (FALSE_IT(arg.set_sess_id(sess_info_verification.get_verify_info_sess_id()))) {
LOG_WARN("fail to set session id", K(ret));
} else if (FALSE_IT(arg.set_proxy_sess_id(
sess_info_verification.get_verify_info_proxy_sess_id()))) {
LOG_WARN("fail to set proxy session id", K(ret));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(addr).by(MTL_ID()).
session_info_verification(arg, result))) {
// rpc fail not self-verification.
LOG_TRACE("fail to rpc", K(ret));
ret = OB_SUCCESS;
} else {
// self-verification
ObString value_buffer;
char *ptr = nullptr;
common::ObArenaAllocator allocator(common::ObModIds::OB_SQL_SESSION,
OB_MALLOC_NORMAL_BLOCK_SIZE,
sess.get_effective_tenant_id());
if (OB_ISNULL(ptr = static_cast<char *> (allocator.alloc(result.verify_info_buf_.length
())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc mem for client identifier", K(ret));
} else {
value_buffer.assign_buffer(ptr, result.verify_info_buf_.length());
value_buffer.write(result.verify_info_buf_.ptr(), result.verify_info_buf_.length());
}
LOG_TRACE("need verify", K(&result), K(result.need_verify_), K(result.verify_info_buf_));
if (OB_FAIL(ret)) {
} else if (result.need_verify_) {
// verification error injection.
ObSessInfoVerify::veri_err_injection(sess);
if (OB_FAIL(ObSessInfoVerify::fetch_verify_session_info(sess, current_verify_info,
allocator))) {
LOG_WARN("fail to serialize session verify info, no need verify", K(ret));
ret = OB_SUCCESS;
} else if (OB_FAIL(ObSessInfoVerify::compare_verify_session_info(sess,
current_verify_info, value_buffer))) {
LOG_ERROR("session info self-verification failed", K(ret), K(sess.get_sessid()),
K(sess.get_proxy_sessid()), K(sess_info_verification));
} else {
LOG_TRACE("session info self-verification success", K(ret));
}
} else {
LOG_TRACE("session info no need self-verification", K(ret));
}
LOG_TRACE("verify end", K(sess.get_sessid()),
K(sess.get_proxy_sessid()), K(sess_info_verification));
}
} else {
LOG_TRACE("verify version not consistent, no need self-verification", K(sess.get_sessid()),
K(sess.get_proxy_sessid()), K(GET_MIN_CLUSTER_VERSION()), K(CLUSTER_CURRENT_VERSION));
}
return ret;
}
int ObSessInfoVerify::compare_verify_session_info(sql::ObSQLSessionInfo &sess,
common::ObString &result1, common::ObString &result2)
{
int ret = OB_SUCCESS;
const char *buf1 = result1.ptr();
const char *data1 = result1.ptr();
const int64_t len1 = result1.length();
const char *end1 = buf1 + len1;
const char *buf2 = result2.ptr();
const char *data2 = result2.ptr();
const int64_t len2 = result2.length();
const char *end2 = buf2 + len2;
int64_t pos1 = 0;
int64_t pos2 = 0;
ObSessInfoEncoder* encoder = NULL;
LOG_TRACE("compare verify session info start", KP(data1), K(len1), KPHEX(data1, len1),
KP(data2), K(len2), KPHEX(data2, len2));
// decode sess_info
if (NULL != result1.ptr() && NULL != result2.ptr()) {
while (OB_SUCC(ret) && pos1 < len1 && pos2 < len2) {
int16_t info_type1 = 0;
int32_t info_len1 = 0;
int16_t info_type2 = 0;
int32_t info_len2 = 0;
int64_t temp_pos1 = 0;
int64_t temp_pos2 = 0;
if (OB_FAIL(ObProtoTransUtil::resolve_type_and_len(
buf1, len1, pos1, info_type1, info_len1))) {
LOG_WARN("failed to resolve type and len", K(ret), K(len1), K(pos1));
} else if (OB_FAIL(ObProtoTransUtil::resolve_type_and_len(
buf2, len2, pos2, info_type2, info_len2))) {
LOG_WARN("failed to resolve type and len", K(ret), K(len1), K(pos1));
} else if (info_type1 < 0 || info_len1 <= 0 || info_type2 < 0 || info_len2 <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid session sync info encoder", K(ret), K(info_type1), K(info_type2),
K(info_len1), K(info_len2));
// for old version compatible
// if new version has a val that old version doesn't has, just ignore.
} else if (info_type1 >= SESSION_SYNC_MAX_TYPE) {
pos1 += info_len1;
} else if (info_type2 >= SESSION_SYNC_MAX_TYPE) {
pos2 += info_len2;
} else if (FALSE_IT(temp_pos1 = pos1)) {
} else if (FALSE_IT(temp_pos2 = pos2)) {
} else if (info_type1 != info_type2) {
LOG_WARN("info type is not consistent", K(ret), K(info_type1), K(info_type2));
} else if (OB_FAIL(sess.get_sess_encoder(SessionSyncInfoType(info_type1), encoder))) {
LOG_WARN("failed to get session encoder", K(ret));
} else if (OB_FAIL(encoder->compare_sess_info(buf1 + pos1, info_len1,
buf2 + pos2, info_len2))) {
LOG_ERROR("fail to compare session info", K(ret),
K(sess.get_sessid()),
K(sess.get_proxy_sessid()),
"info_type", info_type1);
int temp_ret = ret;
if (OB_FAIL(encoder->display_sess_info(sess, buf1 + pos1, info_len1,
buf2 + pos2, info_len2))) {
LOG_WARN("fail to display session info", K(ret));
} else {
ret = temp_ret;
}
} else {
LOG_TRACE("session info type and len is consistent", K(ret), K(info_type1));
}
if (OB_FAIL(ret)) {
} else {
pos1 += info_len1;
pos2 += info_len2;
}
}
}
return ret;
}
// fetch session info veri info
int ObSessInfoVerify::fetch_verify_session_info(sql::ObSQLSessionInfo &sess,
common::ObString &result,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
ObSessInfoEncoder* encoder = NULL;
char *buf = NULL;
int64_t size = 0;
int32_t sess_size[SESSION_SYNC_MAX_TYPE];
for (int64_t i=0; OB_SUCC(ret) && i < SESSION_SYNC_MAX_TYPE; i++) {
oceanbase::sql::SessionSyncInfoType info_type = (oceanbase::sql::SessionSyncInfoType)(i);
sess_size[i] = 0;
if (OB_FAIL(sess.get_sess_encoder(info_type, encoder))) {
LOG_WARN("failed to get session encoder", K(ret));
} else {
sess_size[i] = encoder->get_fetch_sess_info_size(sess);
LOG_TRACE("sess info size", K(sess_size[i]), K(info_type));
size += ObProtoTransUtil::get_serialize_size(sess_size[i]);
}
}
int64_t pos = 0;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_UNLIKELY(size < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Invalid buffer length", K(ret), K(size));
} else if (NULL == (buf = static_cast<char *>(allocator.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc mem", K(size), K(ret));
} else {
// assamble session info as follows:
// type(2 byte) | len(4 byte) | session_info_val | ....
for (int64_t i=0; OB_SUCC(ret) && i < SESSION_SYNC_MAX_TYPE; i++) {
oceanbase::sql::SessionSyncInfoType encoder_type = (oceanbase::sql::SessionSyncInfoType)(i);
if (OB_FAIL(sess.get_sess_encoder(encoder_type, encoder))) {
LOG_WARN("failed to get session encoder", K(ret));
} else {
int16_t info_type = (int16_t)i;
int32_t info_len = sess_size[i];
if (info_len < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid session info length", K(info_len), K(info_type), K(ret));
} else if (info_len == 0) {
// invalid info len do nothing and skip it.
} else if (OB_FAIL(ObProtoTransUtil::store_type_and_len(
buf, size, pos, info_type, info_len))) {
LOG_WARN("failed to set type and len", K(info_type), K(info_len), K(ret));
} else if (OB_FAIL(encoder->fetch_sess_info(sess, buf, size, pos))) {
LOG_WARN("failed to serialize", K(sess), K(ret), K(size), K(pos));
} else {
LOG_TRACE("success to fetch", K(sess_size[i]), K(info_type));
}
}
}
}
result.assign_ptr(buf, size);
LOG_TRACE("fetch serialize buf", KPHEX(buf + pos -size, size), K(pos), K(size), K(result));
return ret;
}
// verification error injection.
void ObSessInfoVerify::veri_err_injection(sql::ObSQLSessionInfo &sess)
{
int64_t code = 0;
code = OB_E(EventTable::EN_SESS_INFO_VERI_SYS_VAR_ERROR) OB_SUCCESS;
if (code < 0) {
share::ObBasicSysVar *sys_var = NULL;
sys_var = sess.get_sys_var(100);
sys_var->set_value(ObObj(2000000000));
}
code = OB_E(EventTable::EN_SESS_INFO_VERI_APP_INFO_ERROR) OB_SUCCESS;
if (code < 0) {
sess.set_action_name("err_name");
}
code = OB_E(EventTable::EN_SESS_INFO_VERI_APP_CTX_ERROR) OB_SUCCESS;
if (code < 0) {
ObContextsMap &map = sess.get_contexts_map();
for (auto it = map.begin(); it != map.end(); ++it) {
auto it2 = it->second->context_map_->begin();
it2->second->value_ = "err_value";
break;
}
}
code = OB_E(EventTable::EN_SESS_INFO_VERI_CLIENT_ID_ERROR) OB_SUCCESS;
if (code < 0) {
sess.set_client_identifier("err_name");
}
code = OB_E(EventTable::EN_SESS_INFO_VERI_CONTROL_INFO_ERROR) OB_SUCCESS;
if (code < 0) {
FLTControlInfo temp = sess.get_control_info();
temp.level_ = 100;
sess.set_flt_control_info_no_sync(temp);
}
}
// resolve session info verification from proxy.
int ObSessInfoVerify::deserialize_sess_info_veri_id(sql::ObSQLSessionInfo &sess,
SessionInfoVerificationId extra_id,
const int64_t v_len,
const char *buf, const int64_t len, int64_t &pos,
SessionInfoVerifacation &sess_info_verification)
{
int ret = OB_SUCCESS;
switch(extra_id) {
case SESS_INFO_VERI_ADDR: {
char* ptr = NULL;
ObAddr addr;
char ip_buf[MAX_IP_ADDR_LENGTH] = "";
if (OB_FAIL(ObProtoTransUtil::get_str(buf, len, pos, v_len, ptr))) {
OB_LOG(WARN,"failed to resolve veri level", K(ret));
} else if (FALSE_IT(memcpy(ip_buf, ptr, v_len))) {
} else if (FALSE_IT(ip_buf[v_len] = '\0')) {
} else if (OB_FAIL(addr.parse_from_cstring(ip_buf))) {
OB_LOG(WARN,"failed to parse from cstring", K(ret));
} else if (OB_FAIL(sess_info_verification.set_verify_info_addr(addr))) {
OB_LOG(WARN,"failed to set verify info addr", K(ret));
}
break;
}
case SESS_INFO_VERI_SESS_ID: {
int32_t v = 0;
if (OB_FAIL(ObProtoTransUtil::get_int4(buf, len, pos, v_len, v))) {
OB_LOG(WARN,"failed to resolve veri level", K(ret));
} else if (OB_FAIL(sess_info_verification.set_verify_info_sess_id(static_cast<uint32_t>(v)))) {
OB_LOG(WARN,"failed to set verify info session id", K(ret));
}
break;
}
case SESS_INFO_VERI_PROXY_SESS_ID: {
int64_t v = 0;
if (OB_FAIL(ObProtoTransUtil::get_int8(buf, len, pos, v_len, v))) {
OB_LOG(WARN,"failed to resolve veri level", K(ret));
} else if(OB_FAIL(sess_info_verification.set_verify_info_proxy_sess_id(static_cast<uint64_t>(v)))) {
OB_LOG(WARN,"failed to set verify info proxy session id", K(ret));
}
break;
}
default: {
// For compatibility, no error
OB_LOG(WARN,"not support extra info id", K(extra_id));
break;
}
}
return ret;
}
// get rpc port by sql port
int ObSessInfoVerify::sql_port_to_rpc_port(sql::ObSQLSessionInfo &sess,
SessionInfoVerifacation &sess_info_verification)
{
int ret = OB_SUCCESS;
int64_t rpc_port = 0;
bool exist = false;
LOG_TRACE("sql port", K(sess_info_verification.get_verify_info_addr()));
if (OB_FAIL(share::ObAllServerTracer::get_instance().get_server_rpc_port(
sess_info_verification.get_verify_info_addr(),
sess_info_verification.get_verify_info_addr().get_port(), rpc_port, exist))) {
LOG_WARN("fail to get rpc port", K(ret));
} else if (!exist) {
// not find rpc port by local cache, need send inner sql to find.
MTL_SWITCH(OB_SYS_TENANT_ID) {
ObMySQLTransaction trans;
bool with_snap_shot = true;
ObMySQLProxy *mysql_proxy = GCTX.sql_proxy_;
uint64_t proxy_sessid = sess.get_proxy_sessid();
if (OB_ISNULL(mysql_proxy)) {
ret = OB_NOT_INIT;
SERVER_LOG(WARN, "mysql proxy is null", K(ret));
} else if (OB_FAIL(trans.start(mysql_proxy, OB_SYS_TENANT_ID, with_snap_shot))) {
SERVER_LOG(WARN, "failed to start transaction", K(ret));
} else {
int sql_len = 0;
const static int MAX_IP_BUFFER_LEN = 32;
char ip_buf[MAX_IP_BUFFER_LEN];
ip_buf[0] = '\0';
if (sess_info_verification.get_verify_info_addr().ip_to_string(ip_buf,
MAX_IP_BUFFER_LEN)) {
SMART_VAR(char[OB_MAX_SQL_LENGTH], sql) {
const uint64_t exec_tenant_id = OB_SYS_TENANT_ID;
const char *table_name = share::OB_ALL_SERVER_TNAME;
sql_len = snprintf(sql, OB_MAX_SQL_LENGTH,
"SELECT svr_port "
"FROM %s WHERE inner_port = %d and svr_ip = '%s'",
table_name,
sess_info_verification.get_verify_info_addr().get_port(),
ip_buf);
LOG_TRACE("send inner sql to get rpc port", K(sess.get_proxy_sessid()),
K(sess_info_verification.get_verify_info_addr().get_port()));
if (sql_len >= OB_MAX_SQL_LENGTH || sql_len <= 0) {
ret = OB_SIZE_OVERFLOW;
SERVER_LOG(WARN, "failed to format sql. size not enough");
} else {
{ // make sure %res destructed before execute other sql in the same transaction
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
ObISQLClient *sql_client = &trans;
uint64_t table_id = share::OB_ALL_SERVER_TID;
ObSQLClientRetryWeak sql_client_retry_weak(sql_client,
OB_SYS_TENANT_ID,
table_id);
// retrive data from client
if (OB_FAIL(sql_client_retry_weak.read(res, OB_SYS_TENANT_ID, sql))) {
SERVER_LOG(WARN, "failed to read data", K(ret));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "failed to get result", K(ret));
} else {
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
if (OB_FAIL(result->get_int(0l, rpc_port))) {
LOG_WARN("fail to get varchar.", K(ret));
} else {
sess_info_verification.get_verify_info_addr().set_port(rpc_port);
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
}
}
}
LOG_TRACE("get rpc port by inner sql", K(sess_info_verification.get_verify_info_addr()),
K(ret), K(sql), K(rpc_port));
}
}
}
} else {
sess_info_verification.get_verify_info_addr().set_port(rpc_port);
LOG_TRACE("not use inner sql to find rpc port", K(rpc_port));
}
return ret;
}
// use for display sys var error message.
int ObSessInfoVerify::create_tmp_sys_var(sql::ObSQLSessionInfo &sess,
share::ObSysVarClassType sys_var_id, share::ObBasicSysVar *&sys_var,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
share::ObBasicSysVar *sys_var_ptr = NULL;
if (OB_ISNULL(sys_var_ptr)) {
share::ObSysVarFactory::create_sys_var(allocator,
sys_var_id, sys_var_ptr);
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(sys_var_ptr)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ret is OB_SUCCESS, but sys_var_ptr is NULL", K(ret), K(sys_var_id));
} else {
sys_var = sys_var_ptr;
}
}
if (OB_FAIL(ret) && sys_var_ptr != nullptr) {
sys_var_ptr->~ObBasicSysVar();
sys_var_ptr = NULL;
}
return ret;
}
// this function use for get another session id when 1:1 server list test
bool GetAnotherSessID::operator()(ObSQLSessionMgr::Key key,
ObSQLSessionInfo *sess_info)
{
int ret = OB_SUCCESS;
UNUSED(key);
LOG_TRACE("current session info", K(sess_info->get_proxy_sessid()),
K(sess_info->get_sessid()), K(sess_id_), K(proxy_sess_id_));
if (OB_ISNULL(sess_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is NULL", KR(ret));
} else if (sess_info->get_proxy_sessid() == proxy_sess_id_ &&
sess_info->get_sessid() != sess_id_) {
sess_id_ = sess_info->get_sessid();
LOG_TRACE("find another session id", K(sess_id_));
} else {
LOG_INFO("not find another session id", K(sess_id_));
}
return OB_SUCCESS == ret;
}
} // end of namespace sql
} // end of namespace oceanbase