patch [FEAT MERGE] support olap async job

Co-authored-by: lmjhh <576788582@qq.com>
This commit is contained in:
obdev 2024-08-27 11:43:42 +00:00 committed by ob-robot
parent b81b5f9d1f
commit ad224ebdb6
35 changed files with 1333 additions and 73 deletions

View File

@ -24,6 +24,7 @@
#include "observer/ob_inner_sql_connection_pool.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/ob_sql.h"
#include "sql/executor/ob_executor_rpc_processor.h"
namespace oceanbase
{
@ -65,6 +66,7 @@ int ObDBMSSchedJobExecutor::init_session(
ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
ObPrivSet db_priv_set = OB_PRIV_SET_EMPTY;
ObArenaAllocator *allocator = NULL;
const bool print_info_log = true;
const bool is_sys_tenant = true;
@ -97,19 +99,34 @@ int ObDBMSSchedJobExecutor::init_session(
OX (session.set_database_id(database_id));
OZ (session.set_user(
user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
OX (session.set_priv_user_id(user_info->get_user_id()));
OX (session.set_user_priv_set(user_info->get_priv_set()));
OZ (schema_guard.get_db_priv_set(tenant_id, user_info->get_user_id(), database_name, db_priv_set));
OX (session.set_db_priv_set(db_priv_set));
OX (session.set_shadow(true));
if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
// set larger timeout for mview scheduler jobs
const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
ObObj query_timeout_obj;
ObObj trx_timeout_obj;
query_timeout_obj.set_int(QUERY_TIMEOUT_US);
trx_timeout_obj.set_int(TRX_TIMEOUT_US);
OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
if (OB_SUCC(ret)) {
if (job_info.is_date_expression_job_class()) {
// set larger timeout for mview scheduler jobs
const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
ObObj query_timeout_obj;
ObObj trx_timeout_obj;
query_timeout_obj.set_int(QUERY_TIMEOUT_US);
trx_timeout_obj.set_int(TRX_TIMEOUT_US);
OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
} else if (job_info.is_olap_async_job_class()) {
const int64_t QUERY_TIMEOUT_US = ((job_info.get_max_run_duration() - OLAP_ASYNC_JOB_DEVIATION_SECOND) * 1000000L);
const int64_t TRX_TIMEOUT_US = ((job_info.get_max_run_duration() - OLAP_ASYNC_JOB_DEVIATION_SECOND) * 1000000L);
ObObj query_timeout_obj;
ObObj trx_timeout_obj;
query_timeout_obj.set_int(QUERY_TIMEOUT_US);
trx_timeout_obj.set_int(TRX_TIMEOUT_US);
OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
}
}
return ret;
}
@ -136,6 +153,9 @@ int ObDBMSSchedJobExecutor::init_env(ObDBMSSchedJobInfo &job_info, ObSQLSessionI
job_info.get_tenant_id(), job_info.get_powner(), user_infos));
OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
} else if (job_info.get_user_id() != OB_INVALID_ID) {
OZ (schema_guard.get_user_info(
job_info.get_tenant_id(), job_info.get_user_id(), user_info));
} else {
ObString user = job_info.get_powner();
if (OB_SUCC(ret)) {
@ -247,6 +267,9 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
if (job_info.is_oracle_tenant_) {
OZ (what.append_fmt("BEGIN %.*s; END;",
job_info.get_what().length(), job_info.get_what().ptr()));
} else if (job_info.is_olap_async_job_class()){
OZ (what.append_fmt("%.*s",
job_info.get_what().length(), job_info.get_what().ptr()));
} else {
//mysql mode not support anonymous block
OZ (what.append_fmt("CALL %.*s;",
@ -346,6 +369,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
}
}
if (OB_SUCC(ret)) {
ObWorkerSessionGuard worker_session_guard(session_info);
OZ (ObDBMSSchedJobExecutor::init_env(job_info, *session_info));
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(sql_proxy_->get_pool())));
OX (session_info->set_job_info(&job_info));
@ -383,19 +407,37 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(uint64_t tenant_id, bool is_oracl
OZ (table_operator_.get_dbms_sched_job_info(tenant_id, is_oracle_tenant, job_id, job_name, allocator, job_info));
if (OB_SUCC(ret)) {
OZ (run_dbms_sched_job(tenant_id, job_info));
int tmp_ret = OB_SUCCESS;
ObString errmsg = common::ob_get_tsi_err_msg(ret);
if (errmsg.empty() && ret != OB_SUCCESS) {
errmsg = ObString(strlen(ob_errpkt_strerror(ret, lib::is_oracle_mode())),
ob_errpkt_strerror(ret, lib::is_oracle_mode()));
if (job_info.is_killed()) { //Intercept user cancellation requests before the actual execution of the job
OZ(table_operator_.update_for_kill(job_info));
} else {
OZ (run_dbms_sched_job(tenant_id, job_info));
bool job_is_user_stop = false;
if (OB_ERR_SESSION_INTERRUPTED == ret) { //It may have been the user interrupted, need to check.
int tmp_user_stop_ret = OB_SUCCESS;
bool job_is_killed = false;
if ((tmp_user_stop_ret = table_operator_.get_dbms_sched_job_is_killed(job_info, job_is_killed)) != OB_SUCCESS) {
LOG_WARN("double check get dbms sched job failed", K(tmp_user_stop_ret), K(ret));
} else if (job_is_killed) {
job_is_user_stop = true;
}
}
int tmp_ret = OB_SUCCESS;
if (job_is_user_stop) {
if ((OB_TMP_FAIL(table_operator_.update_for_kill(job_info)))) {
LOG_WARN("update user stop dbms sched job failed", K(tmp_ret), K(ret));
}
} else {
ObString errmsg = common::ob_get_tsi_err_msg(ret);
if (errmsg.empty() && ret != OB_SUCCESS) {
errmsg = ObString(strlen(ob_errpkt_strerror(ret, lib::is_oracle_mode())),
ob_errpkt_strerror(ret, lib::is_oracle_mode()));
}
if ((OB_TMP_FAIL(table_operator_.update_for_end(job_info, ret, errmsg)))) {
LOG_WARN("update dbms sched job failed", K(tmp_ret), K(ret));
}
}
ret = OB_SUCCESS == ret ? tmp_ret : ret;
}
if ((tmp_ret = table_operator_.update_for_end(job_info, ret, errmsg)) != OB_SUCCESS) {
LOG_WARN("update dbms sched job failed", K(tmp_ret), K(ret));
}
ret = OB_SUCCESS == ret ? tmp_ret : ret;
}
return ret;
}

View File

@ -36,6 +36,7 @@ public:
int init_env(ObDBMSSchedJobInfo &job_info, sql::ObSQLSessionInfo &session);
private:
const static int OLAP_ASYNC_JOB_DEVIATION_SECOND = 60;
static int init_session(
sql::ObSQLSessionInfo &session,
share::schema::ObSchemaGetterGuard &schema_guard,

View File

@ -294,7 +294,8 @@ int64_t ObDBMSSchedJobMaster::run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJ
job_key->get_job_id(),
job_key->get_job_name(),
execute_addr,
self_addr_))) {
self_addr_,
job_info.is_olap_async_job_class() ? share::OBCG_OLAP_ASYNC_JOB : 0))) {
LOG_WARN("failed to run dbms sched job", K(ret), K(job_info), KPC(job_key));
if (is_server_down_error(ret)) {
int tmp = OB_SUCCESS;
@ -340,6 +341,9 @@ int ObDBMSSchedJobMaster::scheduler()
ob_usleep(MIN_SCHEDULER_INTERVAL);
} else {
ob_usleep(max(0, delay));
common::ObCurTraceId::TraceId job_trace_id;
job_trace_id.init(GCONF.self_addr_);
ObTraceIdGuard trace_id_guard(job_trace_id);
if (OB_SUCCESS != (tmp_ret = scheduler_task_.wait_vector().remove(scheduler_task_.wait_vector().begin()))) {
LOG_WARN("fail to remove job_id from sorted vector", K(ret));
} else if (OB_SUCCESS != (tmp_ret = scheduler_job(job_key))) {
@ -389,6 +393,15 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
LOG_WARN("job is timeout, force update for end", K(job_info), K(now));
}
}
} else if (job_info.is_killed()) {
free_job_key(job_key);
job_key = NULL;
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_kill(job_info))) {
LOG_WARN("update for stop failed", K(tmp), K(job_info));
} else {
LOG_WARN("update for stop job", K(job_info));
}
} else if (job_info.is_disabled()) {
free_job_key(job_key);
job_key = NULL;
@ -658,6 +671,10 @@ int ObDBMSSchedJobMaster::check_all_tenants()
OZ (table_operator_.purge_run_detail_histroy(tenant_ids.at(i)));
}
*/ // not open
if (OB_FAIL(table_operator_.purge_olap_async_job_run_detail(tenant_ids.at(i)))) {
LOG_WARN("purge olap async job run detail failed", K(ret), K(tenant_ids.at(i)));
ret = OB_SUCCESS; // not affect subsequent operations
}
OZ (check_new_jobs(tenant_ids.at(i), tenant_schema->is_oracle_tenant()));
}
ret = OB_SUCCESS; // one tenant failed should not affect other
@ -670,7 +687,7 @@ int ObDBMSSchedJobMaster::check_all_tenants()
int ObDBMSSchedJobMaster::check_new_jobs(uint64_t tenant_id, bool is_oracle_tenant)
{
int ret = OB_SUCCESS;
ObSEArray<ObDBMSSchedJobInfo, 16> job_infos;
ObSEArray<ObDBMSSchedJobInfo, 12> job_infos;
ObArenaAllocator allocator("DBMSSchedTmp");
OZ (table_operator_.get_dbms_sched_job_infos_in_tenant(tenant_id, is_oracle_tenant, allocator, job_infos));
OZ (register_new_jobs(tenant_id, is_oracle_tenant, job_infos));

View File

@ -33,13 +33,13 @@ OB_SERIALIZE_MEMBER(ObDBMSSchedJobResult, tenant_id_, job_id_, server_addr_, sta
OB_SERIALIZE_MEMBER(ObDBMSSchedStopJobArg, tenant_id_, job_name_, session_id_, rpc_send_time_);
int ObDBMSSchedJobRpcProxy::run_dbms_sched_job(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, ObString &job_name, ObAddr server_addr, ObAddr master_addr)
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, ObString &job_name, ObAddr server_addr, ObAddr master_addr, int64_t group_id)
{
int ret = OB_SUCCESS;
ObDBMSSchedJobArg arg(tenant_id, job_id, server_addr, master_addr, is_oracle_tenant, job_name);
ObRpcAPDBMSSchedJobCB cb;
CK (arg.is_valid());
OZ (this->to(arg.server_addr_).by(arg.tenant_id_).run_dbms_sched_job(arg, &cb), arg);
OZ (this->to(arg.server_addr_).by(arg.tenant_id_).group_id(group_id).run_dbms_sched_job(arg, &cb), arg);
return ret;
}

View File

@ -173,7 +173,7 @@ public:
public:
int run_dbms_sched_job(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, common::ObString &job_name,
common::ObAddr server_addr, common::ObAddr master_addr);
common::ObAddr server_addr, common::ObAddr master_addr, int64_t group_id);
int stop_dbms_sched_job(
uint64_t tenant_id, common::ObString &job_name,
common::ObAddr server_addr, uint64_t session_id);

