support auto_refresh of alert log external table
This commit is contained in:
parent
5f6497f95c
commit
d7d94eca02
@ -162,7 +162,7 @@ int ObPLPackageManager::read_and_exec_package_sql(
|
||||
// but we need to create system packages with oralce compatibility
|
||||
// here hack to oracle mode
|
||||
bool eof = false;
|
||||
bool skip_affected_rows_check = false;
|
||||
bool create_external_table = false;
|
||||
ObSessionParam param;
|
||||
ObSessionParam *param_ptr = nullptr;
|
||||
char *last_slash = strrchr(const_cast<char*>(package_full_path), '/');
|
||||
@ -170,7 +170,7 @@ int ObPLPackageManager::read_and_exec_package_sql(
|
||||
int64_t sql_mode = SMO_STRICT_ALL_TABLES | SMO_NO_ZERO_IN_DATE | SMO_NO_AUTO_CREATE_USER;
|
||||
// allow affected_rows > 0 when exec sql in external_table_alert_log.sql
|
||||
if (strcmp(pacakge_filename, "external_table_alert_log.sql") == 0) {
|
||||
skip_affected_rows_check = true;
|
||||
create_external_table = true;
|
||||
param.sql_mode_ = &sql_mode;
|
||||
param_ptr = ¶m;
|
||||
}
|
||||
@ -185,7 +185,7 @@ int ObPLPackageManager::read_and_exec_package_sql(
|
||||
static_cast<int64_t>(compa_mode),
|
||||
param_ptr))) {
|
||||
LOG_WARN("fail to exec package sql", K(sql_buf), K(ret));
|
||||
} else if (affected_rows != 0 && !skip_affected_rows_check) {
|
||||
} else if (affected_rows != 0 && !create_external_table) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("affected_rows expected to be zero", K(affected_rows), K(ret));
|
||||
} else {
|
||||
@ -194,6 +194,23 @@ int ObPLPackageManager::read_and_exec_package_sql(
|
||||
}
|
||||
}
|
||||
fclose(file);
|
||||
if (create_external_table && OB_SUCC(ret)) {
|
||||
uint64_t data_version = 0;
|
||||
common::ObString alter_table_sql("alter external table sys_external_tbs.__all_external_alert_log_info auto_refresh immediate");
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, data_version))) {
|
||||
LOG_WARN("fail to get sys tenant data version", KR(ret), K(data_version));
|
||||
} else if (data_version >= DATA_VERSION_4_3_3_0) {
|
||||
if (OB_FAIL(sql_proxy.write(OB_SYS_TENANT_ID,
|
||||
alter_table_sql,
|
||||
affected_rows,
|
||||
static_cast<int64_t>(compa_mode),
|
||||
param_ptr))) {
|
||||
LOG_WARN("fail to alter auto_refresh flag of external table ", K(ret), K(alter_table_sql));
|
||||
} else {
|
||||
LOG_INFO("seccess to alter auto_refresh flag", KR(ret), K(alter_table_sql));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -680,7 +680,8 @@ ObRootService::ObRootService()
|
||||
#endif
|
||||
disaster_recovery_task_executor_(),
|
||||
disaster_recovery_task_mgr_(),
|
||||
global_ctx_task_(*this)
|
||||
global_ctx_task_(*this),
|
||||
alter_log_external_table_task_(*this)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1657,6 +1658,30 @@ int ObRootService::schedule_refresh_io_calibration_task()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::schedule_alter_log_external_table_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool did_repeat = false;
|
||||
const int64_t delay = 1L * 1000L * 1000L; //1s
|
||||
uint64_t current_data_version = 0;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (task_queue_.exist_timer_task(alter_log_external_table_task_)) {
|
||||
// ignore error
|
||||
LOG_WARN("already have one alter_log_external_table_task, ignore this");
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, current_data_version))) {
|
||||
LOG_WARN("fail to get current data version", KR(ret), "tenant_id", OB_SYS_TENANT_ID);
|
||||
} else if (OB_FAIL(alter_log_external_table_task_.init(current_data_version))) {
|
||||
LOG_WARN("fail to init alter log external table task", KR(ret), K(current_data_version));
|
||||
} else if (OB_FAIL(task_queue_.add_timer_task(alter_log_external_table_task_, delay, did_repeat))) {
|
||||
LOG_WARN("fail to add timer task", KR(ret));
|
||||
} else {
|
||||
LOG_INFO("add alter_log_external_table_task task success", KR(ret), K(current_data_version));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::submit_update_rslist_task(const bool force_update)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -11128,6 +11153,75 @@ void ObRootService::ObTenantGlobalContextCleanTimerTask::runTimerTask()
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////
|
||||
ObRootService::ObAlterLogExternalTableTask::ObAlterLogExternalTableTask(ObRootService &root_service)
|
||||
: ObAsyncTimerTask(root_service.task_queue_),
|
||||
root_service_(root_service)
|
||||
{
|
||||
set_retry_times(INT64_MAX);
|
||||
}
|
||||
|
||||
int ObRootService::ObAlterLogExternalTableTask::init(const uint64_t &data_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
pre_data_version_ = data_version;
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObAsyncTask *ObRootService::ObAlterLogExternalTableTask::deep_copy(char *buf,
|
||||
const int64_t buf_size) const
|
||||
{
|
||||
ObAlterLogExternalTableTask *task = NULL;
|
||||
if (NULL == buf || buf_size < static_cast<int64_t>(sizeof(*this))) {
|
||||
LOG_WARN_RET(OB_BUF_NOT_ENOUGH, "buffer not large enough", K(buf_size), KP(buf));
|
||||
} else {
|
||||
task = new (buf) ObAlterLogExternalTableTask(root_service_);
|
||||
task->init(pre_data_version_);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
int ObRootService::ObAlterLogExternalTableTask::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t current_data_version = 0;
|
||||
if (pre_data_version_ >= CLUSTER_VERSION_4_3_3_0) {
|
||||
LOG_INFO("table has been altered, no need to alter log external table again", KR(ret), K(pre_data_version_));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, current_data_version))) {
|
||||
LOG_WARN("fail to get current data version", KR(ret), "tenant_id", OB_SYS_TENANT_ID);
|
||||
} else if (current_data_version < CLUSTER_VERSION_4_3_3_0) {
|
||||
ret = OB_NEED_WAIT;
|
||||
LOG_INFO("upgrade is not finished, cannot run alter log external table task", KR(ret), K(current_data_version));
|
||||
} else if (OB_FAIL(alter_log_external_table_())) {
|
||||
// alter log external table failed, will retry
|
||||
LOG_WARN("fail to alter log external table", KR(ret));
|
||||
} else {
|
||||
// alter log external table succeeded, change pre_data_version_
|
||||
pre_data_version_ = current_data_version;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::ObAlterLogExternalTableTask::alter_log_external_table_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_rows = 0;
|
||||
const char *alter_table_sql = "alter external table sys_external_tbs.__all_external_alert_log_info auto_refresh immediate";
|
||||
if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("sql_proxy_ expected not null", KR(ret), K(lbt()));
|
||||
} else if (OB_FAIL(GCTX.sql_proxy_->write(OB_SYS_TENANT_ID, alter_table_sql, affected_rows))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(alter_table_sql));
|
||||
} else if (0 != affected_rows) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows), K(alter_table_sql));
|
||||
} else {
|
||||
LOG_INFO("seccess to alter auto_refresh flag", KR(ret), K(alter_table_sql));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/////////////////////////
|
||||
int ObRootService::handle_cancel_backup_backup(const obrpc::ObBackupManageArg &arg)
|
||||
{
|
||||
int ret = OB_NOT_SUPPORTED;
|
||||
|
@ -357,6 +357,24 @@ public:
|
||||
ObRootService &root_service_;
|
||||
};
|
||||
|
||||
class ObAlterLogExternalTableTask : public common::ObAsyncTimerTask
|
||||
{
|
||||
public:
|
||||
ObAlterLogExternalTableTask(ObRootService &root_service);
|
||||
virtual ~ObAlterLogExternalTableTask() {}
|
||||
int init(const uint64_t &data_version);
|
||||
public:
|
||||
virtual int process() override;
|
||||
virtual int64_t get_deep_copy_size() const override { return sizeof(*this); }
|
||||
virtual ObAsyncTask *deep_copy(char *buf, const int64_t buf_size) const override;
|
||||
private:
|
||||
int alter_log_external_table_();
|
||||
private:
|
||||
ObRootService &root_service_;
|
||||
uint64_t pre_data_version_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObAlterLogExternalTableTask);
|
||||
};
|
||||
|
||||
public:
|
||||
ObRootService();
|
||||
virtual ~ObRootService();
|
||||
@ -809,6 +827,7 @@ public:
|
||||
|
||||
int schedule_load_ddl_task();
|
||||
int schedule_refresh_io_calibration_task();
|
||||
int schedule_alter_log_external_table_task();
|
||||
// ob_admin command, must be called in ddl thread
|
||||
int force_create_sys_table(const obrpc::ObForceCreateSysTableArg &arg);
|
||||
int force_set_locality(const obrpc::ObForceSetLocalityArg &arg);
|
||||
@ -1057,6 +1076,7 @@ private:
|
||||
ObDRTaskMgr disaster_recovery_task_mgr_;
|
||||
// application context
|
||||
ObTenantGlobalContextCleanTimerTask global_ctx_task_;
|
||||
ObAlterLogExternalTableTask alter_log_external_table_task_; // repeat to succeed & no retry
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRootService);
|
||||
};
|
||||
|
@ -1205,6 +1205,7 @@ int ObUpgradeExecutor::run_upgrade_end_action_(
|
||||
const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(BEFROE_UPDATE_DATA_VERSION);
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (OB_FAIL(check_stop())) {
|
||||
|
@ -616,6 +616,7 @@ class ObString;
|
||||
ACT(BEFOR_EXEC_REBUILD_TASK,)\
|
||||
ACT(BEFORE_CREATE_HIDDEN_TABLE_IN_LOAD,)\
|
||||
ACT(BEFORE_RESTORE_CREATE_TABLETS_SSTABLE,)\
|
||||
ACT(BEFROE_UPDATE_DATA_VERSION,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
@ -1309,6 +1309,7 @@ int ObUpgradeFor4310Processor::post_upgrade_for_create_replication_role_in_oracl
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
/* =========== 4310 upgrade processor end ============= */
|
||||
|
||||
int ObUpgradeFor4320Processor::post_upgrade()
|
||||
{
|
||||
@ -1452,7 +1453,33 @@ int ObUpgradeFor4320Processor::post_upgrade_for_online_estimate_percent()
|
||||
} return ret;
|
||||
}
|
||||
|
||||
/* =========== 4310 upgrade processor end ============= */
|
||||
/* =========== 4320 upgrade processor end ============= */
|
||||
|
||||
int ObUpgradeFor4330Processor::post_upgrade()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (OB_FAIL(post_upgrade_for_external_table_flag())) {
|
||||
LOG_WARN("fail to alter log external table flag", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUpgradeFor4330Processor::post_upgrade_for_external_table_flag()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tenant_id_ != OB_SYS_TENANT_ID) {
|
||||
// do nothing
|
||||
} else if (OB_ISNULL(GCTX.root_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rootservice is null", KR(ret));
|
||||
} else if (OB_FAIL(GCTX.root_service_->schedule_alter_log_external_table_task())) {
|
||||
LOG_WARN("schedule alter log external table task failed", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
/* =========== 4330 upgrade processor end ============= */
|
||||
|
||||
/* =========== special upgrade processor end ============= */
|
||||
} // end share
|
||||
|
@ -262,7 +262,16 @@ private:
|
||||
|
||||
DEF_SIMPLE_UPGRARD_PROCESSER(4, 3, 2, 1)
|
||||
|
||||
DEF_SIMPLE_UPGRARD_PROCESSER(4, 3, 3, 0)
|
||||
class ObUpgradeFor4330Processor : public ObBaseUpgradeProcessor
|
||||
{
|
||||
public:
|
||||
ObUpgradeFor4330Processor() : ObBaseUpgradeProcessor() {}
|
||||
virtual ~ObUpgradeFor4330Processor() {}
|
||||
virtual int pre_upgrade() override { return common::OB_SUCCESS; }
|
||||
virtual int post_upgrade() override;
|
||||
private:
|
||||
int post_upgrade_for_external_table_flag();
|
||||
};
|
||||
/* =========== special upgrade processor end ============= */
|
||||
|
||||
/* =========== upgrade processor end ============= */
|
||||
|
Loading…
x
Reference in New Issue
Block a user