fix event_history delete

This commit is contained in:
maosy
2023-10-11 03:39:52 +00:00
committed by ob-robot
parent c3fc7a64ed
commit a51bfc97fd
8 changed files with 89 additions and 61 deletions

View File

@ -37,30 +37,14 @@ ObAllServerEventHistoryTableOperator &ObAllServerEventHistoryTableOperator::get_
static ObAllServerEventHistoryTableOperator instance; static ObAllServerEventHistoryTableOperator instance;
return instance; return instance;
} }
int ObAllServerEventHistoryTableOperator::async_delete() int ObAllServerEventHistoryTableOperator::async_delete()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!is_inited()) { if (!is_inited()) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
SHARE_LOG(WARN, "not init", K(ret)); SHARE_LOG(WARN, "not init", KR(ret));
} else { } else if (OB_FAIL(default_async_delete())) {
const int64_t now = ObTimeUtility::current_time(); SHARE_LOG(WARN, "failed to default async delete", KR(ret));
ObSqlString sql;
const bool is_delete = true;
if (OB_SUCCESS == ret) {
const int64_t server_delete_timestap = now - GCONF.ob_event_history_recycle_interval;
// OB_ALL_SERVER_EVENT_HISTORY has 16 partitions
for (int64_t i = 0; OB_SUCCESS == ret && i < 16; ++i) {
sql.reset();
if (OB_FAIL(sql.assign_fmt("DELETE FROM %s PARTITION(p%ld) WHERE gmt_create < usec_to_time(%ld) LIMIT 1024",
share::OB_ALL_SERVER_EVENT_HISTORY_TNAME, i, server_delete_timestap))) {
SHARE_LOG(WARN, "assign_fmt failed", K(ret));
} else if (OB_FAIL(add_task(sql, is_delete))) {
SHARE_LOG(WARN, "add_task failed", K(sql), K(is_delete), K(ret));
}
} // end for
}
} }
return ret; return ret;
} }

View File

@ -5345,6 +5345,7 @@ int ObRootService::load_server_manager()
return ret; return ret;
} }
ERRSIM_POINT_DEF(ERROR_EVENT_TABLE_CLEAR_INTERVAL);
int ObRootService::start_timer_tasks() int ObRootService::start_timer_tasks()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -5354,11 +5355,12 @@ int ObRootService::start_timer_tasks()
} }
if (OB_SUCCESS == ret && !task_queue_.exist_timer_task(event_table_clear_task_)) { if (OB_SUCCESS == ret && !task_queue_.exist_timer_task(event_table_clear_task_)) {
const int64_t delay = ObEventHistoryTableOperator::EVENT_TABLE_CLEAR_INTERVAL; const int64_t delay = ERROR_EVENT_TABLE_CLEAR_INTERVAL ? 10 * 1000 * 1000 :
ObEventHistoryTableOperator::EVENT_TABLE_CLEAR_INTERVAL;
if (OB_FAIL(task_queue_.add_repeat_timer_task_schedule_immediately(event_table_clear_task_, delay))) { if (OB_FAIL(task_queue_.add_repeat_timer_task_schedule_immediately(event_table_clear_task_, delay))) {
LOG_WARN("start event table clear task failed", K(delay), K(ret)); LOG_WARN("start event table clear task failed", K(delay), K(ret));
} else { } else {
LOG_INFO("added event_table_clear_task"); LOG_INFO("added event_table_clear_task", K(delay));
} }
} }

View File

@ -44,20 +44,9 @@ int ObRsEventHistoryTableOperator::async_delete()
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!is_inited()) { if (!is_inited()) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
SHARE_LOG(WARN, "not init", K(ret)); SHARE_LOG(WARN, "not init", KR(ret));
} else { } else if (OB_FAIL(default_async_delete())) {
const int64_t now = ObTimeUtility::current_time(); SHARE_LOG(WARN, "failed to default async delete", KR(ret));
ObSqlString sql;
const bool is_delete = true;
if (OB_SUCCESS == ret) {
const int64_t rs_delete_timestap = now - GCONF.ob_event_history_recycle_interval;
if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE gmt_create < usec_to_time(%ld) LIMIT 1024",
share::OB_ALL_ROOTSERVICE_EVENT_HISTORY_TNAME, rs_delete_timestap))) {
SHARE_LOG(WARN, "assign_fmt failed", K(ret));
} else if (OB_FAIL(add_task(sql, is_delete))) {
SHARE_LOG(WARN, "add_task failed", K(sql), K(is_delete), K(ret));
}
}
} }
return ret; return ret;
} }