View File

@ -33,6 +33,9 @@
#include "sql/session/ob_sql_session_mgr.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/stat/ob_dbms_stats_maintenance_window.h"
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
#include "storage/ob_common_id_utils.h"
#include "observer/dbms_scheduler/ob_dbms_sched_job_rpc_proxy.h"
namespace oceanbase
{
@ -50,6 +53,8 @@ int ObDBMSSchedJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSSchedJobI
{
int ret = OB_SUCCESS;
tenant_id_ = other.tenant_id_;
user_id_ = other.user_id_;
database_id_ = other.database_id_;
job_ = other.job_;
last_modify_ = other.last_modify_;
last_date_ = other.last_date_;
@ -82,6 +87,7 @@ int ObDBMSSchedJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSSchedJobI
OZ (ob_write_string(allocator, other.job_class_, job_class_));
OZ (ob_write_string(allocator, other.program_name_, program_name_));
OZ (ob_write_string(allocator, other.state_, state_));
OZ (ob_write_string(allocator, other.job_action_, job_action_));
return ret;
}
@ -99,6 +105,18 @@ int ObDBMSSchedJobClassInfo::deep_copy(common::ObIAllocator &allocator, const Ob
return ret;
}
int ObDBMSSchedJobUtils::generate_job_id(int64_t tenant_id, int64_t &max_job_id)
{
int ret = OB_SUCCESS;
ObCommonID raw_id;
if (OB_FAIL(storage::ObCommonIDUtils::gen_unique_id_by_rpc(tenant_id, raw_id))) {
LOG_WARN("gen unique id failed", K(ret), K(tenant_id));
} else {
max_job_id = raw_id.id() + ObDBMSSchedTableOperator::JOB_ID_OFFSET;
}
return ret;
}
int ObDBMSSchedJobUtils::disable_dbms_sched_job(
ObISQLClient &sql_client,
const uint64_t tenant_id,
@ -131,6 +149,94 @@ int ObDBMSSchedJobUtils::disable_dbms_sched_job(
return ret;
}
int ObDBMSSchedJobUtils::stop_dbms_sched_job(
common::ObISQLClient &sql_client,
const ObDBMSSchedJobInfo &job_info,
const bool is_delete_after_stop)
{
int ret = OB_SUCCESS;
obrpc::ObDBMSSchedJobRpcProxy *rpc_proxy = GCTX.dbms_sched_job_rpc_proxy_;
uint64_t tenant_id = job_info.tenant_id_;
ObSqlString sql;
CK (OB_NOT_NULL(rpc_proxy));
uint64_t data_version = 0;
if (OB_SUCC(ret)) {
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else if (data_version < MOCK_DATA_VERSION_4_2_1_5
|| (data_version >= DATA_VERSION_4_2_2_0 && data_version < MOCK_DATA_VERSION_4_2_4_0)
|| (data_version >= DATA_VERSION_4_3_0_0 && data_version < DATA_VERSION_4_3_2_0)) {
ret = OB_NOT_SUPPORTED;
}
}
if (OB_SUCC(ret)) {
dbms_scheduler::ObDBMSSchedJobInfo update_job_info;
update_job_info.tenant_id_ = job_info.tenant_id_;
update_job_info.is_oracle_tenant_ = job_info.is_oracle_tenant_;
update_job_info.job_name_ = job_info.job_name_;
if (is_delete_after_stop) {
update_job_info.state_ = "KILLED";
}
if(OB_FAIL(update_dbms_sched_job_info(sql_client, update_job_info))) {
LOG_WARN("update job info failed", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(sql.append_fmt("select svr_ip, svr_port, session_id from %s where tenant_id = %lu and job_name = \'%.*s\'",
OB_ALL_VIRTUAL_TENANT_SCHEDULER_RUNNING_JOB_TNAME, tenant_id, job_info.job_name_.length(),job_info.job_name_.ptr()))) {
LOG_WARN("append sql failed", KR(ret));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_client.read(result, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.job_name_));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is null", K(ret), K(sql), K(tenant_id), K(job_info.job_name_));
} else {
bool result_empty = true;
do {
if (OB_FAIL(result.get_result()->next())) {
if (ret == OB_ITER_END) {
//do nothing
} else {
LOG_WARN("fail to get result", K(ret));
}
} else {
result_empty = false;
uint64_t session_id = OB_INVALID_ID;
ObAddr svr;
ObString svr_ip;
int64_t svr_port = OB_INVALID_INDEX;
EXTRACT_VARCHAR_FIELD_MYSQL(*(result.get_result()), "svr_ip", svr_ip);
EXTRACT_UINT_FIELD_MYSQL(*(result.get_result()), "session_id", session_id, uint64_t);
EXTRACT_INT_FIELD_MYSQL(*(result.get_result()), "svr_port", svr_port, int64_t);
if (OB_SUCC(ret)) {
if (!svr.set_ip_addr(svr_ip, svr_port)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("set addr failed", K(svr_ip), K(svr_port));
} else {
LOG_INFO("send rpc", K(tenant_id), K(job_info.job_name_), K(svr), K(session_id));
ObString stop_job_name = ObString(job_info.job_name_);
OZ (rpc_proxy->stop_dbms_sched_job(tenant_id,
stop_job_name,
svr,
session_id));
}
}
}
} while (OB_SUCC(ret));
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
}
}
LOG_INFO("stop job", K(ret), K(job_info.job_name_), K(tenant_id));
return ret;
}
int ObDBMSSchedJobUtils::remove_dbms_sched_job(
ObISQLClient &sql_client,
const uint64_t tenant_id,
@ -153,6 +259,9 @@ int ObDBMSSchedJobUtils::remove_dbms_sched_job(
int64_t affected_rows = 0;
if (OB_FAIL(exec.exec_delete(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
LOG_WARN("execute delete failed", KR(ret));
} else if (is_zero_row(affected_rows) && !if_exists) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("execute delete failed", KR(ret), K(if_exists));
} else if (!if_exists && !is_double_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
@ -198,35 +307,45 @@ int ObDBMSSchedJobUtils::add_dbms_sched_job(
ObDMLExecHelper exec(sql_client, exec_tenant_id);
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
OZ (dml.add_gmt_create(now));
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(job_info.tenant_id_, job_info.tenant_id_)));
OZ (dml.add_pk_column("job", job_id));
OZ (dml.add_column("lowner", ObHexEscapeSqlStr(job_info.lowner_)));
OZ (dml.add_column("powner", ObHexEscapeSqlStr(job_info.powner_)));
OZ (dml.add_column("cowner", ObHexEscapeSqlStr(job_info.cowner_)));
OZ (dml.add_raw_time_column("next_date", job_info.start_date_));
OZ (dml.add_column("total", 0));
OZ (dml.add_column("`interval#`", ObHexEscapeSqlStr(
job_info.repeat_interval_.empty() ? ObString("null") : job_info.repeat_interval_)));
OZ (dml.add_column("flag", job_info.flag_));
OZ (dml.add_column("job_name", ObHexEscapeSqlStr(job_info.job_name_)));
OZ (dml.add_column("job_style", ObHexEscapeSqlStr(job_info.job_style_)));
OZ (dml.add_column("job_type", ObHexEscapeSqlStr(job_info.job_type_)));
OZ (dml.add_column("job_class", ObHexEscapeSqlStr(job_info.job_class_)));
OZ (dml.add_column("job_action", ObHexEscapeSqlStr(job_info.job_action_)));
OZ (dml.add_column("what", ObHexEscapeSqlStr(job_info.job_action_)));
OZ (dml.add_raw_time_column("start_date", job_info.start_date_));
OZ (dml.add_raw_time_column("end_date", job_info.end_date_));
OZ (dml.add_column("repeat_interval", ObHexEscapeSqlStr(job_info.repeat_interval_)));
OZ (dml.add_column("enabled", job_info.enabled_));
OZ (dml.add_column("auto_drop", job_info.auto_drop_));
OZ (dml.add_column("max_run_duration", job_info.max_run_duration_));
OZ (dml.add_column("interval_ts", job_info.interval_ts_));
OZ (dml.add_column("scheduler_flags", job_info.scheduler_flags_));
OZ (dml.add_column("exec_env", job_info.exec_env_));
uint64_t data_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else {
OZ (dml.add_gmt_create(now));
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(job_info.tenant_id_, job_info.tenant_id_)));
if ((MOCK_DATA_VERSION_4_2_4_0 <= data_version && DATA_VERSION_4_3_0_0 > data_version) ||
data_version > DATA_VERSION_4_3_2_0) {
OZ (dml.add_column("user_id", job_info.user_id_));
OZ (dml.add_column("database_id", job_info.database_id_));
}
OZ (dml.add_pk_column("job", job_id));
OZ (dml.add_column("lowner", ObHexEscapeSqlStr(job_info.lowner_)));
OZ (dml.add_column("powner", ObHexEscapeSqlStr(job_info.powner_)));
OZ (dml.add_column("cowner", ObHexEscapeSqlStr(job_info.cowner_)));
OZ (dml.add_raw_time_column("next_date", job_info.start_date_));
OZ (dml.add_column("total", 0));
OZ (dml.add_column("`interval#`", ObHexEscapeSqlStr(
job_info.repeat_interval_.empty() ? ObString("null") : job_info.repeat_interval_)));
OZ (dml.add_column("flag", job_info.flag_));
OZ (dml.add_column("job_name", ObHexEscapeSqlStr(job_info.job_name_)));
OZ (dml.add_column("job_style", ObHexEscapeSqlStr(job_info.job_style_)));
OZ (dml.add_column("job_type", ObHexEscapeSqlStr(job_info.job_type_)));
OZ (dml.add_column("job_class", ObHexEscapeSqlStr(job_info.job_class_)));
OZ (dml.add_column("job_action", ObHexEscapeSqlStr(job_info.job_action_)));
OZ (dml.add_column("what", ObHexEscapeSqlStr(job_info.job_action_)));
OZ (dml.add_raw_time_column("start_date", job_info.start_date_));
OZ (dml.add_raw_time_column("end_date", job_info.end_date_));
OZ (dml.add_column("repeat_interval", ObHexEscapeSqlStr(job_info.repeat_interval_)));
OZ (dml.add_column("enabled", job_info.enabled_));
OZ (dml.add_column("auto_drop", job_info.auto_drop_));
OZ (dml.add_column("max_run_duration", job_info.max_run_duration_));
OZ (dml.add_column("interval_ts", job_info.interval_ts_));
OZ (dml.add_column("scheduler_flags", job_info.scheduler_flags_));
OZ (dml.add_column("exec_env", job_info.exec_env_));
OZ (dml.add_column("comments", ObHexEscapeSqlStr(job_info.comments_)));
}
if (OB_SUCC(ret) && OB_FAIL(exec.exec_insert(
OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
@ -239,6 +358,41 @@ int ObDBMSSchedJobUtils::add_dbms_sched_job(
return ret;
}
int ObDBMSSchedJobUtils::update_dbms_sched_job_info(
common::ObISQLClient &sql_client,
const ObDBMSSchedJobInfo &update_job_info)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = update_job_info.tenant_id_;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || update_job_info.job_name_.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(update_job_info));
} else {
const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
ObDMLSqlSplicer dml;
if (OB_FAIL(dml.add_pk_column(
"tenant_id", ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))
|| OB_FAIL(dml.add_pk_column("job_name", update_job_info.job_name_))) {
LOG_WARN("add column failed", KR(ret));
} else if (!update_job_info.state_.empty() && OB_FAIL(dml.add_column("state", update_job_info.state_))) {
LOG_WARN("add column failed", KR(ret), K(update_job_info.state_));
} else {
ObDMLExecHelper exec(sql_client, exec_tenant_id);
int64_t affected_rows = 0;
if (OB_FAIL(exec.exec_update(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
LOG_WARN("execute update failed", KR(ret));
} else if (is_zero_row(affected_rows)) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("not change", KR(ret), K(affected_rows));
} else if (!is_double_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
}
}
}
return ret;
}
int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray<const ObUserInfo *> &user_infos)
{
int ret = OB_SUCCESS;
@ -264,5 +418,86 @@ int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray<const ObUserInfo
return ret;
}
int ObDBMSSchedJobUtils::get_dbms_sched_job_info(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const bool is_oracle_tenant,
const ObString &job_name,
common::ObIAllocator &allocator,
ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
ObSqlString sql;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || job_name.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(job_name));
} else {
const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
if (OB_FAIL(sql.append_fmt("select * from %s where tenant_id = %ld and job_name = \'%.*s\' and job > 0",
OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
job_name.length(), job_name.ptr()))) {
LOG_WARN("failed to assign sql", K(ret));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql));
} else {
if (res.get_result() != NULL && OB_SUCCESS == (ret = res.get_result()->next())) {
ObDBMSSchedTableOperator table_operator;
OZ (table_operator.extract_info(*(res.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
}
if (OB_FAIL(ret)) {
if (OB_ITER_END == ret) {
ret = OB_ENTRY_NOT_EXIST;
} else {
LOG_WARN("next failed", K(ret));
}
}
}
}
}
}
return ret;
}
int ObDBMSSchedJobUtils::check_dbms_sched_job_priv(const ObUserInfo *user_info,
const ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("user info is NULL", KR(ret));
} else if (is_root_user(user_info->get_user_id())) {
// do nothing
} else if (job_info.is_oracle_tenant_) {
//TODO 连雨
} else {
if (job_info.user_id_ != OB_INVALID_ID) { //如果 job 有 user_id 优先使用
if (job_info.user_id_ != user_info->get_user_id()) {
ret = OB_ERR_NO_PRIVILEGE;
LOG_WARN("job user id check failed", KR(ret), K(user_info), K(job_info.user_id_));
}
} else if (0 != job_info.powner_.case_compare(user_info->get_user_name())) { // job 保存的 owner 可能是 root@% or root (旧)
const char *c = job_info.powner_.reverse_find('@');
if (OB_ISNULL(c)) {
ret = OB_ERR_NO_PRIVILEGE;
LOG_WARN("job user id check failed", KR(ret), K(user_info), K(job_info.user_id_));
} else {
ObString user = job_info.powner_;
ObString user_name;
ObString host_name;
user_name = user.split_on(c);
host_name = user;
if (0 != user_name.case_compare(user_info->get_user_name()) || 0 != host_name.case_compare(user_info->get_host_name())) {
ret = OB_ERR_NO_PRIVILEGE;
LOG_WARN("job user id check failed", KR(ret), K(user_info), K(job_info.user_id_));
}
}
}
}
return ret;
}
} // end for namespace dbms_scheduler
} // end for namespace oceanbase

