support max_connections and max_user_connections

This commit is contained in:
st0
2021-08-19 17:11:52 +08:00
committed by wangzelin.wzl
parent 4a1adf11e8
commit 88a8862b77
52 changed files with 22189 additions and 24112 deletions

View File

@ -1033,6 +1033,7 @@ ob_set_subtarget(ob_sql session
session/ob_sql_session_info.cpp
session/ob_sql_session_mgr.cpp
session/ob_sys_params_mgr.cpp
session/ob_user_resource_mgr.cpp
)
ob_server_add_pchs(sql

View File

@ -101,6 +101,9 @@ int ObSetPasswordExecutor::execute(ObExecContext& ctx, ObSetPasswordStmt& stmt)
arg.x509_issuer_ = x509_issuer;
arg.x509_subject_ = x509_subject;
arg.exec_tenant_id_ = tenant_id;
arg.max_connections_per_hour_ = stmt.get_max_connections_per_hour();
arg.max_user_connections_= stmt.get_max_user_connections();
arg.modify_max_connections_ = stmt.get_modify_max_connections();
if (stmt.get_need_enc()) {
if (OB_FAIL(ObCreateUserExecutor::encrypt_passwd(passwd, arg.passwd_, enc_buf, ENC_BUF_LEN))) {
LOG_WARN("Encrypt passwd failed", K(ret));

View File

@ -166,6 +166,8 @@ int ObCreateUserExecutor::execute(ObExecContext& ctx, ObCreateUserStmt& stmt)
user_info.set_user_id(combine_id(tenant_id, OB_EMPTY_USER_ID));
}
user_info.set_profile_id(stmt.get_profile_id());
user_info.set_max_connections(stmt.get_max_connections_per_hour());
user_info.set_max_user_connections(stmt.get_max_user_connections());
if (OB_FAIL(arg.user_infos_.push_back(user_info))) {
LOG_WARN("Add user info to array error", K(ret));
} else {

View File

@ -1695,6 +1695,11 @@ typedef enum ObItemType {
T_RANGE_PARTITION_LIST,
T_LIST_PARTITION_LIST,
T_USER_RESOURCE_OPTIONS,
T_MAX_CONNECTIONS_PER_HOUR,
T_MAX_USER_CONNECTIONS,
T_MAX_QUERIES_PER_HOUR,
T_MAX_UPDATES_PER_HOUR,
T_MAX // Attention: add a new type before T_MAX
} ObItemType;

View File

@ -417,6 +417,7 @@ END_P SET_VAR DELIMITER
%type <node> opt_match_option
%type <ival> match_action
%type <node> opt_reference_option_list reference_option require_specification tls_option_list tls_option
%type <node> opt_resource_option resource_option_list resource_option
%type <ival> reference_action
%type <node> alter_foreign_key_action
%type <node> optimize_stmt
@ -9371,19 +9372,23 @@ opt_desc_column_option:
*
*****************************************************************************/
create_user_stmt:
CREATE USER opt_if_not_exists user_specification_list
CREATE USER opt_if_not_exists user_specification_list opt_resource_option
{
ParseNode *users_node = NULL;
merge_nodes(users_node, result, T_USERS, $4);
malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_USER, 3, $3, users_node, NULL);
ParseNode *res_opt_node = NULL;
merge_nodes(res_opt_node, result, T_USER_RESOURCE_OPTIONS, $5);
malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_USER, 4, $3, users_node, NULL, res_opt_node);
}
| CREATE USER opt_if_not_exists user_specification_list require_specification
| CREATE USER opt_if_not_exists user_specification_list require_specification opt_resource_option
{
ParseNode *users_node = NULL;
merge_nodes(users_node, result, T_USERS, $4);
ParseNode *require_node = NULL;
merge_nodes(require_node, result, T_TLS_OPTIONS, $5);
malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_USER, 3, $3, users_node, require_node);
ParseNode *res_opt_node = NULL;
merge_nodes(res_opt_node, result, T_USER_RESOURCE_OPTIONS, $6);
malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_USER, 4, $3, users_node, require_node, res_opt_node);
}
;
@ -9443,6 +9448,52 @@ REQUIRE NONE
}
;
opt_resource_option:
WITH resource_option_list
{
$$ = $2;
}
|
{
$$ = NULL;
}
;
resource_option_list:
resource_option_list resource_option
{
malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 2, $1, $2);
}
| resource_option
{
$$ = $1;
}
resource_option:
MAX_CONNECTIONS_PER_HOUR INTNUM
{
malloc_terminal_node($$, result->malloc_pool_, T_MAX_CONNECTIONS_PER_HOUR);
$$->value_ = $2->value_;
}
| MAX_USER_CONNECTIONS INTNUM
{
malloc_terminal_node($$, result->malloc_pool_, T_MAX_USER_CONNECTIONS);
$$->value_ = $2->value_;
}
/*
| MAX_QUERIES_PER_HOUR INTNUM
{
malloc_terminal_node($$, result->malloc_pool_, T_MAX_QUERIES_PER_HOUR);
$$->value_ = $2->value_;
}
| MAX_UPDATES_PER_HOUR INTNUM
{
malloc_terminal_node($$, result->malloc_pool_, T_MAX_UPDATES_PER_HOUR);
$$->value_ = $2->value_;
}
*/
;
tls_option_list:
tls_option
{
@ -9572,6 +9623,12 @@ SET PASSWORD opt_for_user COMP_EQ STRING_VALUE
merge_nodes(require_node, result, T_TLS_OPTIONS, $4);
malloc_non_terminal_node($$, result->malloc_pool_, T_SET_PASSWORD, 4, $3, NULL, NULL, require_node);
}
| ALTER USER user_with_host_name WITH resource_option_list
{
ParseNode *res_opt_node = NULL;
merge_nodes(res_opt_node, result, T_USER_RESOURCE_OPTIONS, $5);
malloc_non_terminal_node($$, result->malloc_pool_, T_SET_PASSWORD, 4, $3, NULL, NULL, res_opt_node);
}
;
opt_for_user:

File diff suppressed because it is too large Load Diff

View File

