[FEAT MERGE] [CP] query_interface 422

This commit is contained in:
obdev
2024-04-15 16:23:22 +00:00
committed by ob-robot
parent 6423e587c1
commit dd7737c7ab
28 changed files with 687 additions and 133 deletions

View File

@ -56,6 +56,7 @@ ob_set_subtarget(oblib_rpc obmysql_packet
obmysql/packet/ompk_resheader.cpp
obmysql/packet/ompk_row.cpp
obmysql/packet/ompk_ssl_request.cpp
obmysql/packet/ompk_auth_switch.cpp
)
ob_set_subtarget(oblib_rpc obrpc

View File

@ -36,7 +36,8 @@ enum class ConnectionPhaseEnum
{
CPE_CONNECTED = 0,//server will send handshake pkt
CPE_SSL_CONNECT, //server will do ssl connect
CPE_AUTHED //server will do auth check and send ok pkt
CPE_AUTHED, //server will do auth check and send ok pkt
CPE_AUTH_SWITCH, //server will do auth switch check and send ok pkt
};

View File

@ -68,6 +68,7 @@ int Ob20ProtocolProcessor::do_decode(ObSMConnection& conn, ObICSMemPool& pool, c
const uint32_t sessid = conn.sessid_;
// together with mysql compress header, all treat as packet header
const int64_t header_size = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE;
conn.mysql_pkt_context_.is_auth_switch_ = conn.is_in_auth_switch_phase();
// no need duplicated check 'm' valid, ObMySQLHandler::process() has already checked
if ((end - start) >= header_size) {

View File

@ -34,6 +34,7 @@ int ObMysqlCompressProtocolProcessor::do_decode(ObSMConnection& conn, ObICSMemPo
pkt = NULL;
const uint32_t sessid = conn.sessid_;
const int64_t header_size = OB_MYSQL_COMPRESSED_HEADER_SIZE;
conn.mysql_pkt_context_.is_auth_switch_ = conn.is_in_auth_switch_phase();
// no need duplicated check 'm' valid, ObMySQLHandler::process() has already checked
if ((end - start) >= header_size) {
//1. decode length from net buffer

View File

@ -84,6 +84,7 @@ enum ObMySQLCmd
// COM_LOGIN represents client---->hand shake response && observer---> ok or error
COM_HANDSHAKE,
COM_LOGIN,
COM_AUTH_SWITCH_RESPONSE,
COM_STMT_PREXECUTE = PREXECUTE_CMD,
COM_STMT_SEND_PIECE_DATA,
@ -106,6 +107,7 @@ enum class ObMySQLPacketType
PKT_RESHEAD, // 10 -> result header packet
PKT_PREXEC, // 11 -> prepare execute packet;
PKT_FILENAME, // 12 -> send file name to client(load local infile)
PKT_AUTH_SWITCH,// 13 -> auth switch request packet;
PKT_END // 13 -> end of packet type
};

View File

@ -35,6 +35,7 @@ int ObMysqlProtocolProcessor::do_decode(ObSMConnection& conn, ObICSMemPool& pool
const uint32_t sessid = conn.sessid_;
const int64_t header_size = OB_MYSQL_HEADER_LENGTH;
// no need duplicated check 'm' valid, ObMySQLHandler::process() has already checked
conn.mysql_pkt_context_.is_auth_switch_ = conn.is_in_auth_switch_phase();
if ((end - start) >= header_size) {
// 1. decode length from net buffer
// 2. decode seq from net buffer
@ -469,8 +470,12 @@ int ObMysqlProtocolProcessor::process_one_mysql_packet(
}
if (OB_SUCC(ret)) {
uint8_t cmd = 0;
if (context.is_auth_switch_) {
raw_pkt->set_cmd(ObMySQLCmd::COM_AUTH_SWITCH_RESPONSE);
} else {
ObMySQLUtil::get_uint1(payload, cmd);
raw_pkt->set_cmd(static_cast<ObMySQLCmd>(cmd));
}
raw_pkt->set_content(payload, static_cast<uint32_t>(total_data_len));
// no need set seq again
need_decode_more = false;

View File

@ -70,6 +70,7 @@ public:
next_read_step_ = READ_HEADER;
raw_pkt_.reset();
is_multi_pkt_ = false;
is_auth_switch_ = false;
arena_.reset(); //fast free memory
}
@ -92,7 +93,7 @@ public:
TO_STRING_KV(K_(header_buffered_len), K_(payload_buffered_len), K_(payload_buffered_total_len),
K_(last_pkt_seq), K_(payload_len), K_(curr_pkt_seq), K_(payload_buf_alloc_len),
"next_read_step", get_read_step_str(next_read_step_), K_(raw_pkt),
"used", arena_.used(), "total", arena_.total(), K_(is_multi_pkt));
"used", arena_.used(), "total", arena_.total(), K_(is_multi_pkt), K_(is_auth_switch));
public:
char header_buf_[common::OB_MYSQL_HEADER_LENGTH];
@ -108,6 +109,7 @@ public:
ObMySQLRawPacket raw_pkt_;
bool is_multi_pkt_;
common::ObArenaAllocator arena_;
bool is_auth_switch_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMysqlPktContext);

View File

@ -70,7 +70,7 @@ int ObSqlSockProcessor::decode_sql_packet(ObICSMemPool& mem_pool,
LOG_WARN("sql nio enable ssl for server failed", K(ret));
}
break;
} else if (!conn.is_in_authed_phase()) {
} else if (!conn.is_in_authed_phase() && !conn.is_in_auth_switch_phase()) {
ret_pkt = pkt;
sess.set_last_pkt_sz(consume_sz);
} else if (OB_FAIL(processor->do_splice(conn, mem_pool, (void*&)pkt, need_read_more))) {

View File

@ -51,7 +51,7 @@ int ObVirtualCSProtocolProcessor::easy_process(easy_request_t *r, bool &need_rea
ObSMConnection *conn = reinterpret_cast<ObSMConnection*>(r->ms->c->user_data);
ObCSEasyMemPool pool(r->ms->pool);
need_read_more = true;
if (!conn->is_in_authed_phase()) {
if (!conn->is_in_authed_phase() && !conn->is_in_auth_switch_phase()) {
need_read_more = false;
} else if (OB_FAIL(do_splice(*conn, pool, r->ipacket, need_read_more))) {
LOG_ERROR("fail to splice mysql packet", K(ret));

View File

@ -152,9 +152,15 @@ public:
return type;
}
bool is_support_plugin_auth() const {
return (1 == cap_flags_.cap_flags_.OB_CLIENT_PLUGIN_AUTH);
}
inline bool is_in_connected_phase() { return rpc::ConnectionPhaseEnum::CPE_CONNECTED == connection_phase_; }
inline bool is_in_ssl_connect_phase() { return rpc::ConnectionPhaseEnum::CPE_SSL_CONNECT == connection_phase_; }
inline bool is_in_authed_phase() { return rpc::ConnectionPhaseEnum::CPE_AUTHED == connection_phase_; }
inline bool is_in_auth_switch_phase() { return rpc::ConnectionPhaseEnum::CPE_AUTH_SWITCH == connection_phase_; }
inline void set_auth_switch_phase() { connection_phase_ = rpc::ConnectionPhaseEnum::CPE_AUTH_SWITCH; }
inline void set_ssl_connect_phase() { connection_phase_ = rpc::ConnectionPhaseEnum::CPE_SSL_CONNECT; }
inline void set_auth_phase() { connection_phase_ = rpc::ConnectionPhaseEnum::CPE_AUTHED; }
inline void set_connect_phase() { connection_phase_ = rpc::ConnectionPhaseEnum::CPE_CONNECTED; }

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX RPC_OBMYSQL
#include "rpc/obmysql/packet/ompk_auth_switch.h"
#include "rpc/obmysql/ob_mysql_util.h"
using namespace oceanbase::common;
using namespace oceanbase::obmysql;
OMPKAuthSwitch::OMPKAuthSwitch()
: status_(0xfe),
plugin_name_(0),
scramble_()
{}
int OMPKAuthSwitch::serialize(char *buffer, int64_t len, int64_t &pos) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buffer) || OB_UNLIKELY(len - pos < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(buffer), K(len), K(pos), K(ret));
} else if (OB_UNLIKELY(len - pos < static_cast<int64_t>(get_serialize_size()))) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("invalid argument", K(len), K(pos), "need_size", get_serialize_size());
} else {
if (OB_FAIL(ObMySQLUtil::store_int1(buffer, len, status_, pos))) {
LOG_WARN("store fail", KP(buffer), K(len), K(pos), K(ret));
} else if (OB_FAIL(ObMySQLUtil::store_obstr_zt(buffer, len, plugin_name_, pos))) {
LOG_WARN("store fail", KP(buffer), K(len), K(pos), K(ret));
} else if (OB_FAIL(ObMySQLUtil::store_obstr_nzt(buffer, len, scramble_, pos))) {
LOG_WARN("store fail", KP(buffer), K(len), K(pos), K(ret));
}
}
return ret;
}
int64_t OMPKAuthSwitch::get_serialize_size() const
{
int64_t len = 0;
len += 1; // field_count_
len += plugin_name_.length() + 1;
len += scramble_.length() + 1;
return len;
}
int64_t OMPKAuthSwitch::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
J_OBJ_START();
J_KV("header", hdr_,
K_(status),
K_(plugin_name),
K_(scramble));
J_OBJ_END();
return pos;
}

View File

@ -0,0 +1,55 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OMPK_AUTH_SWITCH_H_
#define _OMPK_AUTH_SWITCH_H_
#include "lib/string/ob_string.h"
#include "rpc/obmysql/ob_mysql_packet.h"
#include "lib/container/ob_se_array.h"
namespace oceanbase
{
namespace obmysql
{
class OMPKAuthSwitch : public ObMySQLPacket
{
public:
OMPKAuthSwitch();
virtual ~OMPKAuthSwitch() {}
// serialize all data into thread buffer not include packet header
// Attention!! before called serialize or get_serialize_size, must set capability
virtual int serialize(char *buffer, const int64_t length, int64_t &pos) const;
virtual int64_t get_serialize_size() const;
// shadow copy
void set_plugin_name(const common::ObString &plugin_name) { plugin_name_ = plugin_name; }
void set_scramble(const common::ObString &scramble) { scramble_ = scramble; }
inline const common::ObString &get_plugin_name() const { return plugin_name_; }
inline const common::ObString &get_scramble() const { return scramble_; };
inline ObMySQLPacketType get_mysql_packet_type() { return ObMySQLPacketType::PKT_AUTH_SWITCH; }
virtual int64_t to_string(char *buf, const int64_t buf_len) const;
private:
DISALLOW_COPY_AND_ASSIGN(OMPKAuthSwitch);
uint8_t status_; // always 0xfe
common::ObString plugin_name_;
common::ObString scramble_;
};
} // end namespace obmysql
} // end namespace oceanbase
#endif /* _OMPK_AUTH_SWITCH_H_ */

View File

@ -94,10 +94,12 @@ ob_set_subtarget(ob_server mysql
mysql/obmp_stmt_send_long_data.cpp
mysql/obmp_stmt_send_piece_data.cpp
mysql/obmp_utils.cpp
mysql/obmp_auth_response.cpp
mysql/obsm_conn_callback.cpp
mysql/obsm_handler.cpp
mysql/obsm_row.cpp
mysql/obsm_utils.cpp
mysql/obmp_set_option.cpp
)
ob_set_subtarget(ob_server net

View File

@ -0,0 +1,119 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/mysql/obmp_auth_response.h"
#include "observer/mysql/obmp_utils.h"
#include "rpc/obmysql/ob_mysql_packet.h"
#include "rpc/obmysql/obsm_struct.h"
#include "rpc/ob_request.h"
namespace oceanbase
{
using namespace common;
using namespace obmysql;
using namespace rpc;
namespace observer
{
int ObMPAuthResponse::process()
{
int ret = common::OB_SUCCESS;
bool need_disconnect = true;
bool need_response_error = true;
ObSMConnection *conn = NULL;
sql::ObSQLSessionInfo *session = NULL;
const ObMySQLRawPacket &mysql_pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
if (OB_FAIL(packet_sender_.alloc_ezbuf())) {
LOG_WARN("failed to alloc easy buf", K(ret));
} else if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_ISNULL(conn = get_conn())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get connection fail", K(conn), K(ret));
} else if (OB_FAIL(get_session(session))) {
LOG_WARN("get session fail", K(ret));
} else if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql session info is null", K(ret));
} else if (FALSE_IT(session->set_txn_free_route(mysql_pkt.txn_free_route()))) {
} else if (OB_FAIL(process_extra_info(*session, mysql_pkt, need_response_error))) {
LOG_WARN("fail get process extra info", K(ret));
} else if (FALSE_IT(session->post_sync_session_info())) {
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
LOG_WARN("update transmisson checksum flag failed", K(ret));
} else if (OB_FAIL(session->set_login_auth_data(auth_data_))) {
LOG_WARN("failed to set login auth data", K(ret));
} else if (OB_FAIL(load_privilege_info_for_change_user(session))) {
OB_LOG(WARN,"load privilige info failed", K(ret),K(session->get_sessid()));
} else {
conn->set_auth_phase();
ObOKPParam ok_param; // use default values
ok_param.is_on_change_user_ = true;
if (OB_FAIL(send_ok_packet(*session, ok_param))) {
LOG_WARN("fail to send ok pakcet in statistic response", K(ok_param), K(ret));
}
}
if (OB_LIKELY(NULL != session)) {
revert_session(session);
}
if (OB_FAIL(ret) && need_response_error) {
send_error_packet(ret, NULL);
}
if (OB_FAIL(ret) && need_disconnect) {
force_disconnect();
LOG_WARN("disconnect connection", KR(ret));
}
return ret;
}
int ObMPAuthResponse::deserialize()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(req_) || OB_UNLIKELY(ObRequest::OB_MYSQL != req_->get_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid request", K(req_));
} else {
ObSQLSessionInfo *session = NULL;
ObMySQLCapabilityFlags capability;
if (OB_FAIL(get_session(session))) {
LOG_WARN("get session fail", K(ret));
} else if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fail to get session info", K(ret), K(session));
} else {
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
session->update_last_active_time();
}
if (NULL != session) {
revert_session(session);
}
if (OB_SUCC(ret)) {
obmysql::ObMySQLRawPacket pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
const char *buf = pkt.get_cdata();
const char *pos = pkt.get_cdata();
// not need skip command byte
const int64_t len = pkt.get_clen();
const char *end = buf + len;
if (OB_LIKELY(pos < end)) {
auth_data_.assign_ptr(pos, len);
pos += auth_data_.length();
}
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -0,0 +1,44 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OBMP_AUTH_RESPONSE_H_
#define _OBMP_AUTH_RESPONSE_H_
#include "observer/mysql/obmp_base.h"
namespace oceanbase
{
namespace observer
{
class ObMPAuthResponse
: public ObMPBase
{
public:
static const obmysql::ObMySQLCmd COM = obmysql::COM_AUTH_SWITCH_RESPONSE;
public:
explicit ObMPAuthResponse(const ObGlobalContext &gctx)
: ObMPBase(gctx),
auth_data_()
{}
int deserialize();
protected:
int process();
common::ObString auth_data_;
}; // end of class ObMPStatistic
} // end of namespace observer
} // end of namespace oceanbase
#endif /* _OBMP_AUTH_RESPONSE_H_ */

View File

@ -690,5 +690,76 @@ int ObMPBase::update_charset_sys_vars(ObSMConnection &conn, ObSQLSessionInfo &se
return ret;
}
int ObMPBase::load_privilege_info_for_change_user(sql::ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
ObSMConnection *conn = NULL;
if (OB_ISNULL(session) || OB_ISNULL(gctx_.schema_service_)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN,"invalid argument", K(session), K(gctx_.schema_service_));
} else if (OB_ISNULL(conn = get_conn())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null conn", K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session->get_effective_tenant_id(), schema_guard))) {
OB_LOG(WARN,"fail get schema guard", K(ret));
} else {
SSL *ssl_st = SQL_REQ_OP.get_sql_ssl_st(req_);
share::schema::ObUserLoginInfo login_info = session->get_login_info();
share::schema::ObSessionPrivInfo session_priv;
// disconnect previous user connection first.
if (OB_FAIL(ret)) {
} else if (OB_FAIL(session->on_user_disconnect())) {
LOG_WARN("user disconnect failed", K(ret));
}
const ObUserInfo *user_info = NULL;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv,
ssl_st, user_info))) {
OB_LOG(WARN, "User access denied", K(login_info), K(ret));
} else if (OB_FAIL(session->on_user_connect(session_priv, user_info))) {
OB_LOG(WARN, "user connect failed", K(ret), K(session_priv));
} else {
uint64_t db_id = OB_INVALID_ID;
const ObSysVariableSchema *sys_variable_schema = NULL;
session->set_user(session_priv.user_name_, session_priv.host_name_, session_priv.user_id_);
session->set_user_priv_set(session_priv.user_priv_set_);
session->set_db_priv_set(session_priv.db_priv_set_);
session->set_enable_role_array(session_priv.enable_role_id_array_);
if (OB_FAIL(session->set_tenant(login_info.tenant_name_, session_priv.tenant_id_))) {
OB_LOG(WARN, "fail to set tenant", "tenant name", login_info.tenant_name_, K(ret));
} else if (OB_FAIL(session->set_real_client_ip_and_port(login_info.client_ip_, session->get_client_addr_port()))) {
LOG_WARN("failed to set_real_client_ip", K(ret));
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(session_priv.tenant_id_, sys_variable_schema))) {
LOG_WARN("get sys variable schema failed", K(ret));
} else if (OB_ISNULL(sys_variable_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sys variable schema is null", K(ret));
} else if (OB_FAIL(session->load_all_sys_vars(*sys_variable_schema, true))) {
LOG_WARN("load system variables failed", K(ret));
} else if (OB_FAIL(session->update_database_variables(&schema_guard))) {
OB_LOG(WARN, "failed to update database variables", K(ret));
} else if (!session->get_database_name().empty() &&
OB_FAIL(schema_guard.get_database_id(session->get_effective_tenant_id(),
session->get_database_name(),
db_id))) {
OB_LOG(WARN, "failed to get database id", K(ret));
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
LOG_WARN("update transmisson checksum flag failed", K(ret));
} else if (OB_FAIL(update_proxy_sys_vars(*session))) {
LOG_WARN("update_proxy_sys_vars failed", K(ret));
} else if (OB_FAIL(update_charset_sys_vars(*conn, *session))) {
LOG_WARN("fail to update charset sys vars", K(ret));
} else {
session->set_database_id(db_id);
session->reset_user_var();
}
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -133,6 +133,7 @@ protected:
int process_extra_info(sql::ObSQLSessionInfo &session, const obmysql::ObMySQLRawPacket &pkt,
bool &need_response_error);
int process_kill_client_session(sql::ObSQLSessionInfo &session, bool is_connect = false);
int load_privilege_info_for_change_user(sql::ObSQLSessionInfo *session);
protected:
static const int64_t MAX_TRY_STEPS = 5;
static int64_t TRY_EZ_BUF_SIZES[MAX_TRY_STEPS];

View File

@ -28,6 +28,7 @@
#include "sql/parser/ob_parser.h"
#include "sql/parser/ob_parser_utils.h"
#include "rpc/obmysql/obsm_struct.h"
#include "rpc/obmysql/packet/ompk_auth_switch.h"
using namespace oceanbase::common;
@ -38,6 +39,7 @@ namespace oceanbase
{
namespace observer
{
const char *AUTH_PLUGIN_MYSQL_NATIVE_PASSWORD = "mysql_native_password";
int ObMPChangeUser::deserialize()
{
int ret = OB_SUCCESS;
@ -224,7 +226,12 @@ int ObMPChangeUser::process()
OB_LOG(WARN, "fail to rollback trans for change user", K(ret), K(session));
} else {
session->clean_status();
if (OB_FAIL(load_privilege_info(session))) {
if (OB_FAIL(load_login_info(session))) {
OB_LOG(WARN,"load log info failed", K(ret),K(session->get_sessid()));
} else if (get_conn()->is_support_plugin_auth()
&& get_conn()->client_type_ == common::OB_CLIENT_NON_STANDARD) {
// do nothing
} else if (OB_FAIL(load_privilege_info_for_change_user(session))) {
OB_LOG(WARN,"load privilige info failed", K(ret),K(session->get_sessid()));
} else {
if (is_proxy_mod) {
@ -247,11 +254,36 @@ int ObMPChangeUser::process()
//send packet to client
if (OB_SUCC(ret)) {
/*
In order to be compatible with the behavior of mysql change user,
an AuthSwitchRequest request will be sent every time to the external client.
If we're dealing with an older client we can't just send a change plugin
packet to re-initiate the authentication handshake, because the client
won't understand it. The good thing is that we don't need to : the old
client expects us to just check the user credentials here, which we can do
by just reading the cached data that are placed there by change user's
passwd field.
* */
if (get_conn()->is_support_plugin_auth()
&& get_conn()->client_type_ == common::OB_CLIENT_NON_STANDARD) {
// send auth switch request
OMPKAuthSwitch auth_switch;
auth_switch.set_plugin_name(ObString(AUTH_PLUGIN_MYSQL_NATIVE_PASSWORD));
auth_switch.set_scramble(ObString(sizeof(get_conn()->scramble_buf_), get_conn()->scramble_buf_));
if (OB_FAIL(packet_sender_.response_packet(auth_switch, session))) {
RPC_LOG(WARN, "failed to send error packet", K(auth_switch), K(ret));
disconnect();
} else {
get_conn()->set_auth_switch_phase();
}
} else {
ObOKPParam ok_param;
ok_param.is_on_change_user_ = true;
if (OB_FAIL(send_ok_packet(*session, ok_param))) {
OB_LOG(WARN, "response ok packet fail", K(ret));
}
}
} else if (need_response_error) {
if (OB_FAIL(send_error_packet(ret, NULL))) {
OB_LOG(WARN,"response fail packet fail", K(ret));
@ -273,109 +305,6 @@ int ObMPChangeUser::process()
}
return ret;
}
int ObMPChangeUser::load_privilege_info(ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
ObSMConnection *conn = NULL;
if (OB_ISNULL(session) || OB_ISNULL(gctx_.schema_service_)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN,"invalid argument", K(session), K(gctx_.schema_service_));
} else if (OB_ISNULL(conn = get_conn())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null conn", K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session->get_effective_tenant_id(), schema_guard))) {
OB_LOG(WARN,"fail get schema guard", K(ret));
} else {
share::schema::ObUserLoginInfo login_info;
const char *sep_pos = username_.find('@');
if (NULL != sep_pos) {
ObString username(sep_pos - username_.ptr(), username_.ptr());
login_info.user_name_ = username;
login_info.tenant_name_ = username_.after(sep_pos);
if (login_info.tenant_name_ != session->get_tenant_name()) {
ret = OB_OP_NOT_ALLOW;
OB_LOG(WARN, "failed to change user in different tenant", K(ret),
K(login_info.tenant_name_), K(session->get_tenant_name()));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "forbid! change user command in differernt tenant");
}
} else {
login_info.user_name_ = username_;
}
if (OB_SUCC(ret)) {
if (login_info.tenant_name_.empty()) {
login_info.tenant_name_ = session->get_tenant_name();
}
if (!database_.empty()) {
login_info.db_ = database_;
}
login_info.client_ip_ = session->get_client_ip();
OB_LOG(INFO, "com change user", "username", login_info.user_name_,
"tenant name", login_info.tenant_name_);
login_info.scramble_str_.assign_ptr(conn->scramble_buf_, sizeof(conn->scramble_buf_));
login_info.passwd_ = auth_response_;
}
SSL *ssl_st = SQL_REQ_OP.get_sql_ssl_st(req_);
share::schema::ObSessionPrivInfo session_priv;
// disconnect previous user connection first.
if (OB_FAIL(ret)) {
} else if (OB_FAIL(session->on_user_disconnect())) {
LOG_WARN("user disconnect failed", K(ret));
}
const ObUserInfo *user_info = NULL;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv,
ssl_st, user_info))) {
OB_LOG(WARN, "User access denied", K(login_info), K(ret));
} else if (OB_FAIL(session->on_user_connect(session_priv, user_info))) {
OB_LOG(WARN, "user connect failed", K(ret), K(session_priv));
} else {
uint64_t db_id = OB_INVALID_ID;
const ObSysVariableSchema *sys_variable_schema = NULL;
session->set_user(session_priv.user_name_, session_priv.host_name_, session_priv.user_id_);
session->set_user_priv_set(session_priv.user_priv_set_);
session->set_db_priv_set(session_priv.db_priv_set_);
session->set_enable_role_array(session_priv.enable_role_id_array_);
if (OB_FAIL(session->set_tenant(login_info.tenant_name_, session_priv.tenant_id_))) {
OB_LOG(WARN, "fail to set tenant", "tenant name", login_info.tenant_name_, K(ret));
} else if (OB_FAIL(session->set_default_database(database_))) {
OB_LOG(WARN, "failed to set default database", K(ret), K(database_));
} else if (OB_FAIL(session->set_real_client_ip_and_port(login_info.client_ip_, session->get_client_addr_port()))) {
LOG_WARN("failed to set_real_client_ip_and_port", K(ret));
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(session_priv.tenant_id_, sys_variable_schema))) {
LOG_WARN("get sys variable schema failed", K(ret));
} else if (OB_ISNULL(sys_variable_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sys variable schema is null", K(ret));
} else if (OB_FAIL(session->load_all_sys_vars(*sys_variable_schema, true))) {
LOG_WARN("load system variables failed", K(ret));
} else if (OB_FAIL(session->update_database_variables(&schema_guard))) {
OB_LOG(WARN, "failed to update database variables", K(ret));
} else if (!database_.empty() && OB_FAIL(schema_guard.get_database_id(session->get_effective_tenant_id(),
session->get_database_name(),
db_id))) {
OB_LOG(WARN, "failed to get database id", K(ret));
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
LOG_WARN("update transmisson checksum flag failed", K(ret));
} else if (OB_FAIL(update_proxy_sys_vars(*session))) {
LOG_WARN("update_proxy_sys_vars failed", K(ret));
} else if (OB_FAIL(update_charset_sys_vars(*conn, *session))) {
LOG_WARN("fail to update charset sys vars", K(ret));
} else {
session->set_database_id(db_id);
session->reset_user_var();
}
}
}
return ret;
}
// Attention:in order to get the real type of each user var,
// we should build a standard sql 'SET @var1 = val1,@var2 = val2,......;',
// and then parse the sql
@ -501,5 +430,45 @@ int ObMPChangeUser::handle_user_var(const ObString &var, const ObString &val,
return ret;
}
int ObMPChangeUser::load_login_info(ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
share::schema::ObUserLoginInfo login_info;
const char *sep_pos = username_.find('@');
if (NULL != sep_pos) {
ObString username(sep_pos - username_.ptr(), username_.ptr());
login_info.user_name_ = username;
login_info.tenant_name_ = username_.after(sep_pos);
if (login_info.tenant_name_ != session->get_tenant_name()) {
ret = OB_OP_NOT_ALLOW;
OB_LOG(WARN, "failed to change user in different tenant", K(ret),
K(login_info.tenant_name_), K(session->get_tenant_name()));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "forbid! change user command in differernt tenant");
}
} else {
login_info.user_name_ = username_;
}
if (OB_SUCC(ret)) {
if (login_info.tenant_name_.empty()) {
login_info.tenant_name_ = session->get_tenant_name();
}
if (!database_.empty()) {
login_info.db_ = database_;
}
login_info.client_ip_ = session->get_client_ip();
OB_LOG(INFO, "com change user", "username", login_info.user_name_,
"tenant name", login_info.tenant_name_);
const ObSMConnection &conn = *get_conn();
login_info.scramble_str_.assign_ptr(conn.scramble_buf_, sizeof(conn.scramble_buf_));
login_info.passwd_ = auth_response_;
if (OB_FAIL(session->set_login_info(login_info))) {
LOG_WARN("failed to set login_info", K(ret));
} else if (OB_FAIL(session->set_default_database(database_))) {
OB_LOG(WARN, "failed to set default database", K(ret), K(database_));
}
}
return ret;
}
} //namespace observer
} //namespace oceanbase

