[CP] [LOG] To merge the log4220 feature into the master branch.
This commit is contained in:
@ -212,55 +212,34 @@ int ObLogHandler::append(const void *buffer,
|
||||
SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t wait_times = 0;
|
||||
PalfAppendOptions opts;
|
||||
opts.need_nonblock = need_nonblock;
|
||||
opts.need_check_proposal_id = true;
|
||||
ObTimeGuard tg("ObLogHandler::append", 100000);
|
||||
while (true) {
|
||||
// generate opts
|
||||
opts.proposal_id = ATOMIC_LOAD(&proposal_id_);
|
||||
do {
|
||||
RLockGuard guard(lock_);
|
||||
CriticalGuard(ls_qs_);
|
||||
cb->set_append_start_ts(ObTimeUtility::fast_current_time());
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_in_stop_state_ || is_offline_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (LEADER != ATOMIC_LOAD(&role_)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
} else if (OB_FAIL(palf_handle_.append(opts, buffer, nbytes, ref_scn, lsn, scn))) {
|
||||
if (REACH_TIME_INTERVAL(1*1000*1000)) {
|
||||
CLOG_LOG(WARN, "palf_handle_ append failed", K(ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
cb->set_append_finish_ts(ObTimeUtility::fast_current_time());
|
||||
cb->__set_lsn(lsn);
|
||||
cb->__set_scn(scn);
|
||||
ret = apply_status_->push_append_cb(cb);
|
||||
CLOG_LOG(TRACE, "palf_handle_ push_append_cb success", K(lsn), K(scn), K(ret), K(id_));
|
||||
}
|
||||
} while (0);
|
||||
// check if need wait and retry append
|
||||
if (opts.need_nonblock) {
|
||||
// nonblock mode, end loop
|
||||
break;
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
// block mode, need sleep and retry for -4023 ret code
|
||||
static const int64_t MAX_SLEEP_US = 100;
|
||||
++wait_times;
|
||||
int64_t sleep_us = wait_times * 10;
|
||||
if (sleep_us > MAX_SLEEP_US) {
|
||||
sleep_us = MAX_SLEEP_US;
|
||||
}
|
||||
ob_usleep(sleep_us);
|
||||
} else {
|
||||
// other ret code, end loop
|
||||
break;
|
||||
}
|
||||
if (nbytes > MAX_NORMAL_LOG_BODY_SIZE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "nbytes is greater than expected size", K(nbytes), K(MAX_NORMAL_LOG_BODY_SIZE));
|
||||
} else if (OB_FAIL(append_(buffer, nbytes, ref_scn, need_nonblock, cb, lsn, scn))) {
|
||||
CLOG_LOG(WARN, "appending log fails", K(ret),
|
||||
K(buffer), K(nbytes), K(ref_scn), K(need_nonblock), K(lsn), K(scn));
|
||||
}
|
||||
append_cost_stat_.stat(tg.get_diff());
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::append_big_log(const void *buffer,
|
||||
const int64_t nbytes,
|
||||
const SCN &ref_scn,
|
||||
const bool need_nonblock,
|
||||
AppendCb *cb,
|
||||
LSN &lsn,
|
||||
SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (nbytes <= MAX_NORMAL_LOG_BODY_SIZE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "nbytes is smaller than expected size", K(nbytes), K(MAX_NORMAL_LOG_BODY_SIZE));
|
||||
} else if (OB_FAIL(append_(buffer, nbytes, ref_scn, need_nonblock, cb, lsn, scn))) {
|
||||
CLOG_LOG(WARN, "append big log to palf failed", K(ret),
|
||||
K(buffer), K(nbytes), K(ref_scn), K(need_nonblock), K(lsn), K(scn));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1321,6 +1300,67 @@ int ObLogHandler::submit_config_change_cmd_(const LogConfigChangeCmd &req,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::append_(const void *buffer,
|
||||
const int64_t nbytes,
|
||||
const share::SCN &ref_scn,
|
||||
const bool need_nonblock,
|
||||
AppendCb *cb,
|
||||
palf::LSN &lsn,
|
||||
share::SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t wait_times = 0;
|
||||
PalfAppendOptions opts;
|
||||
opts.need_nonblock = need_nonblock;
|
||||
opts.need_check_proposal_id = true;
|
||||
ObTimeGuard tg("ObLogHandler::append", 100000);
|
||||
while (true) {
|
||||
// generate opts
|
||||
opts.proposal_id = ATOMIC_LOAD(&proposal_id_);
|
||||
do {
|
||||
RLockGuard guard(lock_);
|
||||
CriticalGuard(ls_qs_);
|
||||
cb->set_append_start_ts(ObTimeUtility::fast_current_time());
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_in_stop_state_ || is_offline_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (LEADER != ATOMIC_LOAD(&role_)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
} else if (OB_FAIL(palf_handle_.append(opts, buffer, nbytes, ref_scn, lsn, scn))) {
|
||||
if (REACH_TIME_INTERVAL(1*1000*1000)) {
|
||||
CLOG_LOG(WARN, "palf_handle_ append failed", K(ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
cb->set_append_finish_ts(ObTimeUtility::fast_current_time());
|
||||
cb->__set_lsn(lsn);
|
||||
cb->__set_scn(scn);
|
||||
ret = apply_status_->push_append_cb(cb);
|
||||
CLOG_LOG(TRACE, "palf_handle_ push_append_cb success", K(lsn), K(scn), K(ret), K(id_));
|
||||
}
|
||||
} while (0);
|
||||
// check if need wait and retry append
|
||||
if (opts.need_nonblock) {
|
||||
// nonblock mode, end loop
|
||||
break;
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
// block mode, need sleep and retry for -4023 ret code
|
||||
static const int64_t MAX_SLEEP_US = 100;
|
||||
++wait_times;
|
||||
int64_t sleep_us = wait_times * 10;
|
||||
if (sleep_us > MAX_SLEEP_US) {
|
||||
sleep_us = MAX_SLEEP_US;
|
||||
}
|
||||
ob_usleep(sleep_us);
|
||||
} else {
|
||||
// other ret code, end loop
|
||||
break;
|
||||
}
|
||||
}
|
||||
append_cost_stat_.stat(tg.get_diff());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::get_member_gc_stat(const common::ObAddr &addr,
|
||||
bool &is_valid_member,
|
||||
LogMemberGCStat &stat) const
|
||||
@ -1508,20 +1548,6 @@ int ObLogHandler::get_max_decided_scn(SCN &scn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::set_region(const common::ObRegion ®ion)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(lock_);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else {
|
||||
ret = palf_handle_.set_region(region);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::disable_vote(const bool need_check_log_missing)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user