@ -372,6 +372,7 @@ const char* get_type_name(int type)
case T_FUN_SYS_QUARTER : return "T_FUN_SYS_QUARTER";
case T_FUN_SYS_BIT_LENGTH : return "T_FUN_SYS_BIT_LENGTH";
case T_FUN_SYS_PI : return "T_FUN_SYS_PI";
case T_FUN_SYS_EXPORT_SET : return "T_FUN_SYS_EXPORT_SET";
case T_MYSQL_ONLY_SYS_MAX_OP : return "T_MYSQL_ONLY_SYS_MAX_OP";
case T_FUN_SYS_CONNECT_BY_PATH : return "T_FUN_SYS_CONNECT_BY_PATH";
case T_FUN_SYS_SYSTIMESTAMP : return "T_FUN_SYS_SYSTIMESTAMP";
@ -1553,7 +1554,11 @@ const char* get_type_name(int type)
case T_HASH_PARTITION_LIST : return "T_HASH_PARTITION_LIST";
case T_RANGE_PARTITION_LIST : return "T_RANGE_PARTITION_LIST";
case T_LIST_PARTITION_LIST : return "T_LIST_PARTITION_LIST";
case T_FUN_SYS_EXPORT_SET : return "T_FUN_SYS_EXPORT_SET";
case T_USER_RESOURCE_OPTIONS : return "T_USER_RESOURCE_OPTIONS";
case T_MAX_CONNECTIONS_PER_HOUR : return "T_MAX_CONNECTIONS_PER_HOUR";
case T_MAX_USER_CONNECTIONS : return "T_MAX_USER_CONNECTIONS";
case T_MAX_QUERIES_PER_HOUR : return "T_MAX_QUERIES_PER_HOUR";
case T_MAX_UPDATES_PER_HOUR : return "T_MAX_UPDATES_PER_HOUR";
default:return "Unknown";
}
}

View File

@ -33,10 +33,10 @@ int ObCreateUserResolver::resolve(const ParseNode& parse_tree)
int ret = OB_SUCCESS;
ObCreateUserStmt* create_user_stmt = NULL;
if (OB_UNLIKELY(share::is_oracle_mode() && 5 != parse_tree.num_child_) ||
OB_UNLIKELY(share::is_mysql_mode() && 3 != parse_tree.num_child_) ||
OB_UNLIKELY(share::is_mysql_mode() && 4 != parse_tree.num_child_) ||
OB_UNLIKELY(T_CREATE_USER != parse_tree.type_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("expect 3 child in mysql mode and 5 child in oracle mode, create user type",
LOG_WARN("expect 4 child in mysql mode and 5 child in oracle mode, create user type",
"actual_num",
parse_tree.num_child_,
"type",
@ -53,6 +53,7 @@ int ObCreateUserResolver::resolve(const ParseNode& parse_tree)
ParseNode* if_not_exist = const_cast<ParseNode*>(parse_tree.children_[0]);
ParseNode* users = const_cast<ParseNode*>(parse_tree.children_[1]);
ParseNode* require_info = const_cast<ParseNode*>(parse_tree.children_[2]);
ParseNode* resource_options = !share::is_oracle_mode() ? const_cast<ParseNode*>(parse_tree.children_[3]) : NULL;
ParseNode* profile = share::is_oracle_mode() ? const_cast<ParseNode*>(parse_tree.children_[3]) : NULL;
ParseNode* primary_zone = share::is_oracle_mode() ? const_cast<ParseNode*>(parse_tree.children_[4]) : NULL;
ParseNode* ssl_infos = NULL;
@ -248,6 +249,32 @@ int ObCreateUserResolver::resolve(const ParseNode& parse_tree)
K(ret));
}
}
if (OB_SUCC(ret) && NULL != resource_options) {
if (T_USER_RESOURCE_OPTIONS != resource_options->type_
|| OB_ISNULL(resource_options->children_)) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN(
"invalid resource options argument", K(ret), K(resource_options->type_), K(resource_options->children_));
} else {
for (int64_t i = 0; i < resource_options->num_child_; i++) {
ParseNode* res_option = resource_options->children_[i];
if (OB_ISNULL(res_option)) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN("null res option", K(ret), K(i));
} else if (T_MAX_CONNECTIONS_PER_HOUR == res_option->type_) {
uint64_t max_connections_per_hour = static_cast<uint64_t>(res_option->value_);
max_connections_per_hour = max_connections_per_hour > MAX_CONNECTIONS ? MAX_CONNECTIONS
: max_connections_per_hour;
create_user_stmt->set_max_connections_per_hour(max_connections_per_hour);
} else if (T_MAX_USER_CONNECTIONS == res_option->type_) {
uint64_t max_user_connections = static_cast<uint64_t>(res_option->value_);
max_user_connections = max_user_connections > MAX_CONNECTIONS ? MAX_CONNECTIONS
: max_user_connections;
create_user_stmt->set_max_user_connections(max_user_connections);
}
}
}
}
if (OB_SUCC(ret) && ObSchemaChecker::is_ora_priv_check()) {
OZ(schema_checker_->check_ora_ddl_priv(session_info_->get_effective_tenant_id(),

View File

@ -24,6 +24,7 @@ public:
virtual int resolve(const ParseNode& parse_tree);
private:
const static uint64_t MAX_CONNECTIONS = 4294967295;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObCreateUserResolver);
};

View File

@ -27,11 +27,14 @@ ObCreateUserStmt::ObCreateUserStmt(ObIAllocator* name_pool)
tenant_id_(OB_INVALID_ID),
users_(),
if_not_exist_(false),
profile_id_(OB_INVALID_ID)
profile_id_(OB_INVALID_ID),
max_connections_per_hour_(0),
max_user_connections_(0)
{}
ObCreateUserStmt::ObCreateUserStmt()
: ObDDLStmt(NULL, stmt::T_CREATE_USER), tenant_id_(OB_INVALID_ID), users_(), if_not_exist_(false)
: ObDDLStmt(NULL, stmt::T_CREATE_USER), tenant_id_(OB_INVALID_ID), users_(),
if_not_exist_(false), max_connections_per_hour_(0), max_user_connections_(0)
{}
ObCreateUserStmt::~ObCreateUserStmt()

View File

@ -74,6 +74,22 @@ public:
create_user_arg_.primary_zone_ = primary_zone;
return OB_SUCCESS;
}
uint64_t get_max_connections_per_hour()
{
return max_connections_per_hour_;
}
void set_max_connections_per_hour(uint64_t val)
{
max_connections_per_hour_ = val;
}
uint64_t get_max_user_connections()
{
return max_user_connections_;
}
void set_max_user_connections(uint64_t val)
{
max_user_connections_ = val;
}
DECLARE_VIRTUAL_TO_STRING;
private:
@ -86,6 +102,8 @@ private:
bool if_not_exist_;
uint64_t profile_id_; // only used in oracle mode
obrpc::ObCreateUserArg create_user_arg_;
uint64_t max_connections_per_hour_;
uint64_t max_user_connections_;
private:
DISALLOW_COPY_AND_ASSIGN(ObCreateUserStmt);

View File

