patch 421 dbms scheduler to master

This commit is contained in:
obdev
2024-02-09 11:12:47 +00:00
committed by ob-robot
parent 7cc9d0baee
commit f5c1dfab2d
13 changed files with 625 additions and 570 deletions

View File

@ -74,120 +74,149 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
} else {
CK (job_info.valid());
CK ((job_info.get_what().length() != 0) || (job_info.get_program_name().length() != 0));
if (job_info.get_what().length() != 0) { // action
if (job_info.is_oracle_tenant_) {
OZ (what.append_fmt("BEGIN %.*s; END;",
job_info.get_what().length(), job_info.get_what().ptr()));
} else {
//mysql mode not support anonymous block
OZ (what.append_fmt("CALL %.*s;",
job_info.get_what().length(), job_info.get_what().ptr()));
}
} else { // program
ObSqlString sql;
ObString program_action;
uint64_t number_of_argument = 0;
OZ (sql.assign_fmt("select program_action, number_of_argument from %s where program_name = \'%.*s\'",
OB_ALL_TENANT_SCHEDULER_PROGRAM_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr()));
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_job_name().ptr()));
} else if (OB_NOT_NULL(result.get_result())) {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "program_action", program_action);
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "number_of_argument", number_of_argument, uint64_t);
if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
LOG_ERROR("got more than one row for dbms sched program!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
ret = OB_ERR_UNEXPECTED;
}
} else if (OB_ITER_END == ret) {
LOG_INFO("program not exists, may delete alreay!", K(ret), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_program_name().ptr()));
ret = OB_SUCCESS;
if (OB_SUCC(ret)) {
if (job_info.get_what().length() != 0) { // action
if (job_info.is_oracle_tenant_) {
OZ (what.append_fmt("BEGIN %.*s; END;",
job_info.get_what().length(), job_info.get_what().ptr()));
} else {
//mysql mode not support anonymous block
OZ (what.append_fmt("CALL %.*s;",
job_info.get_what().length(), job_info.get_what().ptr()));
}
} else { // program
ObSqlString sql;
ObString program_action;
uint64_t number_of_argument = 0;
OZ (sql.assign_fmt("select program_action, number_of_argument from %s where program_name = \'%.*s\'",
OB_ALL_TENANT_SCHEDULER_PROGRAM_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr()));
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_job_name().ptr()));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
if (OB_SUCCESS == (ret = result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "program_action", program_action);
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "number_of_argument", number_of_argument, uint64_t);
if (OB_SUCC(ret)) {
int tmp_ret = result.get_result()->next();
if (OB_SUCCESS == tmp_ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got more than one row for dbms sched program!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
} else if (tmp_ret != OB_ITER_END) {
ret = tmp_ret;
LOG_WARN("got next row for dbms sched program failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
}
}
} else if (OB_ITER_END == ret) {
LOG_INFO("program not exists, may delete alreay!", K(ret), K(tenant_id), K(job_info.get_program_name().ptr()), K(job_info.get_program_name().ptr()));
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
}
}
}
}
OZ (what.append_fmt("BEGIN %.*s(",
program_action.length(), program_action.ptr()));
if (OB_SUCC(ret) && (0 != number_of_argument)) {
ObString argument_value;
for (int i = 1; OB_SUCC(ret) && i <= number_of_argument; i++) {
argument_value.reset();
OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%.*s\' and argument_position = %d and is_for_default = 0",
OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr(),
job_info.get_job_name().length(),
job_info.get_job_name().ptr(),
i));
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(result.get_result()), K(tenant_id), K(job_info.get_job_name().ptr()));
} else if (OB_NOT_NULL(result.get_result())) {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "default_value", argument_value);
if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
ret = OB_ERR_UNEXPECTED;
}
} else if (OB_ITER_END == ret) {
LOG_INFO("job argument not exists, use default");
ret = OB_SUCCESS;
OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%s\' and argument_position = %d and is_for_default = 1",
OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr(),
"default",
i));
SMART_VAR(ObMySQLProxy::MySQLResult, tmp_result) {
if (OB_FAIL(sql_proxy_->read(tmp_result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_job_name().ptr()));
} else if (OB_NOT_NULL(tmp_result.get_result())) {
if (OB_SUCCESS == (ret = tmp_result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(tmp_result.get_result()), "default_value", argument_value);
if (OB_SUCC(ret) && (tmp_result.get_result()->next()) != OB_ITER_END) {
LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
ret = OB_ERR_UNEXPECTED;
}
} else if (OB_ITER_END == ret) {
LOG_ERROR("program default argument not exists", K(sql.ptr()), K(job_info.get_program_name().ptr()));
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
OZ (what.append_fmt("BEGIN %.*s(",
program_action.length(), program_action.ptr()));
if (OB_SUCC(ret) && (0 != number_of_argument)) {
ObString argument_value;
for (int i = 1; OB_SUCC(ret) && i <= number_of_argument; i++) {
argument_value.reset();
OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%.*s\' and argument_position = %d and is_for_default = 0",
OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr(),
job_info.get_job_name().length(),
job_info.get_job_name().ptr(),
i));
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(result.get_result()), K(tenant_id), K(job_info.get_job_name().ptr()));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
} else {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(result.get_result()), "default_value", argument_value);
if (OB_SUCC(ret)) {
int tmp_ret = result.get_result()->next();
if (OB_SUCCESS == tmp_ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
} else if (tmp_ret != OB_ITER_END) {
ret = tmp_ret;
LOG_WARN("got next row for argument failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
}
}
} else if (OB_ITER_END == ret) {
LOG_INFO("job argument not exists, use default");
ret = OB_SUCCESS;
OZ (sql.assign_fmt("select default_value from %s where program_name = \'%.*s\' and job_name = \'%s\' and argument_position = %d and is_for_default = 1",
OB_ALL_TENANT_SCHEDULER_PROGRAM_ARGUMENT_TNAME,
job_info.get_program_name().length(),
job_info.get_program_name().ptr(),
"default",
i));
SMART_VAR(ObMySQLProxy::MySQLResult, tmp_result) {
if (OB_FAIL(sql_proxy_->read(tmp_result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_info.get_job_name().ptr()));
} else if (OB_ISNULL(tmp_result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
} else {
if (OB_SUCCESS == (ret = tmp_result.get_result()->next())) {
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*(tmp_result.get_result()), "default_value", argument_value);
if (OB_SUCC(ret)) {
int tmp_ret = tmp_result.get_result()->next();
if (OB_SUCCESS == tmp_ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got more than one row for argument!", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
} else if (tmp_ret != OB_ITER_END) {
ret = tmp_ret;
LOG_WARN("got next row for argument failed", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()), K(job_info.get_program_name().ptr()));
}
}
} else if (OB_ITER_END == ret) {
LOG_ERROR("program default argument not exists", K(sql.ptr()), K(job_info.get_program_name().ptr()));
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
}
}
}
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
}
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_info.get_job_name().ptr()));
}
}
if (i == 1) {
OZ (what.append_fmt("\'%.*s\'", argument_value.length(), argument_value.ptr()));
} else {
OZ (what.append_fmt(",\'%.*s\'", argument_value.length(), argument_value.ptr()));
}
}
if (i == 1) {
OZ (what.append_fmt("\'%.*s\'", argument_value.length(), argument_value.ptr()));
} else {
OZ (what.append_fmt(",\'%.*s\'", argument_value.length(), argument_value.ptr()));
}
OZ (what.append_fmt("); END;"));
} else {
LOG_ERROR("number_of_argument not exist or not right", K(ret), K(number_of_argument));
}
OZ (what.append_fmt("); END;"));
} else {
LOG_ERROR("number_of_argument not exist or not right", K(ret), K(number_of_argument));
}
}
if (job_info.is_oracle_tenant_) {
ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(oracle_proxy.get_pool())));
OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info));
OZ (pool->acquire_spi_conn(session_info, conn));
OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
if (OB_NOT_NULL(conn)) {
sql_proxy_->close(conn, ret);
if (job_info.is_oracle_tenant_) {
ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(oracle_proxy.get_pool())));
OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info));
OZ (pool->acquire_spi_conn(session_info, conn));
OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
if (OB_NOT_NULL(conn)) {
sql_proxy_->close(conn, ret);
}
} else {//mysql mode need use mysql proxy
OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info));
OZ (sql_proxy_->write(tenant_id, what.string().ptr(), affected_rows));
}
} else {//mysql mode need use mysql proxy
OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info));
OZ (sql_proxy_->write(tenant_id, what.string().ptr(), affected_rows));
}
}
if (NULL != session_info) {
@ -211,7 +240,6 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(uint64_t tenant_id, bool is_oracl
OZ (table_operator_.get_dbms_sched_job_info(tenant_id, is_oracle_tenant, job_id, job_name, allocator, job_info));
if (OB_SUCC(ret)) {
OZ (table_operator_.update_for_start(tenant_id, job_info));
OZ (run_dbms_sched_job(tenant_id, job_info));

View File

@ -29,6 +29,8 @@
#include "observer/ob_server_struct.h"
#include "rootserver/ob_root_service.h"
#define TO_TS(second) (1000000L * second)
namespace oceanbase
{
using namespace common;
@ -48,26 +50,20 @@ int ObDBMSSchedJobTask::init()
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret), K(inited_));
} else if (OB_FAIL(timer_.init())) {
LOG_WARN("fail to init timer", K(ret));
} else {
inited_ = true;
}
return ret;
}
int ObDBMSSchedJobTask::start(dbms_job::ObDBMSJobQueue *ready_queue)
int ObDBMSSchedJobTask::start(ObVSliceAlloc *allocator)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("dbms sched job task not inited", K(ret), K(inited_));
} else if (OB_ISNULL(ready_queue)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(ready_queue));
}
ready_queue_ = ready_queue;
OZ (timer_.start());
allocator_ = allocator;
return ret;
}
@ -78,12 +74,15 @@ int ObDBMSSchedJobTask::stop()
ret = OB_NOT_INIT;
LOG_WARN("dbms sched job task not inited", K(ret), K(inited_));
} else {
timer_.cancel(*this);
timer_.stop();
timer_.wait();
if (allocator_ != NULL) {
for (WaitVectorIterator iter = wait_vector_.begin();
OB_SUCC(ret) && iter != wait_vector_.end(); ++iter) {
ObDBMSSchedJobKey *job_key = *iter;
allocator_->free(job_key);
}
}
wait_vector_.clear();
job_key_ = NULL;
ready_queue_ = NULL;
allocator_ = NULL;
}
return ret;
}
@ -94,50 +93,10 @@ int ObDBMSSchedJobTask::destroy()
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("scheduler task not inited", K(ret), K(inited_));
} else {
timer_.destroy();
}
return ret;
}
void ObDBMSSchedJobTask::runTimerTask()
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("dbms sched job task not init", K(ret), K(inited_));
} else if (OB_ISNULL(job_key_)
|| OB_ISNULL(ready_queue_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null ptr", K(ret), K(job_key_), K(ready_queue_));
} else if (OB_FAIL(ready_queue_->push(job_key_, 0))) {
LOG_WARN("fail to push ready job to queue", K(ret), K(*job_key_));
} else {
job_key_ = NULL;
if (wait_vector_.count() > 0) {
job_key_ = wait_vector_[0];
if (OB_FAIL(wait_vector_.remove(wait_vector_.begin()))) {
job_key_ = NULL;
LOG_WARN("fail to remove job_id from sorted vector", K(ret));
} else if (OB_ISNULL(job_key_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(job_key_));
} else if (OB_FAIL(timer_.schedule(*this, job_key_->get_adjust_delay()))) {
LOG_WARN("fail to schedule task", K(ret), K(*job_key_));
}
}
}
LOG_DEBUG("JobKEYS INFO HEADER ==== ", KPC(job_key_), K(wait_vector_.count()));
int i = 0;
for (WaitVectorIterator iter = wait_vector_.begin();
OB_SUCC(ret) && iter != wait_vector_.end(); ++iter, ++i) {
ObDBMSSchedJobKey *job = *iter;
LOG_DEBUG("JobKEYS INFO ELEMENT ====", K(i), KPC(job));
}
return;
}
int ObDBMSSchedJobTask::scheduler(ObDBMSSchedJobKey *job_key)
{
int ret = OB_SUCCESS;
@ -146,12 +105,10 @@ int ObDBMSSchedJobTask::scheduler(ObDBMSSchedJobKey *job_key)
LOG_WARN("dbms sched job task not init", K(ret));
} else if (OB_ISNULL(job_key)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr for job id", K(ret), KPC(job_key));
LOG_WARN("NULL ptr for job id", K(ret));
} else if (!job_key->is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job id is invalid", K(ret), KPC(job_key));
} else if (0 == job_key->get_delay()) {
OZ (immediately(job_key), KPC(job_key));
} else {
OZ (add_new_job(job_key), KPC(job_key));
}
@ -169,40 +126,19 @@ int ObDBMSSchedJobTask::add_new_job(ObDBMSSchedJobKey *new_job_key)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), KPC(new_job_key));
} else {
ObSpinLockGuard guard(lock_);
if (OB_ISNULL(job_key_)) {
job_key_ = new_job_key;
OZ (timer_.schedule(*this, job_key_->get_delay()));
} else if (new_job_key->get_execute_at() >= job_key_->get_execute_at()) {
WaitVectorIterator iter;
ObDBMSSchedJobKey *replace_job_key = NULL;
OZ (wait_vector_.replace(new_job_key, iter, compare_job_key, equal_job_key, replace_job_key));
} else {
WaitVectorIterator iter;
OX (timer_.cancel(*this));
OZ (wait_vector_.insert(job_key_, iter, compare_job_key));
OX (job_key_ = new_job_key);
OZ (timer_.schedule(*this, job_key_->get_delay()));
}
WaitVectorIterator iter;
ObDBMSSchedJobKey *replace_job_key = NULL;
OZ (wait_vector_.replace(new_job_key, iter, compare_job_key, equal_job_key, replace_job_key));
}
return ret;
}
int ObDBMSSchedJobTask::immediately(ObDBMSSchedJobKey *job_key)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("dbms sched job not init", K(ret), K(inited_));
} else if (OB_ISNULL(job_key) || OB_ISNULL(ready_queue_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(job_key), K(ready_queue_));
} else {
ObSpinLockGuard guard(lock_);
if (OB_FAIL(ready_queue_->push(job_key, 0))) {
LOG_WARN("fail to push ready job to queue", K(ret), K(*job_key));
}
/*
LOG_DEBUG("JobKEYS INFO HEADER ==== ", KPC(new_job_key), K(wait_vector_.count()));
int i = 0;
for (WaitVectorIterator iter = wait_vector_.begin();
OB_SUCC(ret) && iter != wait_vector_.end(); ++iter, ++i) {
ObDBMSSchedJobKey *job = *iter;
LOG_DEBUG("JobKEYS INFO ELEMENT ====", K(i), KPC(job));
}
*/
return ret;
}
@ -258,8 +194,6 @@ int ObDBMSSchedJobMaster::init(ObUnitManager *unit_mgr,
) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null ptr", K(ret), K(unit_mgr), K(sql_client), K(schema_service));
} else if (FALSE_IT(ready_queue_.set_limit(MAX_READY_JOBS_CAPACITY))) {
// do-nothing
} else if (OB_FAIL(scheduler_task_.init())) {
LOG_WARN("fail to init ready queue", K(ret));
} else if (OB_FAIL(scheduler_thread_.init(1, 1))) {
@ -293,7 +227,7 @@ int ObDBMSSchedJobMaster::start()
// alreay running , do nothing ...
} else if (OB_FAIL(scheduler_thread_.push(static_cast<void *>(this)))) {
LOG_WARN("fail to start scheduler thread", K(ret));
} else if (OB_FAIL(scheduler_task_.start(&ready_queue_))) {
} else if (OB_FAIL(scheduler_task_.start(&allocator_))) {
LOG_WARN("fail to start ready queue", K(ret));
}
LOG_INFO("dbms sched job master started", K(ret));
@ -308,13 +242,52 @@ int ObDBMSSchedJobMaster::stop()
sleep(1);
}
scheduler_task_.stop();
ready_queue_.clear();
alive_jobs_.clear();
stoped_ = false;
LOG_INFO("dbms sched job master stoped", K(ret));
return ret;
}
int64_t ObDBMSSchedJobMaster::calc_next_date(ObDBMSSchedJobInfo &job_info)
{
int64_t next_date = 0;
const int64_t now = ObTimeUtility::current_time();
if (job_info.get_interval_ts() == 0) {
next_date = 64060560000000000;
} else {
int64_t N = (now - job_info.get_start_date()) / job_info.get_interval_ts();
next_date = job_info.get_start_date() + (N + 1) * job_info.get_interval_ts();
}
return next_date;
}
int64_t ObDBMSSchedJobMaster::run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, int64_t next_date)
{
int ret = OB_SUCCESS;
ObAddr execute_addr;
if (OB_FAIL((get_execute_addr(job_info, execute_addr)))) {
LOG_WARN("failed to get execute addr, retry soon", K(ret), K(job_info));
} else if (ObTimeUtility::current_time() > job_info.get_end_date()) {
LOG_INFO("job reach end date, not running", K(job_info));
} else if (OB_FAIL(table_operator_.update_for_start(job_info.get_tenant_id(), job_info, next_date))) {
LOG_WARN("failed to update for start", K(ret), K(job_info), KPC(job_key));
} else if (OB_FAIL(job_rpc_proxy_->run_dbms_sched_job(job_key->get_tenant_id(),
job_key->is_oracle_tenant(),
job_key->get_job_id(),
job_key->get_job_name(),
execute_addr,
self_addr_))) {
LOG_WARN("failed to run dbms sched job", K(ret), K(job_info), KPC(job_key));
if (is_server_down_error(ret)) {
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "send job rpc failed"))) {
LOG_WARN("update for end failed for send rpc failed job", K(tmp), K(job_info), KPC(job_key));
}
}
}
return ret;
}
int ObDBMSSchedJobMaster::scheduler()
{
int ret = OB_SUCCESS;
@ -331,30 +304,31 @@ int ObDBMSSchedJobMaster::scheduler()
lib::set_thread_name("DBMS_SCHEDULER");
while (OB_SUCC(ret) && !stoped_) {
ObLink* ptr = NULL;
int64_t timeout = MIN_SCHEDULER_INTERVAL;
ObDBMSSchedJobKey *job_key = NULL;
if (REACH_TIME_INTERVAL(MIN_SCHEDULER_INTERVAL)) {
int tmp_ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (REACH_TIME_INTERVAL(CHECK_NEW_INTERVAL)) {
if (OB_SUCCESS != (tmp_ret = check_all_tenants())) {
LOG_WARN("fail to check all tenants", K(tmp_ret));
}
}
if (OB_FAIL(ready_queue_.pop(ptr, timeout))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_INFO("dbms sched job master wait timeout, no entry", K(ret));
ret = OB_SUCCESS;
} else {
LOG_ERROR("fail to pop dbms sched job ready queue", K(ret), K(timeout));
}
} else if (OB_ISNULL(job_key = static_cast<ObDBMSSchedJobKey *>(ptr)) || !job_key->is_valid()) {
if (scheduler_task_.wait_vector().count() == 0) {
ob_usleep(MIN_SCHEDULER_INTERVAL);
} else if (OB_ISNULL(job_key = scheduler_task_.wait_vector()[0]) || !job_key->is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected error, invalid job key found in ready queue!", K(ret), KPC(job_key));
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = scheduler_job(job_key))) {
LOG_WARN("fail to scheduler single dbms sched job", K(ret), K(tmp_ret), KPC(job_key));
int64_t delay = job_key->get_execute_at() - ObTimeUtility::current_time();
if (delay > MIN_SCHEDULER_INTERVAL) {
ob_usleep(MIN_SCHEDULER_INTERVAL);
} else {
LOG_INFO("success to scheduler single dbms sched job", K(ret), K(tmp_ret), KPC(job_key));
ob_usleep(max(0, delay));
if (OB_SUCCESS != (tmp_ret = scheduler_task_.wait_vector().remove(scheduler_task_.wait_vector().begin()))) {
LOG_WARN("fail to remove job_id from sorted vector", K(ret));
} else if (OB_SUCCESS != (tmp_ret = scheduler_job(job_key))) {
LOG_WARN("fail to scheduler single dbms sched job", K(ret), K(tmp_ret));
} else {
LOG_INFO("success to scheduler single dbms sched job", K(ret), K(tmp_ret));
}
}
}
}
@ -375,7 +349,7 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
CK (OB_LIKELY(inited_));
CK (OB_NOT_NULL(job_key));
CK (OB_LIKELY(job_key->is_valid()));
JobIdByTenant job_id_by_tenant(job_key->get_tenant_id(), job_key->get_job_id());
if (OB_FAIL(ret)) {
LOG_WARN("fail to scheduler job", K(ret), KPC(job_key));
} else {
@ -383,78 +357,118 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
OZ (table_operator_.get_dbms_sched_job_info(
job_key->get_tenant_id(), job_key->is_oracle_tenant(), job_key->get_job_id(), job_key->get_job_name(), allocator, job_info));
if (OB_FAIL(ret) || !job_info.valid()) {
int tmp = alive_jobs_.erase_refactored(job_key->get_job_id_with_tenant());
if (tmp != OB_SUCCESS) {
LOG_ERROR("failed delete invalid job from hash set", K(tmp), K(ret), K(job_info), KPC(job_key));
const int64_t now = ObTimeUtility::current_time();
int64_t next_check_date = now + MIN_SCHEDULER_INTERVAL;
if (OB_FAIL(ret) || !job_info.valid() || job_info.is_disabled() || job_info.is_broken()) {
free_job_key(job_key);
job_key = NULL;
} else if (job_info.is_running()) {
LOG_INFO("job is running now, retry later", K(job_info));
if (now > job_info.get_this_date() + TO_TS(job_info.get_max_run_duration())) {
if (OB_FAIL(table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job timeout"))) {
LOG_WARN("update for end failed for timeout job", K(ret));
} else {
LOG_WARN("job is timeout, force update for end", K(job_info), K(now));
}
}
} else if (now > job_info.get_end_date()) {
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check expired job"))) {
LOG_WARN("update for end failed for auto drop job", K(tmp), K(job_info));
} else {
LOG_INFO("delete invalid job from hash set", K(tmp), K(ret), K(job_info), KPC(job_key));
LOG_WARN("update for end for expired job", K(job_info), K(now));
}
allocator_.free(job_key); // sql proxy error
} else{
bool ignore_nextdate = false;
if (!job_key->is_check() && !job_info.is_running() && !job_info.is_broken() && !job_info.is_disabled()) {
bool can_running = false;
OZ (table_operator_.check_job_can_running(job_info.get_tenant_id(), can_running));
if (OB_SUCC(ret) && can_running) {
OZ (get_execute_addr(job_info, execute_addr));
OZ (table_operator_.update_for_start(
job_info.get_tenant_id(), job_info));
OZ (job_rpc_proxy_->run_dbms_sched_job(
job_key->get_tenant_id(), job_key->is_oracle_tenant(), job_key->get_job_id(), job_key->get_job_name(), execute_addr, self_addr_));
free_job_key(job_key);
job_key = NULL;
} else if (now < job_info.get_next_date()) {
next_check_date = min(job_info.get_next_date(), now + CHECK_NEW_INTERVAL);
} else {
bool can_running = false;
if (OB_FAIL(table_operator_.check_job_can_running(job_info.get_tenant_id(), alive_jobs_.size(), can_running))) {
LOG_WARN("failed to check job can running, retry later", K(ret));
} else if (!can_running) {
LOG_INFO("job concurrency reach limit, retry later", K(ret), K(job_info), K(can_running));
} else if (now > job_info.get_next_date() + TO_TS(job_info.get_max_run_duration())) {
LOG_WARN("job maybe missed, ignore it", K(now), K(job_info));
int64_t new_next_date = calc_next_date(job_info);
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job missed"))) {
LOG_WARN("update for end failed for missed job", K(tmp));
} else if (OB_SUCCESS != (tmp = table_operator_.update_next_date(job_info.get_tenant_id(), job_info, new_next_date))){
LOG_WARN("update next date failed", K(tmp), K(job_info));
} else {
LOG_INFO("avoid duplicate job", K(ret), K(job_info), K(can_running));
next_check_date = min(new_next_date, now + CHECK_NEW_INTERVAL);
}
ignore_nextdate = true;
}
int tmp_ret = OB_SUCCESS;
// always add job to queue. we need this to check job status changes.
if (OB_SUCCESS != (tmp_ret = register_job(job_info, job_key, ignore_nextdate))) {
LOG_WARN("failed to register job to job queue", K(tmp_ret), K(job_info));
int tmp = alive_jobs_.erase_refactored(job_info.get_job_id_with_tenant());
if (tmp != OB_SUCCESS) {
LOG_ERROR("failed delete invalid job from hash set", K(tmp), K(job_info));
} else {
int64_t new_next_date = calc_next_date(job_info);
if (OB_FAIL(run_job(job_info, job_key, new_next_date))) {
LOG_WARN("failed to run job", K(ret), K(job_info), KPC(job_key));
} else {
LOG_WARN("delete register failed job from hash set", K(job_info));
next_check_date = min(new_next_date, now + CHECK_NEW_INTERVAL);
}
}
}
int tmp = OB_SUCCESS;
if (OB_NOT_NULL(job_key) && OB_SUCCESS != (tmp = register_job(job_key, next_check_date))) {
LOG_WARN("failed to register job", K(tmp), K(job_info));
free_job_key(job_key);
job_key = NULL;
}
}
return ret;
}
int ObDBMSSchedJobMaster::destroy()
{
ready_queue_.destroy();
scheduler_task_.destroy();
scheduler_thread_.destroy();
allocator_.clear();
allocator_.destroy();
return OB_SUCCESS;
}
int ObDBMSSchedJobMaster::alloc_job_key(
ObDBMSSchedJobKey *&job_key,
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name,
uint64_t execute_at, uint64_t delay,
bool check_job)
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name)
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
void *ptr = NULL;
job_key = NULL;
if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObDBMSSchedJobKey)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", K(ret), K(ptr));
} else if (OB_ISNULL(job_key =
new(ptr)ObDBMSSchedJobKey(tenant_id, is_oracle_tenant, job_id, job_name,
execute_at, delay,
check_job))) {
new(ptr)ObDBMSSchedJobKey(tenant_id, is_oracle_tenant, job_id, job_name))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to init scheduler job id", K(ret), K(tenant_id));
} else {
JobIdByTenant job_id_by_tenant;
job_id_by_tenant.set_tenant_id(tenant_id);
job_id_by_tenant.set_job_id(job_id);
if (OB_FAIL(alive_jobs_.set_refactored(job_id_by_tenant))) {
LOG_WARN("faile to add job to alive_jobs", K(ret), K(tenant_id), K(job_id));
allocator_.free(job_key);
job_key = NULL;
}
}
return ret;
}
void ObDBMSSchedJobMaster::free_job_key(ObDBMSSchedJobKey *&job_key)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(job_key)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("job_key is null", K(ret));
} else {
JobIdByTenant job_id_by_tenant;
job_id_by_tenant.set_tenant_id(job_key->get_tenant_id());
job_id_by_tenant.set_job_id(job_key->get_job_id());
OZ (alive_jobs_.erase_refactored(job_id_by_tenant));
allocator_.free(job_key);
job_key = NULL;
}
}
int ObDBMSSchedJobMaster::get_execute_addr(ObDBMSSchedJobInfo &job_info, ObAddr &execute_addr)
{
int ret = OB_SUCCESS;
@ -520,7 +534,7 @@ int ObDBMSSchedJobMaster::server_random_pick(int64_t tenant_id, ObString &pick_z
bool is_alive = false;
bool is_active = false;
bool on_server = false;
int64_t pos = rand_.get(0,255) % total_server.count();
int64_t pos = rand_.get(0,65536) % total_server.count();
int64_t cnt = 0;
do {
pos = (pos + 1) % total_server.count();
@ -566,7 +580,27 @@ int ObDBMSSchedJobMaster::check_all_tenants()
const ObTenantSchema *tenant_schema = NULL;
OZ (schema_guard.get_tenant_info(tenant_ids.at(i), tenant_schema));
CK (OB_NOT_NULL(tenant_schema));
if (OB_SUCC(ret)) {
int64_t tenant_id = tenant_ids.at(i);
bool is_tenant_standby = false;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id));
} else if (is_tenant_standby) {
LOG_INFO("tenant is standby, not check new jobs, and remove exist jobs", K(tenant_id));
for (ObDBMSSchedJobTask::WaitVectorIterator iter = scheduler_task_.wait_vector().begin();
OB_SUCC(ret) && iter != scheduler_task_.wait_vector().end(); ++iter) {
ObDBMSSchedJobKey *job_key = *iter;
if (OB_NOT_NULL(job_key) && tenant_id == job_key->get_tenant_id()) {
if (OB_FAIL(scheduler_task_.wait_vector().remove(iter))) {
LOG_WARN("failed to remove job key from wait vector", K(ret), KPC(job_key));
} else {
LOG_INFO("remove job key", KPC(job_key));
iter--;
free_job_key(job_key);
}
}
}
} else {
uint64_t data_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_ids.at(i), data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
@ -599,84 +633,54 @@ int ObDBMSSchedJobMaster::register_new_jobs(uint64_t tenant_id, bool is_oracle_t
{
int ret = OB_SUCCESS;
ObDBMSSchedJobInfo job_info;
JobIdByTenant job_id_by_tenant;
for (int64_t i = 0; OB_SUCC(ret) && i < job_infos.count(); i++) {
job_info = job_infos.at(i);
if (job_info.valid()) {
int tmp = alive_jobs_.exist_refactored(job_info.get_job_id_with_tenant());
if (job_info.valid() && !job_info.is_disabled() && !job_info.is_broken()) {
job_id_by_tenant.set_tenant_id(job_info.get_tenant_id());
job_id_by_tenant.set_job_id(job_info.get_job_id());
int tmp = alive_jobs_.exist_refactored(job_id_by_tenant);
if (OB_HASH_EXIST == tmp) {
// do nothing ...
} else if (OB_HASH_NOT_EXIST) {
OZ (register_job(job_info));
OZ (alive_jobs_.set_refactored(job_info.get_job_id_with_tenant()));
LOG_DEBUG("job exist", K(alive_jobs_), K(job_id_by_tenant));
} else if (OB_HASH_NOT_EXIST == tmp) {
ObDBMSSchedJobKey *job_key = NULL;
if (OB_FAIL(alloc_job_key(
job_key,
job_info.get_tenant_id(),
job_info.is_oracle_tenant(),
job_info.get_job_id(),
job_info.get_job_name()))) {
LOG_WARN("failed to alloc job key", K(ret), K(job_info));
} else if (OB_FAIL(register_job(job_key, ObTimeUtility::current_time()))) {
LOG_WARN("failed to register job", K(ret), K(job_info));
free_job_key(job_key);
job_key = NULL;
}
LOG_INFO("register new job", K(ret), K(tenant_id), K(job_info));
} else {
LOG_ERROR("dbms sched job master check job exist failed", K(ret), K(job_info));
LOG_ERROR("dbms sched job master check job exist failed", K(tmp), K(job_info));
}
}
}
return ret;
}
int ObDBMSSchedJobMaster::register_job(
ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, bool ignore_nextdate)
int ObDBMSSchedJobMaster::register_job(ObDBMSSchedJobKey *job_key, int64_t next_date)
{
int ret = OB_SUCCESS;
int64_t execute_at = -1;
int64_t delay = -1;
bool check_job = false;
int64_t now = ObTimeUtility::current_time();
CK (OB_LIKELY(inited_));
CK (job_info.valid());
OZ (table_operator_.check_job_timeout(job_info));
OZ (table_operator_.check_auto_drop(job_info));
if (OB_FAIL(ret)) {
} else if (job_info.is_broken() || job_info.is_disabled()) {
execute_at = now + MIN_SCHEDULER_INTERVAL;
delay = MIN_SCHEDULER_INTERVAL; // every MIN_SCHEDULER_INTERVAL check job status
check_job = true;
if (OB_ISNULL(job_key)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("job key is null", K(ret));
} else if (next_date == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("next date should not be 0", K(ret), KPC(job_key), K(next_date));
} else {
OZ (table_operator_.calc_execute_at(job_info, execute_at, delay, ignore_nextdate));
if (OB_FAIL(ret) || delay < 0) {
ret = OB_SUCCESS;
execute_at = now + MIN_SCHEDULER_INTERVAL;
delay = MIN_SCHEDULER_INTERVAL;
check_job = true;
} else if (delay > MIN_SCHEDULER_INTERVAL) {
// job may run later, but we need check job update.
execute_at = now + MIN_SCHEDULER_INTERVAL;
delay = MIN_SCHEDULER_INTERVAL;
check_job = true;
job_key->set_execute_at(next_date);
if (OB_FAIL(scheduler_task_.scheduler(job_key))) {
LOG_WARN("failed to scheduler job", K(ret), KPC(job_key));
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(job_key)) {
OZ (alloc_job_key(
job_key,
job_info.get_tenant_id(),
job_info.is_oracle_tenant(),
job_info.get_job_id(),
job_info.get_job_name(),
execute_at,
delay,
check_job));
CK (OB_NOT_NULL(job_key));
CK (job_key->is_valid());
} else {
CK (job_key->get_tenant_id() == job_info.get_tenant_id());
CK (job_key->get_job_id() == job_info.get_job_id());
CK (job_key->get_job_name() == job_info.get_job_name());
OX (job_key->set_execute_at(execute_at));
OX (job_key->set_delay(delay));
OX (job_key->set_check_job(check_job));
}
OZ (scheduler_task_.scheduler(job_key));
if (OB_FAIL(ret) && OB_NOT_NULL(job_key)) {
allocator_.free(job_key);
}
LOG_INFO("register dbms sched job", K(ret), K(job_info), KPC(job_key), K(ignore_nextdate));
return ret;
}

View File

@ -51,16 +51,11 @@ class ObDBMSSchedJobKey : public common::ObLink
{
public:
ObDBMSSchedJobKey(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
uint64_t execute_at, uint64_t delay,
bool check_job)
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name)
: tenant_id_(tenant_id),
is_oracle_tenant_(is_oracle_tenant),
job_id_(job_id),
job_name_(),
execute_at_(execute_at),
delay_(delay),
check_job_(check_job) {
job_name_() {
job_name_.assign_buffer(job_name_buf_, JOB_NAME_MAX_SIZE);
job_name_.write(job_name.ptr(), job_name.length());
}
@ -73,18 +68,9 @@ public:
OB_INLINE uint64_t get_job_id() const { return job_id_; }
OB_INLINE common::ObString &get_job_name() { return job_name_; }
OB_INLINE uint64_t get_execute_at() const { return execute_at_;}
OB_INLINE uint64_t get_delay() const { return delay_; }
OB_INLINE bool is_check() { return check_job_; }
OB_INLINE void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
OB_INLINE void set_job_id(uint64_t job_id) { job_id_ = job_id; }
OB_INLINE void set_execute_at(uint64_t execute_at) { execute_at_ = execute_at; }
OB_INLINE void set_delay(uint64_t delay) { delay_ = delay; }
OB_INLINE void set_check_job(bool check_job) { check_job_ = check_job; }
OB_INLINE uint64_t get_adjust_delay() const
{
uint64_t now = ObTimeUtility::current_time();
@ -103,9 +89,7 @@ public:
K_(is_oracle_tenant),
K_(job_id),
K_(job_name),
K_(execute_at),
K_(delay),
K_(check_job));
K_(execute_at));
private:
uint64_t tenant_id_;
@ -114,12 +98,9 @@ private:
char job_name_buf_[JOB_NAME_MAX_SIZE];
common::ObString job_name_;
uint64_t execute_at_;
uint64_t delay_;
bool check_job_; // for check job update ...
};
class ObDBMSSchedJobTask : public ObTimerTask
class ObDBMSSchedJobTask
{
public:
typedef common::ObSortedVector<ObDBMSSchedJobKey *> WaitVector;
@ -127,23 +108,19 @@ public:
ObDBMSSchedJobTask()
: inited_(false),
job_key_(NULL),
ready_queue_(NULL),
wait_vector_(0, NULL, ObModIds::VECTOR),
lock_(common::ObLatchIds::DBMS_SCHEDULER_TASK_LOCK) {}
allocator_(NULL),
wait_vector_(0, NULL, ObModIds::VECTOR) {}
virtual ~ObDBMSSchedJobTask() {}
int init();
int start(dbms_job::ObDBMSJobQueue *ready_queue);
int start(common::ObVSliceAlloc *allocator);
int stop();
int destroy();
void runTimerTask();
int scheduler(ObDBMSSchedJobKey *job_key);
int add_new_job(ObDBMSSchedJobKey *job_key);
int immediately(ObDBMSSchedJobKey *job_key);
WaitVector &wait_vector() { return wait_vector_; }
inline static bool compare_job_key(
const ObDBMSSchedJobKey *lhs, const ObDBMSSchedJobKey *rhs);
@ -152,13 +129,8 @@ public:
private:
bool inited_;
ObDBMSSchedJobKey *job_key_;
dbms_job::ObDBMSJobQueue *ready_queue_;
common::ObVSliceAlloc *allocator_;
WaitVector wait_vector_;
ObSpinLock lock_;
ObTimer timer_;
private:
DISALLOW_COPY_AND_ASSIGN(ObDBMSSchedJobTask);
};
@ -175,11 +147,54 @@ public:
schema_service_(NULL),
job_rpc_proxy_(NULL),
self_addr_(),
lock_(common::ObLatchIds::DBMS_SCHEDULER_MASTER_LOCK),
allocator_(ObMemAttr(OB_SERVER_TENANT_ID, "DbmsScheduler"), OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_),
alive_jobs_() {}
virtual ~ObDBMSSchedJobMaster() { alive_jobs_.destroy(); };
class JobIdByTenant
{
public:
JobIdByTenant()
: tenant_id_(OB_INVALID_TENANT_ID),
job_id_(OB_INVALID_ID) {}
JobIdByTenant(const int64_t tenant_id, const int64_t job_id)
: tenant_id_(tenant_id),
job_id_(job_id) {}
~JobIdByTenant() {}
bool operator ==(const JobIdByTenant &other) const
{
return tenant_id_ == other.tenant_id_ && job_id_ == other.job_id_;
}
bool operator !=(const JobIdByTenant &other) const
{
return !(*this == other);
}
uint64_t hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val);
hash_val = murmurhash(&job_id_, sizeof(job_id_), hash_val);
return hash_val;
}
int hash(uint64_t &hash_val) const
{
hash_val = hash();
return OB_SUCCESS;
}
void set_tenant_id(int64_t tenant_id) { tenant_id_ = tenant_id; }
void set_job_id(int64_t job_id) { job_id_ = job_id; }
int64_t get_tenant_id() { return tenant_id_; }
int64_t get_job_id() { return job_id_; }
TO_STRING_KV(K_(tenant_id),
K_(job_id));
private:
int64_t tenant_id_;
int64_t job_id_;
};
static ObDBMSSchedJobMaster &get_instance();
bool is_inited() { return inited_; }
@ -195,23 +210,23 @@ public:
int alloc_job_key(
ObDBMSSchedJobKey *&job_key,
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
uint64_t execute_at, uint64_t delay,
bool check_job = false);
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name);
void free_job_key(ObDBMSSchedJobKey *&job_key);
int server_random_pick(int64_t tenant_id, common::ObString &pick_zone, ObAddr &server);
int get_execute_addr(ObDBMSSchedJobInfo &job_info, common::ObAddr &execute_addr);
int check_all_tenants();
int check_new_jobs(uint64_t tenant_id, bool is_oracle_tenant);
int register_new_jobs(uint64_t tenant_id, bool is_oracle_tenant, ObIArray<ObDBMSSchedJobInfo> &job_infos);
int register_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key = NULL, bool ignore_nextdate = false);
int register_job(ObDBMSSchedJobKey *job_key, int64_t next_date);
int scheduler_job(ObDBMSSchedJobKey *job_key);
int64_t calc_next_date(ObDBMSSchedJobInfo &job_info);
int64_t run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJobKey *job_key, int64_t next_date);
private:
const static int MAX_READY_JOBS_CAPACITY = 1024 * 1024;
const static int MIN_SCHEDULER_INTERVAL = 20 * 1000 * 1000;
const static int MIN_SCHEDULER_INTERVAL = 1 * 1000 * 1000;
const static int CHECK_NEW_INTERVAL = 20 * 1000 * 1000;
bool inited_;
bool stoped_;
@ -225,15 +240,14 @@ private:
obrpc::ObDBMSSchedJobRpcProxy *job_rpc_proxy_;
common::ObAddr self_addr_;
dbms_job::ObDBMSJobQueue ready_queue_;
ObDBMSSchedJobTask scheduler_task_;
ObDBMSSchedJobThread scheduler_thread_;
ObDBMSSchedTableOperator table_operator_;
common::ObSpinLock lock_;
common::ObArenaAllocator allocator_;
common::ObBlockAllocMgr block_alloc_;
common::ObVSliceAlloc allocator_;
common::hash::ObHashSet<uint64_t> alive_jobs_;
common::hash::ObHashSet<JobIdByTenant> alive_jobs_;
private:
DISALLOW_COPY_AND_ASSIGN(ObDBMSSchedJobMaster);

View File

@ -49,7 +49,6 @@ public:
{
return common::is_valid_tenant_id(tenant_id_)
&& job_id_ != common::OB_INVALID_ID
&& !job_name_.empty()
&& server_addr_.is_valid()
&& master_addr_.is_valid();
}

View File

@ -145,6 +145,9 @@ public:
int64_t get_last_modify() { return last_modify_; }
int64_t get_interval_ts() { return interval_ts_; }
int64_t get_max_run_duration() { return (max_run_duration_ == 0) ? 30 : max_run_duration_ ; } // 30s by default
int64_t get_start_date() { return start_date_; }
int64_t get_end_date() { return end_date_; }
int64_t get_auto_drop() { return auto_drop_; }
bool is_broken() { return 0x1 == (flag_ & 0x1); }
bool is_running(){ return this_date_ != 0; }

View File

@ -34,7 +34,6 @@
#include "share/schema/ob_multi_version_schema_service.h"
#include "storage/mview/ob_mview_sched_job_utils.h"
#define TO_TS(second) 1000000L * second
namespace oceanbase
{
@ -48,8 +47,8 @@ using namespace storage;
namespace dbms_scheduler
{
int ObDBMSSchedTableOperator::update_for_start(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, bool update_nextdate)
int ObDBMSSchedTableOperator::update_next_date(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
{
int ret = OB_SUCCESS;
@ -57,50 +56,67 @@ int ObDBMSSchedTableOperator::update_for_start(
ObSqlString sql;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
int64_t delay = 0;
int64_t dummy_execute_at = 0;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (calc_execute_at(
job_info, (update_nextdate ? job_info.next_date_ : dummy_execute_at), delay, true));
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job", job_info.job_));
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.add_time_column("next_date", next_date));
OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
return ret;
}
int ObDBMSSchedTableOperator::update_for_start(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
ObSqlString sql;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OX (job_info.this_date_ = now);
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job", job_info.job_));
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.add_time_column("this_date", job_info.this_date_));
OZ (dml.add_time_column("next_date", next_date));
OZ (dml.add_column("state", "SCHEDULED"));
OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
OZ (sql.append_fmt(" and this_date is null"));
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
CK (affected_rows == 1);
return ret;
}
int ObDBMSSchedTableOperator::update_nextdate(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info)
int ObDBMSSchedTableOperator::seperate_job_id_from_name(ObString &job_name, int64_t &job_id)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
ObSqlString sql;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job", job_info.job_));
OZ (dml.add_column("interval_ts", job_info.interval_ts_));
OZ (dml.add_time_column("next_date", job_info.next_date_));
OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
const char* prefix = "JOB$_";
job_id = 0;
if (job_name.prefix_match(prefix)) {
char nptr[JOB_NAME_MAX_SIZE];
char *endptr = NULL;
snprintf(nptr, JOB_NAME_MAX_SIZE, "%.*s", job_name.length(), job_name.ptr());
job_id = strtoll(nptr + strlen(prefix), &endptr, 10);
if (job_id <= 0) {
LOG_WARN("job_id is not right", K(job_name), K(nptr), K(job_id));
} else if (*endptr != '\0' || job_id <= JOB_ID_OFFSET) {
job_id = 0; // use job_info.job_ when job_id is not formal
}
}
return ret;
}
@ -117,8 +133,6 @@ int ObDBMSSchedTableOperator::update_for_end(
ObSqlString sql2;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
int64_t next_date;
int64_t delay;
UNUSED(errmsg);
@ -161,12 +175,13 @@ int ObDBMSSchedTableOperator::update_for_end(
OZ (dml1.add_column("state", "COMPLETED"));
OZ (dml1.add_column("enabled", job_info.enabled_));
}
CK (job_info.this_date_ > 0);
OX (job_info.total_ += (now - job_info.this_date_));
CK (job_info.this_date_ > 0 || !errmsg.empty());
OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0));
OZ (dml1.add_gmt_modified(now));
OZ (dml1.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml1.add_pk_column("job", job_info.job_));
OZ (dml1.add_pk_column("job_name", job_info.job_name_));
OZ (dml1.add_column(true, "this_date"));
OZ (dml1.add_time_column("last_date", job_info.this_date_));
OZ (dml1.add_time_column("next_date", job_info.next_date_));
@ -196,7 +211,12 @@ int ObDBMSSchedTableOperator::update_for_end(
OZ (dml2.add_gmt_create(now));
OZ (dml2.add_gmt_modified(now));
OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml2.add_pk_column("job", job_info.job_));
int64_t job_id = 0;
OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id));
if (job_id <= 0) {
job_id = job_info.get_job_id();
}
OZ (dml2.add_pk_column("job", job_id));
OZ (dml2.add_time_column("time", now));
OZ (dml2.add_column("code", err));
OZ (dml2.add_column(
@ -225,32 +245,7 @@ int ObDBMSSchedTableOperator::update_for_end(
return ret;
}
int ObDBMSSchedTableOperator::check_job_timeout(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
if ((!job_info.is_running()) || (job_info.get_max_run_duration() == 0)) {
//not running or not set timeout
} else if (ObTimeUtility::current_time() > (job_info.get_this_date() + TO_TS(job_info.get_max_run_duration()))) {
OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check job timeout"));
LOG_WARN("job is timeout, force update for end", K(job_info), K(ObTimeUtility::current_time()));
}
return ret;
}
int ObDBMSSchedTableOperator::check_auto_drop(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
if (job_info.is_running()) {
// running job not check
} else if (ObTimeUtility::current_time() > (job_info.end_date_) &&
(true == job_info.auto_drop_)) {
OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check auto drop expired job"));
LOG_WARN("auto drop miss out job", K(job_info), K(ObTimeUtility::current_time()));
}
return ret;
}
int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, bool &can_running)
int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running)
{
int ret = OB_SUCCESS;
uint64_t job_queue_processor = 0;
@ -263,33 +258,40 @@ int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, bool &can
CK (tenant_config.is_valid());
OX (job_queue_processor = tenant_config->job_queue_processes);
// found current running job count
OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME));
if (OB_FAIL(ret)) {
} else if (alive_job_count <= job_queue_processor) {
can_running = true;
} else {
OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME));
CK (OB_NOT_NULL(GCTX.schema_service_));
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
CK (OB_NOT_NULL(GCTX.schema_service_));
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
// job can not run in standy cluster and restore.
if (OB_SUCC(ret) && job_queue_processor > 0
&& !GCTX.is_standby_cluster()
&& !is_restore) {
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
} else if (OB_NOT_NULL(result.get_result())) {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
int64_t int_value = 0;
if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
LOG_WARN("failed to get column in row. ", K(ret));
} else {
job_running_cnt = static_cast<uint64_t>(int_value);
}
// job can not run in standy cluster and restore.
if (OB_SUCC(ret) && job_queue_processor > 0
&& !is_restore) {
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
} else {
LOG_WARN("failed to calc all running job, no row return", K(ret));
if (OB_SUCCESS == (ret = result.get_result()->next())) {
int64_t int_value = 0;
if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
LOG_WARN("failed to get column in row. ", K(ret));
} else {
job_running_cnt = static_cast<uint64_t>(int_value);
}
} else {
LOG_WARN("failed to calc all running job, no row return", K(ret));
}
}
}
OX (can_running = (job_queue_processor > job_running_cnt));
}
OX (can_running = (job_queue_processor > job_running_cnt));
}
return ret;
}
@ -399,16 +401,24 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_info(
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id));
} else if (OB_NOT_NULL(result.get_result())) {
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_id));
} else {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id));
ret = OB_ERR_UNEXPECTED;
if (OB_SUCC(ret)) {
int tmp_ret = result.get_result()->next();
if (OB_SUCCESS == tmp_ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id));
} else if (tmp_ret != OB_ITER_END) {
ret = tmp_ret;
LOG_ERROR("got next row for dbms sched job failed", K(ret), K(tenant_id), K(job_id));
}
}
} else if (OB_ITER_END == ret) {
LOG_INFO("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
ret = OB_SUCCESS; // job not exist, do nothing ...
LOG_WARN("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
} else {
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id));
}
@ -437,7 +447,10 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_infos_in_tenant(
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
} else if (OB_NOT_NULL(result.get_result())) {
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
} else {
do {
if (OB_FAIL(result.get_result()->next())) {
LOG_INFO("failed to get result", K(ret));
@ -492,12 +505,21 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info(
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
} else if (OB_NOT_NULL(result.get_result())) {
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
} else {
if (OB_SUCCESS == (ret = result.get_result()->next())) {
OZ (extract_job_class_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_class_info));
if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name));
ret = OB_ERR_UNEXPECTED;
if (OB_SUCC(ret)) {
int tmp_ret = result.get_result()->next();
if (OB_SUCCESS == tmp_ret) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name));
} else if (tmp_ret != OB_ITER_END) {
ret = tmp_ret;
LOG_ERROR("got next row for dbms sched job class failed", K(ret), K(tenant_id), K(job_class_name));
}
}
} else if (OB_ITER_END == ret) {
LOG_INFO("job_class_name not exists, may delete alreay!", K(ret), K(tenant_id), K(job_class_name));
@ -511,96 +533,6 @@ int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info(
return ret;
}
int ObDBMSSchedTableOperator::calc_execute_at(
ObDBMSSchedJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate)
{
int ret = OB_SUCCESS;
ObString &interval = job_info.get_interval();
const int64_t now = ObTimeUtility::current_time();
int64_t last_sub_next =
(job_info.get_last_modify() / 1000 / 1000) - (job_info.get_next_date() / 1000/ 1000);
if (job_info.get_next_date() != 0 && (!ignore_nextdate || job_info.get_next_date() != execute_at)) {
if (job_info.get_next_date() > now) {
execute_at = job_info.get_next_date();
delay = job_info.get_next_date() - now;
} else if (now - job_info.get_next_date() < TO_TS(job_info.get_max_run_duration())) {
LOG_WARN("job maybe missed, retry it", K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt()));
execute_at = now;
delay = 0;
} else if (last_sub_next < 5 && last_sub_next >= -5) {
LOG_WARN("job maybe missed, retry it", K(last_sub_next), K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt()));
execute_at = now;
delay = 0;
} else {
LOG_WARN("job maybe missed, ignore it", K(last_sub_next), K(now), K(job_info), K(execute_at), K(delay), K(ignore_nextdate), K(lbt()));
OZ(update_for_end(job_info.get_tenant_id(), job_info, 0, "check job missed"));
delay = -1;
}
} else {
delay = -1;
}
if (delay < 0 && (job_info.get_interval_ts() != 0
|| (!interval.empty() && (0 != interval.case_compare("null"))))) {
ObSqlString sql;
common::ObISQLClient *sql_proxy = sql_proxy_;
ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
// NOTE: we need utc timestamp.
if (lib::is_mysql_mode()) {
OZ (sql.append_fmt("select utc_timestamp() from dual;"));
} else {
OZ (sql.append_fmt(
"select cast(to_timestamp(sys_extract_utc(to_timestamp(sysdate))) as date) from dual;"));
sql_proxy = &oracle_proxy;
}
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql_proxy->read(result, job_info.get_tenant_id(), sql.ptr()))) {
LOG_WARN("execute query failed", K(ret), K(sql), K(job_info));
} else if (OB_NOT_NULL(result.get_result())) {
if (OB_FAIL(result.get_result()->next())) {
LOG_WARN("failed to get result", K(ret));
} else {
int64_t sysdate = 0;
int64_t col_idx = 0;
OZ (result.get_result()->get_datetime(col_idx, sysdate));
if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
int64_t next_date_utc_ts = 0;
if (OB_FAIL(ObMViewSchedJobUtils::calc_date_expression(job_info, next_date_utc_ts))) {
LOG_WARN("failed to calc date expression", KR(ret), K(job_info));
} else {
job_info.interval_ts_ = next_date_utc_ts - sysdate;
}
}
if (OB_SUCC(ret)) {
execute_at = sysdate + job_info.get_interval_ts();
}
if (OB_FAIL(ret)) {
} else if (job_info.get_next_date() > execute_at) {
execute_at = job_info.get_next_date();
delay = execute_at - sysdate;
} else {
delay = execute_at - sysdate;
}
if (OB_SUCC(ret)) {
OX (job_info.next_date_ = execute_at);
OZ (update_nextdate(job_info.get_tenant_id(), job_info));
}
}
}
}
LOG_INFO("repeat job update nextdate", K(job_info), K(execute_at), K(delay), K(ignore_nextdate));
} else if (delay < 0 && job_info.get_interval_ts() == 0) {
OX (job_info.next_date_ = 64060560000000000); // 4000-01-01
OZ (update_nextdate(job_info.get_tenant_id(), job_info));
LOG_INFO("once job update nextdate", K(job_info), K(execute_at), K(delay), K(ignore_nextdate));
}
return ret;
}
int ObDBMSSchedTableOperator::register_default_job_class(uint64_t tenant_id)
{
int ret = OB_SUCCESS;

View File

@ -46,14 +46,20 @@ class ObDBMSSchedTableOperator
public:
ObDBMSSchedTableOperator() : sql_proxy_(NULL) {}
virtual ~ObDBMSSchedTableOperator() {};
static constexpr int64_t JOB_NAME_MAX_SIZE = 128;
static const int64_t JOB_ID_OFFSET = (1LL<<50);
int init(common::ObISQLClient *sql_proxy) { sql_proxy_ = sql_proxy; return common::OB_SUCCESS; }
int update_next_date(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date);
int update_for_start(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, bool update_nextdate = true);
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date);
int update_for_end(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const common::ObString &errmsg);
int update_nextdate(uint64_t tenant_id, ObDBMSSchedJobInfo &job_info);
int seperate_job_id_from_name(common::ObString &job_name, int64_t &job_id);
int get_dbms_sched_job_info(
uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
@ -73,14 +79,7 @@ public:
sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant,
ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info);
int calc_execute_at(
ObDBMSSchedJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate = false);
int check_job_can_running(int64_t tenant_id, bool &can_running);
int check_job_timeout(ObDBMSSchedJobInfo &job_info);
int check_auto_drop(ObDBMSSchedJobInfo &job_info);
int check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running);
int register_default_job_class(uint64_t tenant_id);
int purge_run_detail_histroy(uint64_t tenant_id);

