[OBCDC] fix a bug which would cause obcdc exit when fetch missing log failed
This commit is contained in:
parent
1d4a6804a0
commit
5f2370f1c7
@ -268,16 +268,36 @@ int LSFetchCtx::append_log(const char *buf, const int64_t buf_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(mem_storage_.append(buf, buf_len))) {
|
||||
LOG_ERROR("append log into mem_storage_ failed", KR(ret), K_(tls_id), KP(buf), K(buf_len), K_(mem_storage), K_(group_iterator));
|
||||
} else {
|
||||
ATOMIC_AAF(&fetched_log_size_, buf_len);
|
||||
LOG_DEBUG("append_log succ", K(buf_len), KPC(this));
|
||||
if (! mem_storage_.is_inited()) {
|
||||
const LSN &start_lsn = progress_.get_next_lsn();
|
||||
|
||||
if (OB_FAIL(mem_storage_.init(start_lsn))) {
|
||||
LOG_ERROR("init mem_storage_ failed", KR(ret), K_(tls_id), K(start_lsn));
|
||||
} else if (OB_FAIL(group_iterator_.reuse(start_lsn))) {
|
||||
LOG_ERROR("MemPalfBufferIterator resuse failed", KR(ret), K_(tls_id), K(start_lsn));
|
||||
} else {
|
||||
LOG_DEBUG("mem_storage_ init and MemPalfBufferIterator resuse succ", K_(tls_id), K(start_lsn));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(mem_storage_.append(buf, buf_len))) {
|
||||
LOG_ERROR("append log into mem_storage_ failed", KR(ret), K_(tls_id), KP(buf), K(buf_len),
|
||||
K_(mem_storage), K_(group_iterator));
|
||||
} else {
|
||||
ATOMIC_AAF(&fetched_log_size_, buf_len);
|
||||
LOG_DEBUG("append_log succ", K(buf_len), KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void LSFetchCtx::reset_memory_storage()
|
||||
{
|
||||
mem_storage_.destroy();
|
||||
}
|
||||
|
||||
int LSFetchCtx::get_next_group_entry(palf::LogGroupEntry &group_entry, palf::LSN &lsn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -99,6 +99,7 @@ public:
|
||||
IObLogLSFetchMgr &ls_fetch_mgr);
|
||||
|
||||
int append_log(const char *buf, const int64_t buf_len);
|
||||
void reset_memory_storage();
|
||||
int get_next_group_entry(palf::LogGroupEntry &group_entry, palf::LSN &lsn);
|
||||
int get_next_remote_group_entry(
|
||||
palf::LogGroupEntry &group_entry,
|
||||
|
@ -1405,8 +1405,10 @@ int FetchStream::read_log_(
|
||||
decode_log_entry_time += (get_timestamp() - begin_time);
|
||||
if (OB_FAIL(read_group_entry_(group_entry, group_start_lsn,
|
||||
stop_flag, kick_out_info, tsi))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("read group entry failed", KR(ret));
|
||||
if (OB_IN_STOP_STATE != ret && OB_NEED_RETRY != ret) {
|
||||
LOG_ERROR("read group entry failed", KR(ret), KPC(this));
|
||||
} else if (OB_NEED_RETRY == ret) {
|
||||
ls_fetch_ctx_->reset_memory_storage();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user