diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index f7962f3541..f956ee93d4 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -348,6 +348,8 @@ int ObCdcFetcher::fetch_log_in_archive_( ClientLSCtx &ctx) { int ret = OB_SUCCESS; + // always reserve 4K for archive header + const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024; if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) { LOG_WARN("init archive source failed", K(ctx), K(ls_id)); } else { @@ -358,7 +360,7 @@ int ObCdcFetcher::fetch_log_in_archive_( LOG_WARN("convert progress to scn failed", KR(ret), K(ctx)); } else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn, start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, - log_ext_handler_))) { + log_ext_handler_, SINGLE_READ_SIZE))) { LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id)); } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) { // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned. diff --git a/src/logservice/restoreservice/ob_remote_log_iterator.h b/src/logservice/restoreservice/ob_remote_log_iterator.h index 94afddcd88..3f19bb1e4a 100644 --- a/src/logservice/restoreservice/ob_remote_log_iterator.h +++ b/src/logservice/restoreservice/ob_remote_log_iterator.h @@ -35,6 +35,7 @@ #include "share/rc/ob_tenant_base.h" #include "logservice/archiveservice/large_buffer_pool.h" #include "logservice/ob_log_external_storage_handler.h" // ObLogExternalHandler +#include "logservice/archiveservice/ob_archive_define.h" namespace oceanbase { diff --git a/src/logservice/restoreservice/ob_remote_log_iterator.ipp b/src/logservice/restoreservice/ob_remote_log_iterator.ipp index 7fb851c24f..59d853c138 100644 --- a/src/logservice/restoreservice/ob_remote_log_iterator.ipp +++ b/src/logservice/restoreservice/ob_remote_log_iterator.ipp @@ -52,7 +52,7 @@ int ObRemoteLogIterator::init(const uint64_t tenant_id, { int ret = OB_SUCCESS; ObRemoteLogParent *source = NULL; - const int64_t DEFAULT_BUF_SIZE = 64 * 1024 * 1024L; + const int64_t BUF_SIZE = single_read_size + archive::ARCHIVE_FILE_HEADER_SIZE; if (OB_UNLIKELY(inited_)) { ret = OB_INIT_TWICE; CLOG_LOG(WARN, "ObRemoteLogIterator already init", K(ret), K(inited_), K(id_)); @@ -75,11 +75,11 @@ int ObRemoteLogIterator::init(const uint64_t tenant_id, && ! share::is_raw_path_log_source_type(source->get_source_type()))) { ret = OB_NOT_SUPPORTED; CLOG_LOG(WARN, "source type not support", K(ret), K(id), KPC(source)); - } else if (OB_ISNULL(buf_ = buffer_pool_->acquire(DEFAULT_BUF_SIZE))) { + } else if (OB_ISNULL(buf_ = buffer_pool_->acquire(BUF_SIZE))) { ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "acquire buf failed", K(ret)); } else { - buf_size_ = DEFAULT_BUF_SIZE; + buf_size_ = BUF_SIZE; tenant_id_ = tenant_id; id_ = id; start_lsn_ = start_lsn; @@ -276,11 +276,6 @@ int ObRemoteLogIterator::next_entry_(LogEntryType &entry, LSN &lsn } while (OB_SUCCESS == ret && ! done); - if (OB_NEED_RETRY == ret) { - ret = OB_ITER_END; - CLOG_LOG(WARN, "read data from archive not atomic, rewrite ret_code", KPC(this)); - } - if (OB_FAIL(ret) && OB_ITER_END != ret && ! is_io_error(ret)) { mark_source_error_(ret); } @@ -294,7 +289,7 @@ int ObRemoteLogIterator::next_entry_(LogEntryType &entry, LSN &lsn template bool ObRemoteLogIterator::need_prepare_buf_(const int ret_code) const { - return OB_BUF_NOT_ENOUGH == ret_code; + return OB_BUF_NOT_ENOUGH == ret_code || OB_NEED_RETRY == ret_code; } template