View File

@ -47,7 +47,7 @@ public:
protected:
int process();
int deserialize();
int load_privilege_info(sql::ObSQLSessionInfo *session);
int load_login_info(sql::ObSQLSessionInfo *session);
private:
static int decode_string_kv(const char* attrs_end, const char *&pos, obmysql::ObStringKV &kv);

View File

@ -0,0 +1,131 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/mysql/obmp_set_option.h"
#include "observer/mysql/obmp_utils.h"
#include "observer/mysql/obmp_base.h"
#include "rpc/obmysql/ob_mysql_packet.h"
#include "rpc/obmysql/obsm_struct.h"
#include "sql/ob_sql_utils.h"
#include "rpc/obmysql/packet/ompk_eof.h"
#include "rpc/obmysql/ob_mysql_util.h"
using namespace oceanbase::common;
using namespace oceanbase::obmysql;
using namespace oceanbase::rpc;
namespace oceanbase
{
namespace observer
{
int ObMPSetOption::deserialize()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(req_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid packet", K(ret), K_(req));
} else if (OB_UNLIKELY(req_->get_type() != ObRequest::OB_MYSQL)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid packet", K(ret), K_(req), K(req_->get_type()));
} else {
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
char *buf = const_cast<char *>(pkt.get_cdata());
ObMySQLUtil::get_uint2(buf, set_opt_);
}
return ret;
}
int ObMPSetOption::process()
{
LOG_TRACE("set option", K_(set_opt));
int ret = common::OB_SUCCESS;
bool need_disconnect = true;
ObSQLSessionInfo *session = NULL;
bool need_response_error = true;
ObSMConnection *conn = NULL;
const ObMySQLRawPacket &mysql_pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
if (OB_FAIL(packet_sender_.alloc_ezbuf())) {
LOG_WARN("failed to alloc easy buf", K(ret));
} else if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(get_session(session))) {
LOG_WARN("get session fail", K(ret));
} else if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null pointer");
} else if (OB_ISNULL(conn = get_conn())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get connection fail", K(conn), K(ret));
} else {
session->set_txn_free_route(mysql_pkt.txn_free_route());
if (OB_FAIL(process_extra_info(*session, mysql_pkt, need_response_error))) {
LOG_WARN("fail get process extra info", K(ret));
} else {
session->post_sync_session_info();
}
}
if (OB_SUCC(ret)) {
bool is_changed = false;
obmysql::ObMySQLCapabilityFlags flag = session->get_capability();
if (1 == flag.cap_flags_.OB_CLIENT_MULTI_STATEMENTS
&& set_opt_ == MysqlSetOptEnum::MYSQL_OPTION_MULTI_STATEMENTS_OFF) {
flag.cap_flags_.OB_CLIENT_MULTI_STATEMENTS = 0;
is_changed = true;
} else if (0 == flag.cap_flags_.OB_CLIENT_MULTI_STATEMENTS
&& set_opt_ == MysqlSetOptEnum::MYSQL_OPTION_MULTI_STATEMENTS_ON) {
flag.cap_flags_.OB_CLIENT_MULTI_STATEMENTS = 1;
is_changed = true;
} else {
// do nothing
}
if (!is_changed) {
// do nothing
} else {
session->set_capability(flag);
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_LIKELY(NULL != session)) {
ObOKPParam ok_param; // use default values
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(send_ok_packet(*session, ok_param))) {
LOG_WARN("fail to send ok pakcet in statistic response", K(ok_param), K(ret));
} else if (OB_FAIL(revert_session(session))) {
LOG_ERROR("failed to revert session", K(ret));
} else {
// do nothing
}
}
if (OB_FAIL(ret)) {
if (need_disconnect && is_conn_valid()) {
force_disconnect();
LOG_WARN("disconnect connection when process query", K(ret));
} else if (OB_FAIL(send_error_packet(ret, NULL))) { // 覆盖ret, 无需继续抛出
LOG_WARN("failed to send error packet", K(ret));
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -0,0 +1,51 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OBMP_SET_OPTION_H_
#define _OBMP_SET_OPTION_H_
#include "observer/mysql/obmp_base.h"
namespace oceanbase
{
enum MysqlSetOptEnum {
MYSQL_OPTION_INVALID = -1,
MYSQL_OPTION_MULTI_STATEMENTS_ON = 0,
MYSQL_OPTION_MULTI_STATEMENTS_OFF = 1
};
namespace observer
{
class ObMPSetOption
: public ObMPBase
{
public:
static const obmysql::ObMySQLCmd COM = obmysql::COM_SET_OPTION;
public:
explicit ObMPSetOption(const ObGlobalContext &gctx)
: ObMPBase(gctx),
set_opt_(MysqlSetOptEnum::MYSQL_OPTION_INVALID)
{}
virtual ~ObMPSetOption() {}
int deserialize();
protected:
int process();
uint16_t set_opt_;
}; // end of class ObMPStatistic
} // end of namespace observer
} // end of namespace oceanbase
#endif /* _OBMP_SET_OPTION_H_ */

View File

@ -55,6 +55,8 @@
#include "observer/mysql/obmp_stmt_send_long_data.h"
#include "observer/mysql/obmp_stmt_reset.h"
#include "observer/mysql/obmp_reset_connection.h"
#include "observer/mysql/obmp_auth_response.h"
#include "observer/mysql/obmp_set_option.h"
#include "observer/table/ob_table_rpc_processor.h"
#include "observer/table/ob_table_execute_processor.h"
@ -226,6 +228,8 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
MYSQL_PROCESSOR(ObMPStmtGetPieceData, gctx_);
MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
MYSQL_PROCESSOR(ObMPResetConnection, gctx_);
MYSQL_PROCESSOR(ObMPAuthResponse, gctx_);
MYSQL_PROCESSOR(ObMPSetOption, gctx_);
// ps stmt close request may not response packet.
// Howerver, in get processor phase, it may report
// error due to lack of memory and this response error packet.

View File

@ -108,19 +108,12 @@ int ObOutlineExecutor::generate_outline_info1(ObExecContext &ctx,
has_questionmark_in_outline_sql))) {
LOG_WARN("fail to get outline key", "outline_sql", outline_sql, K(ret));
} else if (FALSE_IT(max_concurrent = query_hint->get_global_hint().max_concurrent_)) {
} else if (OB_UNLIKELY(has_questionmark_in_outline_sql && max_concurrent < 0)) {
} else if (OB_UNLIKELY(has_questionmark_in_outline_sql && query_hint->has_hint_exclude_concurrent())) {
ret = OB_INVALID_OUTLINE;
LOG_USER_ERROR(OB_INVALID_OUTLINE, "sql text should have no ? when there is no concurrent limit");
LOG_WARN("outline should have no ? when there is no concurrent limit",
K(outline_sql), K(ret));
} else if (OB_UNLIKELY(max_concurrent > ObGlobalHint::UNSET_MAX_CONCURRENT
&& query_hint->has_hint_exclude_concurrent())) {
ret = OB_INVALID_OUTLINE;
LOG_USER_ERROR(OB_INVALID_OUTLINE, "outline and sql concurrent limit can not be mixed");
LOG_WARN("outline and sql concurrent limit can not be mixed",
"outline_sql_text", outline_info.get_sql_text_str(), K(ret));
} else if (ObGlobalHint::UNSET_MAX_CONCURRENT == max_concurrent
&& OB_FAIL(get_outline(ctx, outline_stmt, outline))) {
} else if (OB_FAIL(get_outline(ctx, outline_stmt, outline))) {
LOG_WARN("fail to get outline", K(ret));
} else {
//to check whether ok

View File

@ -73,14 +73,7 @@ int ObCreateOutlineResolver::resolve_hint(const ParseNode *node, ObCreateOutline
continue;
}
if (hint_node->type_ == T_MAX_CONCURRENT) {
if (node->num_child_ > 1) {
ret = OB_INVALID_OUTLINE;
LOG_USER_ERROR(OB_INVALID_OUTLINE, "outline and sql concurrent limit can not be mixed");
LOG_WARN("outline and sql concurrent limit can not be mixed");
} else if (1 != hint_node->num_child_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("max concurrent node should have 1 child", K(ret));
} else if (OB_ISNULL(hint_node->children_[0])) {
if (OB_ISNULL(hint_node->children_[0])) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("child of max concurrent node should not be NULL", K(ret));
} else if (hint_node->children_[0]->value_ >= 0) {

View File

@ -329,8 +329,14 @@ bool ObGlobalHint::has_hint_exclude_concurrent() const
|| ObParamOption::NOT_SPECIFIED != param_option_
|| !alloc_op_hints_.empty()
|| !dops_.empty()
|| false != disable_transform_
|| false != disable_cost_based_transform_
|| false != enable_append_
|| !opt_params_.empty()
|| !ob_ddl_schema_versions_.empty()
|| has_gather_opt_stat_hint()
|| false != has_dbms_stats_hint_
|| -1 != dynamic_sampling_
|| flashback_read_tx_uncommitted_;
}

View File

@ -14,6 +14,7 @@
#define OCEANBASE_SQL_OB_SESS_INFO_VERI_H_
#include "share/ob_define.h"
#include "share/system_variable/ob_sys_var_class_type.h"
#include "lib/string/ob_string.h"
#include "lib/atomic/ob_atomic.h"
#include "rpc/obrpc/ob_rpc_proxy.h"
@ -32,6 +33,10 @@ namespace obrpc
{
class ObSrvRpcProxy;
}
namespace share {
class ObBasicSysVar;
enum ObSysVarClassType;
}
namespace sql
{

View File

@ -2919,7 +2919,7 @@ int ObSQLSessionInfo::ps_use_stream_result_set(bool &use_stream) {
return ret;
}
ObPieceCache* ObSQLSessionInfo::get_piece_cache(bool need_init) {
::oceanbase::observer::ObPieceCache* ObSQLSessionInfo::get_piece_cache(bool need_init) {
if (NULL == piece_cache_ && need_init) {
void *buf = get_session_allocator().alloc(sizeof(ObPieceCache));
if (NULL != buf) {
@ -2936,6 +2936,23 @@ ObPieceCache* ObSQLSessionInfo::get_piece_cache(bool need_init) {
return piece_cache_;
}
int ObSQLSessionInfo::set_login_info(const share::schema::ObUserLoginInfo &login_info)
{
int ret = OB_SUCCESS;
OZ (ob_write_string(get_session_allocator(), login_info.tenant_name_, login_info_.tenant_name_));
OZ (ob_write_string(get_session_allocator(), login_info.user_name_, login_info_.user_name_));
OZ (ob_write_string(get_session_allocator(), login_info.client_ip_, login_info_.client_ip_));
OZ (ob_write_string(get_session_allocator(), login_info.passwd_, login_info_.passwd_));
OZ (ob_write_string(get_session_allocator(), login_info.db_, login_info_.db_));
OZ (ob_write_string(get_session_allocator(), login_info.scramble_str_, login_info_.scramble_str_));
return ret;
}
int ObSQLSessionInfo::set_login_auth_data(const ObString &auth_data) {
int ret = OB_SUCCESS;
OZ (ob_write_string(get_session_allocator(), auth_data, login_info_.passwd_));
return ret;
}

View File

@ -1333,6 +1333,9 @@ public:
// piece
observer::ObPieceCache *get_piece_cache(bool need_init = false);
share::schema::ObUserLoginInfo get_login_info () { return login_info_; }
int set_login_info(const share::schema::ObUserLoginInfo &login_info);
int set_login_auth_data(const ObString &auth_data);
void set_load_data_exec_session(bool v) { is_load_data_exec_session_ = v; }
bool is_load_data_exec_session() const { return is_load_data_exec_session_; }
inline ObSqlString &get_pl_exact_err_msg() { return pl_exact_err_msg_; }
@ -1596,6 +1599,7 @@ private:
common::ObSEArray<ObSequenceSchema*, 2> dblink_sequence_schemas_;
bool client_non_standard_;
bool is_session_sync_support_; // session_sync_support flag.
share::schema::ObUserLoginInfo login_info_;
};
inline bool ObSQLSessionInfo::is_terminate(int &ret) const