decrease memory usage for cdc service
This commit is contained in:
@ -348,6 +348,8 @@ int ObCdcFetcher::fetch_log_in_archive_(
|
|||||||
ClientLSCtx &ctx)
|
ClientLSCtx &ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
// always reserve 4K for archive header
|
||||||
|
const int64_t SINGLE_READ_SIZE = 16 * 1024 * 1024L - 4 * 1024;
|
||||||
if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) {
|
if (OB_ISNULL(ctx.get_source()) && OB_FAIL(init_archive_source_(ctx, ls_id))) {
|
||||||
LOG_WARN("init archive source failed", K(ctx), K(ls_id));
|
LOG_WARN("init archive source failed", K(ctx), K(ls_id));
|
||||||
} else {
|
} else {
|
||||||
@ -358,7 +360,7 @@ int ObCdcFetcher::fetch_log_in_archive_(
|
|||||||
LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
|
LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
|
||||||
} else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
|
} else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
|
||||||
start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_,
|
start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_,
|
||||||
log_ext_handler_))) {
|
log_ext_handler_, SINGLE_READ_SIZE))) {
|
||||||
LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
|
LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
|
||||||
} else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
|
} else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
|
||||||
// expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.
|
// expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.
|
||||||
|
|||||||
@ -35,6 +35,7 @@
|
|||||||
#include "share/rc/ob_tenant_base.h"
|
#include "share/rc/ob_tenant_base.h"
|
||||||
#include "logservice/archiveservice/large_buffer_pool.h"
|
#include "logservice/archiveservice/large_buffer_pool.h"
|
||||||
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalHandler
|
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalHandler
|
||||||
|
#include "logservice/archiveservice/ob_archive_define.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
|
|||||||
@ -52,7 +52,7 @@ int ObRemoteLogIterator<LogEntryType>::init(const uint64_t tenant_id,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObRemoteLogParent *source = NULL;
|
ObRemoteLogParent *source = NULL;
|
||||||
const int64_t DEFAULT_BUF_SIZE = 64 * 1024 * 1024L;
|
const int64_t BUF_SIZE = single_read_size + archive::ARCHIVE_FILE_HEADER_SIZE;
|
||||||
if (OB_UNLIKELY(inited_)) {
|
if (OB_UNLIKELY(inited_)) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
CLOG_LOG(WARN, "ObRemoteLogIterator already init", K(ret), K(inited_), K(id_));
|
CLOG_LOG(WARN, "ObRemoteLogIterator already init", K(ret), K(inited_), K(id_));
|
||||||
@ -75,11 +75,11 @@ int ObRemoteLogIterator<LogEntryType>::init(const uint64_t tenant_id,
|
|||||||
&& ! share::is_raw_path_log_source_type(source->get_source_type()))) {
|
&& ! share::is_raw_path_log_source_type(source->get_source_type()))) {
|
||||||
ret = OB_NOT_SUPPORTED;
|
ret = OB_NOT_SUPPORTED;
|
||||||
CLOG_LOG(WARN, "source type not support", K(ret), K(id), KPC(source));
|
CLOG_LOG(WARN, "source type not support", K(ret), K(id), KPC(source));
|
||||||
} else if (OB_ISNULL(buf_ = buffer_pool_->acquire(DEFAULT_BUF_SIZE))) {
|
} else if (OB_ISNULL(buf_ = buffer_pool_->acquire(BUF_SIZE))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
CLOG_LOG(WARN, "acquire buf failed", K(ret));
|
CLOG_LOG(WARN, "acquire buf failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
buf_size_ = DEFAULT_BUF_SIZE;
|
buf_size_ = BUF_SIZE;
|
||||||
tenant_id_ = tenant_id;
|
tenant_id_ = tenant_id;
|
||||||
id_ = id;
|
id_ = id;
|
||||||
start_lsn_ = start_lsn;
|
start_lsn_ = start_lsn;
|
||||||
@ -276,11 +276,6 @@ int ObRemoteLogIterator<LogEntryType>::next_entry_(LogEntryType &entry, LSN &lsn
|
|||||||
|
|
||||||
} while (OB_SUCCESS == ret && ! done);
|
} while (OB_SUCCESS == ret && ! done);
|
||||||
|
|
||||||
if (OB_NEED_RETRY == ret) {
|
|
||||||
ret = OB_ITER_END;
|
|
||||||
CLOG_LOG(WARN, "read data from archive not atomic, rewrite ret_code", KPC(this));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_FAIL(ret) && OB_ITER_END != ret && ! is_io_error(ret)) {
|
if (OB_FAIL(ret) && OB_ITER_END != ret && ! is_io_error(ret)) {
|
||||||
mark_source_error_(ret);
|
mark_source_error_(ret);
|
||||||
}
|
}
|
||||||
@ -294,7 +289,7 @@ int ObRemoteLogIterator<LogEntryType>::next_entry_(LogEntryType &entry, LSN &lsn
|
|||||||
template<class LogEntryType>
|
template<class LogEntryType>
|
||||||
bool ObRemoteLogIterator<LogEntryType>::need_prepare_buf_(const int ret_code) const
|
bool ObRemoteLogIterator<LogEntryType>::need_prepare_buf_(const int ret_code) const
|
||||||
{
|
{
|
||||||
return OB_BUF_NOT_ENOUGH == ret_code;
|
return OB_BUF_NOT_ENOUGH == ret_code || OB_NEED_RETRY == ret_code;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class LogEntryType>
|
template<class LogEntryType>
|
||||||
|
|||||||
Reference in New Issue
Block a user