[FEAT MERGE] implement SQL level resource management based on pattern match

This commit is contained in:
obdev
2023-01-04 12:39:02 +00:00
committed by ob-robot
parent cca9f7c2d2
commit 7c991b5da5
58 changed files with 2613 additions and 168 deletions

View File

@ -208,7 +208,13 @@ int ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt)
value_obj))) {
LOG_WARN("fail to process auto increment hook", K(ret));
} else {}
}
} else if (ObSetVar::SET_SCOPE_GLOBAL == node.set_scope_
&& node.variable_name_ == OB_SV_RESOURCE_MANAGER_PLAN) {
if (OB_FAIL(update_resource_mapping_rule_version(*sql_proxy,
session->get_effective_tenant_id()))) {
LOG_WARN("fail to update resource mapping rule version", K(ret));
}
} else {}
if (OB_FAIL(ret)) {
} else if (ObSetVar::SET_SCOPE_SESSION == node.set_scope_) {
@ -1006,6 +1012,48 @@ int ObVariableSetExecutor::process_last_insert_id_hook(ObPhysicalPlanCtx *plan_c
return ret;
}
int ObVariableSetExecutor::update_resource_mapping_rule_version(ObMySQLProxy &sql_proxy,
uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObSqlString sql;
const char *tname = OB_ALL_SYS_STAT_TNAME;
if (OB_FAIL(sql.assign_fmt("REPLACE INTO %s (", tname))) {
STORAGE_LOG(WARN, "append table name failed, ", K(ret));
} else {
ObSqlString values;
SQL_COL_APPEND_VALUE(sql, values, ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), "tenant_id", "%lu");
SQL_COL_APPEND_CSTR_VALUE(sql, values, "", "zone");
SQL_COL_APPEND_CSTR_VALUE(sql, values, "ob_current_resource_mapping_version", "name");
SQL_COL_APPEND_VALUE(sql, values, 5, "data_type", "%d");
// need use microsecond in case insert multiple rules in one second concurrently.
// It means mapping rule is updated but version keeps the same.
SQL_COL_APPEND_VALUE(sql, values, "cast(unix_timestamp(now(6)) * 1000000 as signed)", "value", "%s");
SQL_COL_APPEND_CSTR_VALUE(sql, values, "version of resource mapping rule", "info");
if (OB_SUCC(ret)) {
int64_t affected_rows = 0;
if (OB_FAIL(sql.append_fmt(") VALUES (%.*s)",
static_cast<int32_t>(values.length()),
values.ptr()))) {
LOG_WARN("append sql failed, ", K(ret));
} else if (OB_FAIL(sql_proxy.write(tenant_id,
sql.ptr(),
affected_rows))) {
LOG_WARN("fail to execute sql", K(sql), K(ret));
} else {
if (is_single_row(affected_rows) || is_double_row(affected_rows)) {
// insert or replace
LOG_TRACE("update resource mapping version successfully", K(sql.string()));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected value. expect 1 or 2 row affected", K(affected_rows), K(sql), K(ret));
}
}
}
}
return ret;
}
int ObVariableSetExecutor::switch_to_session_variable(const ObExprCtx &expr_ctx,
const ObObj &value,
ObSessionVariable &sess_var)