View File

@ -22,6 +22,7 @@
#define DATA_VERSION_SUPPORT_JOB_CLASS(data_version) (data_version >= DATA_VERSION_4_3_2_0)
#define DATA_VERSION_SUPPORT_RUN_DETAIL_V2(data_version) ((MOCK_DATA_VERSION_4_2_4_0 <= data_version && DATA_VERSION_4_3_0_0 > data_version) || DATA_VERSION_4_3_2_0 <= data_version)
#define DATA_VERSION_SUPPORT_RUN_DETAIL_V2_DATABASE_NAME_AND_RUNNING_JOB_JOB_CLASS(data_version) (DATA_VERSION_4_3_2_1 <= data_version)
namespace oceanbase
{
@ -62,6 +63,8 @@ class ObDBMSSchedJobInfo
public:
ObDBMSSchedJobInfo() :
tenant_id_(common::OB_INVALID_ID),
user_id_(common::OB_INVALID_ID),
database_id_(common::OB_INVALID_ID),
job_(common::OB_INVALID_ID),
lowner_(),
powner_(),
@ -104,6 +107,8 @@ public:
is_oracle_tenant_(true) {}
TO_STRING_KV(K(tenant_id_),
K(user_id_),
K(database_id_),
K(job_),
K(job_name_),
K(lowner_),
@ -138,6 +143,8 @@ public:
}
uint64_t get_tenant_id() { return tenant_id_; }
uint64_t get_user_id() { return user_id_; }
uint64_t get_database_id() { return database_id_; }
uint64_t get_job_id() { return job_; }
uint64_t get_job_id_with_tenant() { return common::combine_two_ids(tenant_id_, job_); }
int64_t get_this_date() { return this_date_; }
@ -153,6 +160,7 @@ public:
bool is_broken() { return 0x1 == (flag_ & 0x1); }
bool is_running(){ return this_date_ != 0; }
bool is_disabled() { return 0x0 == (enabled_ & 0x1); }
bool is_killed() { return 0 == state_.case_compare("KILLED"); }
common::ObString &get_what() { return what_; }
common::ObString &get_exec_env() { return exec_env_; }
@ -164,14 +172,19 @@ public:
common::ObString &get_program_name() { return program_name_; }
common::ObString &get_job_name() { return job_name_; }
common::ObString &get_job_class() { return job_class_; }
common::ObString &get_job_action() { return job_action_; }
bool is_oracle_tenant() { return is_oracle_tenant_; }
bool is_date_expression_job_class() const { return !!(scheduler_flags_ & JOB_SCHEDULER_FLAG_DATE_EXPRESSION_JOB_CLASS); }
bool is_mysql_event_job_class() const { return (0 == job_class_.case_compare("MYSQL_EVENT_JOB_CLASS")); }
bool is_olap_async_job_class() const { return (0 == job_class_.case_compare("OLAP_ASYNC_JOB_CLASS")); }
int deep_copy(common::ObIAllocator &allocator, const ObDBMSSchedJobInfo &other);
public:
uint64_t tenant_id_;
uint64_t user_id_;
uint64_t database_id_;
uint64_t job_;
common::ObString lowner_;
common::ObString powner_;
@ -264,18 +277,94 @@ public:
class ObDBMSSchedJobUtils
{
public:
//TO DO DELETE 连雨
static int generate_job_id(int64_t tenant_id, int64_t &max_job_id);
static int disable_dbms_sched_job(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const common::ObString &job_name,
const bool if_exists = false);
static int remove_dbms_sched_job(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const common::ObString &job_name,
const bool if_exists = false);
/**
* @brief job
* @param [in] sql_client - JOB insert, trans
* @param [in] tenant_id - id
* @param [in] job_id - id
* @retval OB_SUCCESS execute success
* @retval OB_ERR_UNEXPECTED
* @retval OB_INVALID_ARGUMENT JOB
* @retval OB_ERR_NO_PRIVILEGE JOB
*/
static int create_dbms_sched_job(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const int64_t job_id,
const ObDBMSSchedJobInfo &job_info);
/**
* @brief job
* @param [in] sql_client
* @param [in] tenant_id - id
* @param [in] job_name - job名
* @param [in] if_exists - true ( JOB )
* @retval OB_SUCCESS execute success
* @retval OB_ERR_UNEXPECTED
* @retval OB_INVALID_ARGUMENT JOB
*/
static int remove_dbms_sched_job(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const common::ObString &job_name,
const bool if_exists = false);
/**
* @brief / job
* @param [in] sql_client
* @param [in] is_delete_after_stop -
* @param [in] job_info - job
* @retval OB_SUCCESS execute success
* @retval OB_NOT_SUPPORTED
* @retval OB_ERR_UNEXPECTED
* @retval OB_ENTRY_NOT_EXIST JOB
* @retval OB_ERR_NO_PRIVILEGE JOB
*/
static int stop_dbms_sched_job(common::ObISQLClient &sql_client,
const ObDBMSSchedJobInfo &job_info,
const bool is_delete_after_stop);
/**
* @brief JOB
* @param [in] update_job_info - job ( tenant_id, job_name)
* @retval OB_SUCCESS execute success
* @retval OB_ERR_UNEXPECTED
* @retval OB_INVALID_ARGUMENT
* @retval OB_ENTRY_NOT_EXIST JOB /
*/
static int update_dbms_sched_job_info(common::ObISQLClient &sql_client,
const ObDBMSSchedJobInfo &update_job_info);
/**
* @brief JOB
* @param [in] tenant_id - id
* @param [in] is_oracle_tenant - oracle , ,
* @param [in] job_name - job名
* @param [in] allocator - allocator, job_info
* @param [in] job_info - job
* @retval OB_SUCCESS execute success
* @retval OB_ERR_UNEXPECTED
* @retval OB_ENTRY_NOT_EXIST JOB
*/
static int get_dbms_sched_job_info(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const bool is_oracle_tenant,
const ObString &job_name,
common::ObIAllocator &allocator,
ObDBMSSchedJobInfo &job_info);
/**
* @brief JOB , root job, job)
* @param [in] user_info -
* @param [in] is_oracle_tenant - oracle
* @param [in] job_info - job
* @retval OB_SUCCESS execute success
* @retval OB_ERR_UNEXPECTED
* @retval OB_INVALID_ARGUMENT / user_info NULL
* @retval OB_ERR_NO_PRIVILEGE
*/
static int check_dbms_sched_job_priv(const ObUserInfo *user_info,
const ObDBMSSchedJobInfo &job_info);
//TO DO DELETE 连雨
static int add_dbms_sched_job(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const int64_t job_id,

View File

@ -174,6 +174,13 @@ int ObDBMSSchedTableOperator::_build_job_log_dml(
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else if (DATA_VERSION_SUPPORT_RUN_DETAIL_V2(data_version)) {
if (DATA_VERSION_SUPPORT_RUN_DETAIL_V2_DATABASE_NAME_AND_RUNNING_JOB_JOB_CLASS(data_version)) {
OZ (dml.add_column("database_name", ObHexEscapeSqlStr(job_info.cowner_)));
OZ (dml.add_column("owner", job_info.powner_));
OZ (dml.add_column("operation", ObHexEscapeSqlStr(job_info.job_action_)));
OZ (dml.add_column("status", job_info.state_));
OZ (dml.add_time_column("req_start_date", job_info.start_date_));
}
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.splice_insert_sql(OB_ALL_SCHEDULER_JOB_RUN_DETAIL_V2_TNAME, sql));
} else {
@ -317,6 +324,7 @@ int ObDBMSSchedTableOperator::update_for_end(ObDBMSSchedJobInfo &job_info, int e
} else if (job_info.is_date_expression_job_class() && now >= job_info.end_date_ && true == job_info.auto_drop_) {
OZ (_build_job_drop_dml(now, job_info, sql1));
} else if ((now >= job_info.end_date_ || job_info.get_interval_ts() == 0) && (true == job_info.auto_drop_)) {
job_info.state_ = ObString("COMPLETED");
OZ (_build_job_drop_dml(now, job_info, sql1));
} else {
OX (job_info.failures_ = (err == 0) ? 0 : (job_info.failures_ + 1));
@ -357,6 +365,40 @@ int ObDBMSSchedTableOperator::update_for_end(ObDBMSSchedJobInfo &job_info, int e
return ret;
}
int ObDBMSSchedTableOperator::update_for_kill(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObSqlString sql1;
ObSqlString sql2;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
bool need_record = true;
int64_t tenant_id = job_info.tenant_id_;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (_check_need_record(job_info, need_record));
OZ (_build_job_drop_dml(now, job_info, sql1));
if (OB_SUCC(ret) && need_record) {
job_info.state_ = ObString("KILLED");
OZ (_build_job_log_dml(now, job_info, OB_ERR_SESSION_INTERRUPTED, "user stop job", sql2));
}
OZ (trans.start(sql_proxy_, tenant_id, true));
OZ (trans.write(tenant_id, sql1.ptr(), affected_rows));
if (OB_SUCC(ret) && need_record) {
OZ (trans.write(tenant_id, sql2.ptr(), affected_rows));
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
return ret;
}
int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running)
{
int ret = OB_SUCCESS;
@ -418,6 +460,16 @@ int ObDBMSSchedTableOperator::extract_info(
job_info_local.tenant_id_ = tenant_id;
job_info_local.is_oracle_tenant_ = is_oracle_tenant;
EXTRACT_INT_FIELD_MYSQL(result, "job", job_info_local.job_, uint64_t);
EXTRACT_INT_FIELD_MYSQL(result, "user_id", job_info_local.user_id_, uint64_t);
if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) {
ret = OB_SUCCESS;
job_info_local.user_id_ = OB_INVALID_ID;
}
EXTRACT_INT_FIELD_MYSQL(result, "database_id", job_info_local.database_id_, uint64_t);
if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) {
ret = OB_SUCCESS;
job_info_local.database_id_ = OB_INVALID_ID;
}
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "lowner", job_info_local.lowner_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "powner", job_info_local.powner_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "cowner", job_info_local.cowner_);
@ -479,7 +531,7 @@ do { \
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class", job_info_local.job_class_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "program_name", job_info_local.program_name_);
//job_type not used
//job_action not used
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_action", job_info_local.job_action_);
//number_of_argument not used
//repeat_interval not used
EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "enabled", job_info_local.enabled_);
@ -499,6 +551,19 @@ do { \
return ret;
}
int ObDBMSSchedTableOperator::get_dbms_sched_job_is_killed(const ObDBMSSchedJobInfo &job_info, bool &is_killed)
{
int ret = OB_SUCCESS;
is_killed = false;
ObArenaAllocator allocator("SchedStateTmp");
ObDBMSSchedJobInfo update_job_info;
OZ(get_dbms_sched_job_info(job_info.tenant_id_, job_info.is_oracle_tenant_, job_info.job_, job_info.job_name_, allocator, update_job_info));
if (OB_SUCC(ret) && update_job_info.is_killed()) {
is_killed = true;
}
return ret;
}
int ObDBMSSchedTableOperator::get_dbms_sched_job_info(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info)
@ -724,5 +789,31 @@ int ObDBMSSchedTableOperator::purge_run_detail_histroy(uint64_t tenant_id)
return ret;
}
int ObDBMSSchedTableOperator::purge_olap_async_job_run_detail(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affected_rows = 0;
uint64_t data_version = 0;
int64_t log_history = 30;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else if (DATA_VERSION_SUPPORT_RUN_DETAIL_V2(data_version)) {
OZ (sql.assign_fmt("delete from %s where job_class=\'OLAP_ASYNC_JOB_CLASS\' and time<DATE_SUB(NOW(), INTERVAL %ld DAY)",
OB_ALL_SCHEDULER_JOB_RUN_DETAIL_V2_TNAME, log_history));
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
if (affected_rows > 0) {
LOG_INFO("purge olap async job run detail finish", K(ret), K(tenant_id), K(sql), K(affected_rows));
}
}
return ret;
}
} // end for namespace dbms_scheduler
} // end for namespace oceanbase

View File

@ -67,7 +67,8 @@ public:
int update_for_rollback(ObDBMSSchedJobInfo &job_info);
int update_for_timeout(ObDBMSSchedJobInfo &job_info);
int update_for_end(ObDBMSSchedJobInfo &job_info, int err, const common::ObString &errmsg);
int update_for_kill(ObDBMSSchedJobInfo &job_info);
int get_dbms_sched_job_is_killed(const ObDBMSSchedJobInfo &job_info, bool &is_killed);
int get_dbms_sched_job_info(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
common::ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info);
@ -90,6 +91,7 @@ public:
int purge_run_detail_histroy(uint64_t tenant_id);
int purge_olap_async_job_run_detail(uint64_t tenant_id);
private:
DISALLOW_COPY_AND_ASSIGN(ObDBMSSchedTableOperator);

View File

@ -445,7 +445,7 @@ void ObResourceGroup::check_worker_count()
{
int ret = OB_SUCCESS;
if (OB_SUCC(workers_lock_.trylock())) {
if (is_user_group(group_id_)
if ((is_user_group(group_id_) || is_job_group(group_id_))
&& nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) {
for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
if (OB_SUCC(acquire_level_worker(level))) {
@ -1390,7 +1390,7 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
if (req_level < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id));
} else if (is_user_group(group_id) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
} else if ((is_user_group(group_id) || is_job_group(group_id)) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
group->recv_level_rpc_cnt_.atomic_inc(req_level);
if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) {
LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id));

View File

@ -282,6 +282,7 @@ public:
WList &get_workers() { return workers_; }
lib::ObMutex &get_workers_lock() { return workers_lock_; }
share::ObCgroupCtrl *get_cgroup_ctrl() { return cgroup_ctrl_; }
bool is_job_group(int64_t group_id) { return share::OBCG_OLAP_ASYNC_JOB == group_id; }
int init();
void update_queue_size();
@ -542,6 +543,7 @@ private:
int construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx);
int recv_group_request(rpc::ObRequest &req, int64_t group_id);
bool is_job_group(int64_t group_id) { return share::OBCG_OLAP_ASYNC_JOB == group_id; }
protected:
mutable common::TCRWLock meta_lock_;

View File

@ -151,6 +151,15 @@ int ObAllVirtualTenantSchedulerRunningJob::FillScanner::operator()(
cur_row_->cells_[cell_idx].set_null();
break;
}
case JOB_CLASS: {
if (OB_NOT_NULL(sess_info->get_job_info())) {
cur_row_->cells_[cell_idx].set_varchar(sess_info->get_job_info()->get_job_class());
cur_row_->cells_[cell_idx].set_collation_type(default_collation);
} else {
cur_row_->cells_[cell_idx].set_null();
}
break;
}
case JOB_STYLE: {
cur_row_->cells_[cell_idx].set_null();
break;

View File

@ -56,7 +56,8 @@ private:
DESTINATION_OWNER,
DESTINATION,
CREDENTIAL_OWNER,
CREDENTIAL_NAME
CREDENTIAL_NAME,
JOB_CLASS
};
class FillScanner
{

View File

@ -177,6 +177,9 @@ static const NonReservedKeyword Mysql_pl_none_reserved_keywords[] =
{"pragma", PRAGMA},
{"interface", INTERFACE},
{"c", C},
{"submit", SUBMIT},
{"job", JOB},
{"cancel", CANCEL},
};
const NonReservedKeyword *mysql_pl_non_reserved_keyword_lookup(const char *word)

View File

@ -97,6 +97,7 @@ TABLE { return TABLE; }
UPDATE { return UPDATE; }
JSON { return JSON; }
REPLACE { return REPLACE; }
LOAD { return LOAD; }
TRIGGER {
ObParseCtx *parse_ctx = (ObParseCtx *)yyextra;
parse_ctx->is_for_trigger_ = 1;

View File

@ -210,7 +210,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode
TINYINT SMALLINT MEDIUMINT INTEGER BIGINT FLOAT DOUBLE PRECISION DEC DECIMAL NUMERIC
CHARACTER VARCHAR BINARY VARBINARY UNSIGNED
ZEROFILL COLLATE SET BLOB TINYTEXT MEDIUMTEXT LONGTEXT TINYBLOB
MEDIUMBLOB LONGBLOB VARYING
MEDIUMBLOB LONGBLOB VARYING LOAD
/* reserved key words only used in ob, in mysql these keywords are non reserved*/
CHARSET COMMIT ROLLBACK DO UNTIL
//-----------------------------reserved keyword end-------------------------------------------------
@ -223,7 +223,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode
DATA DEFINER END_KEY EXTEND FOLLOWS FOUND FUNCTION HANDLER INTERFACE INVOKER JSON LANGUAGE
MESSAGE_TEXT MYSQL_ERRNO NATIONAL NEXT NO OF OPEN PACKAGE PRAGMA PRECEDES RECORD RETURNS ROW ROWTYPE
SCHEMA_NAME SECURITY SUBCLASS_ORIGIN TABLE_NAME USER TYPE VALUE DATETIME TIMESTAMP TIME DATE YEAR
TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE
TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE SUBMIT CANCEL JOB
//-----------------------------non_reserved keyword end---------------------------------------------
%right END_KEY
%left ELSE IF ELSEIF
@ -276,6 +276,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode
%type <node> create_trigger_stmt drop_trigger_stmt plsql_trigger_source
%type <node> trigger_definition trigger_event trigger_body pl_obj_access_ref
%type <ival> trigger_time
%type <node> submit_job_stmt cancel_job_stmt
/*SQL data type*/
%type <node> scalar_data_type opt_charset collation opt_collation charset_name collation_name
%type <node> number_literal literal charset_key opt_float_precision opt_number_precision opt_binary
@ -359,6 +360,8 @@ outer_stmt:
| create_package_stmt { $$ = $1; }
| create_package_body_stmt { $$ = $1; }
| drop_package_stmt { $$ = $1; }
| submit_job_stmt { $$ = $1; }
| cancel_job_stmt { $$ = $1; }
| sql_stmt { $$ = $1; }
| call_sp_stmt { $$ = $1; }
| do_sp_stmt { $$ = $1; }
@ -395,6 +398,7 @@ sql_stmt_prefix:
| INSERT { $$ = NULL; }
| DELETE { $$ = NULL; }
| UPDATE { $$ = NULL; }
| LOAD { $$ = NULL; }
;
sql_stmt:
@ -734,6 +738,7 @@ unreserved_keyword:
| BODY %prec LOWER_PARENS
| C
| CATALOG_NAME
| CANCEL
| CLASS_ORIGIN
| CLOSE
| COLUMN_NAME
@ -754,6 +759,7 @@ unreserved_keyword:
| HANDLER
| INTERFACE
| INVOKER
| JOB
| JSON
| LANGUAGE
| MESSAGE_TEXT
@ -773,6 +779,7 @@ unreserved_keyword:
| SCHEMA_NAME
| SECURITY
| SUBCLASS_ORIGIN
| SUBMIT
| TABLE_NAME
| TYPE
| VALUE
@ -2734,6 +2741,30 @@ scond_info_item_name:
| MYSQL_ERRNO { $$ = DIAG_MYSQL_ERRNO; }
;
/*****************************************************************************
*
* OLAP ASYNC JOB grammar
*
*****************************************************************************/
submit_job_stmt:
SUBMIT JOB sql_stmt
{
malloc_terminal_node($$, parse_ctx->mem_pool_, T_OLAP_ASYNC_JOB_SUBMIT);
const char *stmt_str = parse_ctx->stmt_str_ + @3.first_column;
int32_t str_len = @3.last_column - @3.first_column + 1;
$$->str_value_ = parse_strndup(stmt_str, str_len, parse_ctx->mem_pool_);
check_ptr($$->str_value_);
$$->str_len_ = str_len;
}
;
cancel_job_stmt:
CANCEL JOB STRING
{
malloc_non_terminal_node($$, parse_ctx->mem_pool_, T_OLAP_ASYNC_JOB_CANCEL, 1, $3);
}
;
%%
/**
* parser function

View File

@ -12079,6 +12079,21 @@ int ObInnerTableSchema::all_virtual_tenant_scheduler_running_job_schema(ObTableS
true, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("job_class", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObVarcharType, //column_type
CS_TYPE_INVALID, //column_collation_type
128, //column_length
-1, //column_precision
-1, //column_scale
true, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
table_schema.get_part_option().set_part_num(1);
table_schema.set_part_level(PARTITION_LEVEL_ONE);

View File

@ -14370,6 +14370,7 @@ def_table_schema(
('destination', 'varchar:128', 'true'),
('credential_owner', 'varchar:30', 'true'),
('credential_name', 'varchar:30', 'true'),
('job_class', 'varchar:128', 'true'),
],
partition_columns = ['svr_ip', 'svr_port'],
vtable_route_policy = 'distributed',

View File

@ -272,6 +272,7 @@ ob_set_subtarget(ob_sql engine_cmd
engine/cmd/ob_table_direct_insert_service.cpp
engine/cmd/ob_tenant_snapshot_executor.cpp
engine/cmd/ob_mock_executor.cpp
engine/cmd/ob_olap_async_job_executor.cpp
)
ob_set_subtarget(ob_sql engine_dml
@ -1119,6 +1120,8 @@ ob_set_subtarget(ob_sql resolver_cmd
resolver/cmd/ob_alter_system_resolver.cpp
resolver/cmd/ob_tenant_snapshot_resolver.cpp
resolver/cmd/ob_tenant_clone_resolver.cpp
resolver/cmd/ob_olap_async_job_resolver.cpp
resolver/cmd/ob_olap_async_job_stmt.cpp
)
ob_set_subtarget(ob_sql resolver_dcl

View File

@ -0,0 +1,66 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_olap_async_job_executor.h"
#include <chrono>
#include <ctime>
#include <iomanip>
#include "lib/mysqlclient/ob_mysql_transaction.h"
#include "lib/string/ob_sql_string.h"
#include "sql/resolver/cmd/ob_olap_async_job_stmt.h"
#include "sql/engine/ob_exec_context.h"
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
namespace oceanbase
{
using namespace common;
using namespace obrpc;
using namespace share::schema;
namespace sql
{
int ObOLAPAsyncCancelJobExecutor::execute(ObExecContext &ctx, ObOLAPAsyncCancelJobStmt &stmt)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = stmt.get_tenant_id();
uint64_t user_id = stmt.get_user_id();
const ObUserInfo *user_info = nullptr;
ObArenaAllocator allocator("ASYNC_JOB_TMP");
dbms_scheduler::ObDBMSSchedJobInfo job_info;
schema::ObSchemaGetterGuard schema_guard;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_id));
} else if(OB_FAIL(schema_guard.get_user_info(tenant_id, user_id, user_info))) {
LOG_WARN("fail to get user id", KR(ret), K(tenant_id), K(user_id));
} else if (OB_ISNULL(user_info)) {
ret = OB_USER_NOT_EXIST;
LOG_WARN("user not exist", KR(ret), K(tenant_id), K(user_id));
} else if (OB_FAIL(dbms_scheduler::ObDBMSSchedJobUtils::get_dbms_sched_job_info(
*GCTX.sql_proxy_,
tenant_id,
false, // is_oracle_tenant
stmt.get_job_name(),
allocator,
job_info))) {
LOG_WARN("get job info failed", KR(ret), K(tenant_id), K(stmt.get_job_name()));
} else if (!job_info.is_olap_async_job_class()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("cancel not olap async job", KR(ret), K(tenant_id), K(job_info));
} else if (OB_FAIL(dbms_scheduler::ObDBMSSchedJobUtils::check_dbms_sched_job_priv(user_info, job_info))) {
LOG_WARN("check user priv failed", KR(ret), K(tenant_id), K(job_info));
} else if (OB_FAIL(dbms_scheduler::ObDBMSSchedJobUtils::stop_dbms_sched_job(*GCTX.sql_proxy_, job_info, true /* delete after stop */))) {
LOG_WARN("failed to stop dbms scheduler job", KR(ret));
}
return ret;
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SQL_ENGINE_CMD_OLAP_ASYNC_JOB_CMD_EXECUTOR_
#define OCEANBASE_SQL_ENGINE_CMD_OLAP_ASYNC_JOB_CMD_EXECUTOR_
#include "lib/string/ob_string.h"
#include "lib/container/ob_array_serialization.h"
#include "share/schema/ob_schema_struct.h"
namespace oceanbase
{
namespace sql
{
class ObExecContext;
class ObOLAPAsyncCancelJobStmt;
class ObOLAPAsyncCancelJobExecutor
{
public:
ObOLAPAsyncCancelJobExecutor() {}
virtual ~ObOLAPAsyncCancelJobExecutor() {}
int execute(ObExecContext &ctx, ObOLAPAsyncCancelJobStmt &stmt);
private:
DISALLOW_COPY_AND_ASSIGN(ObOLAPAsyncCancelJobExecutor);
};
}
}
#endif //OCEANBASE_SQL_ENGINE_CMD_OLAP_ASYNC_JOB_CMD_EXECUTOR_

