// Copyright 2015-2016 Alibaba Inc. All Rights Reserved. // Author: // LuoFan // Normalizer: // LuoFan #define USING_LOG_PREFIX SQL_QRR #include "share/inner_table/ob_inner_table_schema_constants.h" #include "share/schema/ob_schema_utils.h" #include "lib/oblog/ob_log.h" #include "lib/oblog/ob_log_module.h" #include "lib/string/ob_sql_string.h" #include "lib/mysqlclient/ob_mysql_proxy.h" #include "lib/mysqlclient/ob_mysql_connection.h" #include "lib/mysqlclient/ob_mysql_statement.h" #include "lib/mysqlclient/ob_mysql_connection_pool.h" #include "lib/utility/ob_print_utils.h" #include "lib/mysqlclient/ob_mysql_transaction.h" #include "share/ob_dml_sql_splicer.h" #include "share/ob_max_id_fetcher.h" #include "observer/ob_sql_client_decorator.h" #include "sql/resolver/expr/ob_raw_expr_printer.h" #include "sql/udr/ob_udr_sql_service.h" namespace oceanbase { using namespace share; using namespace share::schema; using namespace common::sqlclient; namespace sql { #define UDR_START_TRANS(tenant_id) \ ObMySQLTransaction trans; \ if (OB_FAIL(ret)) { \ } else if (OB_ISNULL(sql_proxy_)) { \ ret = OB_ERR_UNEXPECTED; \ LOG_WARN("get unexpected null", K(ret), K(sql_proxy_)); \ } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id))) { \ LOG_WARN("fail to start transaction", K(ret)); \ } \ #define UDR_END_TRANS \ if (trans.is_started()) { \ const bool is_commit = (OB_SUCCESS == ret); \ int tmp_ret = trans.end(is_commit); \ if (OB_SUCCESS != tmp_ret) { \ LOG_WARN("end trans failed", K(tmp_ret), K(is_commit)); \ ret = (OB_SUCCESS == ret) ? tmp_ret : ret; \ } \ } \ int ObUDRSqlService::init(ObMySQLProxy *proxy) { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("rewrite rule sql service init twice", K(ret)); } else if (OB_ISNULL(proxy)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get null mysql proxy", K(ret)); } else { sql_proxy_ = proxy; inited_ = true; } return ret; } int ObUDRSqlService::gen_insert_rule_dml(const ObUDRInfo &arg, const uint64_t tenant_id, oceanbase::share::ObDMLSqlSplicer &dml) { int ret = OB_SUCCESS; if (OB_FAIL(dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, arg.tenant_id_))) || OB_FAIL(dml.add_pk_column("rule_name", ObHexEscapeSqlStr(arg.rule_name_))) || OB_FAIL(dml.add_column("rule_id", arg.rule_id_)) || OB_FAIL(dml.add_column("pattern", ObHexEscapeSqlStr(arg.pattern_))) || OB_FAIL(dml.add_column("db_name", ObHexEscapeSqlStr(arg.db_name_))) || OB_FAIL(dml.add_column("replacement", ObHexEscapeSqlStr(arg.replacement_))) || OB_FAIL(dml.add_column("normalized_pattern", ObHexEscapeSqlStr(arg.normalized_pattern_))) || OB_FAIL(dml.add_column("status", static_cast(arg.rule_status_))) || OB_FAIL(dml.add_column("version", arg.rule_version_)) || OB_FAIL(dml.add_uint64_column("pattern_digest", arg.pattern_digest_)) || OB_FAIL(dml.add_column("fixed_param_infos", ObHexEscapeSqlStr(arg.fixed_param_infos_str_.empty() ? ObString("") : arg.fixed_param_infos_str_))) || OB_FAIL(dml.add_column("dynamic_param_infos", ObHexEscapeSqlStr(arg.dynamic_param_infos_str_.empty() ? ObString("") : arg.dynamic_param_infos_str_))) || OB_FAIL(dml.add_column("def_name_ctx_str", ObHexEscapeSqlStr(arg.question_mark_ctx_str_.empty() ? ObString("") : arg.question_mark_ctx_str_)))) { LOG_WARN("add column failed", K(ret), K(arg)); } return ret; } int ObUDRSqlService::gen_modify_rule_status_dml(const ObUDRInfo &arg, const uint64_t tenant_id, oceanbase::share::ObDMLSqlSplicer &dml) { int ret = OB_SUCCESS; if (OB_FAIL(dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, arg.tenant_id_))) || OB_FAIL(dml.add_pk_column("rule_name", ObHexEscapeSqlStr(arg.rule_name_))) || OB_FAIL(dml.add_column("version", arg.rule_version_)) || OB_FAIL(dml.add_column("status", static_cast(arg.rule_status_)))) { LOG_WARN("modify column failed", K(ret), K(arg)); } return ret; } int ObUDRSqlService::fetch_new_rule_version(const uint64_t tenant_id, int64_t &new_rule_version) { int ret = OB_SUCCESS; uint64_t rule_version = OB_INVALID_VERSION; ObMaxIdFetcher id_fetcher(*sql_proxy_); if (OB_FAIL(id_fetcher.fetch_new_max_id(tenant_id, OB_MAX_USED_REWRITE_RULE_VERSION_TYPE, rule_version, OB_INIT_REWRITE_RULE_VERSION))) { LOG_WARN("fetch_new_max_id failed", K(ret), "id_type", OB_MAX_USED_REWRITE_RULE_VERSION_TYPE); } else { new_rule_version = rule_version; } return ret; } int ObUDRSqlService::fetch_max_rule_version(const uint64_t tenant_id, int64_t &max_rule_version) { int ret = OB_SUCCESS; uint64_t rule_version = OB_INVALID_VERSION; ObMaxIdFetcher id_fetcher(*sql_proxy_); if (OB_FAIL(id_fetcher.fetch_max_id(*sql_proxy_, tenant_id, OB_MAX_USED_REWRITE_RULE_VERSION_TYPE, rule_version))) { LOG_WARN("failed to fetch max rule version", K(ret), K(tenant_id)); } else { max_rule_version = rule_version; } return ret; } int ObUDRSqlService::fetch_new_rule_id(const uint64_t tenant_id, int64_t &new_rule_id) { int ret = OB_SUCCESS; uint64_t rule_id = OB_INVALID_ID; ObMaxIdFetcher id_fetcher(*sql_proxy_); if (OB_FAIL(id_fetcher.fetch_new_max_id(tenant_id, OB_MAX_USED_OBJECT_ID_TYPE, rule_id))) { LOG_WARN("fetch_new_max_id failed", K(ret), "id_type", OB_MAX_USED_OBJECT_ID_TYPE); } else { new_rule_id = rule_id; } return ret; } int ObUDRSqlService::insert_rule(ObUDRInfo &arg) { int ret = OB_SUCCESS; ObDMLSqlSplicer dml; int64_t affected_rows = 0; int64_t rule_id = OB_INVALID_ID; int64_t rule_version = OB_INVALID_VERSION; UDR_START_TRANS(arg.tenant_id_); LOG_INFO("insert rule", K(arg)); ObDMLExecHelper exec(trans, arg.tenant_id_); if (OB_FAIL(ret)) { } else if (OB_FAIL(fetch_new_rule_version(arg.tenant_id_, rule_version))) { LOG_WARN("failed to fetch new rule version", K(ret)); } else if (FALSE_IT(arg.rule_version_ = rule_version)) { } else if (OB_FAIL(fetch_new_rule_id(arg.tenant_id_, rule_id))) { LOG_WARN("failed to fetch new rule id", K(ret)); } else if (FALSE_IT(arg.rule_id_ = rule_id)) { } else if (OB_FAIL(gen_insert_rule_dml(arg, arg.tenant_id_, dml))) { LOG_WARN("failed to gen rewrite rule dml", K(ret)); } else if (OB_FAIL(exec.exec_insert(OB_ALL_TENANT_REWRITE_RULES_TNAME, dml, affected_rows))) { LOG_WARN("execute insert failed", K(ret)); } else if (!is_single_row(affected_rows)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("affected_rows unexpected to be one", K(affected_rows), K(ret)); } UDR_END_TRANS; return ret; } int ObUDRSqlService::alter_rule_status(ObUDRInfo &arg) { int ret = OB_SUCCESS; UDR_START_TRANS(arg.tenant_id_); ObDMLSqlSplicer dml; int64_t affected_rows = 0; ObDMLExecHelper exec(trans, arg.tenant_id_); int64_t rule_version = OB_INVALID_VERSION; LOG_INFO("alter rule status", K(arg)); if (OB_FAIL(ret)) { } else if (OB_FAIL(fetch_new_rule_version(arg.tenant_id_, rule_version))) { LOG_WARN("failed to fetch new rule version", K(ret)); } else if (FALSE_IT(arg.rule_version_ = rule_version)) { } else if (OB_FAIL(gen_modify_rule_status_dml(arg, arg.tenant_id_, dml))) { LOG_WARN("failed to gen rewrite rule dml", K(ret)); } else if (OB_FAIL(exec.exec_update(OB_ALL_TENANT_REWRITE_RULES_TNAME, dml, affected_rows))) { LOG_WARN("execute insert failed", K(ret)); } else if (!is_single_row(affected_rows) && !is_zero_row(affected_rows)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("affected_rows unexpected", K(affected_rows), K(ret)); } UDR_END_TRANS; return ret; } int ObUDRSqlService::gen_recyclebin_rule_name(const int64_t rule_version, const int64_t buf_len, char *buf, ObString &recyclebin_rule_name) { int ret = OB_SUCCESS; int64_t pos = 0; if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%s", "__recyclebin_"))) { LOG_WARN("append name to buf error", K(ret)); } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%ld", rule_version))) { LOG_WARN("append name to buf error", K(ret)); } else { recyclebin_rule_name.assign_ptr(buf, pos); } return ret; } int ObUDRSqlService::remove_rule(ObUDRInfo &arg) { int ret = OB_SUCCESS; UDR_START_TRANS(arg.tenant_id_); ObSqlString sql; ObSqlString rule_name_sql_str; int64_t affected_rows = 0; int64_t rule_version = OB_INVALID_VERSION; ObString recyclebin_rule_name; char buf[OB_MAX_ORIGINAL_NANE_LENGTH] = {0}; const int64_t buf_len = OB_MAX_ORIGINAL_NANE_LENGTH; if (OB_FAIL(ret)) { } else if (OB_FAIL(fetch_new_rule_version(arg.tenant_id_, rule_version))) { LOG_WARN("failed to fetch new rule version", K(ret)); } else if (OB_FAIL(gen_recyclebin_rule_name(rule_version, buf_len, buf, recyclebin_rule_name))) { LOG_WARN("failed to gen recyclebin rule name", K(ret)); } else if (OB_FAIL(sql_append_hex_escape_str(arg.rule_name_, rule_name_sql_str))) { LOG_WARN("failed to process rule name", K(ret)); } else if (sql.assign_fmt("update %s set gmt_modified = now(), rule_name = '%.*s' ,\ version = %ld, status = %ld \ where tenant_id = %lu and rule_name = %.*s", OB_ALL_TENANT_REWRITE_RULES_TNAME, LEN_AND_PTR(recyclebin_rule_name), rule_version, static_cast(ObUDRInfo::DELETE_STATUS), ObSchemaUtils::get_extract_tenant_id(arg.tenant_id_, arg.tenant_id_), LEN_AND_PTR(rule_name_sql_str.string()))) { LOG_WARN("update from __all_tenant_rewrite_rules table failed.", K(ret)); } else if (OB_FAIL(trans.write(arg.tenant_id_, sql.ptr(), affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql)); } else if (!is_single_row(affected_rows) && !is_zero_row(affected_rows)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("affected_rows unexpected", K(affected_rows), K(ret), K(sql)); } UDR_END_TRANS; return ret; } int ObUDRSqlService::clean_up_items_marked_for_deletion(const uint64_t tenant_id) { int ret = OB_SUCCESS; UDR_START_TRANS(tenant_id); ObSqlString sql; int64_t affected_rows = 0; if (OB_FAIL(ret)) { } else if (sql.assign_fmt("delete FROM %s WHERE status = %ld \ AND DATEDIFF(now(), gmt_modified) >= %ld", OB_ALL_TENANT_REWRITE_RULES_TNAME, static_cast(ObUDRInfo::DELETE_STATUS), DELETE_DATE_INTERVAL_THRESHOLD)) { LOG_WARN("delete from __all_tenant_rewrite_rules table failed.", K(ret)); } else if (OB_FAIL(trans.write(tenant_id, sql.ptr(), affected_rows))) { LOG_WARN("fail to exec sql", K(ret)); } else { LOG_INFO("affected rows", K(affected_rows)); } UDR_END_TRANS; return ret; } int ObUDRSqlService::get_need_sync_rule_infos(ObIAllocator& allocator, const uint64_t tenant_id, const int64_t local_rule_version, ObIArray& rule_infos) { int ret = OB_SUCCESS; ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_); SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *result = NULL; ObSqlString sql; const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id); int64_t affected_rows = 0; const char* sql_fmt = "select rule_id, rule_name, pattern, db_name, replacement, normalized_pattern, " "status, version, pattern_digest, fixed_param_infos, " "dynamic_param_infos, def_name_ctx_str " "from %s where tenant_id = %lu and version > %ld;"; if (OB_ISNULL(sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret)); } else if (OB_FAIL(sql.assign_fmt(sql_fmt, OB_ALL_TENANT_REWRITE_RULES_TNAME, ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), local_rule_version))) { LOG_WARN("failed to assign format sql", K(ret)); } else if (OB_FAIL(sql_client_retry_weak.read(res, exec_tenant_id, sql.ptr()))) { LOG_WARN("execute sql failed", "sql", sql.ptr(), K(ret)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to execute ", "sql", sql.ptr(), K(ret)); } else { while (OB_SUCC(result->next())) { ObUDRInfo rule_info; rule_info.tenant_id_ = tenant_id; EXTRACT_INT_FIELD_MYSQL(*result, "status", rule_info.rule_status_, ObUDRInfo::RuleStatus); EXTRACT_INT_FIELD_MYSQL(*result, "rule_id", rule_info.rule_id_, int64_t); EXTRACT_INT_FIELD_MYSQL(*result, "version", rule_info.rule_version_, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "pattern_digest", rule_info.pattern_digest_, uint64_t); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "rule_name", rule_info.rule_name_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "pattern", rule_info.pattern_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "db_name", rule_info.db_name_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "replacement", rule_info.replacement_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "normalized_pattern", rule_info.normalized_pattern_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "fixed_param_infos", rule_info.fixed_param_infos_str_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "dynamic_param_infos", rule_info.dynamic_param_infos_str_); EXTRACT_VARCHAR_FIELD_MYSQL(*result, "def_name_ctx_str", rule_info.question_mark_ctx_str_); if (OB_SUCC(ret)) { if (OB_FAIL(ob_write_string(allocator, rule_info.rule_name_, rule_info.rule_name_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.pattern_, rule_info.pattern_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.db_name_, rule_info.db_name_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.replacement_, rule_info.replacement_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.normalized_pattern_, rule_info.normalized_pattern_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.fixed_param_infos_str_, rule_info.fixed_param_infos_str_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.dynamic_param_infos_str_, rule_info.dynamic_param_infos_str_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(ob_write_string(allocator, rule_info.question_mark_ctx_str_, rule_info.question_mark_ctx_str_))) { LOG_WARN("failed to write string"); } else if (OB_FAIL(rule_infos.push_back(rule_info))) { LOG_WARN("failed to push back rule info", K(ret)); } } } } if (OB_ITER_END != ret) { LOG_WARN("get next row failed", K(ret)); } else { ret = OB_SUCCESS; } } return ret; } #undef UDR_START_TRANS #undef UDR_END_TRANS } // namespace oceanbase end } // namespace sql end