@ -102,84 +102,35 @@ int ObSetPasswordResolver::resolve(const ParseNode& parse_tree)
ObSSLType ssl_type = ObSSLType::SSL_TYPE_NOT_SPECIFIED;
ObString infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_MAX)] = {};
if ((NULL == node->children_[1]) && (NULL == node->children_[2])) {
// alter user require ssl_info
ParseNode* require_info = const_cast<ParseNode*>(node->children_[3]);
ParseNode* ssl_infos = NULL;
if (OB_ISNULL(require_info) || OB_UNLIKELY(T_TLS_OPTIONS != require_info->type_) ||
OB_UNLIKELY(require_info->num_child_ != 1) || OB_ISNULL(ssl_infos = require_info->children_[0])) {
const ParseNode *child_node = node->children_[3];
if (OB_ISNULL(child_node)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Create user ParseNode error", K(ret), K(ssl_infos->type_));
LOG_WARN("alter user ParseNode error", K(ret));
} else if (T_TLS_OPTIONS == child_node->type_) {
if (OB_FAIL(resolve_require_node(*child_node, user_name, host_name, ssl_type, infos))) {
LOG_WARN("resolve require node failed", K(ret));
}
} else if (T_USER_RESOURCE_OPTIONS == child_node->type_) {
if (OB_FAIL(resolve_resource_option_node(*child_node, user_name, host_name, ssl_type, infos))) {
LOG_WARN("resolve resource option node failed", K(ret));
}
} else {
ssl_type = static_cast<ObSSLType>(
static_cast<int32_t>(ObSSLType::SSL_TYPE_NONE) + (ssl_infos->type_ - T_TLS_NONE));
if (ObSSLType::SSL_TYPE_SPECIFIED == ssl_type) {
ParseNode* specified_ssl_infos = NULL;
if (OB_UNLIKELY(ssl_infos->num_child_ <= 0) || OB_ISNULL(specified_ssl_infos = ssl_infos->children_[0])) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Create user ParseNode error", K(ret), K(ssl_infos->num_child_), KP(specified_ssl_infos));
} else {
bool check_repeat[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_MAX)] = {};
for (int i = 0; i < specified_ssl_infos->num_child_ && OB_SUCC(ret); ++i) {
ParseNode* ssl_info = specified_ssl_infos->children_[i];
if (OB_ISNULL(ssl_info)) {
ret = OB_ERR_PARSE_SQL;
LOG_WARN("The child of parseNode should not be NULL", K(ret), K(i));
} else if (OB_UNLIKELY(ssl_info->num_child_ != 1)) {
ret = OB_ERR_PARSE_SQL;
LOG_WARN("The num_child_is error", K(ret), K(i), K(ssl_info->num_child_));
} else if (OB_UNLIKELY(check_repeat[ssl_info->type_ - T_TLS_CIPHER])) {
ret = OB_ERR_DUP_ARGUMENT;
LOG_WARN("Option used twice in statement", K(ret), K(ssl_info->type_));
LOG_USER_ERROR(OB_ERR_DUP_ARGUMENT,
get_ssl_spec_type_str(static_cast<ObSSLSpecifiedType>(ssl_info->type_ - T_TLS_CIPHER)));
} else {
check_repeat[ssl_info->type_ - T_TLS_CIPHER] = true;
infos[ssl_info->type_ - T_TLS_CIPHER].assign_ptr(
ssl_info->children_[0]->str_value_, ssl_info->children_[0]->str_len_);
}
}
}
}
}
if (OB_SUCC(ret)) {
ObString password;
set_pwd_stmt->set_need_enc(false);
if (OB_FAIL(set_pwd_stmt->set_user_password(user_name, host_name, password))) {
LOG_WARN("Failed to set UserPasswordStmt");
} else if (OB_FAIL(set_pwd_stmt->add_ssl_info(get_ssl_type_string(ssl_type),
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)]))) {
LOG_WARN("Failed to add_ssl_info",
K(ssl_type),
"CIPHER",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
"ISSUER",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
"SUBJECT",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)],
K(ret));
}
ret = OB_INVALID_ARGUMENT;
LOG_WARN("alter user ParseNode error", K(ret), K(child_node->type_));
}
} else if (OB_ISNULL(node->children_[1]) || OB_ISNULL(node->children_[2])) {
ret = OB_ERR_PARSE_SQL;
LOG_WARN("The child 1 or child 2 should not be NULL",
K(ret),
"child 1",
node->children_[1],
"child 2",
node->children_[2]);
K(ret), "child 1", node->children_[1], "child 2", node->children_[2]);
} else {
ObString password(static_cast<int32_t>(node->children_[1]->str_len_), node->children_[1]->str_value_);
ObString password(static_cast<int32_t>(node->children_[1]->str_len_),
node->children_[1]->str_value_);
if (!share::is_oracle_mode() && OB_FAIL(check_password_strength(password, user_name))) {
LOG_WARN("fail to check password strength", K(ret));
} else if (share::is_oracle_mode() &&
OB_FAIL(resolve_oracle_password_strength(user_name, host_name, password))) {
} else if (share::is_oracle_mode() && OB_FAIL(
resolve_oracle_password_strength(user_name, host_name, password))) {
LOG_WARN("fail to check password strength", K(ret));
} else if (0 != password.length()) { // set password
} else if (0 != password.length()) {//set password
bool need_enc = (1 == node->children_[2]->value_) ? true : false;
if (!need_enc && (!is_valid_mysql41_passwd(password))) {
ret = OB_ERR_PASSWORD_FORMAT;
@ -188,37 +139,145 @@ int ObSetPasswordResolver::resolve(const ParseNode& parse_tree)
set_pwd_stmt->set_need_enc(need_enc);
}
} else {
set_pwd_stmt->set_need_enc(false); // clear password
set_pwd_stmt->set_need_enc(false); //clear password
}
if (OB_SUCC(ret)) {
if (OB_FAIL(set_pwd_stmt->set_user_password(user_name, host_name, password))) {
LOG_WARN("Failed to set UserPasswordStmt");
} else if (OB_FAIL(set_pwd_stmt->add_ssl_info(get_ssl_type_string(ssl_type),
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)]))) {
LOG_WARN("Failed to add_ssl_info",
K(ssl_type),
"ISSUER",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
"CIPHER",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
"SUBJECT",
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)],
K(ret));
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)]))) {
LOG_WARN("Failed to add_ssl_info", K(ssl_type),
"ISSUER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
"CIPHER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
"SUBJECT", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)], K(ret));
}
}
}
}
}
if (OB_SUCC(ret) && ObSchemaChecker::is_ora_priv_check() && set_pwd_stmt->get_for_current_user() == false) {
OZ(schema_checker_->check_ora_ddl_priv(session_info_->get_effective_tenant_id(),
session_info_->get_priv_user_id(),
ObString(""),
stmt::T_SET_PASSWORD,
session_info_->get_enable_role_array()),
session_info_->get_effective_tenant_id(),
session_info_->get_user_id());
if (OB_SUCC(ret) && ObSchemaChecker::is_ora_priv_check()
&& set_pwd_stmt->get_for_current_user() == false) {
OZ (schema_checker_->check_ora_ddl_priv(
session_info_->get_effective_tenant_id(),
session_info_->get_priv_user_id(),
ObString(""),
stmt::T_SET_PASSWORD,
session_info_->get_enable_role_array()),
session_info_->get_effective_tenant_id(), session_info_->get_user_id());
}
}
return ret;
}
int ObSetPasswordResolver::resolve_require_node(const ParseNode &require_info,
const ObString &user_name, const ObString &host_name, ObSSLType &ssl_type, ObString *infos)
{
int ret = OB_SUCCESS;
//alter user require ssl_info
ParseNode *ssl_infos = NULL;
ObSetPasswordStmt *set_pwd_stmt = static_cast<ObSetPasswordStmt *>(stmt_);
if (OB_UNLIKELY(T_TLS_OPTIONS != require_info.type_)
|| OB_UNLIKELY(require_info.num_child_ != 1)
|| OB_ISNULL(ssl_infos = require_info.children_[0])
|| OB_ISNULL(set_pwd_stmt)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Create user ParseNode error", K(ret), K(ssl_infos), K(set_pwd_stmt));
} else {
ssl_type = static_cast<ObSSLType>(static_cast<int32_t>(ObSSLType::SSL_TYPE_NONE) + (ssl_infos->type_ - T_TLS_NONE));
if (ObSSLType::SSL_TYPE_SPECIFIED == ssl_type) {
ParseNode *specified_ssl_infos = NULL;
if (OB_UNLIKELY(ssl_infos->num_child_ <= 0)
|| OB_ISNULL(specified_ssl_infos = ssl_infos->children_[0])) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Create user ParseNode error", K(ret), K(ssl_infos->num_child_), KP(specified_ssl_infos));
} else {
bool check_repeat[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_MAX)] = {};
for (int i = 0; i < specified_ssl_infos->num_child_ && OB_SUCC(ret); ++i) {
ParseNode *ssl_info = specified_ssl_infos->children_[i];
if (OB_ISNULL(ssl_info)) {
ret = OB_ERR_PARSE_SQL;
LOG_WARN("The child of parseNode should not be NULL", K(ret), K(i));
} else if (OB_UNLIKELY(ssl_info->num_child_ != 1)) {
ret = OB_ERR_PARSE_SQL;
LOG_WARN("The num_child_is error", K(ret), K(i), K(ssl_info->num_child_));
} else if (OB_UNLIKELY(check_repeat[ssl_info->type_ - T_TLS_CIPHER])) {
ret = OB_ERR_DUP_ARGUMENT;
LOG_WARN("Option used twice in statement", K(ret), K(ssl_info->type_));
LOG_USER_ERROR(OB_ERR_DUP_ARGUMENT, get_ssl_spec_type_str(static_cast<ObSSLSpecifiedType>(ssl_info->type_ - T_TLS_CIPHER)));
} else {
check_repeat[ssl_info->type_ - T_TLS_CIPHER] = true;
infos[ssl_info->type_ - T_TLS_CIPHER].assign_ptr(ssl_info->children_[0]->str_value_, ssl_info->children_[0]->str_len_);
}
}
}
}
}
if (OB_SUCC(ret)) {
ObString password;
set_pwd_stmt->set_need_enc(false);
if (OB_FAIL(set_pwd_stmt->set_user_password(user_name, host_name, password))) {
LOG_WARN("Failed to set UserPasswordStmt");
} else if (OB_FAIL(set_pwd_stmt->add_ssl_info(get_ssl_type_string(ssl_type),
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)]))) {
LOG_WARN("Failed to add_ssl_info", K(ssl_type),
"CIPHER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
"ISSUER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
"SUBJECT", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)], K(ret));
}
}
return ret;
}
int ObSetPasswordResolver::resolve_resource_option_node(const ParseNode &resource_options,
const ObString &user_name, const ObString &host_name, ObSSLType &ssl_type, ObString *infos)
{
int ret = OB_SUCCESS;
ObSetPasswordStmt *set_pwd_stmt = static_cast<ObSetPasswordStmt *>(stmt_);
if (OB_ISNULL(set_pwd_stmt) || T_USER_RESOURCE_OPTIONS != resource_options.type_
|| OB_ISNULL(resource_options.children_)) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN("invalid resource options argument", K(ret), K(set_pwd_stmt),
K(resource_options.type_), K(resource_options.children_));
} else {
for (int64_t i = 0; i < resource_options.num_child_; i++) {
ParseNode *res_option = resource_options.children_[i];
if (OB_ISNULL(res_option)) {
ret = common::OB_INVALID_ARGUMENT;
LOG_WARN("null res option", K(ret), K(i));
} else if (T_MAX_CONNECTIONS_PER_HOUR == res_option->type_) {
uint64_t max_connections_per_hour = static_cast<uint64_t>(res_option->value_);
max_connections_per_hour = max_connections_per_hour > MAX_CONNECTIONS ? MAX_CONNECTIONS
: max_connections_per_hour;
set_pwd_stmt->set_max_connections_per_hour(max_connections_per_hour);
} else if (T_MAX_USER_CONNECTIONS == res_option->type_) {
uint64_t max_user_connections = static_cast<uint64_t>(res_option->value_);
max_user_connections = max_user_connections > MAX_CONNECTIONS ? MAX_CONNECTIONS
: max_user_connections;
set_pwd_stmt->set_max_user_connections(max_user_connections);
}
}
}
if (OB_SUCC(ret)) {
set_pwd_stmt->set_modify_max_connections(true);
ObString password;
set_pwd_stmt->set_need_enc(false);
if (OB_FAIL(set_pwd_stmt->set_user_password(user_name, host_name, password))) {
LOG_WARN("Failed to set UserPasswordStmt");
} else if (OB_FAIL(set_pwd_stmt->add_ssl_info(get_ssl_type_string(ssl_type),
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)]))) {
LOG_WARN("Failed to add_ssl_info", K(ssl_type),
"CIPHER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_CIPHER)],
"ISSUER", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_ISSUER)],
"SUBJECT", infos[static_cast<int32_t>(ObSSLSpecifiedType::SSL_SPEC_TYPE_SUBJECT)], K(ret));
}
}
return ret;