View File

@ -24,6 +24,7 @@ public:
virtual ~ObRsEventHistoryTableOperator() {} virtual ~ObRsEventHistoryTableOperator() {}
int init(common::ObMySQLProxy &proxy, const common::ObAddr &self_addr); int init(common::ObMySQLProxy &proxy, const common::ObAddr &self_addr);
virtual int async_delete() override; virtual int async_delete() override;
static ObRsEventHistoryTableOperator &get_instance(); static ObRsEventHistoryTableOperator &get_instance();

View File

@ -557,6 +557,7 @@ class ObString;
ACT(BEFORE_FINISH_UNIT_NUM,)\ ACT(BEFORE_FINISH_UNIT_NUM,)\
ACT(BEFORE_CHECK_PRIMARY_ZONE,)\ ACT(BEFORE_CHECK_PRIMARY_ZONE,)\
ACT(BEFORE_RELOAD_UNIT,)\ ACT(BEFORE_RELOAD_UNIT,)\
ACT(BEFORE_PROCESS_EVENT_TASK,)\
ACT(MAX_DEBUG_SYNC_POINT,) ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -39,6 +39,7 @@ ObEventTableClearTask::ObEventTableClearTask(
int ObEventTableClearTask::process() int ObEventTableClearTask::process()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (!rs_event_operator_.is_inited()) { if (!rs_event_operator_.is_inited()) {
ret = OB_INNER_STAT_ERROR; ret = OB_INNER_STAT_ERROR;
LOG_WARN("rs_event_operator not init", K(ret)); LOG_WARN("rs_event_operator not init", K(ret));
@ -48,12 +49,16 @@ int ObEventTableClearTask::process()
} else if (!deadlock_history_operator_.is_inited()) { } else if (!deadlock_history_operator_.is_inited()) {
ret = OB_INNER_STAT_ERROR; ret = OB_INNER_STAT_ERROR;
LOG_WARN("deadlock_history_operator_ not init", K(ret)); LOG_WARN("deadlock_history_operator_ not init", K(ret));
} else if (OB_FAIL(rs_event_operator_.async_delete())) { } else {
LOG_WARN("async_delete failed", K(ret)); if (OB_TMP_FAIL(rs_event_operator_.async_delete())) {
} else if (OB_FAIL(server_event_operator_.async_delete())) { LOG_WARN("async_delete failed", KR(tmp_ret));
LOG_WARN("async_delete failed", K(ret)); }
} else if (OB_FAIL(deadlock_history_operator_.async_delete())) { if (OB_TMP_FAIL(server_event_operator_.async_delete())) {
LOG_WARN("async_delete failed", K(ret)); LOG_WARN("async_delete failed", KR(tmp_ret));
}
if (OB_TMP_FAIL(deadlock_history_operator_.async_delete())) {
LOG_WARN("async_delete failed", KR(tmp_ret));
}
} }
return ret; return ret;
} }
@ -74,8 +79,9 @@ ObAsyncTask *ObEventTableClearTask::deep_copy(char *buf, const int64_t buf_size)
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
ObEventHistoryTableOperator::ObEventTableUpdateTask::ObEventTableUpdateTask( ObEventHistoryTableOperator::ObEventTableUpdateTask::ObEventTableUpdateTask(
ObEventHistoryTableOperator &table_operator, const bool is_delete) ObEventHistoryTableOperator &table_operator, const bool is_delete, const int64_t create_time)
: IObDedupTask(T_RS_ET_UPDATE), table_operator_(table_operator), is_delete_(is_delete) : IObDedupTask(T_RS_ET_UPDATE), table_operator_(table_operator), is_delete_(is_delete),
create_time_(create_time)
{ {
} }
@ -128,6 +134,7 @@ bool ObEventHistoryTableOperator::ObEventTableUpdateTask::operator==(
} else { } else {
is_equal = (&(this->table_operator_) == &(o.table_operator_)) is_equal = (&(this->table_operator_) == &(o.table_operator_))
&& this->sql_ == o.sql_ && this->is_delete_ == o.is_delete_; && this->sql_ == o.sql_ && this->is_delete_ == o.is_delete_;
//no need take care of create_time
} }
} }
return is_equal; return is_equal;
@ -143,7 +150,7 @@ IObDedupTask *ObEventHistoryTableOperator::ObEventTableUpdateTask::deep_copy(
LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid argument", "buf", reinterpret_cast<int64_t>(buf), K(buf_size), LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid argument", "buf", reinterpret_cast<int64_t>(buf), K(buf_size),
"need size", get_deep_copy_size()); "need size", get_deep_copy_size());
} else { } else {
task = new (buf) ObEventTableUpdateTask(table_operator_, is_delete_); task = new (buf) ObEventTableUpdateTask(table_operator_, is_delete_, create_time_);
char *ptr = buf + sizeof(ObEventTableUpdateTask); char *ptr = buf + sizeof(ObEventTableUpdateTask);
MEMCPY(ptr, sql_.ptr(), sql_.length()); MEMCPY(ptr, sql_.ptr(), sql_.length());
task->assign_ptr(ptr, sql_.length()); task->assign_ptr(ptr, sql_.length());
@ -157,8 +164,8 @@ int ObEventHistoryTableOperator::ObEventTableUpdateTask::process()
if (!this->is_valid()) { if (!this->is_valid()) {
ret = OB_INNER_STAT_ERROR; ret = OB_INNER_STAT_ERROR;
LOG_WARN("invalid event task update task", "task", *this, K(ret)); LOG_WARN("invalid event task update task", "task", *this, K(ret));
} else if (OB_FAIL(table_operator_.process_task(sql_, is_delete_))) { } else if (OB_FAIL(table_operator_.process_task(sql_, is_delete_, create_time_))) {
LOG_WARN("process_task failed", K_(sql), K_(is_delete), K(ret)); LOG_WARN("process_task failed", K_(sql), K_(is_delete), KR(ret), K(create_time_));
} }
return ret; return ret;
} }
@ -268,7 +275,28 @@ int ObEventHistoryTableOperator::gen_event_ts(int64_t &event_ts)
return ret; return ret;
} }
int ObEventHistoryTableOperator::add_task(const ObSqlString &sql, const bool is_delete) int ObEventHistoryTableOperator::default_async_delete()
{
int ret = OB_SUCCESS;
if (!is_inited()) {
ret = OB_NOT_INIT;
SHARE_LOG(WARN, "not init", K(ret));
} else {
const int64_t now = ObTimeUtility::current_time();
ObSqlString sql;
const bool is_delete = true;
const int64_t delete_timestap = now - GCONF.ob_event_history_recycle_interval;
if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE gmt_create < usec_to_time(%ld) LIMIT 1024",
event_table_name_, delete_timestap))) {
SHARE_LOG(WARN, "assign_fmt failed", K(ret), K(event_table_name_));
} else if (OB_FAIL(add_task(sql, is_delete, now))) {
SHARE_LOG(WARN, "add_task failed", K(sql), K(is_delete), K(ret));
}
}
return ret;
}
int ObEventHistoryTableOperator::add_task(const ObSqlString &sql, const bool is_delete, const int64_t create_time)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!inited_) { if (!inited_) {
@ -281,7 +309,9 @@ int ObEventHistoryTableOperator::add_task(const ObSqlString &sql, const bool is_
ret = OB_CANCELED; ret = OB_CANCELED;
LOG_WARN("observer is stopped, cancel task", K(sql), K(is_delete), K(ret)); LOG_WARN("observer is stopped, cancel task", K(sql), K(is_delete), K(ret));
} else { } else {
ObEventTableUpdateTask task(*this, is_delete); int64_t new_create_time = OB_INVALID_TIMESTAMP == create_time ?
ObTimeUtility::current_time() : create_time;
ObEventTableUpdateTask task(*this, is_delete, new_create_time);
if (OB_FAIL(task.init(sql.ptr(), sql.length() + 1))) { // extra byte for '\0' if (OB_FAIL(task.init(sql.ptr(), sql.length() + 1))) { // extra byte for '\0'
LOG_WARN("task init error", K(ret)); LOG_WARN("task init error", K(ret));
} else if (OB_FAIL(event_queue_.add_task(task))) { } else if (OB_FAIL(event_queue_.add_task(task))) {
@ -289,7 +319,7 @@ int ObEventHistoryTableOperator::add_task(const ObSqlString &sql, const bool is_
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("duplicated task is not expected to exist", K(task), K(ret)); LOG_WARN("duplicated task is not expected to exist", K(task), K(ret));
} else { } else {
LOG_WARN("event_queue_ add_task failed", K(task), K(ret)); LOG_WARN("event_queue_ add_task failed", K(task), K(ret), K(new_create_time));
} }
} else { } else {
// do nothing // do nothing
@ -299,9 +329,11 @@ int ObEventHistoryTableOperator::add_task(const ObSqlString &sql, const bool is_
return ret; return ret;
} }
int ObEventHistoryTableOperator::process_task(const ObString &sql, const bool is_delete) int ObEventHistoryTableOperator::process_task(const ObString &sql, const bool is_delete, const int64_t create_time)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
DEBUG_SYNC(BEFORE_PROCESS_EVENT_TASK);
if (!inited_) { if (!inited_) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret)); LOG_WARN("not init", K(ret));
@ -324,17 +356,35 @@ int ObEventHistoryTableOperator::process_task(const ObString &sql, const bool is
} else { } else {
int64_t cnt = 0; int64_t cnt = 0;
const int64_t MAX_DELETE_TIMES = 10; const int64_t MAX_DELETE_TIMES = 10;
int tmp_ret = OB_SUCCESS;
while (OB_SUCCESS == ret && !stopped_) { while (OB_SUCCESS == ret && !stopped_) {
if (OB_FAIL(proxy_->write(sql.ptr(), affected_rows))) { if (OB_FAIL(proxy_->write(sql.ptr(), affected_rows))) {
LOG_WARN("execute sql failed", K(sql), K(ret)); LOG_WARN("execute sql failed", K(sql), K(ret));
} else if (0 == affected_rows) { } else if (0 == affected_rows) {
LOG_INFO("finished to delete from event history table", K(sql)); LOG_INFO("finished to delete from event history table", K(sql), K(create_time));
break; break;
} else if (cnt > MAX_DELETE_TIMES) { } else if (cnt > MAX_DELETE_TIMES) {
LOG_INFO("delete cnt reach limit, schedule next round", K(sql)); LOG_INFO("delete cnt reach limit, schedule next round", K(sql), K(create_time));
if (OB_INVALID_TIMESTAMP == create_time) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("create time is invalid", KR(ret), K(create_time), K(sql));
} else if (ObTimeUtility::current_time() - create_time > EVENT_TABLE_CLEAR_INTERVAL) {
//has new clear task, no need add task again
LOG_INFO("maybe has new clear task, no need add task again", K(create_time));
} else {
ObSqlString new_sql;
const bool is_delete = true;
if (OB_TMP_FAIL(new_sql.assign(sql))) {
LOG_WARN("failed to assign sql", KR(tmp_ret), K(sql));
} else if (OB_TMP_FAIL(add_task(new_sql, is_delete, create_time))) {
LOG_WARN("failed to add task", KR(tmp_ret), K(new_sql), K(create_time));
} else {
LOG_INFO("has event need delete, add task again", K(new_sql), K(create_time));
}
}
break; break;
} else { } else {
LOG_INFO("delete rows from event history table", K(affected_rows), K(sql)); LOG_INFO("delete rows from event history table", K(affected_rows), K(sql), K(cnt));
cnt++; cnt++;
} }
} }

View File

@ -68,7 +68,8 @@ public:
class ObEventTableUpdateTask : public common::IObDedupTask class ObEventTableUpdateTask : public common::IObDedupTask
{ {
public: public:
ObEventTableUpdateTask(ObEventHistoryTableOperator &table_operator, const bool is_delete); ObEventTableUpdateTask(ObEventHistoryTableOperator &table_operator, const bool is_delete,
const int64_t create_time);
virtual ~ObEventTableUpdateTask() {} virtual ~ObEventTableUpdateTask() {}
int init(const char *ptr, const int64_t buf_size); int init(const char *ptr, const int64_t buf_size);
bool is_valid() const; bool is_valid() const;
@ -82,11 +83,12 @@ public:
void assign_ptr(char *ptr, const int64_t buf_size) void assign_ptr(char *ptr, const int64_t buf_size)
{ sql_.assign_ptr(ptr, static_cast<int32_t>(buf_size));} { sql_.assign_ptr(ptr, static_cast<int32_t>(buf_size));}
TO_STRING_KV(K_(sql), K_(is_delete)); TO_STRING_KV(K_(sql), K_(is_delete), K_(create_time));
private: private:
ObEventHistoryTableOperator &table_operator_; ObEventHistoryTableOperator &table_operator_;
common::ObString sql_; common::ObString sql_;
bool is_delete_; bool is_delete_;
int64_t create_time_;
DISALLOW_COPY_AND_ASSIGN(ObEventTableUpdateTask); DISALLOW_COPY_AND_ASSIGN(ObEventTableUpdateTask);
}; };
@ -139,6 +141,7 @@ public:
virtual int async_delete() = 0; virtual int async_delete() = 0;
protected: protected:
virtual int default_async_delete();
// recursive begin // recursive begin
template <int Floor, typename Name, typename Value, typename ...Rest> template <int Floor, typename Name, typename Value, typename ...Rest>
int sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Name &&name, Value &&value, Rest &&...others); int sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Name &&name, Value &&value, Rest &&...others);
@ -160,7 +163,8 @@ protected:
const common::ObAddr &get_addr() const { return self_addr_; } const common::ObAddr &get_addr() const { return self_addr_; }
void set_event_table(const char* tname) { event_table_name_ = tname; } void set_event_table(const char* tname) { event_table_name_ = tname; }
const char *get_event_table() const { return event_table_name_; } const char *get_event_table() const { return event_table_name_; }
int add_task(const common::ObSqlString &sql, const bool is_delete = false); int add_task(const common::ObSqlString &sql, const bool is_delete = false,
const int64_t create_time = OB_INVALID_TIMESTAMP);
int gen_event_ts(int64_t &event_ts); int gen_event_ts(int64_t &event_ts);
protected: protected:
static constexpr const char * names[7] = {"name1", "name2", "name3", "name4", "name5", "name6", "extra_info"}; // only valid in compile time static constexpr const char * names[7] = {"name1", "name2", "name3", "name4", "name5", "name6", "extra_info"}; // only valid in compile time
@ -170,12 +174,9 @@ protected:
static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE; static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE;
static const int64_t TASK_MAP_SIZE = 20 * 1024; static const int64_t TASK_MAP_SIZE = 20 * 1024;
static const int64_t TASK_QUEUE_SIZE = 20 *1024; static const int64_t TASK_QUEUE_SIZE = 20 *1024;
static const int64_t RS_EVENT_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 7DAY
static const int64_t SERVER_EVENT_HISTORY_DELETE_TIME = 2L * 24L * 3600L * 1000L * 1000L; // 2DAY
static const int64_t UNIT_LOAD_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 2DAY
static const int64_t MAX_RETRY_COUNT = 12; static const int64_t MAX_RETRY_COUNT = 12;
virtual int process_task(const common::ObString &sql, const bool is_delete); virtual int process_task(const common::ObString &sql, const bool is_delete, const int64_t create_time);
private: private:
bool inited_; bool inited_;
volatile bool stopped_; volatile bool stopped_;

View File

@ -114,7 +114,7 @@ IS_TENANT_STATUS(prepare_flashback_for_switch_to_primary)
TO_STRING_KV(K_(tenant_id), K_(tenant_role), K_(switchover_status), TO_STRING_KV(K_(tenant_id), K_(tenant_role), K_(switchover_status),
K_(switchover_epoch), K_(sync_scn), K_(replayable_scn), K_(switchover_epoch), K_(sync_scn), K_(replayable_scn),
K_(standby_scn), K_(recovery_until_scn), K_(log_mode)); K_(standby_scn), K_(recovery_until_scn), K_(log_mode), K_(max_ls_id));
DECLARE_TO_YSON_KV; DECLARE_TO_YSON_KV;
// Getter&Setter // Getter&Setter