Fix archive statistic core
This commit is contained in:
@ -218,7 +218,7 @@ int ObArchiveFetcher::submit_log_fetch_task(ObArchiveLogFetchTask *task)
|
|||||||
RETRY_FUNC_ON_ERROR(OB_SIZE_OVERFLOW, has_set_stop(), task_queue_, push, task);
|
RETRY_FUNC_ON_ERROR(OB_SIZE_OVERFLOW, has_set_stop(), task_queue_, push, task);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ARCHIVE_LOG(INFO, "submit log fetch task succ", KPC(task));
|
ARCHIVE_LOG(INFO, "submit log fetch task succ", KP(task));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -355,7 +355,7 @@ int ObArchiveFetcher::handle_log_fetch_task_(ObArchiveLogFetchTask &task)
|
|||||||
PalfHandleGuard palf_handle_guard;
|
PalfHandleGuard palf_handle_guard;
|
||||||
TmpMemoryHelper helper(unit_size_, allocator_);
|
TmpMemoryHelper helper(unit_size_, allocator_);
|
||||||
ObArchiveSendTask *send_task = NULL;
|
ObArchiveSendTask *send_task = NULL;
|
||||||
const ObLSID &id = task.get_ls_id();
|
const ObLSID id = task.get_ls_id();
|
||||||
const ArchiveWorkStation &station = task.get_station();
|
const ArchiveWorkStation &station = task.get_station();
|
||||||
ArchiveKey key = station.get_round();
|
ArchiveKey key = station.get_round();
|
||||||
LSN commit_lsn;
|
LSN commit_lsn;
|
||||||
@ -389,7 +389,7 @@ int ObArchiveFetcher::handle_log_fetch_task_(ObArchiveLogFetchTask &task)
|
|||||||
} else if (OB_FAIL(submit_fetch_log_(task, submit_log))) {
|
} else if (OB_FAIL(submit_fetch_log_(task, submit_log))) {
|
||||||
ARCHIVE_LOG(WARN, "submit send task failed", K(ret), KPC(send_task));
|
ARCHIVE_LOG(WARN, "submit send task failed", K(ret), KPC(send_task));
|
||||||
} else {
|
} else {
|
||||||
ARCHIVE_LOG(INFO, "handle log fetch task succ", K(task));
|
ARCHIVE_LOG(INFO, "handle log fetch task succ", K(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 0. task submit to sort queue, do nothing
|
// 0. task submit to sort queue, do nothing
|
||||||
@ -766,7 +766,7 @@ int ObArchiveFetcher::submit_fetch_log_(ObArchiveLogFetchTask &task, bool &submi
|
|||||||
ARCHIVE_LOG(WARN, "push fetch log failed", K(ret), K(task));
|
ARCHIVE_LOG(WARN, "push fetch log failed", K(ret), K(task));
|
||||||
} else {
|
} else {
|
||||||
submitted = true;
|
submitted = true;
|
||||||
ARCHIVE_LOG(INFO, "push fetch log succ", K(task));
|
ARCHIVE_LOG(INFO, "push fetch log succ", KP(&task));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -895,6 +895,7 @@ int ObArchiveFetcher::submit_residual_log_fetch_task_(ObArchiveLogFetchTask &tas
|
|||||||
const ObArchivePiece &cur_piece = task.get_piece();
|
const ObArchivePiece &cur_piece = task.get_piece();
|
||||||
const LSN &start_offset = task.get_start_offset();
|
const LSN &start_offset = task.get_start_offset();
|
||||||
const LSN &cur_offset = task.get_cur_offset();
|
const LSN &cur_offset = task.get_cur_offset();
|
||||||
|
const ObLSID id = task.get_ls_id();
|
||||||
|
|
||||||
if (OB_UNLIKELY(cur_piece.is_valid() && cur_offset == start_offset)) {
|
if (OB_UNLIKELY(cur_piece.is_valid() && cur_offset == start_offset)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
@ -902,7 +903,7 @@ int ObArchiveFetcher::submit_residual_log_fetch_task_(ObArchiveLogFetchTask &tas
|
|||||||
} else if (OB_FAIL(task_queue_.push(&task))) {
|
} else if (OB_FAIL(task_queue_.push(&task))) {
|
||||||
ARCHIVE_LOG(WARN, "push task failed", K(ret), K(task));
|
ARCHIVE_LOG(WARN, "push task failed", K(ret), K(task));
|
||||||
} else {
|
} else {
|
||||||
ARCHIVE_LOG(INFO, "submit residual log fetch task succ", K(task));
|
ARCHIVE_LOG(INFO, "submit residual log fetch task succ", KP(&task));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -913,7 +914,7 @@ int ObArchiveFetcher::submit_send_task_(ObArchiveSendTask *send_task)
|
|||||||
if (OB_FAIL(archive_sender_->submit_send_task(send_task))) {
|
if (OB_FAIL(archive_sender_->submit_send_task(send_task))) {
|
||||||
ARCHIVE_LOG(WARN, "submit send task failed", K(ret), KPC(send_task));
|
ARCHIVE_LOG(WARN, "submit send task failed", K(ret), KPC(send_task));
|
||||||
} else {
|
} else {
|
||||||
ARCHIVE_LOG(INFO, "submit send task succ");
|
ARCHIVE_LOG(INFO, "submit send task succ", KP(send_task));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -185,7 +185,7 @@ int ObArchiveSender::push_task_status(ObArchiveTaskStatus *task_status)
|
|||||||
} else if (OB_FAIL(task_queue_.push(task_status))) {
|
} else if (OB_FAIL(task_queue_.push(task_status))) {
|
||||||
ARCHIVE_LOG(WARN, "push fail", K(ret), KPC(task_status));
|
ARCHIVE_LOG(WARN, "push fail", K(ret), KPC(task_status));
|
||||||
} else {
|
} else {
|
||||||
ARCHIVE_LOG(INFO, "push succ", KPC(task_status));
|
ARCHIVE_LOG(INFO, "push succ", KP(task_status));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -261,7 +261,6 @@ void ObArchiveSender::do_thread_task_()
|
|||||||
} else if (! task_exist) {
|
} else if (! task_exist) {
|
||||||
} else if (FALSE_IT(handle(*task, consume_status))) {
|
} else if (FALSE_IT(handle(*task, consume_status))) {
|
||||||
} else {
|
} else {
|
||||||
ARCHIVE_LOG(TRACE, "after handle task", KPC(task), K(consume_status));
|
|
||||||
switch (consume_status) {
|
switch (consume_status) {
|
||||||
case TaskConsumeStatus::DONE:
|
case TaskConsumeStatus::DONE:
|
||||||
break;
|
break;
|
||||||
@ -546,7 +545,9 @@ int ObArchiveSender::archive_log_(const ObBackupDest &backup_dest,
|
|||||||
int64_t file_offset = 0;
|
int64_t file_offset = 0;
|
||||||
share::ObBackupPath path;
|
share::ObBackupPath path;
|
||||||
ObBackupPathString uri;
|
ObBackupPathString uri;
|
||||||
const ObLSID &id = task.get_ls_id();
|
const ObLSID id = task.get_ls_id();
|
||||||
|
const int64_t log_size = static_cast<int64_t>((task.get_end_lsn() - task.get_start_lsn()));
|
||||||
|
const int64_t buf_size = task.get_buf_size();
|
||||||
const ObArchivePiece &pre_piece = arg.tuple_.get_piece();
|
const ObArchivePiece &pre_piece = arg.tuple_.get_piece();
|
||||||
const ObArchivePiece &piece = task.get_piece();
|
const ObArchivePiece &piece = task.get_piece();
|
||||||
const ArchiveWorkStation &station = task.get_station();
|
const ArchiveWorkStation &station = task.get_station();
|
||||||
@ -601,7 +602,7 @@ int ObArchiveSender::archive_log_(const ObBackupDest &backup_dest,
|
|||||||
|
|
||||||
// 8. 统计
|
// 8. 统计
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
statistic(task, common::ObTimeUtility::current_time() - start_ts);
|
statistic(log_size, buf_size, common::ObTimeUtility::current_time() - start_ts);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -821,15 +822,15 @@ int ObArchiveSender::free_residual_task_()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObArchiveSender::statistic(const ObArchiveSendTask &task, const int64_t cost_ts)
|
void ObArchiveSender::statistic(const int64_t log_size, const int64_t buf_size, const int64_t cost_ts)
|
||||||
{
|
{
|
||||||
static __thread int64_t SEND_LOG_LSN_SIZE;
|
static __thread int64_t SEND_LOG_LSN_SIZE;
|
||||||
static __thread int64_t SEND_BUF_SIZE;
|
static __thread int64_t SEND_BUF_SIZE;
|
||||||
static __thread int64_t SEND_TASK_COUNT;
|
static __thread int64_t SEND_TASK_COUNT;
|
||||||
static __thread int64_t SEND_COST_TS;
|
static __thread int64_t SEND_COST_TS;
|
||||||
|
|
||||||
SEND_LOG_LSN_SIZE += static_cast<int64_t>((task.get_end_lsn() - task.get_start_lsn()));
|
SEND_LOG_LSN_SIZE += log_size;
|
||||||
SEND_BUF_SIZE += task.get_buf_size();
|
SEND_BUF_SIZE += buf_size;
|
||||||
SEND_TASK_COUNT++;
|
SEND_TASK_COUNT++;
|
||||||
SEND_COST_TS += cost_ts;
|
SEND_COST_TS += cost_ts;
|
||||||
|
|
||||||
|
|||||||
@ -175,7 +175,7 @@ private:
|
|||||||
bool is_retry_ret_code_(const int ret_code) const;
|
bool is_retry_ret_code_(const int ret_code) const;
|
||||||
bool is_ignore_ret_code_(const int ret_code) const;
|
bool is_ignore_ret_code_(const int ret_code) const;
|
||||||
|
|
||||||
void statistic(const ObArchiveSendTask &task, const int64_t cost_ts);
|
void statistic(const int64_t log_size, const int64_t buf_size, const int64_t cost_ts);
|
||||||
|
|
||||||
int try_free_send_task_();
|
int try_free_send_task_();
|
||||||
int do_free_send_task_();
|
int do_free_send_task_();
|
||||||
|
|||||||
@ -103,7 +103,8 @@ public:
|
|||||||
K_(cur_offset),
|
K_(cur_offset),
|
||||||
"unfinished_data_size", get_log_fetch_size(),
|
"unfinished_data_size", get_log_fetch_size(),
|
||||||
K_(max_scn),
|
K_(max_scn),
|
||||||
K_(send_task));
|
K_(send_task),
|
||||||
|
KP(this));
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
@ -204,7 +205,8 @@ public:
|
|||||||
K_(file_id),
|
K_(file_id),
|
||||||
K_(file_offset),
|
K_(file_offset),
|
||||||
K_(data),
|
K_(data),
|
||||||
K_(data_len));
|
K_(data_len),
|
||||||
|
KP(this));
|
||||||
private:
|
private:
|
||||||
static const int8_t INITAL_STATUS = 0;
|
static const int8_t INITAL_STATUS = 0;
|
||||||
static const int8_t ISSUE_STATUS = 1;
|
static const int8_t ISSUE_STATUS = 1;
|
||||||
|
|||||||
Reference in New Issue
Block a user