View File

@ -28,10 +28,14 @@ public:
static bool is_valid_mysql41_passwd(const common::ObString& str);
private:
int resolve_oracle_password_strength(
common::ObString& user_name, common::ObString& hostname, common::ObString& password);
int resolve_require_node(const ParseNode &require_info, const common::ObString &user_name,
const common::ObString &host_name, share::schema::ObSSLType &ssl_type, ObString *infos);
int resolve_resource_option_node(const ParseNode &resource_options, const common::ObString &user_name,
const common::ObString &host_name, share::schema::ObSSLType &ssl_type, ObString *infos);
int resolve_oracle_password_strength(common::ObString &user_name,
common::ObString &hostname, common::ObString &password);
private:
const static uint64_t MAX_CONNECTIONS = 4294967295;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObSetPasswordResolver);
};

View File

@ -16,11 +16,13 @@ using namespace oceanbase::common;
using namespace oceanbase::sql;
ObSetPasswordStmt::ObSetPasswordStmt(ObIAllocator* name_pool)
: ObDDLStmt(name_pool, stmt::T_SET_PASSWORD), tenant_id_(false), need_enc_(false), for_current_user_(false)
: ObDDLStmt(name_pool, stmt::T_SET_PASSWORD), tenant_id_(false), need_enc_(false), for_current_user_(false),
modify_max_connections_(false), max_connections_per_hour_(OB_INVALID_ID), max_user_connections_(OB_INVALID_ID)
{}
ObSetPasswordStmt::ObSetPasswordStmt()
: ObDDLStmt(NULL, stmt::T_SET_PASSWORD), tenant_id_(false), need_enc_(false), for_current_user_(false)
: ObDDLStmt(NULL, stmt::T_SET_PASSWORD), tenant_id_(false), need_enc_(false), for_current_user_(false),
modify_max_connections_(false), max_connections_per_hour_(OB_INVALID_ID), max_user_connections_(OB_INVALID_ID)
{}
ObSetPasswordStmt::~ObSetPasswordStmt()

