fix: dbms_scheduler patch 421bp4 bugfix to master

This commit is contained in:
obdev
2024-03-04 13:21:04 +00:00
committed by ob-robot
parent 8ca85b06ab
commit 2fb8107632
5 changed files with 247 additions and 110 deletions

View File

@ -247,7 +247,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(uint64_t tenant_id, bool is_oracl
errmsg = ObString(strlen(ob_errpkt_strerror(ret, lib::is_oracle_mode())),
ob_errpkt_strerror(ret, lib::is_oracle_mode()));
}
if ((tmp_ret = table_operator_.update_for_end(tenant_id, job_info, ret, errmsg)) != OB_SUCCESS) {
if ((tmp_ret = table_operator_.update_for_end(job_info, ret, errmsg)) != OB_SUCCESS) {
LOG_WARN("update dbms sched job failed", K(tmp_ret), K(ret));
}
ret = OB_SUCCESS == ret ? tmp_ret : ret;

View File

@ -294,7 +294,7 @@ int64_t ObDBMSSchedJobMaster::run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJ
LOG_WARN("failed to run dbms sched job", K(ret), K(job_info), KPC(job_key));
if (is_server_down_error(ret)) {
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "send job rpc failed"))) {
if (OB_SUCCESS != (tmp = table_operator_.update_for_rollback(job_info))) {
LOG_WARN("update for end failed for send rpc failed job", K(tmp), K(job_info), KPC(job_key));
}
}
@ -379,7 +379,7 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
} else if (job_info.is_running()) {
LOG_INFO("job is running now, retry later", K(job_info));
if (now > job_info.get_this_date() + TO_TS(job_info.get_max_run_duration())) {
if (OB_FAIL(table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job timeout"))) {
if (OB_FAIL(table_operator_.update_for_timeout(job_info))) {
LOG_WARN("update for end failed for timeout job", K(ret));
} else {
LOG_WARN("job is timeout, force update for end", K(job_info), K(now));
@ -387,7 +387,7 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
}
} else if (now > job_info.get_end_date()) {
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check expired job"))) {
if (OB_SUCCESS != (tmp = table_operator_.update_for_enddate(job_info))) {
LOG_WARN("update for end failed for auto drop job", K(tmp), K(job_info));
} else {
LOG_WARN("update for end for expired job", K(job_info), K(now));
@ -406,7 +406,7 @@ int ObDBMSSchedJobMaster::scheduler_job(ObDBMSSchedJobKey *job_key)
LOG_WARN("job maybe missed, ignore it", K(now), K(job_info));
int64_t new_next_date = calc_next_date(job_info);
int tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = table_operator_.update_for_end(job_info.get_tenant_id(), job_info, 0, "check job missed"))) {
if (OB_SUCCESS != (tmp = table_operator_.update_for_missed(job_info))) {
LOG_WARN("update for end failed for missed job", K(tmp));
} else if (OB_SUCCESS != (tmp = table_operator_.update_next_date(job_info.get_tenant_id(), job_info, new_next_date))){
LOG_WARN("update next date failed", K(tmp), K(job_info));

View File

@ -81,6 +81,7 @@ int ObDBMSSchedJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSSchedJobI
OZ (ob_write_string(allocator, other.job_name_, job_name_));
OZ (ob_write_string(allocator, other.job_class_, job_class_));
OZ (ob_write_string(allocator, other.program_name_, program_name_));
OZ (ob_write_string(allocator, other.state_, state_));
return ret;
}

View File

@ -120,121 +120,166 @@ int ObDBMSSchedTableOperator::seperate_job_id_from_name(ObString &job_name, int6
}
int ObDBMSSchedTableOperator::update_for_end(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg)
int ObDBMSSchedTableOperator::_build_job_drop_dml(int64_t now, ObDBMSSchedJobInfo &job_info, ObSqlString &sql)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
int64_t tenant_id = job_info.tenant_id_;
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.splice_delete_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
return ret;
}
int ObDBMSSchedTableOperator::_build_job_finished_dml(int64_t now, ObDBMSSchedJobInfo &job_info, ObSqlString &sql)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
int64_t tenant_id = job_info.tenant_id_;
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job", job_info.job_));
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.add_column("state", job_info.state_));
if (0 == job_info.state_.case_compare("COMPLETED")) {
OZ (dml.add_column("enabled", false));
}
OZ (dml.add_column(true, "this_date"));
OZ (dml.add_time_column("last_date", job_info.this_date_));
OZ (dml.add_time_column("next_date", job_info.next_date_));
OZ (dml.add_column("failures", job_info.failures_));
OZ (dml.add_column("flag", job_info.flag_));
OZ (dml.add_column("total", job_info.total_));
OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
return ret;
}
int ObDBMSSchedTableOperator::_build_job_rollback_start_dml(ObDBMSSchedJobInfo &job_info, ObSqlString &sql)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
int64_t tenant_id = job_info.tenant_id_;
OZ (dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml.add_pk_column("job", job_info.job_));
OZ (dml.add_pk_column("job_name", job_info.job_name_));
OZ (dml.add_column(true, "this_date"));
OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
return ret;
}
int ObDBMSSchedTableOperator::_build_job_log_dml(
int64_t now, ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg, ObSqlString &sql)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer dml;
int64_t tenant_id = job_info.tenant_id_;
OZ (dml.add_gmt_create(now));
OZ (dml.add_gmt_modified(now));
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
int64_t job_id = 0;
OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id));
if (job_id <= 0) {
job_id = job_info.get_job_id();
}
OZ (dml.add_pk_column("job", job_id));
OZ (dml.add_time_column("time", now));
OZ (dml.add_column("code", err));
OZ (dml.add_column("message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
OZ (dml.add_column("job_class", job_info.job_class_));
OZ (dml.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME, sql));
return ret;
}
int ObDBMSSchedTableOperator::_check_need_record(ObDBMSSchedJobInfo &job_info, bool &need_record, bool err_state)
{
int ret = OB_SUCCESS;
uint64_t data_version = 0;
int64_t tenant_id = job_info.tenant_id_;
need_record = true;
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else if (MOCK_DATA_VERSION <= data_version) {
ObDBMSSchedJobClassInfo job_class_info;
ObArenaAllocator allocator("DBMSSchedTmp");
CK (OB_LIKELY(!job_info.job_class_.empty()));
OZ (get_dbms_sched_job_class_info(tenant_id, job_info.is_oracle_tenant(), job_info.get_job_class(), allocator, job_class_info));
if (OB_SUCC(ret)) {
ObString logging_level = job_class_info.get_logging_level();
if (logging_level.empty()) {
LOG_WARN("logging_level may not assigned");
} else if (0 == logging_level.case_compare("OFF")) {
need_record = false;
} else if (0 == logging_level.case_compare("RUNS")) {
need_record = true;
} else if (0 == logging_level.case_compare("FAILED RUNS") && err_state) {
need_record = true;
}
}
}
return ret;
}
int ObDBMSSchedTableOperator::update_for_missed(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObDMLSqlSplicer dml1;
ObSqlString sql1;
ObDMLSqlSplicer dml2;
ObSqlString sql2;
ObSqlString sql;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
UNUSED(errmsg);
bool need_record = true;
int64_t tenant_id = job_info.tenant_id_;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (_check_need_record(job_info, need_record));
uint64_t data_version = 0;
if (OB_SUCC(ret)) {
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
} else if (MOCK_DATA_VERSION <= data_version) {
CK (OB_LIKELY(!job_info.job_class_.empty()));
}
if (OB_SUCC(ret) && need_record) {
OZ (_build_job_log_dml(now, job_info, 0, "check job missed", sql));
OZ (trans.start(sql_proxy_, tenant_id, true));
OZ (trans.write(tenant_id, sql.ptr(), affected_rows));
}
ObDBMSSchedJobClassInfo job_class_info;
ObArenaAllocator allocator("DBMSSchedTmp");
if (MOCK_DATA_VERSION <= data_version) {
OZ (get_dbms_sched_job_class_info(tenant_id, job_info.is_oracle_tenant(), job_info.get_job_class(), allocator, job_class_info));
}
// when if failures > 16 then set broken flag.
OX (job_info.failures_ = errmsg.empty() ? 0 : (job_info.failures_ + 1));
OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
if ((now >= job_info.end_date_ || job_info.get_interval_ts() == 0) && (true == job_info.auto_drop_)) {
// when end_date is reach or no interval set, and auto_drop is set true, drop job.
OZ (dml1.add_gmt_modified(now));
OZ (dml1.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml1.add_pk_column("job_name", job_info.job_name_));
OZ (dml1.splice_delete_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
} else {
if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
// when if failures > 16 then set broken state.
job_info.next_date_ = 64060560000000000; // 4000-01-01
OZ (dml1.add_column("state", "BROKEN"));
} else if (now >= job_info.end_date_) {
// when end_date is reach and auto_drop is set false, disable set completed state.
job_info.enabled_ = false;
OZ (dml1.add_column("state", "COMPLETED"));
OZ (dml1.add_column("enabled", job_info.enabled_));
}
CK (job_info.this_date_ > 0 || !errmsg.empty());
OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0));
OZ (dml1.add_gmt_modified(now));
OZ (dml1.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
OZ (dml1.add_pk_column("job", job_info.job_));
OZ (dml1.add_pk_column("job_name", job_info.job_name_));
OZ (dml1.add_column(true, "this_date"));
OZ (dml1.add_time_column("last_date", job_info.this_date_));
OZ (dml1.add_time_column("next_date", job_info.next_date_));
OZ (dml1.add_column("failures", job_info.failures_));
OZ (dml1.add_column("flag", job_info.failures_ > 16 ? 1 : job_info.flag_));
OZ (dml1.add_column("total", job_info.total_));
OZ (dml1.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
}
//If a non-existent JOB CLASS is entered when creating a JOB,
//job_run_detail still needs to be recorded.
bool need_write_job_run_detail = true;
if (MOCK_DATA_VERSION <= data_version) {
ObString logging_level = job_class_info.get_logging_level();
if (logging_level.empty()) {
LOG_WARN("logging_level may not assigned");
} else if (0 == logging_level.case_compare("OFF")) {
need_write_job_run_detail = false;
} else if (0 == logging_level.case_compare("RUNS")) {
need_write_job_run_detail = true;
} else if (0 == logging_level.case_compare("FAILED RUNS") && !errmsg.empty()) {
need_write_job_run_detail = true;
}
}
if (need_write_job_run_detail) {
OZ (dml2.add_gmt_create(now));
OZ (dml2.add_gmt_modified(now));
OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
int64_t job_id = 0;
OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id));
if (job_id <= 0) {
job_id = job_info.get_job_id();
}
OZ (dml2.add_pk_column("job", job_id));
OZ (dml2.add_time_column("time", now));
OZ (dml2.add_column("code", err));
OZ (dml2.add_column(
"message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
if (MOCK_DATA_VERSION <= data_version) {
OZ (dml2.add_column("job_class", job_info.job_class_));
}
OZ (dml2.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME, sql2));
}
OZ (trans.start(sql_proxy_, tenant_id, true));
OZ (trans.write(tenant_id, sql1.ptr(), affected_rows));
if (need_write_job_run_detail) {
OZ (trans.write(tenant_id, sql2.ptr(), affected_rows));
}
int tmp_ret = OB_SUCCESS;
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
return ret;
}
int ObDBMSSchedTableOperator::update_for_rollback(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObSqlString sql1;
ObSqlString sql2;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
bool need_record = true;
int64_t tenant_id = job_info.tenant_id_;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (_check_need_record(job_info, need_record));
OZ (_build_job_rollback_start_dml(job_info, sql1));
if (OB_SUCC(ret) && need_record) {
OZ (_build_job_log_dml(now, job_info, 0, "send job rpc failed", sql2));
}
OZ (trans.start(sql_proxy_, job_info.tenant_id_, true));
OZ (trans.write(job_info.tenant_id_, sql1.ptr(), affected_rows));
if (OB_SUCC(ret) && need_record) {
OZ (trans.write(job_info.tenant_id_, sql2.ptr(), affected_rows));
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
@ -244,6 +289,72 @@ int ObDBMSSchedTableOperator::update_for_end(
return ret;
}
int ObDBMSSchedTableOperator::update_for_enddate(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
OZ (update_for_end(job_info, 0, "check job enddate"));
return ret;
}
int ObDBMSSchedTableOperator::update_for_timeout(ObDBMSSchedJobInfo &job_info)
{
int ret = OB_SUCCESS;
OZ (update_for_end(job_info, -4012, "check job timeout"));
return ret;
}
int ObDBMSSchedTableOperator::update_for_end(ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObSqlString sql1;
ObSqlString sql2;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
bool need_record = true;
int64_t tenant_id = job_info.tenant_id_;
CK (OB_NOT_NULL(sql_proxy_));
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
OZ (_check_need_record(job_info, need_record, false));
if (OB_FAIL(ret)) {
} else if ((now >= job_info.end_date_ || job_info.get_interval_ts() == 0) && (true == job_info.auto_drop_)) {
OZ (_build_job_drop_dml(now, job_info, sql1));
} else {
OX (job_info.failures_ = (err == 0) ? 0 : (job_info.failures_ + 1));
OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0));
if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
// when if failures > 16 then set broken state.
job_info.next_date_ = 64060560000000000; // 4000-01-01
job_info.state_ = ObString("BROKEN");
} else if (now >= job_info.end_date_) {
// when end_date is reach and auto_drop is set false, disable set completed state.
job_info.state_ = ObString("COMPLETED");
}
OZ (_build_job_finished_dml(now, job_info, sql1));
}
if (OB_SUCC(ret) && need_record) {
OZ (_build_job_log_dml(now, job_info, err, errmsg, sql2));
}
OZ (trans.start(sql_proxy_, tenant_id, true));
OZ (trans.write(tenant_id, sql1.ptr(), affected_rows));
if (OB_SUCC(ret) && need_record) {
OZ (trans.write(tenant_id, sql2.ptr(), affected_rows));
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
return ret;
}
int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running)
{
int ret = OB_SUCCESS;
@ -338,6 +449,10 @@ do { \
} while (false)
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "gmt_modified", job_info_local.last_modify_);
//lowner not used
//powner not used
//cowner not used
//last_modify not used
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "last_date", job_info_local.last_date_);
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "this_date", job_info_local.this_date_);
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "next_date", job_info_local.next_date_);
@ -358,12 +473,24 @@ do { \
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "scheduler_flags", job_info_local.scheduler_flags_, uint64_t);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "exec_env", job_info_local.exec_env_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_name", job_info_local.job_name_);
//job_style not used
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class", job_info_local.job_class_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "program_name", job_info_local.program_name_);
//job_type not used
//job_action not used
//number_of_argument not used
//repeat_interval not used
EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "enabled", job_info_local.enabled_);
EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "auto_drop", job_info_local.auto_drop_);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "state", job_info_local.state_);
//run_count not used
//retry_count not used
//last_run_duration not used
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "interval_ts", job_info_local.interval_ts_, uint64_t);
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "max_run_duration", job_info_local.max_run_duration_, uint64_t);
//comments not used
//credential_name not used
//destination_name not used
OZ (job_info.deep_copy(allocator, job_info_local));

View File

@ -56,8 +56,17 @@ public:
int update_for_start(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date);
int update_for_end(
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const common::ObString &errmsg);
int _build_job_drop_dml(int64_t now, ObDBMSSchedJobInfo &job_info, ObSqlString &sql);
int _build_job_finished_dml(int64_t now, ObDBMSSchedJobInfo &job_info, ObSqlString &sql);
int _build_job_rollback_start_dml(ObDBMSSchedJobInfo &job_info, ObSqlString &sql);
int _build_job_log_dml(int64_t now, ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg, ObSqlString &sql);
int _check_need_record(ObDBMSSchedJobInfo &job_info, bool &need_record, bool err_state = true);
int update_for_missed(ObDBMSSchedJobInfo &job_info);
int update_for_enddate(ObDBMSSchedJobInfo &job_info);
int update_for_rollback(ObDBMSSchedJobInfo &job_info);
int update_for_timeout(ObDBMSSchedJobInfo &job_info);
int update_for_end(ObDBMSSchedJobInfo &job_info, int err, const common::ObString &errmsg);
int seperate_job_id_from_name(common::ObString &job_name, int64_t &job_id);