View File

@ -1517,7 +1517,7 @@ int ObInnerTableSchema::dba_scheduler_windows_schema(ObTableSchema &table_schema
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT CAST(T.POWNER AS CHAR(128)) AS OWNER, CAST(T.JOB_NAME AS CHAR(128)) AS WINDOW_NAME, CAST(NULL AS CHAR(128)) AS RESOURCE_PLAN, CAST(NULL AS CHAR(4000)) AS SCHEDULE_OWNER, CAST(NULL AS CHAR(4000)) AS SCHEDULE_NAME, CAST(NULL AS CHAR(8)) AS SCHEDULE_TYPE, CAST(T.START_DATE AS DATETIME(6)) AS START_DATE, CAST(T.REPEAT_INTERVAL AS CHAR(4000)) AS REPEAT_INTERVAL, CAST(T.END_DATE AS DATETIME(6)) AS END_DATE, CAST(T.MAX_RUN_DURATION AS SIGNED) AS DURATION, CAST(NULL AS CHAR(4)) AS WINDOW_PRIORITY, CAST(T.NEXT_DATE AS DATETIME(6)) AS NEXT_RUN_DATE, CAST(T.LAST_DATE AS DATETIME(6)) AS LAST_START_DATE, CAST(T.ENABLED AS CHAR(5)) AS ENABLED, CAST(NULL AS CHAR(5)) AS ACTIVE, CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME, CAST(NULL AS SIGNED) AS MANUAL_DURATION, CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB_NAME in ('MONDAY_WINDOW', 'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW') )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT CAST(T.POWNER AS CHAR(128)) AS OWNER, CAST(T.JOB_NAME AS CHAR(128)) AS WINDOW_NAME, CAST(NULL AS CHAR(128)) AS RESOURCE_PLAN, CAST(NULL AS CHAR(4000)) AS SCHEDULE_OWNER, CAST(NULL AS CHAR(4000)) AS SCHEDULE_NAME, CAST(NULL AS CHAR(8)) AS SCHEDULE_TYPE, CAST(T.START_DATE AS DATETIME(6)) AS START_DATE, CAST(T.REPEAT_INTERVAL AS CHAR(4000)) AS REPEAT_INTERVAL, CAST(T.END_DATE AS DATETIME(6)) AS END_DATE, CAST(T.MAX_RUN_DURATION AS SIGNED) AS DURATION, CAST(NULL AS CHAR(4)) AS WINDOW_PRIORITY, CAST(T.NEXT_DATE AS DATETIME(6)) AS NEXT_RUN_DATE, CAST(T.LAST_DATE AS DATETIME(6)) AS LAST_START_DATE, CAST(T.ENABLED AS CHAR(5)) AS ENABLED, CAST(NULL AS CHAR(5)) AS ACTIVE, CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME, CAST(NULL AS SIGNED) AS MANUAL_DURATION, CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB > 0 and T.JOB_NAME in ('MONDAY_WINDOW', 'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW') )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}