View File

@ -61,6 +61,30 @@ public:
{
return set_password_arg_;
}
void set_modify_max_connections(bool value)
{
modify_max_connections_ = value;
}
bool get_modify_max_connections()
{
return modify_max_connections_;
}
void set_max_connections_per_hour(uint64_t value)
{
max_connections_per_hour_ = value;
}
uint64_t get_max_connections_per_hour()
{
return max_connections_per_hour_;
}
void set_max_user_connections(uint64_t value)
{
max_user_connections_ = value;
}
uint64_t get_max_user_connections()
{
return max_user_connections_;
}
DECLARE_VIRTUAL_TO_STRING;
private:
@ -72,7 +96,9 @@ private:
bool need_enc_;
bool for_current_user_;
obrpc::ObSetPasswdArg set_password_arg_;
bool modify_max_connections_;
uint64_t max_connections_per_hour_;
uint64_t max_user_connections_;
private:
DISALLOW_COPY_AND_ASSIGN(ObSetPasswordStmt);
};

View File

@ -143,7 +143,8 @@ ObSQLSessionInfo::ObSQLSessionInfo()
prelock_(false),
proxy_version_(0),
min_proxy_version_ps_(0),
is_ignore_stmt_(false)
is_ignore_stmt_(false),
got_conn_res_(false)
{}
ObSQLSessionInfo::~ObSQLSessionInfo()
@ -1269,3 +1270,60 @@ int ObSQLSessionInfo::ps_use_stream_result_set(bool& use_stream)
}
return ret;
}
int ObSQLSessionInfo::on_user_connect(schema::ObSessionPrivInfo& priv_info, const ObUserInfo* user_info)
{
int ret = OB_SUCCESS;
ObConnectResourceMgr* conn_res_mgr = GCTX.conn_res_mgr_;
if (get_is_deserialized()) {
// do nothing
} else if (OB_ISNULL(conn_res_mgr) || OB_ISNULL(user_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("connect resource mgr or user info is null", K(ret), K(conn_res_mgr));
} else {
const ObPrivSet& priv = priv_info.user_priv_set_;
const ObString& user_name = priv_info.user_name_;
const uint64_t tenant_id = priv_info.tenant_id_;
const uint64_t user_id = priv_info.user_id_;
uint64_t max_connections_per_hour = user_info->get_max_connections();
uint64_t max_user_connections = user_info->get_max_user_connections();
uint64_t max_tenant_connections = 0;
if (OB_FAIL(get_sys_variable(SYS_VAR_MAX_CONNECTIONS, max_tenant_connections))) {
LOG_WARN("get system variable SYS_VAR_MAX_CONNECTIONS failed", K(ret));
} else if (0 == max_user_connections) {
if (OB_FAIL(get_sys_variable(SYS_VAR_MAX_USER_CONNECTIONS, max_user_connections))) {
LOG_WARN("get system variable SYS_VAR_MAX_USER_CONNECTIONS failed", K(ret));
}
} else {
ObObj val;
val.set_uint64(max_user_connections);
if (OB_FAIL(update_sys_variable(SYS_VAR_MAX_USER_CONNECTIONS, val))) {
LOG_WARN("set system variable SYS_VAR_MAX_USER_CONNECTIONS failed", K(ret), K(val));
}
}
if (OB_SUCC(ret) && OB_FAIL(conn_res_mgr->on_user_connect(tenant_id,
user_id,
priv,
user_name,
max_connections_per_hour,
max_user_connections,
max_tenant_connections,
*this))) {
LOG_WARN("create user connection failed", K(ret));
}
}
return ret;
}
int ObSQLSessionInfo::on_user_disconnect()
{
int ret = OB_SUCCESS;
ObConnectResourceMgr* conn_res_mgr = GCTX.conn_res_mgr_;
if (OB_ISNULL(conn_res_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("connect resource mgr is null", K(ret));
} else if (OB_FAIL(conn_res_mgr->on_user_disconnect(*this))) {
LOG_WARN("user disconnect failed", K(ret));
}
return ret;
}

View File

@ -645,6 +645,17 @@ public:
return is_ignore_stmt_;
}
void set_got_conn_res(bool v)
{
got_conn_res_ = v;
}
bool has_got_conn_res() const
{
return got_conn_res_;
}
int on_user_connect(share::schema::ObSessionPrivInfo& priv_info, const share::schema::ObUserInfo* user_info);
int on_user_disconnect();
private:
int close_all_ps_stmt();
@ -724,6 +735,11 @@ private:
// return different stmt id for same sql if proxy version is higher than min_proxy_version_ps_.
uint64_t min_proxy_version_ps_;
bool is_ignore_stmt_; // for static engine.
// Record whether this session has got connection resource, which means it increased connections count.
// It's used for on_user_disconnect.
// No matter whether apply for resource successfully, a session will call on_user_disconnect when disconnect.
// While only session got connection resource can release connection resource and decrease connections count.
bool got_conn_res_;
};
inline ObIExtraStatusCheck::Guard::Guard(ObSQLSessionInfo& session, ObIExtraStatusCheck& checker)

View File