View File

@ -152,6 +152,8 @@
#include "sql/engine/cmd/ob_tenant_snapshot_executor.h"
#include "sql/resolver/cmd/ob_tenant_clone_stmt.h"
#include "sql/engine/cmd/ob_clone_executor.h"
#include "sql/resolver/cmd/ob_olap_async_job_stmt.h"
#include "sql/engine/cmd/ob_olap_async_job_executor.h"
#ifdef OB_BUILD_TDE_SECURITY
#include "sql/resolver/ddl/ob_create_keystore_stmt.h"
#include "sql/resolver/ddl/ob_alter_keystore_stmt.h"
@ -1084,6 +1086,10 @@ int ObCmdExecutor::execute(ObExecContext &ctx, ObICmd &cmd)
DEFINE_EXECUTE_CMD(ObTransferPartitionStmt, ObTransferPartitionExecutor);
break;
}
case stmt::T_OLAP_ASYNC_JOB_CANCEL: {
DEFINE_EXECUTE_CMD(ObOLAPAsyncCancelJobStmt, ObOLAPAsyncCancelJobExecutor);
break;
}
case stmt::T_CS_DISKMAINTAIN:
case stmt::T_TABLET_CMD:
case stmt::T_SWITCH_ROOTSERVER:

View File

@ -61,7 +61,9 @@ bool ObParser::is_pl_stmt(const ObString &stmt, bool *is_create_func, bool *is_c
case S_BEGIN:
case S_DROP:
case S_ALTER:
case S_UPDATE: {
case S_UPDATE:
case S_SUBMIT:
case S_CANCEL: {
if (ISSPACE(*p)) {
p++;
} else {
@ -381,6 +383,9 @@ ObParser::State ObParser::transform_normal(ObString &normal)
ELSIF(6, S_SIGNAL, "signal")
ELSIF(8, S_RESIGNAL, "resignal")
ELSIF(5, S_FORCE, "force")
ELSIF(6, S_SUBMIT, "submit")
ELSIF(6, S_CANCEL, "cancel")
ELSIF(3, S_JOB, "job")
ELSE()
if (S_INVALID == state
@ -429,7 +434,9 @@ ObParser::State ObParser::transform_normal(
case S_BEGIN:
case S_DROP:
case S_ALTER:
case S_UPDATE: {
case S_UPDATE:
case S_SUBMIT:
case S_CANCEL: {
state = token;
} break;
case S_INVALID:
@ -497,6 +504,15 @@ ObParser::State ObParser::transform_normal(
is_not_pl = true;
}
} break;
case S_SUBMIT:
case S_CANCEL: {
State token = transform_normal(normal);
if (S_JOB == token) {
is_pl = true;
} else {
is_not_pl = true;
}
} break;
default: {
is_not_pl = true;
LOG_WARN_RET(common::OB_ERR_UNEXPECTED, "unexpected state", K(state));

View File

@ -153,6 +153,9 @@ enum State {
S_VALUES,
S_TABLE,
S_INTO,
S_SUBMIT,
S_CANCEL,
S_JOB,
// add new states above me
S_MAX
};

View File

@ -14440,6 +14440,14 @@ SHOW opt_extended_or_full TABLES opt_from_or_in_database_clause opt_show_conditi
(void)($4);
malloc_non_terminal_node($$, result->malloc_pool_, T_SHOW_OPEN_TABLES, 1, $5);
}
| SHOW JOB STATUS
{
malloc_terminal_node($$, result->malloc_pool_, T_SHOW_OLAP_ASYNC_JOB_STATUS);
}
| SHOW JOB STATUS WHERE JOB COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_SHOW_OLAP_ASYNC_JOB_STATUS, 1, $7);
}
| CHECK TABLE table_list check_table_options
{
(void) ($4);

View File

@ -28,14 +28,16 @@ const char *ObValuesTableCompression::lower_[ObParser::S_MAX] = {
"", "", "", "", "", "", "", "", "", "", /* 0 ~9 */
"", "", "", "update", "", "", "", "", "", "", /* 10 ~19 */
"", "", "", "", "", "", "", "", "", "", /* 20 ~29 */
"", "", "", "select", "insert", "delete", "values", "table", "into" /* 30 ~38 */
"", "", "", "select", "insert", "delete", "values", "table", "into", "", /* 30 ~39 */
"", "" /* 40 ~41 */
};
const char *ObValuesTableCompression::upper_[ObParser::S_MAX] = {
"", "", "", "", "", "", "", "", "", "", /* 0 ~9 */
"", "", "", "UPDATE", "", "", "", "", "", "", /* 10 ~19 */
"", "", "", "", "", "", "", "", "", "", /* 20 ~29 */
"", "", "", "SELECT", "INSERT", "DELETE", "VALUES", "TABLE", "INTO" /* 30 ~38 */
"", "", "", "SELECT", "INSERT", "DELETE", "VALUES", "TABLE", "INTO", "", /* 30 ~39 */
"", "" /* 40 ~41 */
};
#define ISSPACE(c) ((c) == ' ' || (c) == '\n' || (c) == '\r' || (c) == '\t' || (c) == '\f' || (c) == '\v')

View File

@ -0,0 +1,305 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_RESV
#include "sql/resolver/cmd/ob_olap_async_job_resolver.h"
#include "sql/parser/ob_parser.h"
#include "sql/resolver/ob_resolver_utils.h"
#include <inttypes.h>
namespace oceanbase
{
using namespace common;
using namespace share;
namespace sql
{
ObOLAPAsyncJobResolver::ObOLAPAsyncJobResolver(ObResolverParams &params)
: ObSelectResolver(params)
{
}
int ObOLAPAsyncJobResolver::resolve(const ParseNode &parse_tree)
{
int ret = OB_SUCCESS;
ObItemType stmt_type = parse_tree.type_;
switch (stmt_type) {
case T_OLAP_ASYNC_JOB_SUBMIT: {
ObOLAPAsyncSubmitJobStmt job_stmt;
OZ (resolve_submit_job_stmt(parse_tree, job_stmt));
OZ (execute_submit_job(job_stmt));
OZ (init_select_stmt(job_stmt));
break;
}
case T_OLAP_ASYNC_JOB_CANCEL: {
ObOLAPAsyncCancelJobStmt *stmt = create_stmt<ObOLAPAsyncCancelJobStmt>();
OV (OB_NOT_NULL(stmt), OB_ALLOCATE_MEMORY_FAILED);
OZ (resolve_cancel_job_stmt(parse_tree, stmt));
break;
}
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid stmt type", K(ret), K(stmt_type));
}
return ret;
}
int ObOLAPAsyncJobResolver::resolve_submit_job_stmt(const ParseNode &parse_tree, ObOLAPAsyncSubmitJobStmt &stmt)
{
int ret = OB_SUCCESS;
int64_t session_query_time_out_ts = 0;
if (OB_JOB_SQL_MAX_LENGTH - 1 < parse_tree.str_len_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("sql too long", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "sql length");
} else if (OB_INVALID_ID == session_info_->get_database_id()) {
ret = OB_ERR_NO_DB_SELECTED;
LOG_WARN("not select database", KR(ret));
} else if (OB_FAIL(session_info_->get_query_timeout(session_query_time_out_ts))){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get session query timeout failed", KR(ret));
} else {
const int definer_buf_size = OB_MAX_USER_NAME_LENGTH + OB_MAX_HOST_NAME_LENGTH + 2; // @ + \0
char *definer_buf = static_cast<char*>(allocator_->alloc(definer_buf_size));
if (OB_ISNULL(definer_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
memset(definer_buf, 0, definer_buf_size);
snprintf(definer_buf, definer_buf_size, "%.*s@%.*s", session_info_->get_user_name().length(), session_info_->get_user_name().ptr(),
session_info_->get_host_name().length(), session_info_->get_host_name().ptr());
stmt.set_job_definer(definer_buf);
}
if (OB_SUCC(ret)) {
const uint64_t tenant_id = params_.session_info_->get_effective_tenant_id();
stmt.set_tenant_id(tenant_id);
stmt.set_user_id(session_info_->get_user_id());
stmt.set_job_database(session_info_->get_database_name());
stmt.set_database_id(session_info_->get_database_id());
stmt.set_query_time_out_second(session_query_time_out_ts / 1000 / 1000);
}
if (OB_SUCC(ret)) {
char *sql_buf = static_cast<char*>(allocator_->alloc(OB_JOB_SQL_MAX_LENGTH));
if (OB_ISNULL(sql_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
memset(sql_buf, 0, OB_JOB_SQL_MAX_LENGTH);
snprintf(sql_buf, OB_JOB_SQL_MAX_LENGTH, "%.*s;", (int)parse_tree.str_len_, parse_tree.str_value_);
stmt.set_job_action(sql_buf);
}
}
if (OB_SUCC(ret)) {
char *job_exec_buf = static_cast<char*>(allocator_->alloc(OB_MAX_PROC_ENV_LENGTH));
if (OB_ISNULL(job_exec_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
memset(job_exec_buf, 0, OB_MAX_PROC_ENV_LENGTH);
int64_t pos = 0;
if (OB_FAIL(ObExecEnv::gen_exec_env(*session_info_, job_exec_buf, OB_MAX_PROC_ENV_LENGTH, pos))){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("generate exec env failed", K(ret), K(session_info_));
} else {
stmt.set_exec_env((ObString(pos, job_exec_buf)));
}
}
}
if (OB_SUCC(ret)) {
int64_t job_id = OB_INVALID_ID;
if (OB_FAIL(dbms_scheduler::ObDBMSSchedJobUtils::generate_job_id(stmt.get_tenant_id(), job_id))) {
LOG_WARN("generate_job_id failed", KR(ret), K(stmt.get_tenant_id()));
} else {
stmt.set_job_id(job_id);
char *job_name_buf = static_cast<char*>(allocator_->alloc(OB_JOB_NAME_MAX_LENGTH));
if (OB_ISNULL(job_name_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
memset(job_name_buf, 0, OB_JOB_NAME_MAX_LENGTH);
snprintf(job_name_buf, OB_JOB_NAME_MAX_LENGTH, "%"PRIu64"%"PRIu64,stmt.get_database_id(), stmt.get_job_id());
stmt.set_job_name(job_name_buf);
}
}
}
}
return ret;
}
int ObOLAPAsyncJobResolver::resolve_cancel_job_stmt(const ParseNode &parse_tree, ObOLAPAsyncCancelJobStmt *stmt)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = params_.session_info_->get_effective_tenant_id();
stmt->set_tenant_id(tenant_id);
stmt->set_user_id(session_info_->get_user_id());
if (parse_tree.num_child_ != 1) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid job name", K(ret));
} else {
const ParseNode *job_name_node = parse_tree.children_[0];
if (OB_ISNULL(job_name_node)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid job name", K(ret));
} else {
char *job_name_buf = static_cast<char*>(allocator_->alloc(OB_JOB_NAME_MAX_LENGTH));
if (OB_ISNULL(job_name_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
memset(job_name_buf, 0, OB_JOB_NAME_MAX_LENGTH);
snprintf(job_name_buf, OB_JOB_NAME_MAX_LENGTH, "%.*s", (int)job_name_node->str_len_, job_name_node->str_value_);
stmt->set_job_name(job_name_buf);
}
}
}
return ret;
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(ERRSIM_SUBMIT_ERR_JOB_NAME);
ERRSIM_POINT_DEF(ERRSIM_SUBMIT_ERR_JOB_START_TIME);
#endif
int ObOLAPAsyncJobResolver::execute_submit_job(ObOLAPAsyncSubmitJobStmt &stmt)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == stmt.get_tenant_id()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", KR(ret), K(stmt.get_tenant_id()));
} else {
int64_t start_date_us = ObTimeUtility::current_time();
int64_t end_date_us = 64060560000000000; // 4000-01-01
HEAP_VAR(dbms_scheduler::ObDBMSSchedJobInfo, job_info) {
job_info.tenant_id_ = stmt.get_tenant_id();
job_info.user_id_ = stmt.get_user_id();
job_info.database_id_ = stmt.get_database_id();
job_info.job_ = stmt.get_job_id();
job_info.job_name_ = stmt.get_job_name();
job_info.job_action_ = stmt.get_job_action();
job_info.lowner_ = stmt.get_job_definer();
job_info.powner_ = stmt.get_job_definer();
job_info.cowner_ = stmt.get_job_database();
job_info.job_style_ = ObString("regular");
job_info.job_type_ = ObString("PLSQL_BLOCK");
job_info.job_class_ = ObString("OLAP_ASYNC_JOB_CLASS");
job_info.what_ = stmt.get_job_action();
job_info.start_date_ = start_date_us;
job_info.end_date_ = end_date_us;
job_info.interval_ = job_info.repeat_interval_;
job_info.repeat_interval_ = job_info.repeat_interval_;
job_info.enabled_ = true;
job_info.auto_drop_ = true;
job_info.max_run_duration_ = stmt.get_query_time_out_second() + 60;
job_info.interval_ts_ = 0;
job_info.exec_env_ = stmt.get_exec_env();
job_info.comments_ = ObString("olap async job");
#ifdef ERRSIM
if (OB_SUCCESS != ERRSIM_SUBMIT_ERR_JOB_NAME) { //注入一个错误的JOB NAME
job_info.job_name_ = "ERRSIM_JOB";
}
if (OB_SUCCESS != ERRSIM_SUBMIT_ERR_JOB_START_TIME) { //注入一个错误的开始时间
job_info.start_date_ = 64060560000000000;
}
#endif
ObMySQLTransaction trans;
if (OB_FAIL(trans.start(GCTX.sql_proxy_, stmt.get_tenant_id()))) {
LOG_WARN("failed to start trans", KR(ret), K(stmt.get_tenant_id()));
} else if (OB_FAIL(dbms_scheduler::ObDBMSSchedJobUtils::create_dbms_sched_job(
trans, stmt.get_tenant_id(), stmt.get_job_id(), job_info))) {
LOG_WARN("failed to create dbms scheduler job", KR(ret));
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
if (ret == OB_ERR_PRIMARY_KEY_DUPLICATE) {
LOG_WARN("job is exist", KR(ret));
}
}
}
return ret;
}
int ObOLAPAsyncJobResolver::init_select_stmt(ObOLAPAsyncSubmitJobStmt &stmt)
{
int ret = OB_SUCCESS;
ObSelectStmt *select_stmt = create_stmt<ObSelectStmt>();
OV (OB_NOT_NULL(select_stmt), OB_ALLOCATE_MEMORY_FAILED);
if (OB_LIKELY(OB_SUCC(ret))) {
ObSqlString select_sql;
if (OB_FAIL(select_sql.assign_fmt(
"SELECT '%.*s' as job_id",stmt.get_job_name().length(), stmt.get_job_name().ptr()))) {
LOG_WARN("assign sql string failed", KR(ret), K(stmt.get_job_name()));
} else if (OB_FAIL(parse_and_resolve_select_sql(select_sql.string()))) {
LOG_WARN("fail to parse and resolve select sql", K(ret), K(select_sql));
} else if (OB_UNLIKELY(stmt::T_SELECT != stmt_->get_stmt_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected stmt type", K(stmt_->get_stmt_type()));
} else {
select_stmt->set_select_type(NOT_AFFECT_FOUND_ROWS);
if(OB_FAIL(select_stmt->formalize_stmt(session_info_))) {
LOG_WARN("pull select stmt all expr relation ids failed", K(ret));
}
}
}
return ret;
}
int ObOLAPAsyncJobResolver::parse_and_resolve_select_sql(const ObString &select_sql)
{
int ret = OB_SUCCESS;
// 1. parse and resolve view defination
if (OB_ISNULL(session_info_) || OB_ISNULL(params_.allocator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("data member is not init", K(ret), K(session_info_), K(params_.allocator_));
} else {
ParseResult select_result;
ObParser parser(*params_.allocator_, session_info_->get_sql_mode());
if (OB_FAIL(parser.parse(select_sql, select_result))) {
LOG_WARN("parse select sql failed", K(select_sql), K(ret));
} else {
// use alias to make all columns number continued
if (OB_ISNULL(select_result.result_tree_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result tree is NULL", K(ret));
} else if (OB_UNLIKELY(select_result.result_tree_->num_child_ != 1
|| NULL == select_result.result_tree_->children_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result tree is invalid",
K(ret), K(select_result.result_tree_->num_child_), K(select_result.result_tree_->children_));
} else if (OB_UNLIKELY(NULL == select_result.result_tree_->children_[0])) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result tree is invalid", K(ret), "child ptr", select_result.result_tree_->children_[0]);
} else {
ParseNode *select_stmt_node = select_result.result_tree_->children_[0];
if (OB_FAIL(ObSelectResolver::resolve(*select_stmt_node))) {
LOG_WARN("resolve select in view definition failed", K(ret), K(select_stmt_node));
}
}
}
}
return ret;
}
} //namespace sql
} //namespace oceanbase

View File

@ -0,0 +1,42 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_OLAP_ASYNC_JOB_RESOLVER_H
#define _OB_OLAP_ASYNC_JOB_RESOLVER_H 1
#include "sql/resolver/dml/ob_select_resolver.h"
#include "sql/resolver/cmd/ob_olap_async_job_stmt.h"
namespace oceanbase
{
namespace sql
{
class ObOLAPAsyncJobResolver: public ObSelectResolver
{
public:
explicit ObOLAPAsyncJobResolver(ObResolverParams &params);
virtual ~ObOLAPAsyncJobResolver() = default;
virtual int resolve(const ParseNode &parse_tree);
private:
static const int OB_JOB_NAME_MAX_LENGTH = 128;
static const int OB_JOB_SQL_MAX_LENGTH = 4096;
int resolve_submit_job_stmt(const ParseNode &parse_tree, ObOLAPAsyncSubmitJobStmt &stmt);
int resolve_cancel_job_stmt(const ParseNode &parse_tree, ObOLAPAsyncCancelJobStmt *stmt);
int execute_submit_job(ObOLAPAsyncSubmitJobStmt &stmt);
int init_select_stmt(ObOLAPAsyncSubmitJobStmt &stmt);
int parse_and_resolve_select_sql(const common::ObString &select_sql);
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObOLAPAsyncJobResolver);
};
}//namespace sql
}//namespace oceanbase
#endif // _OB_OLAP_ASYNC_JOB_RESOLVER_H

View File

@ -0,0 +1,66 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_RESV
#include "sql/resolver/cmd/ob_olap_async_job_stmt.h"
#include "share/ob_define.h"
#include "lib/string/ob_string.h"
#include "lib/string/ob_strings.h"
#include "lib/utility/ob_print_utils.h"
using namespace oceanbase;
using namespace oceanbase::common;
using namespace oceanbase::sql;
ObOLAPAsyncSubmitJobStmt::ObOLAPAsyncSubmitJobStmt(ObIAllocator *name_pool)
: ObCMDStmt(name_pool, stmt::T_OLAP_ASYNC_JOB_SUBMIT)
{
}
ObOLAPAsyncSubmitJobStmt::ObOLAPAsyncSubmitJobStmt()
: ObCMDStmt(NULL, stmt::T_OLAP_ASYNC_JOB_SUBMIT)
{
}
int64_t ObOLAPAsyncSubmitJobStmt::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
if (NULL != buf) {
J_OBJ_START();
J_KV(N_STMT_TYPE, ((int)stmt_type_));
J_OBJ_END();
}
return pos;
}
ObOLAPAsyncCancelJobStmt::ObOLAPAsyncCancelJobStmt(ObIAllocator *name_pool)
: ObCMDStmt(name_pool, stmt::T_OLAP_ASYNC_JOB_CANCEL)
{
}
ObOLAPAsyncCancelJobStmt::ObOLAPAsyncCancelJobStmt()
: ObCMDStmt(NULL, stmt::T_OLAP_ASYNC_JOB_CANCEL)
{
}
int64_t ObOLAPAsyncCancelJobStmt::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
if (NULL != buf) {
J_OBJ_START();
J_KV(N_STMT_TYPE, ((int)stmt_type_));
J_OBJ_END();
}
return pos;
}

View File

@ -0,0 +1,94 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SQL_RESOLVER_CMD_OB_OLAP_ASYNC_JOB_STMT_
#define OCEANBASE_SQL_RESOLVER_CMD_OB_OLAP_ASYNC_JOB_STMT_
#include "sql/resolver/cmd/ob_cmd_stmt.h"
#include "lib/string/ob_strings.h"
#include "share/ob_define.h"
namespace oceanbase
{
namespace sql
{
class ObOLAPAsyncSubmitJobStmt: public ObCMDStmt
{
public:
explicit ObOLAPAsyncSubmitJobStmt(common::ObIAllocator *name_pool);
ObOLAPAsyncSubmitJobStmt();
virtual ~ObOLAPAsyncSubmitJobStmt() = default;
inline void set_tenant_id(const uint64_t id) { tenant_id_ = id; }
inline void set_database_id(const uint64_t id) { database_id_ = id; }
inline void set_user_id(const uint64_t id) { user_id_ = id; }
inline void set_job_id(const int64_t id) { job_id_ = id; }
inline void set_query_time_out_second(const uint64_t time) { query_time_out_second_ = time; }
inline void set_job_name(const common::ObString &job_name) { job_name_ = job_name; }
inline void set_job_definer(const common::ObString &job_definer) { job_definer_ = job_definer; }
inline void set_job_database(const common::ObString &job_database) { job_database_ = job_database; }
inline void set_job_action(const common::ObString &job_action) { job_action_ = job_action; }
inline void set_exec_env(const common::ObString &exec_env) { exec_env_ = exec_env; }
inline uint64_t get_tenant_id() const { return tenant_id_; }
inline uint64_t get_database_id() const { return database_id_; }
inline uint64_t get_user_id() const { return user_id_; }
inline int64_t get_job_id() const { return job_id_; }
inline uint64_t get_query_time_out_second() const { return query_time_out_second_; }
inline const common::ObString &get_job_name() const { return job_name_; }
inline const common::ObString &get_job_definer() const { return job_definer_; }
inline const common::ObString &get_job_action() const { return job_action_; }
inline const common::ObString &get_exec_env() const { return exec_env_; }
inline const common::ObString &get_job_database() const { return job_database_; }
DECLARE_VIRTUAL_TO_STRING;
private:
// data members
uint64_t tenant_id_;
uint64_t database_id_;
uint64_t user_id_;
int64_t job_id_;
uint64_t query_time_out_second_;
common::ObString job_name_;
common::ObString job_definer_;
common::ObString job_database_;
common::ObString job_action_;
common::ObString exec_env_;
private:
DISALLOW_COPY_AND_ASSIGN(ObOLAPAsyncSubmitJobStmt);
};
class ObOLAPAsyncCancelJobStmt: public ObCMDStmt
{
public:
explicit ObOLAPAsyncCancelJobStmt(common::ObIAllocator *name_pool);
ObOLAPAsyncCancelJobStmt();
virtual ~ObOLAPAsyncCancelJobStmt() = default;
inline void set_tenant_id(const uint64_t id) { tenant_id_ = id; }
inline void set_user_id(const uint64_t id) { user_id_ = id; }
inline void set_job_name(const common::ObString &job_name) { job_name_ = job_name; }
inline uint64_t get_tenant_id() const { return tenant_id_; }
inline uint64_t get_user_id() const { return user_id_; }
inline const common::ObString &get_job_name() const { return job_name_; }
DECLARE_VIRTUAL_TO_STRING;
private:
// data members
uint64_t tenant_id_;
uint64_t user_id_;
common::ObString job_name_;
private:
DISALLOW_COPY_AND_ASSIGN(ObOLAPAsyncCancelJobStmt);
};
} // end namespace sql
} // end namespace oceanbase
#endif //OCEANBASE_SQL_RESOLVER_CMD_OB_OLAP_ASYNC_JOB_STMT_

View File

@ -161,7 +161,8 @@ int ObShowResolver::resolve(const ParseNode &parse_tree)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_ENGINE)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_OPEN_TABLES)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_CREATE_USER)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_CHECK_TABLE)) {
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_CHECK_TABLE)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_OLAP_ASYNC_JOB_STATUS)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected parse tree type", K(ret), K(parse_tree.type_));
} else {
@ -1729,6 +1730,56 @@ int ObShowResolver::resolve(const ParseNode &parse_tree)
}
break;
}
case T_SHOW_OLAP_ASYNC_JOB_STATUS: {
[&] {
const int WHERE_JOB_NAME_LENGTH = 128 + 20;
const int LIMIT_LENGTH = 40;
char where_job_name[WHERE_JOB_NAME_LENGTH] = {};
char limit_count [LIMIT_LENGTH] = {};
uint64_t min_version = OB_INVALID_VERSION;
if (OB_FAIL(GET_MIN_DATA_VERSION(real_tenant_id, min_version))) {
LOG_WARN("get min data_version failed", K(ret), K(real_tenant_id));
} else if (min_version < DATA_VERSION_4_3_2_1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support show async job", K(ret), K(real_tenant_id));
} else if (parse_tree.num_child_ == 1 && OB_NOT_NULL(parse_tree.children_)) {
snprintf(where_job_name, WHERE_JOB_NAME_LENGTH, " AND JOB_NAME = '%.*s' ", (int)parse_tree.children_[0]->str_len_, parse_tree.children_[0]->str_value_);
snprintf(limit_count, LIMIT_LENGTH, "order by update_time desc limit 1");
} else if (parse_tree.num_child_ == 0) {
//nothing to do
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parse tree is wrong", K(ret), K(parse_tree.num_child_),
K(parse_tree.children_));
}
if (OB_SUCC(ret)) {
const int WHERE_USR_NAME_LENGTH = OB_MAX_USER_NAME_LENGTH + OB_MAX_HOST_NAME_LENGTH + 20;
char where_user_from_jobs[WHERE_USR_NAME_LENGTH] = {};
char where_user_from_logs[WHERE_USR_NAME_LENGTH] = {};
if (0 != session_info_->get_user_name().case_compare("root")) {
snprintf(where_user_from_jobs, WHERE_USR_NAME_LENGTH, " AND T.POWNER = '%.*s@%.*s' ", session_info_->get_user_name().length(), session_info_->get_user_name().ptr(),
session_info_->get_host_name().length(), session_info_->get_host_name().ptr());
snprintf(where_user_from_logs, WHERE_USR_NAME_LENGTH, " AND R.OWNER = '%.*s@%.*s' ", session_info_->get_user_name().length(), session_info_->get_user_name().ptr(),
session_info_->get_host_name().length(), session_info_->get_host_name().ptr());
}
show_resv_ctx.stmt_type_ = stmt::T_SHOW_OLAP_ASYNC_JOB_STATUS;
GEN_SQL_STEP_1(ObShowSqlSet::SHOW_OLAP_ASYNC_JOB_STATUS);
GEN_SQL_STEP_2(ObShowSqlSet::SHOW_OLAP_ASYNC_JOB_STATUS,
OB_SYS_DATABASE_NAME,
OB_ALL_SCHEDULER_JOB_RUN_DETAIL_V2_TNAME,
where_job_name,
where_user_from_logs,
OB_SYS_DATABASE_NAME,
OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
sql_tenant_id,
where_job_name,
where_user_from_jobs,
limit_count);
}
}();
break;
}
default:
/* won't be here */
ret = OB_NOT_IMPLEMENT;
@ -3725,6 +3776,13 @@ DEFINE_SHOW_CLAUSE_SET(SHOW_SEQUENCES_LIKE,
"SELECT sequence_name FROM %s.%s WHERE database_id = %ld ORDER BY sequence_name COLLATE utf8mb4_bin ASC",
NULL,
"sequence_name");
DEFINE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS,
NULL,
"(SELECT R.job_name AS 'job_id', R.database_name AS 'schema_name', CASE WHEN R.status = 'COMPLETED' AND R.message = 'SUCCESS' THEN 'FINISH' WHEN R.status = 'COMPLETED' AND R.message <> 'SUCCESS' THEN 'FAILED' WHEN R.status = 'KILLED' THEN 'CANCELLED' ELSE R.status END as 'status', R.message as 'fail_msg', R.req_start_date as 'create_time', R.time as 'update_time', R.operation AS 'definition' FROM %s.%s R WHERE R.JOB_CLASS = 'OLAP_ASYNC_JOB_CLASS' %s %s\
UNION ALL\
SELECT T.job_name AS 'job_id', T.cowner AS 'schema_name', CASE WHEN T.state IS NULL THEN 'SUBMITTED' WHEN T.state = 'SCHEDULED' THEN 'RUNNING' WHEN T.state = 'KILLED' THEN 'CANCELLED' ELSE T.state END as 'status', NULL as fail_msg, T.start_date as 'create_time', T.gmt_modified as 'update_time', T.job_action as 'definition' FROM %s.%s T WHERE T.JOB_CLASS = 'OLAP_ASYNC_JOB_CLASS' AND T.TENANT_ID = %d AND T.JOB > 0 %s %s) %s",
NULL,
NULL);
DEFINE_SHOW_CLAUSE_SET(SHOW_CREATE_USER,
NULL,
"SELECT \"%.*s\" AS `CREATE USER for %.*s@%.*s` FROM DUAL",

View File

@ -209,6 +209,7 @@ struct ObShowResolver::ObShowSqlSet
DECLARE_SHOW_CLAUSE_SET(SHOW_SEQUENCES_LIKE);
DECLARE_SHOW_CLAUSE_SET(SHOW_ENGINE);
DECLARE_SHOW_CLAUSE_SET(SHOW_OPEN_TABLES);
DECLARE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS);
DECLARE_SHOW_CLAUSE_SET(SHOW_CREATE_USER);
};// ObShowSqlSet