View File

@ -26512,7 +26512,7 @@ def_table_schema(
CAST(NULL AS DATETIME(6)) AS MANUAL_OPEN_TIME,
CAST(NULL AS SIGNED) AS MANUAL_DURATION,
CAST(T.COMMENTS AS CHAR(4000)) AS COMMENTS
FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB_NAME in ('MONDAY_WINDOW',
FROM oceanbase.__all_tenant_scheduler_job T WHERE T.JOB > 0 and T.JOB_NAME in ('MONDAY_WINDOW',
'TUESDAY_WINDOW', 'WEDNESDAY_WINDOW', 'THURSDAY_WINDOW', 'FRIDAY_WINDOW', 'SATURDAY_WINDOW', 'SUNDAY_WINDOW')
""".replace("\n", " ")
)

View File

@ -1147,6 +1147,44 @@ int ObUpgradeFor4200Processor::post_upgrade_for_heartbeat_and_server_zone_op_ser
/* =========== 4200 upgrade processor end ============= */
int ObUpgradeFor4211Processor::post_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(post_upgrade_for_dbms_scheduler())) {
LOG_WARN("post for upgrade dbms scheduler failed", K(ret));
}
return ret;
}
int ObUpgradeFor4211Processor::post_upgrade_for_dbms_scheduler()
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affected_rows = 0;
bool is_tenant_standby = false;
if (sql_proxy_ == NULL) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy is null", K(ret), K(tenant_id_));
} else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(sql_proxy_, tenant_id_, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id_));
} else if (is_tenant_standby) {
LOG_INFO("tenant is standby, ignore", K(tenant_id_));
} else {
OZ (sql.append_fmt(
"insert ignore into %s "
"(tenant_id,job_name,job,lowner,powner,cowner,next_date,`interval#`,flag) "
"select tenant_id, job_name,0,lowner,powner,cowner,next_date,`interval#`,flag from %s where job != 0",
OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
OB_ALL_TENANT_SCHEDULER_JOB_TNAME)); // if has new colomn, use default value
OZ (sql_proxy_->write(tenant_id_, sql.ptr(), affected_rows));
LOG_INFO("insert job_id=0 rows finished for dbms_scheduler old jobs", K(ret), K(tenant_id_), K(affected_rows));
}
return ret;
}
/* =========== 4211 upgrade processor end ============= */
/* =========== special upgrade processor end ============= */
} // end share

View File

@ -210,7 +210,17 @@ private:
};
DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 0)
DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 1)
class ObUpgradeFor4211Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor4211Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor4211Processor() {}
virtual int pre_upgrade() override { return common::OB_SUCCESS; }
virtual int post_upgrade() override;
private:
int post_upgrade_for_dbms_scheduler();
};
DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 1, 2)
DEF_SIMPLE_UPGRARD_PROCESSER(4, 2, 2, 0)
DEF_SIMPLE_UPGRARD_PROCESSER(4, 3, 0, 0)

View File

@ -1394,7 +1394,7 @@ DEF_INT(connection_control_max_connection_delay, OB_TENANT_PARAMETER, "214748364
DEF_BOOL(ob_proxy_readonly_transaction_routing_policy, OB_TENANT_PARAMETER, "false",
"Proxy route policy for readonly sql: whether regard begining read only stmts as in transaction",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(job_queue_processes, OB_TENANT_PARAMETER, "1000", "[0,1000]",
DEF_INT(job_queue_processes, OB_TENANT_PARAMETER, "1000", "[0,16384]",
"specifies the maximum number of job slaves per instance that can be created "
"for the execution of DBMS_JOB jobs and Oracle Scheduler (DBMS_SCHEDULER) jobs.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -100,21 +100,39 @@ int ObDbmsStatsMaintenanceWindow::get_stats_maintenance_window_jobs_sql(const Ob
} else if (OB_UNLIKELY(start_usec == -1 || job_action.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(start_usec), K(job_action));
} else if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode,
tenant_id,
job_id++,
windows_name[i],
exec_env,
start_usec,
job_action,
tmp_sql))) {
LOG_WARN("failed to get stat window job sql", K(ret));
} else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", (i == 0 ? "" : ","), tmp_sql.ptr()))) {
LOG_WARN("failed to append sql", K(ret));
} else {
++ expected_affected_rows;
tmp_sql.reset();
job_action.reset();
if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode,
tenant_id,
job_id++,
windows_name[i],
exec_env,
start_usec,
job_action,
tmp_sql))) {
LOG_WARN("failed to get stat window job sql", K(ret));
} else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", (i == 0 ? "" : ","), tmp_sql.ptr()))) {
LOG_WARN("failed to append sql", K(ret));
} else {
++ expected_affected_rows;
tmp_sql.reset();
job_action.reset();
if (OB_FAIL(get_stat_window_job_sql(is_oracle_mode,
tenant_id,
0,
windows_name[i],
exec_env,
start_usec,
job_action,
tmp_sql))) {
LOG_WARN("failed to get stat window job sql", K(ret));
} else if (OB_FAIL(raw_sql.append_fmt("%s(%s)", ",", tmp_sql.ptr()))) {
LOG_WARN("failed to append sql", K(ret));
} else {
++ expected_affected_rows;
tmp_sql.reset();
job_action.reset();
}
}
}
}
if (OB_SUCC(ret)) {
@ -127,7 +145,17 @@ int ObDbmsStatsMaintenanceWindow::get_stats_maintenance_window_jobs_sql(const Ob
} else {
++ expected_affected_rows;
tmp_sql.reset();
if (OB_FAIL(get_stats_history_manager_job_sql(is_oracle_mode, tenant_id,
0, exec_env, tmp_sql))) {
LOG_WARN("failed to get stats history manager job sql", K(ret));
} else if (OB_FAIL(raw_sql.append_fmt(", (%s)", tmp_sql.ptr()))) {
LOG_WARN("failed to append sql", K(ret));
} else {
++ expected_affected_rows;
tmp_sql.reset();
}
}
//set dummy guard job
if (OB_FAIL(ret)) {
} else if (OB_FAIL(get_dummy_guard_job_sql(tenant_id, job_id, tmp_sql))) {