add apply queue idx in log print
This commit is contained in:
@ -128,7 +128,8 @@ ObApplyServiceQueueTask::ObApplyServiceQueueTask()
|
|||||||
: ObApplyServiceTask(),
|
: ObApplyServiceTask(),
|
||||||
total_submit_cb_cnt_(0),
|
total_submit_cb_cnt_(0),
|
||||||
last_check_submit_cb_cnt_(0),
|
last_check_submit_cb_cnt_(0),
|
||||||
total_apply_cb_cnt_(0)
|
total_apply_cb_cnt_(0),
|
||||||
|
idx_(-1)
|
||||||
{
|
{
|
||||||
type_ = ObApplyServiceTaskType::APPLY_LOG_TASK;
|
type_ = ObApplyServiceTaskType::APPLY_LOG_TASK;
|
||||||
}
|
}
|
||||||
@ -148,21 +149,24 @@ void ObApplyServiceQueueTask::reset()
|
|||||||
total_submit_cb_cnt_ = 0;
|
total_submit_cb_cnt_ = 0;
|
||||||
last_check_submit_cb_cnt_ = 0;
|
last_check_submit_cb_cnt_ = 0;
|
||||||
total_apply_cb_cnt_ = 0;
|
total_apply_cb_cnt_ = 0;
|
||||||
|
idx_ = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObApplyServiceQueueTask::init(ObApplyStatus *apply_status)
|
int ObApplyServiceQueueTask::init(ObApplyStatus *apply_status,
|
||||||
|
const int64_t idx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(apply_status)) {
|
if (OB_ISNULL(apply_status) || idx < 0) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
CLOG_LOG(WARN, "invalid argument", K(type_), K(apply_status), K(ret));
|
CLOG_LOG(WARN, "invalid argument", K(type_), K(apply_status), K(idx), K(ret));
|
||||||
} else {
|
} else {
|
||||||
apply_status_ = apply_status;
|
apply_status_ = apply_status;
|
||||||
type_ = ObApplyServiceTaskType::APPLY_LOG_TASK;
|
type_ = ObApplyServiceTaskType::APPLY_LOG_TASK;
|
||||||
total_submit_cb_cnt_ = 0;
|
total_submit_cb_cnt_ = 0;
|
||||||
last_check_submit_cb_cnt_ = 0;
|
last_check_submit_cb_cnt_ = 0;
|
||||||
total_apply_cb_cnt_ = 0;
|
total_apply_cb_cnt_ = 0;
|
||||||
CLOG_LOG(INFO, "ObApplyServiceQueueTask init success", K(type_), K(apply_status));
|
idx_ = idx;
|
||||||
|
CLOG_LOG(INFO, "ObApplyServiceQueueTask init success", K(type_), K(apply_status), K(idx_));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -235,6 +239,11 @@ int ObApplyServiceQueueTask::is_apply_done(bool &is_done)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ObApplyServiceQueueTask::idx() const
|
||||||
|
{
|
||||||
|
return idx_;
|
||||||
|
}
|
||||||
|
|
||||||
//---------------ObApplyStatus---------------//
|
//---------------ObApplyStatus---------------//
|
||||||
ObApplyStatus::ObApplyStatus()
|
ObApplyStatus::ObApplyStatus()
|
||||||
: is_inited_(false),
|
: is_inited_(false),
|
||||||
@ -284,7 +293,7 @@ int ObApplyStatus::init(const share::ObLSID &id,
|
|||||||
CLOG_LOG(WARN, "failed to init submit_task", K(ret), K(id));
|
CLOG_LOG(WARN, "failed to init submit_task", K(ret), K(id));
|
||||||
} else {
|
} else {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < APPLY_TASK_QUEUE_SIZE; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < APPLY_TASK_QUEUE_SIZE; ++i) {
|
||||||
if (OB_FAIL(cb_queues_[i].init(this))) {
|
if (OB_FAIL(cb_queues_[i].init(this, i))) {
|
||||||
CLOG_LOG(WARN, "failed to init cb_queues", K(ret), K(id));
|
CLOG_LOG(WARN, "failed to init cb_queues", K(ret), K(id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -449,6 +458,7 @@ int ObApplyStatus::try_handle_cb_queue(ObApplyServiceQueueTask *cb_queue,
|
|||||||
int64_t append_finish_time = OB_INVALID_TIMESTAMP;
|
int64_t append_finish_time = OB_INVALID_TIMESTAMP;
|
||||||
int64_t cb_first_handle_time = OB_INVALID_TIMESTAMP;
|
int64_t cb_first_handle_time = OB_INVALID_TIMESTAMP;
|
||||||
int64_t cb_start_time = OB_INVALID_TIMESTAMP;
|
int64_t cb_start_time = OB_INVALID_TIMESTAMP;
|
||||||
|
int64_t idx = cb_queue->idx();
|
||||||
RLockGuard guard(lock_);
|
RLockGuard guard(lock_);
|
||||||
do {
|
do {
|
||||||
ObLink *link = NULL;
|
ObLink *link = NULL;
|
||||||
@ -474,7 +484,7 @@ int ObApplyStatus::try_handle_cb_queue(ObApplyServiceQueueTask *cb_queue,
|
|||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
||||||
cb_first_handle_time, cb_start_time);
|
cb_first_handle_time, cb_start_time, idx);
|
||||||
cb_queue->inc_total_apply_cb_cnt();
|
cb_queue->inc_total_apply_cb_cnt();
|
||||||
}
|
}
|
||||||
} else if (FOLLOWER == role_) {
|
} else if (FOLLOWER == role_) {
|
||||||
@ -490,7 +500,7 @@ int ObApplyStatus::try_handle_cb_queue(ObApplyServiceQueueTask *cb_queue,
|
|||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
||||||
cb_first_handle_time, cb_start_time);
|
cb_first_handle_time, cb_start_time, idx);
|
||||||
cb_queue->inc_total_apply_cb_cnt();
|
cb_queue->inc_total_apply_cb_cnt();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -811,6 +821,7 @@ int ObApplyStatus::handle_drop_cb_queue_(ObApplyServiceQueueTask &cb_queue)
|
|||||||
int64_t append_finish_time = OB_INVALID_TIMESTAMP;
|
int64_t append_finish_time = OB_INVALID_TIMESTAMP;
|
||||||
int64_t cb_first_handle_time = OB_INVALID_TIMESTAMP;
|
int64_t cb_first_handle_time = OB_INVALID_TIMESTAMP;
|
||||||
int64_t cb_start_time = OB_INVALID_TIMESTAMP;
|
int64_t cb_start_time = OB_INVALID_TIMESTAMP;
|
||||||
|
int64_t idx = cb_queue.idx();
|
||||||
do {
|
do {
|
||||||
ObLink *link = NULL;
|
ObLink *link = NULL;
|
||||||
AppendCb *cb = NULL;
|
AppendCb *cb = NULL;
|
||||||
@ -833,7 +844,7 @@ int ObApplyStatus::handle_drop_cb_queue_(ObApplyServiceQueueTask &cb_queue)
|
|||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
statistics_cb_cost_(lsn, log_ts, append_start_time, append_finish_time,
|
||||||
cb_first_handle_time, cb_start_time);
|
cb_first_handle_time, cb_start_time, idx);
|
||||||
cb_queue.inc_total_apply_cb_cnt();
|
cb_queue.inc_total_apply_cb_cnt();
|
||||||
}
|
}
|
||||||
} while (OB_SUCC(ret) && (!is_queue_empty));
|
} while (OB_SUCC(ret) && (!is_queue_empty));
|
||||||
@ -864,7 +875,8 @@ void ObApplyStatus::statistics_cb_cost_(const LSN &lsn,
|
|||||||
const int64_t append_start_time,
|
const int64_t append_start_time,
|
||||||
const int64_t append_finish_time,
|
const int64_t append_finish_time,
|
||||||
const int64_t cb_first_handle_time,
|
const int64_t cb_first_handle_time,
|
||||||
const int64_t cb_start_time)
|
const int64_t cb_start_time,
|
||||||
|
const int64_t idx)
|
||||||
{
|
{
|
||||||
// no need to print debug log when config [default value is true] is false;
|
// no need to print debug log when config [default value is true] is false;
|
||||||
if (GCONF.enable_record_trace_log) {
|
if (GCONF.enable_record_trace_log) {
|
||||||
@ -879,7 +891,7 @@ void ObApplyStatus::statistics_cb_cost_(const LSN &lsn,
|
|||||||
cb_wait_commit_stat_.stat(cb_wait_commit_time);
|
cb_wait_commit_stat_.stat(cb_wait_commit_time);
|
||||||
cb_execute_stat_.stat(cb_cost_time);
|
cb_execute_stat_.stat(cb_cost_time);
|
||||||
if (total_cost_time > 1000 * 1000) { //1s
|
if (total_cost_time > 1000 * 1000) { //1s
|
||||||
CLOG_LOG(WARN, "cb cost too much time", K(lsn), K(log_ts), K(total_cost_time), K(append_cost_time),
|
CLOG_LOG(WARN, "cb cost too much time", K(lsn), K(log_ts), K(idx), K(total_cost_time), K(append_cost_time),
|
||||||
K(cb_wait_thread_time), K(cb_wait_commit_time), K(cb_cost_time), K(append_start_time), K(append_finish_time),
|
K(cb_wait_thread_time), K(cb_wait_commit_time), K(cb_cost_time), K(append_start_time), K(append_finish_time),
|
||||||
K(cb_first_handle_time), K(cb_first_handle_time), K(cb_finish_time));
|
K(cb_first_handle_time), K(cb_first_handle_time), K(cb_finish_time));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -105,7 +105,8 @@ public:
|
|||||||
public:
|
public:
|
||||||
ObApplyServiceQueueTask();
|
ObApplyServiceQueueTask();
|
||||||
~ObApplyServiceQueueTask();
|
~ObApplyServiceQueueTask();
|
||||||
int init(ObApplyStatus *apply_status);
|
int init(ObApplyStatus *apply_status,
|
||||||
|
const int64_t idx);
|
||||||
void reset() override;
|
void reset() override;
|
||||||
public:
|
public:
|
||||||
Link *top();
|
Link *top();
|
||||||
@ -118,15 +119,18 @@ public:
|
|||||||
void set_snapshot_check_submit_cb_cnt();
|
void set_snapshot_check_submit_cb_cnt();
|
||||||
int is_snapshot_apply_done(bool &is_done);
|
int is_snapshot_apply_done(bool &is_done);
|
||||||
int is_apply_done(bool &is_done);
|
int is_apply_done(bool &is_done);
|
||||||
|
int64_t idx() const;
|
||||||
INHERIT_TO_STRING_KV("ObApplyServiceQueueTask", ObApplyServiceTask,
|
INHERIT_TO_STRING_KV("ObApplyServiceQueueTask", ObApplyServiceTask,
|
||||||
K(total_submit_cb_cnt_),
|
K(total_submit_cb_cnt_),
|
||||||
K(last_check_submit_cb_cnt_),
|
K(last_check_submit_cb_cnt_),
|
||||||
K(total_apply_cb_cnt_));
|
K(total_apply_cb_cnt_),
|
||||||
|
K(idx_));
|
||||||
private:
|
private:
|
||||||
int64_t total_submit_cb_cnt_;
|
int64_t total_submit_cb_cnt_;
|
||||||
int64_t last_check_submit_cb_cnt_;
|
int64_t last_check_submit_cb_cnt_;
|
||||||
int64_t total_apply_cb_cnt_;
|
int64_t total_apply_cb_cnt_;
|
||||||
common::ObSpLinkQueue queue_;
|
common::ObSpLinkQueue queue_;
|
||||||
|
int64_t idx_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObApplyStatus
|
class ObApplyStatus
|
||||||
@ -183,7 +187,8 @@ private:
|
|||||||
const int64_t append_start_time,
|
const int64_t append_start_time,
|
||||||
const int64_t append_finish_time,
|
const int64_t append_finish_time,
|
||||||
const int64_t cb_first_handle_time,
|
const int64_t cb_first_handle_time,
|
||||||
const int64_t cb_start_time);
|
const int64_t cb_start_time,
|
||||||
|
const int64_t idx);
|
||||||
private:
|
private:
|
||||||
typedef common::RWLock RWLock;
|
typedef common::RWLock RWLock;
|
||||||
typedef RWLock::RLockGuard RLockGuard;
|
typedef RWLock::RLockGuard RLockGuard;
|
||||||
|
|||||||
Reference in New Issue
Block a user