View File

@ -137,6 +137,7 @@
#include "sql/resolver/ddl/ob_drop_context_resolver.h"
#include "sql/resolver/cmd/ob_tenant_snapshot_resolver.h"
#include "sql/resolver/cmd/ob_tenant_clone_resolver.h"
#include "sql/resolver/cmd/ob_olap_async_job_resolver.h"
#ifdef OB_BUILD_TDE_SECURITY
#include "sql/resolver/ddl/ob_create_tablespace_resolver.h"
#include "sql/resolver/ddl/ob_alter_tablespace_resolver.h"
@ -761,6 +762,7 @@ int ObResolver::resolve(IsPrepared if_prepared, const ParseNode &parse_tree, ObS
case T_SHOW_ENGINE:
case T_SHOW_OPEN_TABLES:
case T_SHOW_SEQUENCES:
case T_SHOW_OLAP_ASYNC_JOB_STATUS:
case T_SHOW_CHECK_TABLE:
case T_SHOW_CREATE_USER: {
REGISTER_STMT_RESOLVER(Show);
@ -1272,6 +1274,14 @@ int ObResolver::resolve(IsPrepared if_prepared, const ParseNode &parse_tree, ObS
REGISTER_STMT_RESOLVER(Mock);
break;
}
case T_OLAP_ASYNC_JOB_SUBMIT: {
REGISTER_STMT_RESOLVER(OLAPAsyncJob);
break;
}
case T_OLAP_ASYNC_JOB_CANCEL: {
REGISTER_STMT_RESOLVER(OLAPAsyncJob);
break;
}
default: {
ret = OB_NOT_SUPPORTED;
const char *type_name = get_type_name(parse_tree.type_);

View File

@ -9236,6 +9236,7 @@ destination_owner varchar(128) YES NULL
destination varchar(128) YES NULL
credential_owner varchar(30) YES NULL
credential_name varchar(30) YES NULL
job_class varchar(128) YES NULL
select /*+QUERY_TIMEOUT(60000000)*/ IF(count(*) >= 0, 1, 0) from oceanbase.__all_virtual_tenant_scheduler_running_job;
IF(count(*) >= 0, 1, 0)
1

View File

@ -171,6 +171,8 @@ Reserved Keyword: LONGBLOB
Test PL procedure name Sql:create procedure LONGBLOB () select 1;
Reserved Keyword: VARYING
Test PL procedure name Sql:create procedure VARYING () select 1;
Reserved Keyword: LOAD
Test PL procedure name Sql:create procedure LOAD () select 1;
Reserved Keyword: CHARSET
Test PL procedure name Sql:create procedure CHARSET () select 1;
Reserved Keyword: COMMIT
@ -181,7 +183,7 @@ Reserved Keyword: DO
Test PL procedure name Sql:create procedure DO () select 1;
Reserved Keyword: UNTIL
Test PL procedure name Sql:create procedure UNTIL () select 1;
************** Total Count of Reserved Keyword:91 ***************
************** Total Count of Reserved Keyword:92 ***************
************** End Test Reserved Keyword ***************
************** Begin Test Non-Reserved Keyword ***************
************** End Test Non-Reserved Keyword ***************