@ -23,6 +23,7 @@
#include "observer/mysql/obsm_handler.h"
#include "observer/mysql/obsm_struct.h"
#include "observer/ob_server_struct.h"
#include "sql/session/ob_user_resource_mgr.h"
using namespace oceanbase::common;
using namespace oceanbase::sql;
@ -421,6 +422,10 @@ int ObSQLSessionMgr::free_session(const ObFreeSessionCtx& ctx)
ObSQLSessionInfo* sess_info = NULL;
sessinfo_map_.get(Key(version, sessid), sess_info);
if (NULL != sess_info) {
if (sess_info->has_got_conn_res()
&& OB_UNLIKELY(OB_SUCCESS != sess_info->on_user_disconnect())) {
LOG_ERROR("user disconnect failed", K(ret), K(sess_info->get_user_id()));
}
sessinfo_map_.revert(sess_info);
}
if (OB_FAIL(sessinfo_map_.del(Key(version, sessid)))) {

View File

@ -0,0 +1,376 @@
// Copyright 2021 Alibaba Inc. All Rights Reserved.
// Author:
// shanting <dachuan.sdc@antgroup.com>
#define USING_LOG_PREFIX SQL_SESSION
#include "sql/session/ob_user_resource_mgr.h"
#include "lib/objectpool/ob_concurrency_objpool.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/thread/thread_mgr.h"
#include "share/ob_get_compat_mode.h"
#include "ob_sql_session_info.h"
#include "share/ob_thread_mgr.h"
using namespace oceanbase::common;
using namespace oceanbase::sql;
using namespace oceanbase::share::schema;
namespace oceanbase {
namespace sql {
ObConnectResource* ObConnectResAlloc::alloc_value()
{
return op_alloc(ObConnectResource);
}
void ObConnectResAlloc::free_value(ObConnectResource* tz_info)
{
op_free(tz_info);
tz_info = NULL;
}
ObConnectResHashNode* ObConnectResAlloc::alloc_node(ObConnectResource* value)
{
UNUSED(value);
return op_alloc(ObConnectResHashNode);
}
void ObConnectResAlloc::free_node(ObConnectResHashNode* node)
{
if (NULL != node) {
op_free(node);
node = NULL;
}
}
ObConnectResourceMgr::ObConnectResourceMgr()
: inited_(false), user_res_map_(), tenant_res_map_(), schema_service_(nullptr), cleanup_task_(*this)
{}
ObConnectResourceMgr::~ObConnectResourceMgr()
{}
int ObConnectResourceMgr::init(ObMultiVersionSchemaService& schema_service)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_FAIL(user_res_map_.init("UserResCtrl"))) {
LOG_WARN("fail to init user resource map", K(ret));
} else {
schema_service_ = &schema_service;
inited_ = true;
const int64_t delay = 3600000000;
const bool repeat = false;
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::ServerGTimer, cleanup_task_, delay, repeat))) {
LOG_WARN("schedual connect resource mgr failed", K(ret));
}
}
return ret;
}
int ObConnectResourceMgr::apply_for_tenant_conn_resource(
const uint64_t tenant_id, const ObPrivSet& priv, const uint64_t max_tenant_connections)
{
int ret = OB_SUCCESS;
ObConnectResource* tenant_res = NULL;
ObTenantUserKey tenant_key(tenant_id);
bool has_insert = false;
if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
// not exist, alloc and insert
if (OB_ISNULL(tenant_res = op_alloc(ObConnectResource))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate tenant resource failed", K(ret));
} else {
tenant_res->cur_connections_ = 1;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tenant_res_map_.insert_and_get(tenant_key, tenant_res))) {
op_free(tenant_res);
// tenant resouce already exist because of concurrent insert, just get it.
if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
// may happen with very very little probability: insert failed and then tenant is dropped
// and value in the map is deleted by periodly task.
LOG_WARN("tenant not exists", K(ret));
}
} else {
has_insert = true;
tenant_res_map_.revert(tenant_res);
}
} else {
LOG_WARN("get tenant resource failed", K(ret));
}
}
if (OB_SUCC(ret) && !has_insert) {
if (OB_ISNULL(tenant_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant resource is null", K(ret));
} else {
// check and update cur_connections.
ObLatchWGuard wr_guard(tenant_res->rwlock_, ObLatchIds::DEFAULT_MUTEX);
if (tenant_res->cur_connections_ < max_tenant_connections ||
(max_tenant_connections == tenant_res->cur_connections_ && OB_PRIV_HAS_ANY(priv, OB_PRIV_SUPER))) {
// only user with super privilege is permitted to connect when reach max tenant connections.
tenant_res->cur_connections_++;
} else {
ret = OB_ERR_CON_COUNT_ERROR;
;
LOG_WARN("too many connections", K(ret), K(tenant_res->cur_connections_), K(max_tenant_connections));
}
tenant_res_map_.revert(tenant_res);
}
}
return ret;
}
void ObConnectResourceMgr::release_tenant_conn_resource(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObTenantUserKey tenant_key(tenant_id);
ObConnectResource* tenant_res = NULL;
bool has_insert = false;
if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
LOG_ERROR("get tenant res map failed", K(ret));
} else {
ObLatchWGuard wr_guard(tenant_res->rwlock_, ObLatchIds::DEFAULT_MUTEX);
if (OB_UNLIKELY(0 == tenant_res->cur_connections_)) {
LOG_ERROR("tenant current connections is zero when release resource", K(tenant_id));
} else {
tenant_res->cur_connections_--;
}
}
}
// get user resource from hash map, insert if not exist.
int ObConnectResourceMgr::get_or_insert_user_resource(const uint64_t user_id, const uint64_t max_user_connections,
const uint64_t max_connections_per_hour, ObConnectResource*& user_res, bool& has_insert)
{
int ret = OB_SUCCESS;
user_res = NULL;
ObTenantUserKey user_key(user_id);
has_insert = false;
if (OB_FAIL(user_res_map_.get(user_key, user_res))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
// not exist, alloc and insert
if (OB_ISNULL(user_res = op_alloc(ObConnectResource))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate user resource failed", K(ret));
} else {
user_res->cur_connections_ = 0 == max_user_connections ? 0 : 1;
user_res->history_connections_ = 0 == max_connections_per_hour ? 0 : 1;
user_res->start_time_ = 0 == max_connections_per_hour ? 0 : ObTimeUtility::current_time();
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(user_res_map_.insert_and_get(user_key, user_res))) {
// user resouce already exist because of concurrent insert, just get it.
if (OB_FAIL(user_res_map_.get(user_key, user_res))) {
// may happen with very very little probability: insert failed and then user is dropped
// and value in the map is deleted by periodly task.
LOG_WARN("user not exists", K(ret));
}
} else {
has_insert = true;
user_res_map_.revert(user_res);
}
} else {
LOG_WARN("get user resource failed", K(ret));
}
}
return ret;
}
int ObConnectResourceMgr::increase_user_connections_count(const uint64_t max_user_connections,
const uint64_t max_connections_per_hour, const ObString& user_name, ObConnectResource* user_res)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("user resource is null", K(ret));
} else {
const static int64_t usec_per_hour = static_cast<int64_t>(1000000) * 3600;
// check and update cur_connections and connections in one hour.
ObLatchWGuard wr_guard(user_res->rwlock_, ObLatchIds::DEFAULT_MUTEX);
if (0 != max_connections_per_hour) {
int64_t cur_time = ObTimeUtility::current_time();
if (cur_time - user_res->start_time_ > usec_per_hour) {
user_res->start_time_ = cur_time;
user_res->history_connections_ = 0;
} else if (user_res->history_connections_ >= max_connections_per_hour) {
ret = OB_ERR_USER_EXCEED_RESOURCE;
LOG_WARN("user exceed max connections per hour", K(ret), KPC(user_res));
LOG_USER_ERROR(OB_ERR_USER_EXCEED_RESOURCE,
user_name.length(),
user_name.ptr(),
"max_connections_per_hour",
user_res->history_connections_);
}
}
if (OB_SUCC(ret) && 0 != max_user_connections) {
if (user_res->cur_connections_ >= max_user_connections) {
ret = OB_ERR_USER_EXCEED_RESOURCE;
LOG_WARN("user exceed max user connections", K(ret), KPC(user_res));
LOG_USER_ERROR(OB_ERR_USER_EXCEED_RESOURCE,
user_name.length(),
user_name.ptr(),
"max_user_connections",
user_res->cur_connections_);
}
}
if (OB_SUCC(ret)) {
user_res->history_connections_ += 0 == max_connections_per_hour ? 0 : 1;
user_res->cur_connections_ += 0 == max_user_connections ? 0 : 1;
}
}
if (OB_SUCC(ret)) {
user_res_map_.revert(user_res);
}
return ret;
}
// max_connections:max connections per hour.
// max_user_connections: max concurrent connections.
// 0 means no limit.
int ObConnectResourceMgr::on_user_connect(const uint64_t tenant_id, const uint64_t user_id, const ObPrivSet& priv,
const ObString& user_name, const uint64_t max_connections_per_hour, const uint64_t max_user_connections,
const uint64_t max_tenant_connections, ObSQLSessionInfo& session)
{
int ret = OB_SUCCESS;
share::ObWorker::CompatMode compat_mode = share::ObWorker::CompatMode::ORACLE;
if (OB_UNLIKELY(session.has_got_conn_res())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session trying to connect already occupy connection resource",
K(tenant_id),
K(user_id),
K(session.get_sessid()));
} else if (OB_FAIL(share::ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
LOG_WARN("get tenant mode failed", K(ret), K(tenant_id));
} else if (compat_mode != share::ObWorker::CompatMode::MYSQL) {
} else if (OB_FAIL(apply_for_tenant_conn_resource(tenant_id, priv, max_tenant_connections))) {
LOG_WARN("reach teannt max connections", K(ret));
} else if (0 == max_connections_per_hour && 0 == max_user_connections) {
session.set_got_conn_res(true);
} else {
// only increase cur_connections_ if max_user_connections is not zero
// only record history_connections_ if max_connections_per_hour is not zero.
ObConnectResource* user_res = NULL;
bool has_insert = false;
if (OB_FAIL(get_or_insert_user_resource(
user_id, max_user_connections, max_connections_per_hour, user_res, has_insert))) {
LOG_WARN("get or insert user resource failed", K(ret));
} else if (!has_insert) {
// if user resource already exists in the hash map, increase its connections count.
if (OB_FAIL(
increase_user_connections_count(max_user_connections, max_connections_per_hour, user_name, user_res))) {
LOG_WARN("increase user connection count failed", K(ret));
}
}
if (OB_FAIL(ret)) {
release_tenant_conn_resource(tenant_id);
} else {
session.set_got_conn_res(true);
}
}
return ret;
}
// Whether need decrease cur_connections_, that's a question.
// It depends on whether cur_connections_ is increased when create the connection.
// Since max_user_connections is only allowed to be modified globally, which means it remains
// unchanged from connection to disconnection, we can use it decide whether decrease cur_connections_.
int ObConnectResourceMgr::on_user_disconnect(ObSQLSessionInfo& session)
{
int ret = OB_SUCCESS;
share::ObWorker::CompatMode compat_mode;
uint64_t max_user_connections = 0;
uint64_t user_id = session.get_user_id();
uint64_t tenant_id = session.get_login_tenant_id();
if (OB_UNLIKELY(!session.has_got_conn_res())) {
// do nothing
} else if (OB_FAIL(share::ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
LOG_ERROR("get tenant mode failed", K(ret), K(tenant_id));
} else if (compat_mode != share::ObWorker::CompatMode::MYSQL) {
} else if (OB_FAIL(session.get_sys_variable(share::SYS_VAR_MAX_USER_CONNECTIONS, max_user_connections))) {
LOG_ERROR("get system variable SYS_VAR_MAX_USER_CONNECTIONS failed", K(ret));
} else {
release_tenant_conn_resource(tenant_id);
if (0 != max_user_connections) {
ObConnectResource* user_res = NULL;
ObTenantUserKey user_key(user_id);
if (OB_FAIL(user_res_map_.get(user_key, user_res))) {
// maybe already dropped.
ret = OB_SUCCESS;
} else if (OB_ISNULL(user_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("user resource is null", K(ret));
} else {
ObLatchWGuard wr_guard(user_res->rwlock_, ObLatchIds::DEFAULT_MUTEX);
if (OB_UNLIKELY(0 == user_res->cur_connections_)) {
LOG_ERROR("current connections is zero when disconnect", K(user_id));
} else {
user_res->cur_connections_--;
}
user_res_map_.revert(user_res);
}
}
session.set_got_conn_res(false);
}
return ret;
}
bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator()(ObTenantUserKey key, ObConnectResource* conn_res)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(conn_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("user res is NULL", K(ret), K(conn_res));
} else if (is_user_) {
const ObUserInfo* user_info = NULL;
if (OB_FAIL(schema_guard_.get_user_info(key.id_, user_info))) {
LOG_ERROR("get user info failed", K(ret), K(key.id_));
} else if (OB_ISNULL(user_info)) {
conn_res_map_.del(key);
}
} else {
const ObTenantSchema* tenant_schema = NULL;
if (OB_FAIL(schema_guard_.get_tenant_info(key.id_, tenant_schema))) {
LOG_ERROR("get tenant info failed", K(ret), K(key.id_));
} else if (OB_ISNULL(tenant_schema)) {
conn_res_map_.del(key);
}
}
return OB_SUCCESS == ret;
}
// task for cleanup periodly. Remove dropped tenant and user from tenant_res_map_ and user_res_map_.
void ObConnectResourceMgr::ConnResourceCleanUpTask::runTimerTask()
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard sys_schema_guard;
if (OB_ISNULL(conn_res_mgr_.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema service is null", K(ret));
} else if (OB_FAIL(conn_res_mgr_.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, sys_schema_guard))) {
LOG_WARN("get sys tenant schema guard failed", K(ret));
} else {
CleanUpConnResourceFunc user_func(sys_schema_guard, conn_res_mgr_.user_res_map_, true);
CleanUpConnResourceFunc tenant_func(sys_schema_guard, conn_res_mgr_.user_res_map_, false);
if (OB_FAIL(conn_res_mgr_.user_res_map_.for_each(user_func))) {
LOG_WARN("cleanup dropped user failed", K(ret));
} else if (OB_FAIL(conn_res_mgr_.tenant_res_map_.for_each(tenant_func))) {
LOG_WARN("cleanup dropped tenant failed", K(ret));
}
}
const int64_t delay = SLEEP_USECONDS;
const bool repeat = false;
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::ServerGTimer, *this, delay, repeat))) {
LOG_ERROR("schedule connect resource cleanup task failed", K(ret));
}
}
} // namespace sql
} // namespace oceanbase

