diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp index 884380e06d..e02af6c131 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp @@ -74,120 +74,149 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job( } else { CK (job_info.valid()); CK ((job_info.get_what().length() != 0) || (job_info.get_program_name().length() != 0)); - if (job_info.get_what().length() != 0) { // action - if (job_info.is_oracle_tenant_) { - OZ (what.append_fmt("BEGIN %.*s; END;", - job_info.get_what().length(), job_info.get_what().ptr())); - } else { - //mysql mode not support anonymous block - OZ (what.append_fmt("CALL %.*s;", - job_info.get_what().length(), job_info.get_what().ptr())); - } - } else { // program - ObSqlString sql; - ObString program_action; - uint64_t number_of_argument = 0; - OZ (sql.assign_fmt("select program_action, number_of_argument from %s where program_name = \'%.*s\'", - OB_ALL_TENANT_SCHEDULER_PROGRAM_TNAME, - job_info.get_program_name().length(), - job_info.get_program_name().ptr())); - SMART_VAR(ObMySQLProxy::MySQLResult, result) { - if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { - LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_job_name().ptr())); - } else if (OB_NOT_NULL(result.get_result())) { - if (OB_SUCCESS == (ret = result.get_result()->next())) { - EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "program_action", program_action); - EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "number_of_argument", number_of_argument, uint64_t); - if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) { - LOG_ERROR("got more than one row for dbms sched program!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); - ret = OB_ERR_UNEXPECTED; - } - } else if (OB_ITER_END == ret) { - LOG_INFO("program not exists, may delete alreay!", K(ret), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_program_name().ptr())); - ret = OB_SUCCESS; + if (OB_SUCC(ret)) { + if (job_info.get_what().length() != 0) { // action + if (job_info.is_oracle_tenant_) { + OZ (what.append_fmt("BEGIN %.*s; END;", + job_info.get_what().length(), job_info.get_what().ptr())); + } else { + //mysql mode not support anonymous block + OZ (what.append_fmt("CALL %.*s;", + job_info.get_what().length(), job_info.get_what().ptr())); + } + } else { // program + ObSqlString sql; + ObString program_action; + uint64_t number_of_argument = 0; + OZ (sql.assign_fmt("select program_action, number_of_argument from %s where program_name = \'%.*s\'", + OB_ALL_TENANT_SCHEDULER_PROGRAM_TNAME, + job_info.get_program_name().length(), + job_info.get_program_name().ptr())); + SMART_VAR(ObMySQLProxy::MySQLResult, result) { + if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { + LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_job_name().ptr())); + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); } else { - LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + if (OB_SUCCESS == (ret = result.get_result()->next())) { + EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "program_action", program_action); + EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "number_of_argument", number_of_argument, uint64_t); + if (OB_SUCC(ret)) { + int tmp_ret = result.get_result()->next(); + if (OB_SUCCESS == tmp_ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("got more than one row for dbms sched program!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } else if (tmp_ret != OB_ITER_END) { + ret = tmp_ret; + LOG_WARN("got next row for dbms sched program failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } + } + } else if (OB_ITER_END == ret) { + LOG_INFO("program not exists, may delete alreay!", K(ret), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_program_name().ptr())); + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } } } - } - OZ (what.append_fmt("BEGIN %.*s(", - program_action.length(), program_action.ptr())); - if (OB_SUCC(ret) && (0 != number_of_argument)) { - ObString argument_value; - for (int i = 1; OB_SUCC(ret) && i <= number_of_argument; i++) { - argument_value.reset(); - OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%.*s\' and argument_position = %d and is_for_default = 0", - OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME, - job_info.get_program_name().length(), - job_info.get_program_name().ptr(), - job_info.get_job_name().length(), - job_info.get_job_name().ptr(), - i)); - SMART_VAR(ObMySQLProxy::MySQLResult, result) { - if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { - LOG_WARN("execute query failed", K(ret), K(sql), K(result.get_result()), K(tenant_id), K(job_info.get_job_name().ptr())); - } else if (OB_NOT_NULL(result.get_result())) { - if (OB_SUCCESS == (ret = result.get_result()->next())) { - EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "default_value", argument_value); - if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) { - LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); - ret = OB_ERR_UNEXPECTED; - } - } else if (OB_ITER_END == ret) { - LOG_INFO("job argument not exists, use default"); - ret = OB_SUCCESS; - OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%s\' and argument_position = %d and is_for_default = 1", - OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME, - job_info.get_program_name().length(), - job_info.get_program_name().ptr(), - "default", - i)); - SMART_VAR(ObMySQLProxy::MySQLResult, tmp_result) { - if (OB_FAIL(sql_proxy_->read(tmp_result, tenant_id, sql.ptr()))) { - LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_job_name().ptr())); - } else if (OB_NOT_NULL(tmp_result.get_result())) { - if (OB_SUCCESS == (ret = tmp_result.get_result()->next())) { - EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(tmp_result.get_result()), "default_value", argument_value); - if (OB_SUCC(ret) && (tmp_result.get_result()->next()) != OB_ITER_END) { - LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); - ret = OB_ERR_UNEXPECTED; - } - } else if (OB_ITER_END == ret) { - LOG_ERROR("program default argument not exists", K(sql.ptr()), K(job_info.get_program_name().ptr())); - } else { - LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); + OZ (what.append_fmt("BEGIN %.*s(", + program_action.length(), program_action.ptr())); + if (OB_SUCC(ret) && (0 != number_of_argument)) { + ObString argument_value; + for (int i = 1; OB_SUCC(ret) && i <= number_of_argument; i++) { + argument_value.reset(); + OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%.*s\' and argument_position = %d and is_for_default = 0", + OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME, + job_info.get_program_name().length(), + job_info.get_program_name().ptr(), + job_info.get_job_name().length(), + job_info.get_job_name().ptr(), + i)); + SMART_VAR(ObMySQLProxy::MySQLResult, result) { + if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { + LOG_WARN("execute query failed", K(ret), K(sql), K(result.get_result()), K(tenant_id), K(job_info.get_job_name().ptr())); + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); + } else { + if (OB_SUCCESS == (ret = result.get_result()->next())) { + EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "default_value", argument_value); + if (OB_SUCC(ret)) { + int tmp_ret = result.get_result()->next(); + if (OB_SUCCESS == tmp_ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } else if (tmp_ret != OB_ITER_END) { + ret = tmp_ret; + LOG_WARN("got next row for argument failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); } } + } else if (OB_ITER_END == ret) { + LOG_INFO("job argument not exists, use default"); + ret = OB_SUCCESS; + OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%s\' and argument_position = %d and is_for_default = 1", + OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME, + job_info.get_program_name().length(), + job_info.get_program_name().ptr(), + "default", + i)); + SMART_VAR(ObMySQLProxy::MySQLResult, tmp_result) { + if (OB_FAIL(sql_proxy_->read(tmp_result, tenant_id, sql.ptr()))) { + LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_job_name().ptr())); + } else if (OB_ISNULL(tmp_result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); + } else { + if (OB_SUCCESS == (ret = tmp_result.get_result()->next())) { + EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(tmp_result.get_result()), "default_value", argument_value); + if (OB_SUCC(ret)) { + int tmp_ret = tmp_result.get_result()->next(); + if (OB_SUCCESS == tmp_ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } else if (tmp_ret != OB_ITER_END) { + ret = tmp_ret; + LOG_WARN("got next row for argument failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr())); + } + } + } else if (OB_ITER_END == ret) { + LOG_ERROR("program default argument not exists", K(sql.ptr()), K(job_info.get_program_name().ptr())); + } else { + LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); + } + } + } + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); } - ret = OB_SUCCESS; - } else { - LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr())); } } + if (i == 1) { + OZ (what.append_fmt("\'%.*s\'", argument_value.length(), argument_value.ptr())); + } else { + OZ (what.append_fmt(",\'%.*s\'", argument_value.length(), argument_value.ptr())); + } } - if (i == 1) { - OZ (what.append_fmt("\'%.*s\'", argument_value.length(), argument_value.ptr())); - } else { - OZ (what.append_fmt(",\'%.*s\'", argument_value.length(), argument_value.ptr())); - } + OZ (what.append_fmt("); END;")); + } else { + LOG_ERROR("number_of_argument not exist or not right", K(ret), K(number_of_argument)); } - OZ (what.append_fmt("); END;")); - } else { - LOG_ERROR("number_of_argument not exist or not right", K(ret), K(number_of_argument)); } - } - if (job_info.is_oracle_tenant_) { - ObOracleSqlProxy oracle_proxy(*(static_cast(sql_proxy_))); - CK (OB_NOT_NULL(pool = static_cast(oracle_proxy.get_pool()))); - OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info)); - OZ (pool->acquire_spi_conn(session_info, conn)); - OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows)); - if (OB_NOT_NULL(conn)) { - sql_proxy_->close(conn, ret); + if (job_info.is_oracle_tenant_) { + ObOracleSqlProxy oracle_proxy(*(static_cast(sql_proxy_))); + CK (OB_NOT_NULL(pool = static_cast(oracle_proxy.get_pool()))); + OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info)); + OZ (pool->acquire_spi_conn(session_info, conn)); + OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows)); + if (OB_NOT_NULL(conn)) { + sql_proxy_->close(conn, ret); + } + } else {//mysql mode need use mysql proxy + OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info)); + OZ (sql_proxy_->write(tenant_id, what.string().ptr(), affected_rows)); } - } else {//mysql mode need use mysql proxy - OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info)); - OZ (sql_proxy_->write(tenant_id, what.string().ptr(), affected_rows)); } } if (NULL != session_info) { @@ -211,7 +240,6 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(uint64_t tenant_id, bool is_oracl OZ (table_operator_.get_dbms_sched_job_info(tenant_id, is_oracle_tenant, job_id, job_name, allocator, job_info)); if (OB_SUCC(ret)) { - OZ (table_operator_.update_for_start(tenant_id, job_info)); OZ (run_dbms_sched_job(tenant_id, job_info)); diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_master.cpp b/src/observer/dbms_scheduler/ob_dbms_sched_job_master.cpp index b6d22d6bcb..702b2e2a7d 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_master.cpp +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_master.cpp @@ -29,6 +29,8 @@ #include "observer/ob_server_struct.h" #include "rootserver/ob_root_service.h" +#define TO_TS(second) (1000000L * second) + namespace oceanbase { using namespace common; @@ -48,26 +50,20 @@ int ObDBMSSchedJobTask::init() if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret), K(inited_)); - } else if (OB_FAIL(timer_.init())) { - LOG_WARN("fail to init timer", K(ret)); } else { inited_ = true; } return ret; } -int ObDBMSSchedJobTask::start(dbms_job::ObDBMSJobQueue *ready_queue) +int ObDBMSSchedJobTask::start(ObVSliceAlloc *allocator) { int ret = OB_SUCCESS; if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("dbms sched job task not inited", K(ret), K(inited_)); - } else if (OB_ISNULL(ready_queue)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL ptr", K(ret), K(ready_queue)); } - ready_queue_ = ready_queue; - OZ (timer_.start()); + allocator_ = allocator; return ret; } @@ -78,12 +74,15 @@ int ObDBMSSchedJobTask::stop() ret = OB_NOT_INIT; LOG_WARN("dbms sched job task not inited", K(ret), K(inited_)); } else { - timer_.cancel(*this); - timer_.stop(); - timer_.wait(); + if (allocator_ != NULL) { + for (WaitVectorIterator iter = wait_vector_.begin(); + OB_SUCC(ret) && iter != wait_vector_.end(); ++iter) { + ObDBMSSchedJobKey *job_key = *iter; + allocator_->free(job_key); + } + } wait_vector_.clear(); - job_key_ = NULL; - ready_queue_ = NULL; + allocator_ = NULL; } return ret; } @@ -94,50 +93,10 @@ int ObDBMSSchedJobTask::destroy() if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("scheduler task not inited", K(ret), K(inited_)); - } else { - timer_.destroy(); } return ret; } -void ObDBMSSchedJobTask::runTimerTask() -{ - int ret = OB_SUCCESS; - ObSpinLockGuard guard(lock_); - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("dbms sched job task not init", K(ret), K(inited_)); - } else if (OB_ISNULL(job_key_) - || OB_ISNULL(ready_queue_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null ptr", K(ret), K(job_key_), K(ready_queue_)); - } else if (OB_FAIL(ready_queue_->push(job_key_, 0))) { - LOG_WARN("fail to push ready job to queue", K(ret), K(*job_key_)); - } else { - job_key_ = NULL; - if (wait_vector_.count() > 0) { - job_key_ = wait_vector_[0]; - if (OB_FAIL(wait_vector_.remove(wait_vector_.begin()))) { - job_key_ = NULL; - LOG_WARN("fail to remove job_id from sorted vector", K(ret)); - } else if (OB_ISNULL(job_key_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL ptr", K(ret), K(job_key_)); - } else if (OB_FAIL(timer_.schedule(*this, job_key_->get_adjust_delay()))) { - LOG_WARN("fail to schedule task", K(ret), K(*job_key_)); - } - } - } - LOG_DEBUG("JobKEYS INFO HEADER ==== ", KPC(job_key_), K(wait_vector_.count())); - int i = 0; - for (WaitVectorIterator iter = wait_vector_.begin(); - OB_SUCC(ret) && iter != wait_vector_.end(); ++iter, ++i) { - ObDBMSSchedJobKey *job = *iter; - LOG_DEBUG("JobKEYS INFO ELEMENT ====", K(i), KPC(job)); - } - return; -} - int ObDBMSSchedJobTask::scheduler(ObDBMSSchedJobKey *job_key) { int ret = OB_SUCCESS; @@ -146,12 +105,10 @@ int ObDBMSSchedJobTask::scheduler(ObDBMSSchedJobKey *job_key) LOG_WARN("dbms sched job task not init", K(ret)); } else if (OB_ISNULL(job_key)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL ptr for job id", K(ret), KPC(job_key)); + LOG_WARN("NULL ptr for job id", K(ret)); } else if (!job_key->is_valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("job id is invalid", K(ret), KPC(job_key)); - } else if (0 == job_key->get_delay()) { - OZ (immediately(job_key), KPC(job_key)); } else { OZ (add_new_job(job_key), KPC(job_key)); } @@ -169,40 +126,19 @@ int ObDBMSSchedJobTask::add_new_job(ObDBMSSchedJobKey *new_job_key) ret = OB_ERR_UNEXPECTED; LOG_WARN("NULL ptr", K(ret), KPC(new_job_key)); } else { - ObSpinLockGuard guard(lock_); - if (OB_ISNULL(job_key_)) { - job_key_ = new_job_key; - OZ (timer_.schedule(*this, job_key_->get_delay())); - } else if (new_job_key->get_execute_at() >= job_key_->get_execute_at()) { - WaitVectorIterator iter; - ObDBMSSchedJobKey *replace_job_key = NULL; - OZ (wait_vector_.replace(new_job_key, iter, compare_job_key, equal_job_key, replace_job_key)); - } else { - WaitVectorIterator iter; - OX (timer_.cancel(*this)); - OZ (wait_vector_.insert(job_key_, iter, compare_job_key)); - OX (job_key_ = new_job_key); - OZ (timer_.schedule(*this, job_key_->get_delay())); - } + WaitVectorIterator iter; + ObDBMSSchedJobKey *replace_job_key = NULL; + OZ (wait_vector_.replace(new_job_key, iter, compare_job_key, equal_job_key, replace_job_key)); } - return ret; -} - -int ObDBMSSchedJobTask::immediately(ObDBMSSchedJobKey *job_key) -{ - int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("dbms sched job not init", K(ret), K(inited_)); - } else if (OB_ISNULL(job_key) || OB_ISNULL(ready_queue_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL ptr", K(ret), K(job_key), K(ready_queue_)); - } else { - ObSpinLockGuard guard(lock_); - if (OB_FAIL(ready_queue_->push(job_key, 0))) { - LOG_WARN("fail to push ready job to queue", K(ret), K(*job_key)); - } + /* + LOG_DEBUG("JobKEYS INFO HEADER ==== ", KPC(new_job_key), K(wait_vector_.count())); + int i = 0; + for (WaitVectorIterator iter = wait_vector_.begin(); + OB_SUCC(ret) && iter != wait_vector_.end(); ++iter, ++i) { + ObDBMSSchedJobKey *job = *iter; + LOG_DEBUG("JobKEYS INFO ELEMENT ====", K(i), KPC(job)); } + */ return ret; } @@ -258,8 +194,6 @@ int ObDBMSSchedJobMaster::init(ObUnitManager *unit_mgr, ) { ret = OB_ERR_UNEXPECTED; LOG_WARN("null ptr", K(ret), K(unit_mgr), K(sql_client), K(schema_service)); - } else if (FALSE_IT(ready_queue_.set_limit(MAX_READY_JOBS_CAPACITY))) { - // do-nothing } else if (OB_FAIL(scheduler_task_.init())) { LOG_WARN("fail to init ready queue", K(ret)); } else if (OB_FAIL(scheduler_thread_.init(1, 1))) { @@ -293,7 +227,7 @@ int ObDBMSSchedJobMaster::start() // alreay running , do nothing ... } else if (OB_FAIL(scheduler_thread_.push(static_cast(this)))) { LOG_WARN("fail to start scheduler thread", K(ret)); - } else if (OB_FAIL(scheduler_task_.start(&ready_queue_))) { + } else if (OB_FAIL(scheduler_task_.start(&allocator_))) { LOG_WARN("fail to start ready queue", K(ret)); } LOG_INFO("dbms sched job master started", K(ret)); @@ -308,13 +242,52 @@ int ObDBMSSchedJobMaster::stop() sleep(1); } scheduler_task_.stop(); - ready_queue_.clear(); alive_jobs_.clear(); stoped_ = false; LOG_INFO("dbms sched job master stoped", K(ret)); return ret; } +int64_t ObDBMSSchedJobMaster::calc_next_date(ObDBMSSchedJobInfo &job_info) +{ + int64_t next_date = 0; + const int64_t now = ObTimeUtility::current_time(); + if (job_info.get_interval_ts() == 0) { + next_date = 64060560000000000; + } else { + int64_t N = (now - job_info.get_start_date()) / job_info.get_interval_ts(); + next_date = job_info.get_start_date() + (N + 1) * job_info.get_interval_ts(); + } + return next_date; +} + +int64_t ObDBMSSchedJobMaster::run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, int64_t next_date) +{ + int ret = OB_SUCCESS; + ObAddr execute_addr; + if (OB_FAIL((get_execute_addr(job_info, execute_addr)))) { + LOG_WARN("failed to get execute addr, retry soon", K(ret), K(job_info)); + } else if (ObTimeUtility::current_time() > job_info.get_end_date()) { + LOG_INFO("job reach end date, not running", K(job_info)); + } else if (OB_FAIL(table_operator_.update_for_start(job_info.get_tenant_id(), job_info, next_date))) { + LOG_WARN("failed to update for start", K(ret), K(job_info), KPC(job_key)); + } else if (OB_FAIL(job_rpc_proxy_->run_dbms_sched_job(job_key->get_tenant_id(), + job_key->is_oracle_tenant(), + job_key->get_job_id(), + job_key->get_job_name(), + execute_addr, + self_addr_))) { + LOG_WARN("failed to run dbms sched job", K(ret), K(job_info), KPC(job_key)); + if (is_server_down_error(ret)) { + int tmp = OB_SUCCESS; + if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "send job rpc failed"))) { + LOG_WARN("update for end failed for send rpc failed job", K(tmp), K(job_info), KPC(job_key)); + } + } + } + return ret; +} + int ObDBMSSchedJobMaster::scheduler() { int ret = OB_SUCCESS; @@ -331,30 +304,31 @@ int ObDBMSSchedJobMaster::scheduler() lib::set_thread_name("DBMS_SCHEDULER"); while (OB_SUCC(ret) && !stoped_) { ObLink* ptr = NULL; - int64_t timeout = MIN_SCHEDULER_INTERVAL; ObDBMSSchedJobKey *job_key = NULL; - if (REACH_TIME_INTERVAL(MIN_SCHEDULER_INTERVAL)) { - int tmp_ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + if (REACH_TIME_INTERVAL(CHECK_NEW_INTERVAL)) { if (OB_SUCCESS != (tmp_ret = check_all_tenants())) { LOG_WARN("fail to check all tenants", K(tmp_ret)); } } - if (OB_FAIL(ready_queue_.pop(ptr, timeout))) { - if (OB_ENTRY_NOT_EXIST == ret) { - LOG_INFO("dbms sched job master wait timeout, no entry", K(ret)); - ret = OB_SUCCESS; - } else { - LOG_ERROR("fail to pop dbms sched job ready queue", K(ret), K(timeout)); - } - } else if (OB_ISNULL(job_key = static_cast(ptr)) || !job_key->is_valid()) { + if (scheduler_task_.wait_vector().count() == 0) { + ob_usleep(MIN_SCHEDULER_INTERVAL); + } else if (OB_ISNULL(job_key = scheduler_task_.wait_vector()[0]) || !job_key->is_valid()) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("unexpected error, invalid job key found in ready queue!", K(ret), KPC(job_key)); } else { - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = scheduler_job(job_key))) { - LOG_WARN("fail to scheduler single dbms sched job", K(ret), K(tmp_ret), KPC(job_key)); + int64_t delay = job_key->get_execute_at() - ObTimeUtility::current_time(); + if (delay > MIN_SCHEDULER_INTERVAL) { + ob_usleep(MIN_SCHEDULER_INTERVAL); } else { - LOG_INFO("success to scheduler single dbms sched job", K(ret), K(tmp_ret), KPC(job_key)); + ob_usleep(max(0, delay)); + if (OB_SUCCESS != (tmp_ret = scheduler_task_.wait_vector().remove(scheduler_task_.wait_vector().begin()))) { + LOG_WARN("fail to remove job_id from sorted vector", K(ret)); + } else if (OB_SUCCESS != (tmp_ret = scheduler_job(job_key))) { + LOG_WARN("fail to scheduler single dbms sched job", K(ret), K(tmp_ret)); + } else { + LOG_INFO("success to scheduler single dbms sched job", K(ret), K(tmp_ret)); + } } } } @@ -375,7 +349,7 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key) CK (OB_LIKELY(inited_)); CK (OB_NOT_NULL(job_key)); CK (OB_LIKELY(job_key->is_valid())); - + JobIdByTenant job_id_by_tenant(job_key->get_tenant_id(), job_key->get_job_id()); if (OB_FAIL(ret)) { LOG_WARN("fail to scheduler job", K(ret), KPC(job_key)); } else { @@ -383,78 +357,118 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key) OZ (table_operator_.get_dbms_sched_job_info( job_key->get_tenant_id(), job_key->is_oracle_tenant(), job_key->get_job_id(), job_key->get_job_name(), allocator, job_info)); - if (OB_FAIL(ret) || !job_info.valid()) { - int tmp = alive_jobs_.erase_refactored(job_key->get_job_id_with_tenant()); - if (tmp != OB_SUCCESS) { - LOG_ERROR("failed delete invalid job from hash set", K(tmp), K(ret), K(job_info), KPC(job_key)); + const int64_t now = ObTimeUtility::current_time(); + int64_t next_check_date = now + MIN_SCHEDULER_INTERVAL; + if (OB_FAIL(ret) || !job_info.valid() || job_info.is_disabled() || job_info.is_broken()) { + free_job_key(job_key); + job_key = NULL; + } else if (job_info.is_running()) { + LOG_INFO("job is running now, retry later", K(job_info)); + if (now > job_info.get_this_date() + TO_TS(job_info.get_max_run_duration())) { + if (OB_FAIL(table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job timeout"))) { + LOG_WARN("update for end failed for timeout job", K(ret)); + } else { + LOG_WARN("job is timeout, force update for end", K(job_info), K(now)); + } + } + } else if (now > job_info.get_end_date()) { + int tmp = OB_SUCCESS; + if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check expired job"))) { + LOG_WARN("update for end failed for auto drop job", K(tmp), K(job_info)); } else { - LOG_INFO("delete invalid job from hash set", K(tmp), K(ret), K(job_info), KPC(job_key)); + LOG_WARN("update for end for expired job", K(job_info), K(now)); } - allocator_.free(job_key); // sql proxy error - } else{ - bool ignore_nextdate = false; - if (!job_key->is_check() && !job_info.is_running() && !job_info.is_broken() && !job_info.is_disabled()) { - bool can_running = false; - OZ (table_operator_.check_job_can_running(job_info.get_tenant_id(), can_running)); - if (OB_SUCC(ret) && can_running) { - OZ (get_execute_addr(job_info, execute_addr)); - OZ (table_operator_.update_for_start( - job_info.get_tenant_id(), job_info)); - OZ (job_rpc_proxy_->run_dbms_sched_job( - job_key->get_tenant_id(), job_key->is_oracle_tenant(), job_key->get_job_id(), job_key->get_job_name(), execute_addr, self_addr_)); + free_job_key(job_key); + job_key = NULL; + } else if (now < job_info.get_next_date()) { + next_check_date = min(job_info.get_next_date(), now + CHECK_NEW_INTERVAL); + } else { + bool can_running = false; + if (OB_FAIL(table_operator_.check_job_can_running(job_info.get_tenant_id(), alive_jobs_.size(), can_running))) { + LOG_WARN("failed to check job can running, retry later", K(ret)); + } else if (!can_running) { + LOG_INFO("job concurrency reach limit, retry later", K(ret), K(job_info), K(can_running)); + } else if (now > job_info.get_next_date() + TO_TS(job_info.get_max_run_duration())) { + LOG_WARN("job maybe missed, ignore it", K(now), K(job_info)); + int64_t new_next_date = calc_next_date(job_info); + int tmp = OB_SUCCESS; + if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job missed"))) { + LOG_WARN("update for end failed for missed job", K(tmp)); + } else if (OB_SUCCESS != (tmp = table_operator_.update_next_date(job_info.get_tenant_id(), job_info, new_next_date))){ + LOG_WARN("update next date failed", K(tmp), K(job_info)); } else { - LOG_INFO("avoid duplicate job", K(ret), K(job_info), K(can_running)); + next_check_date = min(new_next_date, now + CHECK_NEW_INTERVAL); } - ignore_nextdate = true; - } - int tmp_ret = OB_SUCCESS; - // always add job to queue. we need this to check job status changes. - if (OB_SUCCESS != (tmp_ret = register_job(job_info, job_key, ignore_nextdate))) { - LOG_WARN("failed to register job to job queue", K(tmp_ret), K(job_info)); - int tmp = alive_jobs_.erase_refactored(job_info.get_job_id_with_tenant()); - if (tmp != OB_SUCCESS) { - LOG_ERROR("failed delete invalid job from hash set", K(tmp), K(job_info)); + } else { + int64_t new_next_date = calc_next_date(job_info); + if (OB_FAIL(run_job(job_info, job_key, new_next_date))) { + LOG_WARN("failed to run job", K(ret), K(job_info), KPC(job_key)); } else { - LOG_WARN("delete register failed job from hash set", K(job_info)); + next_check_date = min(new_next_date, now + CHECK_NEW_INTERVAL); } } } + int tmp = OB_SUCCESS; + if (OB_NOT_NULL(job_key) && OB_SUCCESS != (tmp = register_job(job_key, next_check_date))) { + LOG_WARN("failed to register job", K(tmp), K(job_info)); + free_job_key(job_key); + job_key = NULL; + } } return ret; } int ObDBMSSchedJobMaster::destroy() { - ready_queue_.destroy(); scheduler_task_.destroy(); scheduler_thread_.destroy(); - allocator_.clear(); + allocator_.destroy(); return OB_SUCCESS; } int ObDBMSSchedJobMaster::alloc_job_key( ObDBMSSchedJobKey *&job_key, - uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name, - uint64_t execute_at, uint64_t delay, - bool check_job) + uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name) { int ret = OB_SUCCESS; - ObSpinLockGuard guard(lock_); void *ptr = NULL; job_key = NULL; if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObDBMSSchedJobKey)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(ret), K(ptr)); } else if (OB_ISNULL(job_key = - new(ptr)ObDBMSSchedJobKey(tenant_id, is_oracle_tenant, job_id, job_name, - execute_at, delay, - check_job))) { + new(ptr)ObDBMSSchedJobKey(tenant_id, is_oracle_tenant, job_id, job_name))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to init scheduler job id", K(ret), K(tenant_id)); + } else { + JobIdByTenant job_id_by_tenant; + job_id_by_tenant.set_tenant_id(tenant_id); + job_id_by_tenant.set_job_id(job_id); + if (OB_FAIL(alive_jobs_.set_refactored(job_id_by_tenant))) { + LOG_WARN("faile to add job to alive_jobs", K(ret), K(tenant_id), K(job_id)); + allocator_.free(job_key); + job_key = NULL; + } } return ret; } +void ObDBMSSchedJobMaster::free_job_key(ObDBMSSchedJobKey *&job_key) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(job_key)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("job_key is null", K(ret)); + } else { + JobIdByTenant job_id_by_tenant; + job_id_by_tenant.set_tenant_id(job_key->get_tenant_id()); + job_id_by_tenant.set_job_id(job_key->get_job_id()); + OZ (alive_jobs_.erase_refactored(job_id_by_tenant)); + allocator_.free(job_key); + job_key = NULL; + } +} + int ObDBMSSchedJobMaster::get_execute_addr(ObDBMSSchedJobInfo &job_info, ObAddr &execute_addr) { int ret = OB_SUCCESS; @@ -520,7 +534,7 @@ int ObDBMSSchedJobMaster::server_random_pick(int64_t tenant_id, ObString &pick_z bool is_alive = false; bool is_active = false; bool on_server = false; - int64_t pos = rand_.get(0,255) % total_server.count(); + int64_t pos = rand_.get(0,65536) % total_server.count(); int64_t cnt = 0; do { pos = (pos + 1) % total_server.count(); @@ -566,7 +580,27 @@ int ObDBMSSchedJobMaster::check_all_tenants() const ObTenantSchema *tenant_schema = NULL; OZ (schema_guard.get_tenant_info(tenant_ids.at(i), tenant_schema)); CK (OB_NOT_NULL(tenant_schema)); - if (OB_SUCC(ret)) { + int64_t tenant_id = tenant_ids.at(i); + bool is_tenant_standby = false; + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id)); + } else if (is_tenant_standby) { + LOG_INFO("tenant is standby, not check new jobs, and remove exist jobs", K(tenant_id)); + for (ObDBMSSchedJobTask::WaitVectorIterator iter = scheduler_task_.wait_vector().begin(); + OB_SUCC(ret) && iter != scheduler_task_.wait_vector().end(); ++iter) { + ObDBMSSchedJobKey *job_key = *iter; + if (OB_NOT_NULL(job_key) && tenant_id == job_key->get_tenant_id()) { + if (OB_FAIL(scheduler_task_.wait_vector().remove(iter))) { + LOG_WARN("failed to remove job key from wait vector", K(ret), KPC(job_key)); + } else { + LOG_INFO("remove job key", KPC(job_key)); + iter--; + free_job_key(job_key); + } + } + } + } else { uint64_t data_version = 0; if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_ids.at(i), data_version))) { LOG_WARN("fail to get tenant data version", KR(ret), K(data_version)); @@ -599,84 +633,54 @@ int ObDBMSSchedJobMaster::register_new_jobs(uint64_t tenant_id, bool is_oracle_t { int ret = OB_SUCCESS; ObDBMSSchedJobInfo job_info; + JobIdByTenant job_id_by_tenant; for (int64_t i = 0; OB_SUCC(ret) && i < job_infos.count(); i++) { job_info = job_infos.at(i); - if (job_info.valid()) { - int tmp = alive_jobs_.exist_refactored(job_info.get_job_id_with_tenant()); + if (job_info.valid() && !job_info.is_disabled() && !job_info.is_broken()) { + job_id_by_tenant.set_tenant_id(job_info.get_tenant_id()); + job_id_by_tenant.set_job_id(job_info.get_job_id()); + int tmp = alive_jobs_.exist_refactored(job_id_by_tenant); if (OB_HASH_EXIST == tmp) { // do nothing ... - } else if (OB_HASH_NOT_EXIST) { - OZ (register_job(job_info)); - OZ (alive_jobs_.set_refactored(job_info.get_job_id_with_tenant())); + LOG_DEBUG("job exist", K(alive_jobs_), K(job_id_by_tenant)); + } else if (OB_HASH_NOT_EXIST == tmp) { + ObDBMSSchedJobKey *job_key = NULL; + if (OB_FAIL(alloc_job_key( + job_key, + job_info.get_tenant_id(), + job_info.is_oracle_tenant(), + job_info.get_job_id(), + job_info.get_job_name()))) { + LOG_WARN("failed to alloc job key", K(ret), K(job_info)); + } else if (OB_FAIL(register_job(job_key, ObTimeUtility::current_time()))) { + LOG_WARN("failed to register job", K(ret), K(job_info)); + free_job_key(job_key); + job_key = NULL; + } LOG_INFO("register new job", K(ret), K(tenant_id), K(job_info)); } else { - LOG_ERROR("dbms sched job master check job exist failed", K(ret), K(job_info)); + LOG_ERROR("dbms sched job master check job exist failed", K(tmp), K(job_info)); } } } return ret; } -int ObDBMSSchedJobMaster::register_job( - ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, bool ignore_nextdate) +int ObDBMSSchedJobMaster::register_job(ObDBMSSchedJobKey *job_key, int64_t next_date) { int ret = OB_SUCCESS; - - int64_t execute_at = -1; - int64_t delay = -1; - bool check_job = false; - int64_t now = ObTimeUtility::current_time(); - - CK (OB_LIKELY(inited_)); - CK (job_info.valid()); - OZ (table_operator_.check_job_timeout(job_info)); - OZ (table_operator_.check_auto_drop(job_info)); - if (OB_FAIL(ret)) { - } else if (job_info.is_broken() || job_info.is_disabled()) { - execute_at = now + MIN_SCHEDULER_INTERVAL; - delay = MIN_SCHEDULER_INTERVAL; // every MIN_SCHEDULER_INTERVAL check job status - check_job = true; + if (OB_ISNULL(job_key)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("job key is null", K(ret)); + } else if (next_date == 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("next date should not be 0", K(ret), KPC(job_key), K(next_date)); } else { - OZ (table_operator_.calc_execute_at(job_info, execute_at, delay, ignore_nextdate)); - if (OB_FAIL(ret) || delay < 0) { - ret = OB_SUCCESS; - execute_at = now + MIN_SCHEDULER_INTERVAL; - delay = MIN_SCHEDULER_INTERVAL; - check_job = true; - } else if (delay > MIN_SCHEDULER_INTERVAL) { - // job may run later, but we need check job update. - execute_at = now + MIN_SCHEDULER_INTERVAL; - delay = MIN_SCHEDULER_INTERVAL; - check_job = true; + job_key->set_execute_at(next_date); + if (OB_FAIL(scheduler_task_.scheduler(job_key))) { + LOG_WARN("failed to scheduler job", K(ret), KPC(job_key)); } } - if (OB_FAIL(ret)) { - } else if (OB_ISNULL(job_key)) { - OZ (alloc_job_key( - job_key, - job_info.get_tenant_id(), - job_info.is_oracle_tenant(), - job_info.get_job_id(), - job_info.get_job_name(), - execute_at, - delay, - check_job)); - CK (OB_NOT_NULL(job_key)); - CK (job_key->is_valid()); - } else { - CK (job_key->get_tenant_id() == job_info.get_tenant_id()); - CK (job_key->get_job_id() == job_info.get_job_id()); - CK (job_key->get_job_name() == job_info.get_job_name()); - OX (job_key->set_execute_at(execute_at)); - OX (job_key->set_delay(delay)); - OX (job_key->set_check_job(check_job)); - } - OZ (scheduler_task_.scheduler(job_key)); - if (OB_FAIL(ret) && OB_NOT_NULL(job_key)) { - allocator_.free(job_key); - } - LOG_INFO("register dbms sched job", K(ret), K(job_info), KPC(job_key), K(ignore_nextdate)); - return ret; } diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_master.h b/src/observer/dbms_scheduler/ob_dbms_sched_job_master.h index 5d3eee0927..30766e9ee0 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_master.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_master.h @@ -51,16 +51,11 @@ class ObDBMSSchedJobKey : public common::ObLink { public: ObDBMSSchedJobKey( - uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name, - uint64_t execute_at, uint64_t delay, - bool check_job) + uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name) : tenant_id_(tenant_id), is_oracle_tenant_(is_oracle_tenant), job_id_(job_id), - job_name_(), - execute_at_(execute_at), - delay_(delay), - check_job_(check_job) { + job_name_() { job_name_.assign_buffer(job_name_buf_, JOB_NAME_MAX_SIZE); job_name_.write(job_name.ptr(), job_name.length()); } @@ -73,18 +68,9 @@ public: OB_INLINE uint64_t get_job_id() const { return job_id_; } OB_INLINE common::ObString &get_job_name() { return job_name_; } OB_INLINE uint64_t get_execute_at() const { return execute_at_;} - OB_INLINE uint64_t get_delay() const { return delay_; } - - OB_INLINE bool is_check() { return check_job_; } - OB_INLINE void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } OB_INLINE void set_job_id(uint64_t job_id) { job_id_ = job_id; } - OB_INLINE void set_execute_at(uint64_t execute_at) { execute_at_ = execute_at; } - OB_INLINE void set_delay(uint64_t delay) { delay_ = delay; } - - OB_INLINE void set_check_job(bool check_job) { check_job_ = check_job; } - OB_INLINE uint64_t get_adjust_delay() const { uint64_t now = ObTimeUtility::current_time(); @@ -103,9 +89,7 @@ public: K_(is_oracle_tenant), K_(job_id), K_(job_name), - K_(execute_at), - K_(delay), - K_(check_job)); + K_(execute_at)); private: uint64_t tenant_id_; @@ -114,12 +98,9 @@ private: char job_name_buf_[JOB_NAME_MAX_SIZE]; common::ObString job_name_; uint64_t execute_at_; - uint64_t delay_; - - bool check_job_; // for check job update ... }; -class ObDBMSSchedJobTask : public ObTimerTask +class ObDBMSSchedJobTask { public: typedef common::ObSortedVector WaitVector; @@ -127,23 +108,19 @@ public: ObDBMSSchedJobTask() : inited_(false), - job_key_(NULL), - ready_queue_(NULL), - wait_vector_(0, NULL, ObModIds::VECTOR), - lock_(common::ObLatchIds::DBMS_SCHEDULER_TASK_LOCK) {} + allocator_(NULL), + wait_vector_(0, NULL, ObModIds::VECTOR) {} virtual ~ObDBMSSchedJobTask() {} int init(); - int start(dbms_job::ObDBMSJobQueue *ready_queue); + int start(common::ObVSliceAlloc *allocator); int stop(); int destroy(); - void runTimerTask(); - int scheduler(ObDBMSSchedJobKey *job_key); int add_new_job(ObDBMSSchedJobKey *job_key); - int immediately(ObDBMSSchedJobKey *job_key); + WaitVector &wait_vector() { return wait_vector_; } inline static bool compare_job_key( const ObDBMSSchedJobKey *lhs, const ObDBMSSchedJobKey *rhs); @@ -152,13 +129,8 @@ public: private: bool inited_; - ObDBMSSchedJobKey *job_key_; - dbms_job::ObDBMSJobQueue *ready_queue_; + common::ObVSliceAlloc *allocator_; WaitVector wait_vector_; - - ObSpinLock lock_; - ObTimer timer_; - private: DISALLOW_COPY_AND_ASSIGN(ObDBMSSchedJobTask); }; @@ -175,11 +147,54 @@ public: schema_service_(NULL), job_rpc_proxy_(NULL), self_addr_(), - lock_(common::ObLatchIds::DBMS_SCHEDULER_MASTER_LOCK), + allocator_(ObMemAttr(OB_SERVER_TENANT_ID, "DbmsScheduler"), OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_), alive_jobs_() {} virtual ~ObDBMSSchedJobMaster() { alive_jobs_.destroy(); }; + class JobIdByTenant + { + public: + JobIdByTenant() + : tenant_id_(OB_INVALID_TENANT_ID), + job_id_(OB_INVALID_ID) {} + JobIdByTenant(const int64_t tenant_id, const int64_t job_id) + : tenant_id_(tenant_id), + job_id_(job_id) {} + ~JobIdByTenant() {} + bool operator ==(const JobIdByTenant &other) const + { + return tenant_id_ == other.tenant_id_ && job_id_ == other.job_id_; + } + bool operator !=(const JobIdByTenant &other) const + { + return !(*this == other); + } + uint64_t hash() const + { + uint64_t hash_val = 0; + + hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); + hash_val = murmurhash(&job_id_, sizeof(job_id_), hash_val); + + return hash_val; + } + int hash(uint64_t &hash_val) const + { + hash_val = hash(); + return OB_SUCCESS; + } + void set_tenant_id(int64_t tenant_id) { tenant_id_ = tenant_id; } + void set_job_id(int64_t job_id) { job_id_ = job_id; } + int64_t get_tenant_id() { return tenant_id_; } + int64_t get_job_id() { return job_id_; } + TO_STRING_KV(K_(tenant_id), + K_(job_id)); + private: + int64_t tenant_id_; + int64_t job_id_; + }; + static ObDBMSSchedJobMaster &get_instance(); bool is_inited() { return inited_; } @@ -195,23 +210,23 @@ public: int alloc_job_key( ObDBMSSchedJobKey *&job_key, - uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name, - uint64_t execute_at, uint64_t delay, - bool check_job = false); - + uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name); + void free_job_key(ObDBMSSchedJobKey *&job_key); int server_random_pick(int64_t tenant_id, common::ObString &pick_zone, ObAddr &server); int get_execute_addr(ObDBMSSchedJobInfo &job_info, common::ObAddr &execute_addr); int check_all_tenants(); int check_new_jobs(uint64_t tenant_id, bool is_oracle_tenant); int register_new_jobs(uint64_t tenant_id, bool is_oracle_tenant, ObIArray &job_infos); - int register_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key = NULL, bool ignore_nextdate = false); - + int register_job(ObDBMSSchedJobKey *job_key, int64_t next_date); int scheduler_job(ObDBMSSchedJobKey *job_key); + int64_t calc_next_date(ObDBMSSchedJobInfo &job_info); + int64_t run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, int64_t next_date); private: const static int MAX_READY_JOBS_CAPACITY = 1024 * 1024; - const static int MIN_SCHEDULER_INTERVAL = 20 * 1000 * 1000; + const static int MIN_SCHEDULER_INTERVAL = 1 * 1000 * 1000; + const static int CHECK_NEW_INTERVAL = 20 * 1000 * 1000; bool inited_; bool stoped_; @@ -225,15 +240,14 @@ private: obrpc::ObDBMSSchedJobRpcProxy *job_rpc_proxy_; common::ObAddr self_addr_; - dbms_job::ObDBMSJobQueue ready_queue_; ObDBMSSchedJobTask scheduler_task_; ObDBMSSchedJobThread scheduler_thread_; ObDBMSSchedTableOperator table_operator_; - common::ObSpinLock lock_; - common::ObArenaAllocator allocator_; + common::ObBlockAllocMgr block_alloc_; + common::ObVSliceAlloc allocator_; - common::hash::ObHashSet alive_jobs_; + common::hash::ObHashSet alive_jobs_; private: DISALLOW_COPY_AND_ASSIGN(ObDBMSSchedJobMaster); diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_rpc_proxy.h b/src/observer/dbms_scheduler/ob_dbms_sched_job_rpc_proxy.h index cb54884eb9..f2aac5ea12 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_rpc_proxy.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_rpc_proxy.h @@ -49,7 +49,6 @@ public: { return common::is_valid_tenant_id(tenant_id_) && job_id_ != common::OB_INVALID_ID - && !job_name_.empty() && server_addr_.is_valid() && master_addr_.is_valid(); } diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h index 67173d5683..055fd23515 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h @@ -145,6 +145,9 @@ public: int64_t get_last_modify() { return last_modify_; } int64_t get_interval_ts() { return interval_ts_; } int64_t get_max_run_duration() { return (max_run_duration_ == 0) ? 30 : max_run_duration_ ; } // 30s by default + int64_t get_start_date() { return start_date_; } + int64_t get_end_date() { return end_date_; } + int64_t get_auto_drop() { return auto_drop_; } bool is_broken() { return 0x1 == (flag_ & 0x1); } bool is_running(){ return this_date_ != 0; } diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.cpp b/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.cpp index aff711f8ef..cf3cb18183 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.cpp +++ b/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.cpp @@ -34,7 +34,6 @@ #include "share/schema/ob_multi_version_schema_service.h" #include "storage/mview/ob_mview_sched_job_utils.h" -#define TO_TS(second) 1000000L * second namespace oceanbase { @@ -48,8 +47,8 @@ using namespace storage; namespace dbms_scheduler { -int ObDBMSSchedTableOperator::update_for_start( - uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, bool update_nextdate) +int ObDBMSSchedTableOperator::update_next_date( + uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date) { int ret = OB_SUCCESS; @@ -57,50 +56,67 @@ int ObDBMSSchedTableOperator::update_for_start( ObSqlString sql; int64_t affected_rows = 0; const int64_t now = ObTimeUtility::current_time(); - int64_t delay = 0; - int64_t dummy_execute_at = 0; CK (OB_NOT_NULL(sql_proxy_)); CK (OB_LIKELY(tenant_id != OB_INVALID_ID)); CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID)); - OZ (calc_execute_at( - job_info, (update_nextdate ? job_info.next_date_ : dummy_execute_at), delay, true)); + OZ (dml.add_gmt_modified(now)); + OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))); + OZ (dml.add_pk_column("job", job_info.job_)); + OZ (dml.add_pk_column("job_name", job_info.job_name_)); + OZ (dml.add_time_column("next_date", next_date)); + OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql)); + OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows)); + return ret; +} + + +int ObDBMSSchedTableOperator::update_for_start( + uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date) +{ + int ret = OB_SUCCESS; + + ObDMLSqlSplicer dml; + ObSqlString sql; + int64_t affected_rows = 0; + const int64_t now = ObTimeUtility::current_time(); + + CK (OB_NOT_NULL(sql_proxy_)); + CK (OB_LIKELY(tenant_id != OB_INVALID_ID)); + CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID)); OX (job_info.this_date_ = now); OZ (dml.add_gmt_modified(now)); OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))); OZ (dml.add_pk_column("job", job_info.job_)); + OZ (dml.add_pk_column("job_name", job_info.job_name_)); OZ (dml.add_time_column("this_date", job_info.this_date_)); + OZ (dml.add_time_column("next_date", next_date)); OZ (dml.add_column("state", "SCHEDULED")); OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql)); + OZ (sql.append_fmt(" and this_date is null")); OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows)); - + CK (affected_rows == 1); return ret; } -int ObDBMSSchedTableOperator::update_nextdate( - uint64_t tenant_id, ObDBMSSchedJobInfo &job_info) +int ObDBMSSchedTableOperator::seperate_job_id_from_name(ObString &job_name, int64_t &job_id) { int ret = OB_SUCCESS; - - ObDMLSqlSplicer dml; - ObSqlString sql; - int64_t affected_rows = 0; - const int64_t now = ObTimeUtility::current_time(); - - CK (OB_NOT_NULL(sql_proxy_)); - CK (OB_LIKELY(tenant_id != OB_INVALID_ID)); - CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID)); - - OZ (dml.add_gmt_modified(now)); - OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))); - OZ (dml.add_pk_column("job", job_info.job_)); - OZ (dml.add_column("interval_ts", job_info.interval_ts_)); - OZ (dml.add_time_column("next_date", job_info.next_date_)); - OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql)); - OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows)); - + const char* prefix = "JOB$_"; + job_id = 0; + if (job_name.prefix_match(prefix)) { + char nptr[JOB_NAME_MAX_SIZE]; + char *endptr = NULL; + snprintf(nptr, JOB_NAME_MAX_SIZE, "%.*s", job_name.length(), job_name.ptr()); + job_id = strtoll(nptr + strlen(prefix), &endptr, 10); + if (job_id <= 0) { + LOG_WARN("job_id is not right", K(job_name), K(nptr), K(job_id)); + } else if (*endptr != '\0' || job_id <= JOB_ID_OFFSET) { + job_id = 0; // use job_info.job_ when job_id is not formal + } + } return ret; } @@ -117,8 +133,6 @@ int ObDBMSSchedTableOperator::update_for_end( ObSqlString sql2; int64_t affected_rows = 0; const int64_t now = ObTimeUtility::current_time(); - int64_t next_date; - int64_t delay; UNUSED(errmsg); @@ -161,12 +175,13 @@ int ObDBMSSchedTableOperator::update_for_end( OZ (dml1.add_column("state", "COMPLETED")); OZ (dml1.add_column("enabled", job_info.enabled_)); } - CK (job_info.this_date_ > 0); - OX (job_info.total_ += (now - job_info.this_date_)); + CK (job_info.this_date_ > 0 || !errmsg.empty()); + OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0)); OZ (dml1.add_gmt_modified(now)); OZ (dml1.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))); OZ (dml1.add_pk_column("job", job_info.job_)); + OZ (dml1.add_pk_column("job_name", job_info.job_name_)); OZ (dml1.add_column(true, "this_date")); OZ (dml1.add_time_column("last_date", job_info.this_date_)); OZ (dml1.add_time_column("next_date", job_info.next_date_)); @@ -196,7 +211,12 @@ int ObDBMSSchedTableOperator::update_for_end( OZ (dml2.add_gmt_create(now)); OZ (dml2.add_gmt_modified(now)); OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))); - OZ (dml2.add_pk_column("job", job_info.job_)); + int64_t job_id = 0; + OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id)); + if (job_id <= 0) { + job_id = job_info.get_job_id(); + } + OZ (dml2.add_pk_column("job", job_id)); OZ (dml2.add_time_column("time", now)); OZ (dml2.add_column("code", err)); OZ (dml2.add_column( @@ -225,32 +245,7 @@ int ObDBMSSchedTableOperator::update_for_end( return ret; } -int ObDBMSSchedTableOperator::check_job_timeout(ObDBMSSchedJobInfo &job_info) -{ - int ret = OB_SUCCESS; - if ((!job_info.is_running()) || (job_info.get_max_run_duration() == 0)) { - //not running or not set timeout - } else if (ObTimeUtility::current_time() > (job_info.get_this_date() + TO_TS(job_info.get_max_run_duration()))) { - OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check job timeout")); - LOG_WARN("job is timeout, force update for end", K(job_info), K(ObTimeUtility::current_time())); - } - return ret; -} - -int ObDBMSSchedTableOperator::check_auto_drop(ObDBMSSchedJobInfo &job_info) -{ - int ret = OB_SUCCESS; - if (job_info.is_running()) { - // running job not check - } else if (ObTimeUtility::current_time() > (job_info.end_date_) && - (true == job_info.auto_drop_)) { - OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check auto drop expired job")); - LOG_WARN("auto drop miss out job", K(job_info), K(ObTimeUtility::current_time())); - } - return ret; -} - -int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, bool &can_running) +int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running) { int ret = OB_SUCCESS; uint64_t job_queue_processor = 0; @@ -263,33 +258,40 @@ int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, bool &can CK (tenant_config.is_valid()); OX (job_queue_processor = tenant_config->job_queue_processes); // found current running job count - OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME)); + if (OB_FAIL(ret)) { + } else if (alive_job_count <= job_queue_processor) { + can_running = true; + } else { + OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME)); - CK (OB_NOT_NULL(GCTX.schema_service_)); - OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard)); - OZ (guard.check_tenant_is_restore(tenant_id, is_restore)); + CK (OB_NOT_NULL(GCTX.schema_service_)); + OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard)); + OZ (guard.check_tenant_is_restore(tenant_id, is_restore)); - // job can not run in standy cluster and restore. - if (OB_SUCC(ret) && job_queue_processor > 0 - && !GCTX.is_standby_cluster() - && !is_restore) { - SMART_VAR(ObMySQLProxy::MySQLResult, result) { - if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { - LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id)); - } else if (OB_NOT_NULL(result.get_result())) { - if (OB_SUCCESS == (ret = result.get_result()->next())) { - int64_t int_value = 0; - if (OB_FAIL(result.get_result()->get_int(static_cast(0), int_value))) { - LOG_WARN("failed to get column in row. ", K(ret)); - } else { - job_running_cnt = static_cast(int_value); - } + // job can not run in standy cluster and restore. + if (OB_SUCC(ret) && job_queue_processor > 0 + && !is_restore) { + SMART_VAR(ObMySQLProxy::MySQLResult, result) { + if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { + LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id)); + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id)); } else { - LOG_WARN("failed to calc all running job, no row return", K(ret)); + if (OB_SUCCESS == (ret = result.get_result()->next())) { + int64_t int_value = 0; + if (OB_FAIL(result.get_result()->get_int(static_cast(0), int_value))) { + LOG_WARN("failed to get column in row. ", K(ret)); + } else { + job_running_cnt = static_cast(int_value); + } + } else { + LOG_WARN("failed to calc all running job, no row return", K(ret)); + } } } + OX (can_running = (job_queue_processor > job_running_cnt)); } - OX (can_running = (job_queue_processor > job_running_cnt)); } return ret; } @@ -399,16 +401,24 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_info( SMART_VAR(ObMySQLProxy::MySQLResult, result) { if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id)); - } else if (OB_NOT_NULL(result.get_result())) { + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_id)); + } else { if (OB_SUCCESS == (ret = result.get_result()->next())) { OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info)); - if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) { - LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id)); - ret = OB_ERR_UNEXPECTED; + if (OB_SUCC(ret)) { + int tmp_ret = result.get_result()->next(); + if (OB_SUCCESS == tmp_ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id)); + } else if (tmp_ret != OB_ITER_END) { + ret = tmp_ret; + LOG_ERROR("got next row for dbms sched job failed", K(ret), K(tenant_id), K(job_id)); + } } } else if (OB_ITER_END == ret) { - LOG_INFO("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id)); - ret = OB_SUCCESS; // job not exist, do nothing ... + LOG_WARN("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id)); } else { LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id)); } @@ -437,7 +447,10 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_infos_in_tenant( SMART_VAR(ObMySQLProxy::MySQLResult, result) { if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id)); - } else if (OB_NOT_NULL(result.get_result())) { + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id)); + } else { do { if (OB_FAIL(result.get_result()->next())) { LOG_INFO("failed to get result", K(ret)); @@ -492,12 +505,21 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info( SMART_VAR(ObMySQLProxy::MySQLResult, result) { if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_class_name)); - } else if (OB_NOT_NULL(result.get_result())) { + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id), K(job_class_name)); + } else { if (OB_SUCCESS == (ret = result.get_result()->next())) { OZ (extract_job_class_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_class_info)); - if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) { - LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name)); - ret = OB_ERR_UNEXPECTED; + if (OB_SUCC(ret)) { + int tmp_ret = result.get_result()->next(); + if (OB_SUCCESS == tmp_ret) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name)); + } else if (tmp_ret != OB_ITER_END) { + ret = tmp_ret; + LOG_ERROR("got next row for dbms sched job class failed", K(ret), K(tenant_id), K(job_class_name)); + } } } else if (OB_ITER_END == ret) { LOG_INFO("job_class_name not exists, may delete alreay!", K(ret), K(tenant_id), K(job_class_name)); @@ -511,96 +533,6 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info( return ret; } -int ObDBMSSchedTableOperator::calc_execute_at( - ObDBMSSchedJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate) -{ - int ret = OB_SUCCESS; - - ObString &interval = job_info.get_interval(); - - const int64_t now = ObTimeUtility::current_time(); - int64_t last_sub_next = - (job_info.get_last_modify() / 1000 / 1000) - (job_info.get_next_date() / 1000/ 1000); - if (job_info.get_next_date() != 0 && (!ignore_nextdate || job_info.get_next_date() != execute_at)) { - if (job_info.get_next_date() > now) { - execute_at = job_info.get_next_date(); - delay = job_info.get_next_date() - now; - } else if (now - job_info.get_next_date() < TO_TS(job_info.get_max_run_duration())) { - LOG_WARN("job maybe missed, retry it", K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt())); - execute_at = now; - delay = 0; - } else if (last_sub_next < 5 && last_sub_next >= -5) { - LOG_WARN("job maybe missed, retry it", K(last_sub_next), K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt())); - execute_at = now; - delay = 0; - } else { - LOG_WARN("job maybe missed, ignore it", K(last_sub_next), K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt())); - OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check job missed")); - delay = -1; - } - } else { - delay = -1; - } - - if (delay < 0 && (job_info.get_interval_ts() != 0 - || (!interval.empty() && (0 != interval.case_compare("null"))))) { - ObSqlString sql; - common::ObISQLClient *sql_proxy = sql_proxy_; - ObOracleSqlProxy oracle_proxy(*(static_cast(sql_proxy_))); - // NOTE: we need utc timestamp. - if (lib::is_mysql_mode()) { - OZ (sql.append_fmt("select utc_timestamp() from dual;")); - } else { - OZ (sql.append_fmt( - "select cast(to_timestamp(sys_extract_utc(to_timestamp(sysdate))) as date) from dual;")); - sql_proxy = &oracle_proxy; - } - - SMART_VAR(ObMySQLProxy::MySQLResult, result) { - if (OB_FAIL(sql_proxy->read(result, job_info.get_tenant_id(), sql.ptr()))) { - LOG_WARN("execute query failed", K(ret), K(sql), K(job_info)); - } else if (OB_NOT_NULL(result.get_result())) { - if (OB_FAIL(result.get_result()->next())) { - LOG_WARN("failed to get result", K(ret)); - } else { - int64_t sysdate = 0; - int64_t col_idx = 0; - OZ (result.get_result()->get_datetime(col_idx, sysdate)); - if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) { - int64_t next_date_utc_ts = 0; - if (OB_FAIL(ObMViewSchedJobUtils::calc_date_expression(job_info, next_date_utc_ts))) { - LOG_WARN("failed to calc date expression", KR(ret), K(job_info)); - } else { - job_info.interval_ts_ = next_date_utc_ts - sysdate; - } - } - if (OB_SUCC(ret)) { - execute_at = sysdate + job_info.get_interval_ts(); - } - if (OB_FAIL(ret)) { - } else if (job_info.get_next_date() > execute_at) { - execute_at = job_info.get_next_date(); - delay = execute_at - sysdate; - } else { - delay = execute_at - sysdate; - } - if (OB_SUCC(ret)) { - OX (job_info.next_date_ = execute_at); - OZ (update_nextdate(job_info.get_tenant_id(), job_info)); - } - } - } - } - LOG_INFO("repeat job update nextdate", K(job_info), K(execute_at), K(delay), K(ignore_nextdate)); - } else if (delay < 0 && job_info.get_interval_ts() == 0) { - OX (job_info.next_date_ = 64060560000000000); // 4000-01-01 - OZ (update_nextdate(job_info.get_tenant_id(), job_info)); - LOG_INFO("once job update nextdate", K(job_info), K(execute_at), K(delay), K(ignore_nextdate)); - } - - return ret; -} - int ObDBMSSchedTableOperator::register_default_job_class(uint64_t tenant_id) { int ret = OB_SUCCESS; diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.h b/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.h index 6aa98b7692..30a4c7ebb0 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_table_operator.h @@ -46,14 +46,20 @@ class ObDBMSSchedTableOperator public: ObDBMSSchedTableOperator() : sql_proxy_(NULL) {} virtual ~ObDBMSSchedTableOperator() {}; + static constexpr int64_t JOB_NAME_MAX_SIZE = 128; + static const int64_t JOB_ID_OFFSET = (1LL<<50); int init(common::ObISQLClient *sql_proxy) { sql_proxy_ = sql_proxy; return common::OB_SUCCESS; } + int update_next_date( + uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date); + int update_for_start( - uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, bool update_nextdate = true); + uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date); int update_for_end( uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const common::ObString &errmsg); - int update_nextdate(uint64_t tenant_id, ObDBMSSchedJobInfo &job_info); + + int seperate_job_id_from_name(common::ObString &job_name, int64_t &job_id); int get_dbms_sched_job_info( uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name, @@ -73,14 +79,7 @@ public: sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant, ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info); - int calc_execute_at( - ObDBMSSchedJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate = false); - - int check_job_can_running(int64_t tenant_id, bool &can_running); - - int check_job_timeout(ObDBMSSchedJobInfo &job_info); - - int check_auto_drop(ObDBMSSchedJobInfo &job_info); + int check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running); int register_default_job_class(uint64_t tenant_id); int purge_run_detail_histroy(uint64_t tenant_id); diff --git a/src/share/inner_table/ob_inner_table_schema.21301_21350.cpp b/src/share/inner_table/ob_inner_table_schema.21301_21350.cpp index dbab422ac9..2bde1e98ba 100644 --- a/src/share/inner_table/ob_inner_table_schema.21301_21350.cpp +++ b/src/share/inner_table/ob_inner_table_schema.21301_21350.cpp @@ -1517,7 +1517,7 @@ int ObInnerTableSchema::dba_scheduler_windows_schema(ObTableSchema &table_schema table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); if (OB_SUCC(ret)) { - if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT CAST(T.POWNER AS CHAR(128)) AS OWNER, CAST(T.JOB_NAME AS CHAR(128)) AS WINDOW_NAME, CAST(NULL AS CHAR(128)) AS RESOURCE_PLAN, CAST(NULL AS CHAR(4000)) AS SCHEDULE_OWNER, CAST(NULL AS CHAR(4000)) AS SCHEDULE_NAME, CAST(NULL AS CHAR(8)) AS SCHEDULE_TYPE, CAST(T.START_DATE AS DATETIME(6)) AS START_DATE, CAST(T.REPEAT_INTERVAL AS CHAR(4000)) AS REPEAT_INTERVAL, CAST(T.END_DATE AS DATETIME(6)) AS END_DATE, CAST(T.MAX_RUN_DURATION AS SIGNED) AS DURATION, CAST(NULL AS CHAR(4)) AS WINDOW_PRIORITY, CAST(T.NEXT_DATE AS DATETIME(6)) AS NEXT_RUN_DATE, CAST(T.LAST_DATE AS DATETIME(6)) AS LAST_START_DATE, CAST(T.ENABLED AS CHAR(5)) AS ENABLED, CAST(NULL AS CHAR(5)) AS ACTIVE, CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME, CAST(NULL AS SIGNED) AS MANUAL_DURATION, CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB_NAME in ('MONDAY_WINDOW', 'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW') )__"))) { + if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT CAST(T.POWNER AS CHAR(128)) AS OWNER, CAST(T.JOB_NAME AS CHAR(128)) AS WINDOW_NAME, CAST(NULL AS CHAR(128)) AS RESOURCE_PLAN, CAST(NULL AS CHAR(4000)) AS SCHEDULE_OWNER, CAST(NULL AS CHAR(4000)) AS SCHEDULE_NAME, CAST(NULL AS CHAR(8)) AS SCHEDULE_TYPE, CAST(T.START_DATE AS DATETIME(6)) AS START_DATE, CAST(T.REPEAT_INTERVAL AS CHAR(4000)) AS REPEAT_INTERVAL, CAST(T.END_DATE AS DATETIME(6)) AS END_DATE, CAST(T.MAX_RUN_DURATION AS SIGNED) AS DURATION, CAST(NULL AS CHAR(4)) AS WINDOW_PRIORITY, CAST(T.NEXT_DATE AS DATETIME(6)) AS NEXT_RUN_DATE, CAST(T.LAST_DATE AS DATETIME(6)) AS LAST_START_DATE, CAST(T.ENABLED AS CHAR(5)) AS ENABLED, CAST(NULL AS CHAR(5)) AS ACTIVE, CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME, CAST(NULL AS SIGNED) AS MANUAL_DURATION, CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB > 0 and T.JOB_NAME in ('MONDAY_WINDOW', 'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW') )__"))) { LOG_ERROR("fail to set view_definition", K(ret)); } } diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index 3c330001a6..cdc950c747 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -26512,7 +26512,7 @@ def_table_schema( CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME, CAST(NULL AS SIGNED) AS MANUAL_DURATION, CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS - FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB_NAME in ('MONDAY_WINDOW', + FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB > 0 and T.JOB_NAME in ('MONDAY_WINDOW', 'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW') """.replace("\n", " ") ) diff --git a/src/share/ob_upgrade_utils.cpp b/src/share/ob_upgrade_utils.cpp index 99a97c936b..ffad4d5ee9 100755 --- a/src/share/ob_upgrade_utils.cpp +++ b/src/share/ob_upgrade_utils.cpp @@ -1147,6 +1147,44 @@ int ObUpgradeFor4200Processor::post_upgrade_for_heartbeat_and_server_zone_op_ser /* =========== 4200 upgrade processor end ============= */ +int ObUpgradeFor4211Processor::post_upgrade() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(post_upgrade_for_dbms_scheduler())) { + LOG_WARN("post for upgrade dbms scheduler failed", K(ret)); + } + return ret; +} + +int ObUpgradeFor4211Processor::post_upgrade_for_dbms_scheduler() +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + bool is_tenant_standby = false; + if (sql_proxy_ == NULL) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_proxy is null", K(ret), K(tenant_id_)); + } else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(sql_proxy_, tenant_id_, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id_)); + } else if (is_tenant_standby) { + LOG_INFO("tenant is standby, ignore", K(tenant_id_)); + } else { + OZ (sql.append_fmt( + "insert ignore into %s " + "(tenant_id,job_name,job,lowner,powner,cowner,next_date,`interval#`,flag) " + "select tenant_id, job_name,0,lowner,powner,cowner,next_date,`interval#`,flag from %s where job != 0", + OB_ALL_TENANT_SCHEDULER_JOB_TNAME, + OB_ALL_TENANT_SCHEDULER_JOB_TNAME)); // if has new colomn, use default value + OZ (sql_proxy_->write(tenant_id_, sql.ptr(), affected_rows)); + LOG_INFO("insert job_id=0 rows finished for dbms_scheduler old jobs", K(ret), K(tenant_id_), K(affected_rows)); + } + + return ret; +} +/* =========== 4211 upgrade processor end ============= */ /* =========== special upgrade processor end ============= */ } // end share diff --git a/src/share/ob_upgrade_utils.h b/src/share/ob_upgrade_utils.h index 7bb3dc1b33..30cdc9de7f 100755 --- a/src/share/ob_upgrade_utils.h +++ b/src/share/ob_upgrade_utils.h @@ -210,7 +210,17 @@ private: }; DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 0) -DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 1) +class ObUpgradeFor4211Processor : public ObBaseUpgradeProcessor +{ +public: + ObUpgradeFor4211Processor() : ObBaseUpgradeProcessor() {} + virtual ~ObUpgradeFor4211Processor() {} + virtual int pre_upgrade() override { return common::OB_SUCCESS; } + virtual int post_upgrade() override; +private: + int post_upgrade_for_dbms_scheduler(); + +}; DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 2) DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 2, 0) DEF_SIMPLE_UPGRARD_PROCESSER(4, 3, 0, 0) diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index ee4fdba921..4aef148894 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1394,7 +1394,7 @@ DEF_INT(connection_control_max_connection_delay, OB_TENANT_PARAMETER, "214748364 DEF_BOOL(ob_proxy_readonly_transaction_routing_policy, OB_TENANT_PARAMETER, "false", "Proxy route policy for readonly sql: whether regard begining read only stmts as in transaction", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_INT(job_queue_processes, OB_TENANT_PARAMETER, "1000", "[0,1000]", +DEF_INT(job_queue_processes, OB_TENANT_PARAMETER, "1000", "[0,16384]", "specifies the maximum number of job slaves per instance that can be created " "for the execution of DBMS_JOB jobs and Oracle Scheduler (DBMS_SCHEDULER) jobs.", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/share/stat/ob_dbms_stats_maintenance_window.cpp b/src/share/stat/ob_dbms_stats_maintenance_window.cpp index 69470a6e38..2f446f1024 100644 --- a/src/share/stat/ob_dbms_stats_maintenance_window.cpp +++ b/src/share/stat/ob_dbms_stats_maintenance_window.cpp @@ -100,21 +100,39 @@ int ObDbmsStatsMaintenanceWindow::get_stats_maintenance_window_jobs_sql(const Ob } else if (OB_UNLIKELY(start_usec == -1 || job_action.empty())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected error", K(ret), K(start_usec), K(job_action)); - } else if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode, - tenant_id, - job_id++, - windows_name[i], - exec_env, - start_usec, - job_action, - tmp_sql))) { - LOG_WARN("failed to get stat window job sql", K(ret)); - } else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", (i == 0 ? "" : ","), tmp_sql.ptr()))) { - LOG_WARN("failed to append sql", K(ret)); } else { - ++ expected_affected_rows; - tmp_sql.reset(); - job_action.reset(); + if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode, + tenant_id, + job_id++, + windows_name[i], + exec_env, + start_usec, + job_action, + tmp_sql))) { + LOG_WARN("failed to get stat window job sql", K(ret)); + } else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", (i == 0 ? "" : ","), tmp_sql.ptr()))) { + LOG_WARN("failed to append sql", K(ret)); + } else { + ++ expected_affected_rows; + tmp_sql.reset(); + job_action.reset(); + if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode, + tenant_id, + 0, + windows_name[i], + exec_env, + start_usec, + job_action, + tmp_sql))) { + LOG_WARN("failed to get stat window job sql", K(ret)); + } else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", ",", tmp_sql.ptr()))) { + LOG_WARN("failed to append sql", K(ret)); + } else { + ++ expected_affected_rows; + tmp_sql.reset(); + job_action.reset(); + } + } } } if (OB_SUCC(ret)) { @@ -127,7 +145,17 @@ int ObDbmsStatsMaintenanceWindow::get_stats_maintenance_window_jobs_sql(const Ob } else { ++ expected_affected_rows; tmp_sql.reset(); + if (OB_FAIL(get_stats_history_manager_job_sql(is_oracle_mode, tenant_id, + 0, exec_env, tmp_sql))) { + LOG_WARN("failed to get stats history manager job sql", K(ret)); + } else if (OB_FAIL(raw_sql.append_fmt(", (%s)", tmp_sql.ptr()))) { + LOG_WARN("failed to append sql", K(ret)); + } else { + ++ expected_affected_rows; + tmp_sql.reset(); + } } + //set dummy guard job if (OB_FAIL(ret)) { } else if (OB_FAIL(get_dummy_guard_job_sql(tenant_id, job_id, tmp_sql))) {