View File

@ -0,0 +1,145 @@
// Copyright 2021 Alibaba Inc. All Rights Reserved.
// Author:
// shanting <dachuan.sdc@antgroup.com>
#ifndef OCEANBASE_SQL_USER_RESOURCE_MGR_H_
#define OCEANBASE_SQL_USER_RESOURCE_MGR_H_
#include "lib/hash/ob_link_hashmap.h"
#include "share/schema/ob_schema_struct.h"
#include "lib/task/ob_timer.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/schema/ob_multi_version_schema_service.h"
namespace oceanbase {
namespace sql {
class ObSQLSessionInfo;
class ObTenantUserKey {
public:
ObTenantUserKey() : id_(0)
{}
ObTenantUserKey(const int64_t id_) : id_(id_)
{}
uint64_t hash() const
{
return common::murmurhash(&id_, sizeof(id_), 0);
};
int compare(const ObTenantUserKey& r)
{
int cmp = 0;
if (id_ < r.id_) {
cmp = -1;
} else if (id_ == r.id_) {
cmp = 0;
} else {
cmp = 1;
}
return cmp;
}
TO_STRING_KV(K_(id));
public:
uint64_t id_;
};
typedef common::LinkHashNode<ObTenantUserKey> ObConnectResHashNode;
typedef common::LinkHashValue<ObTenantUserKey> ObConnectResHashValue;
class ObConnectResource : public ObConnectResHashValue {
public:
ObConnectResource() : rwlock_(), cur_connections_(0), history_connections_(0), start_time_(0)
{}
ObConnectResource(uint64_t cur_connections, uint64_t history_connections, int64_t cur_time)
: rwlock_(), cur_connections_(cur_connections), history_connections_(history_connections), start_time_(cur_time)
{}
virtual ~ObConnectResource()
{}
TO_STRING_KV(K_(cur_connections), K_(history_connections), K_(start_time));
common::ObLatch rwlock_;
uint64_t cur_connections_;
uint64_t history_connections_;
// From start_time_ to now, count of connections is history_connections_.
// According to MySQL, we don't have to record time of all connections in the previous hour.
// At start_time_ + 1 hour, history_connections_ is reset to zero.
// For example, max_connections_per_hour is 3,
// we create one connection at 1:00, 1:10 and 1:20, then we can't create a connection until 2:00.
// At 2:00, we can create 3 connections, so we only have to record start_time_ = 1:00 and
// number of connections from this time, and don't have to record 1:10 or 1:20.
int64_t start_time_;
// TODO: count of update and query in one hour.
};
class ObConnectResAlloc {
public:
ObConnectResAlloc()
{}
~ObConnectResAlloc()
{}
ObConnectResource* alloc_value();
void free_value(ObConnectResource* conn_res);
ObConnectResHashNode* alloc_node(ObConnectResource* value);
void free_node(ObConnectResHashNode* node);
};
typedef common::ObLinkHashMap<ObTenantUserKey, ObConnectResource, ObConnectResAlloc> ObConnResMap;
class ObConnectResourceMgr {
public:
ObConnectResourceMgr();
virtual ~ObConnectResourceMgr();
int init(share::schema::ObMultiVersionSchemaService& schema_service);
// ask for tenant connection resource.
int apply_for_tenant_conn_resource(
const uint64_t tenant_id, const ObPrivSet& priv, const uint64_t max_tenant_connections);
void release_tenant_conn_resource(const uint64_t tenant_id);
int get_or_insert_user_resource(const uint64_t user_id, const uint64_t max_user_connections,
const uint64_t max_connections_per_hour, ObConnectResource*& user_res, bool& has_insert);
int increase_user_connections_count(const uint64_t max_user_connections, const uint64_t max_connections_per_hour,
const ObString& user_name, ObConnectResource* user_res);
int on_user_connect(const uint64_t tenant_id, const uint64_t user_id, const ObPrivSet& priv,
const ObString& user_name, const uint64_t max_connections_per_hour, const uint64_t max_user_connections,
const uint64_t max_global_connections, ObSQLSessionInfo& session);
int on_user_disconnect(ObSQLSessionInfo& session);
private:
class CleanUpConnResourceFunc {
public:
CleanUpConnResourceFunc(
share::schema::ObSchemaGetterGuard& schema_guard, ObConnResMap& conn_res_map, const bool is_user)
: schema_guard_(schema_guard), conn_res_map_(conn_res_map), is_user_(is_user)
{}
bool operator()(ObTenantUserKey key, ObConnectResource* user_res);
private:
share::schema::ObSchemaGetterGuard& schema_guard_;
ObConnResMap& conn_res_map_;
const bool is_user_;
};
class ConnResourceCleanUpTask : public common::ObTimerTask {
public:
ConnResourceCleanUpTask(ObConnectResourceMgr& conn_res_mgr) : conn_res_mgr_(conn_res_mgr)
{}
int init(ObConnectResourceMgr* tz_mgr);
virtual ~ConnResourceCleanUpTask()
{}
void runTimerTask(void) override;
ObConnectResourceMgr& conn_res_mgr_;
const uint64_t SLEEP_USECONDS = 3600000000; // one hour
};
friend class ConnResourceCleanUpTask;
friend class CleanUpUserResourceFunc;
private:
bool inited_;
ObConnResMap user_res_map_;
ObConnResMap tenant_res_map_;
share::schema::ObMultiVersionSchemaService* schema_service_;
ConnResourceCleanUpTask cleanup_task_;
DISALLOW_COPY_AND_ASSIGN(ObConnectResourceMgr);
};
} // end namespace sql
} // end namespace oceanbase
#endif /* OCEANBASE_SQL_USER_RESOURCE_MGR_H_ */