Fix backup do not support bandwidth throttle

This commit is contained in:
oceanoverflow 2024-06-27 05:02:28 +00:00 committed by ob-robot
parent e71cac5148
commit 4f8426eb0b
13 changed files with 121 additions and 57 deletions

View File

@ -202,10 +202,12 @@ void TestLSTabletInfoWR::fill_tablet_meta()
TEST_F(TestLSTabletInfoWR, testTabletInfoWriterAndReader)
{
int ret = OB_SUCCESS;
ObInOutBandwidthThrottle bandwidth_throttle;
ASSERT_EQ(OB_SUCCESS, bandwidth_throttle.init(1024 * 1024 * 60));
LOG_INFO("test tablet info", K(tablet_metas.count()), K(backup_set_dest_));
backup::ObExternTabletMetaWriter writer;
backup::ObExternTabletMetaReader reader;
ASSERT_EQ(OB_SUCCESS, writer.init(backup_set_dest_, ObLSID(TEST_LS_ID), 1, 0));
ASSERT_EQ(OB_SUCCESS, writer.init(backup_set_dest_, ObLSID(TEST_LS_ID), 1, 0, bandwidth_throttle));
for (int i = 0; i < tablet_metas.count(); i++) {
blocksstable::ObSelfBufferWriter buffer_writer("TestBuff");
blocksstable::ObBufferReader buffer_reader;

View File

@ -291,7 +291,7 @@ ObBackupDataCtx::~ObBackupDataCtx()
}
int ObBackupDataCtx::open(const ObLSBackupDataParam &param, const share::ObBackupDataType &backup_data_type,
const int64_t file_id)
const int64_t file_id, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
static const int64_t BUF_SIZE = 4 * 1024 * 1024;
@ -315,7 +315,7 @@ int ObBackupDataCtx::open(const ObLSBackupDataParam &param, const share::ObBacku
file_id_ = file_id;
file_offset_ = 0;
backup_data_type_ = backup_data_type;
if (OB_FAIL(prepare_file_write_ctx_(param, backup_data_type, file_id))) {
if (OB_FAIL(prepare_file_write_ctx_(param, backup_data_type, file_id, bandwidth_throttle))) {
LOG_WARN("failed to prepare file write ctx", K(ret), K(param), K(backup_data_type), K(file_id));
} else {
is_inited_ = true;
@ -442,7 +442,8 @@ int ObBackupDataCtx::open_file_writer_(const share::ObBackupPath &backup_path)
}
int ObBackupDataCtx::prepare_file_write_ctx_(
const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id)
const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id,
common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
share::ObBackupPath backup_path;
@ -451,7 +452,7 @@ int ObBackupDataCtx::prepare_file_write_ctx_(
LOG_WARN("failed to get macro block backup path", K(ret), K(file_id));
} else if (OB_FAIL(open_file_writer_(backup_path))) {
LOG_WARN("failed to open file writer", K(ret), K(backup_path));
} else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_))) {
} else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_, bandwidth_throttle))) {
LOG_WARN("failed to open file write ctx", K(ret), K(param), K(type), K(backup_path), K(data_file_size), K(file_id));
}
return ret;
@ -855,7 +856,8 @@ ObLSBackupCtx::ObLSBackupCtx()
rebuild_seq_(),
check_tablet_info_cost_time_(),
backup_tx_table_filled_tx_scn_(share::SCN::min_scn()),
tablet_checker_()
tablet_checker_(),
bandwidth_throttle_(NULL)
{}
ObLSBackupCtx::~ObLSBackupCtx()
@ -865,7 +867,7 @@ ObLSBackupCtx::~ObLSBackupCtx()
int ObLSBackupCtx::open(
const ObLSBackupParam &param, const share::ObBackupDataType &backup_data_type,
common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache)
common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
ObArray<common::ObTabletID> tablet_list;
@ -898,6 +900,7 @@ int ObLSBackupCtx::open(
sql_proxy_ = &sql_proxy;
rebuild_seq_ = 0;
check_tablet_info_cost_time_ = 0;
bandwidth_throttle_ = &bandwidth_throttle;
is_inited_ = true;
if (OB_FAIL(prepare_tablet_id_reader_(reader))) {
LOG_WARN("failed to prepare tablet id reader", K(ret), K(param));

View File

@ -133,7 +133,8 @@ struct ObBackupDataCtx {
public:
ObBackupDataCtx();
virtual ~ObBackupDataCtx();
int open(const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id);
int open(const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
int write_backup_file_header(const ObBackupFileHeader &file_header);
int write_macro_block_data(const blocksstable::ObBufferReader &macro_data,
const blocksstable::ObLogicMacroBlockId &logic_id, ObBackupMacroBlockIndex &macro_index);
@ -157,7 +158,8 @@ private:
}
int open_file_writer_(const share::ObBackupPath &backup_path);
int prepare_file_write_ctx_(
const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id);
const ObLSBackupDataParam &param, const share::ObBackupDataType &type, const int64_t file_id,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
int get_macro_block_backup_path_(const int64_t file_id, share::ObBackupPath &backup_path);
int write_macro_block_data_(const blocksstable::ObBufferReader &buffer, const blocksstable::ObLogicMacroBlockId &logic_id,
ObBackupMacroBlockIndex &macro_index);
@ -229,8 +231,9 @@ struct ObLSBackupCtx {
public:
ObLSBackupCtx();
virtual ~ObLSBackupCtx();
int open(const ObLSBackupParam &param, const share::ObBackupDataType &backup_data_type,
common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache);
int open(
const ObLSBackupParam &param, const share::ObBackupDataType &backup_data_type, common::ObMySQLProxy &sql_proxy,
ObBackupIndexKVCache &index_kv_cache, common::ObInOutBandwidthThrottle &bandwidth_throttle);
int next(common::ObTabletID &tablet_id);
void set_backup_data_type(const share::ObBackupDataType &backup_data_type);
int set_tablet(const common::ObTabletID &tablet_id, ObBackupTabletHandleRef *tablet_handle);
@ -316,6 +319,7 @@ public:
int64_t check_tablet_info_cost_time_;
share::SCN backup_tx_table_filled_tx_scn_;
ObBackupTabletChecker tablet_checker_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupCtx);
};

View File

@ -339,7 +339,7 @@ int64_t ObTabletInfoTrailer::get_serialize_size_() const
int ObExternTabletMetaWriter::init(
const share::ObBackupDest &backup_set_dest, const share::ObLSID &ls_id,
const int64_t turn_id, const int64_t retry_id)
const int64_t turn_id, const int64_t retry_id, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
const int64_t start_file_id = 1;
@ -357,6 +357,7 @@ int ObExternTabletMetaWriter::init(
ls_id_ = ls_id;
turn_id_ = turn_id;
retry_id_ = retry_id;
bandwidth_throttle_ = &bandwidth_throttle;
if (OB_FAIL(prepare_backup_file_(start_file_id))) {
LOG_WARN("failed to prepare backup file", K(ret), K(start_file_id));
} else {
@ -381,8 +382,8 @@ int ObExternTabletMetaWriter::prepare_backup_file_(const int64_t file_id)
} else if (OB_FAIL(util.open_with_access_type(
dev_handle_, io_fd_, backup_set_dest_.get_storage_info(), backup_path.get_obstr(), access_type))) {
LOG_WARN("failed to open with access type", K(ret), K(backup_set_dest_), K(backup_path));
} else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_))) {
LOG_WARN("failed to open file write ctx", K(ret), K(backup_path), K(data_file_size), K(file_id));
} else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_, *bandwidth_throttle_))) {
LOG_WARN("failed to open file write ctx", K(ret), K(backup_path), K(data_file_size), K(file_id), KP_(bandwidth_throttle));
} else {
file_trailer_.reset();
file_trailer_.file_id_ = file_id;

View File

@ -91,10 +91,11 @@ public:
static const int64_t BUF_SIZE = 4 * 1024 * 1024;
static const int64_t TRAILER_BUF = 1024;
ObExternTabletMetaWriter(): is_inited_(false), backup_set_dest_(), ls_id_(), turn_id_(0), retry_id_(0),
io_fd_(0), dev_handle_(nullptr), file_write_ctx_(), file_trailer_(), tmp_buffer_("BackupExtInfo") {};
io_fd_(0), dev_handle_(nullptr), file_write_ctx_(), file_trailer_(), tmp_buffer_("BackupExtInfo"),
bandwidth_throttle_(NULL) {};
~ObExternTabletMetaWriter() {};
int init(const share::ObBackupDest &backup_set_dest, const share::ObLSID &ls_id,
const int64_t turn_id, const int64_t retry_id);
const int64_t turn_id, const int64_t retry_id, common::ObInOutBandwidthThrottle &bandwidth_throttle);
int write_meta_data(const blocksstable::ObBufferReader &meta_data, const common::ObTabletID &tablet_id);
int close();
private:
@ -124,6 +125,7 @@ private:
ObBackupFileWriteCtx file_write_ctx_;
ObTabletInfoTrailer file_trailer_;
blocksstable::ObSelfBufferWriter tmp_buffer_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObExternTabletMetaWriter);
};

View File

@ -17,6 +17,8 @@ namespace oceanbase {
namespace backup {
ERRSIM_POINT_DEF(EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE);
/* ObBackupFileWriteCtx */
ObBackupFileWriteCtx::ObBackupFileWriteCtx()
@ -25,13 +27,16 @@ ObBackupFileWriteCtx::ObBackupFileWriteCtx()
max_file_size_(0),
io_fd_(),
dev_handle_(NULL),
data_buffer_("BackupFileWriteCtx")
data_buffer_("BackupFileWriteCtx"),
bandwidth_throttle_(NULL),
last_active_time_(0)
{}
ObBackupFileWriteCtx::~ObBackupFileWriteCtx()
{}
int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &dev_handle)
int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &dev_handle,
common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -47,6 +52,7 @@ int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd
max_file_size_ = max_file_size;
io_fd_ = io_fd;
dev_handle_ = &dev_handle;
bandwidth_throttle_ = &bandwidth_throttle;
is_inited_ = true;
}
return ret;
@ -109,17 +115,34 @@ int ObBackupFileWriteCtx::flush_buffer_(const bool is_last_part)
const int64_t offset = file_size_;
if (!check_can_flush_(is_last_part)) {
LOG_DEBUG("can not flush now", K(is_last_part), K(data_buffer_));
} else if (OB_ISNULL(dev_handle_)) {
} else if (OB_ISNULL(dev_handle_) || OB_ISNULL(bandwidth_throttle_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dev handle should not be null", K(ret));
LOG_WARN("dev handle or bandwidth throttle should not be null", K(ret), KP_(dev_handle), KP_(bandwidth_throttle));
} else if (OB_FAIL(dev_handle_->pwrite(io_fd_, offset, data_buffer_.length(), data_buffer_.data(), write_size))) {
LOG_WARN("failed to write data buffer", K(ret), K(data_buffer_));
} else if (data_buffer_.length() != write_size) {
ret = OB_IO_ERROR;
LOG_WARN("write length not equal buffer length", K(offset), K(data_buffer_.length()), K(write_size));
} else {
file_size_ += write_size;
data_buffer_.reuse();
int64_t bytes = write_size;
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE ? : OB_SUCCESS;
if (OB_FAIL(ret)) {
STORAGE_LOG(ERROR, "fake EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE", K(ret));
// in case of errsim, that the bytes be larger to trigger bandwidth throttle
bytes = write_size * 10;
ret = OB_SUCCESS;
}
}
#endif
if (OB_FAIL(bandwidth_throttle_->limit_out_and_sleep(bytes, last_active_time_, INT64_MAX))) {
LOG_WARN("failed to limit out and sleep", K(ret));
} else {
file_size_ += write_size;
data_buffer_.reuse();
last_active_time_ = ObTimeUtility::current_time();
}
}
return ret;
}

View File

@ -14,6 +14,7 @@
#define STORAGE_LOG_STREAM_BACKUP_FILE_WRITER_CTX_H_
#include "common/storage/ob_io_device.h"
#include "storage/blocksstable/ob_data_buffer.h"
#include "lib/utility/utility.h"
namespace oceanbase {
namespace backup {
@ -22,7 +23,8 @@ struct ObBackupFileWriteCtx {
public:
ObBackupFileWriteCtx();
virtual ~ObBackupFileWriteCtx();
int open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &device_handle);
int open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &device_handle,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
bool is_opened() const
{
return is_inited_;
@ -47,6 +49,8 @@ private:
common::ObIOFd io_fd_;
common::ObIODevice *dev_handle_;
blocksstable::ObSelfBufferWriter data_buffer_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
int64_t last_active_time_;
DISALLOW_COPY_AND_ASSIGN(ObBackupFileWriteCtx);
};

View File

@ -600,10 +600,12 @@ int ObIBackupIndexMerger::open_file_writer_(const share::ObBackupPath &path, con
return ret;
}
int ObIBackupIndexMerger::prepare_file_write_ctx_(ObBackupFileWriteCtx &write_ctx)
int ObIBackupIndexMerger::prepare_file_write_ctx_(
common::ObInOutBandwidthThrottle &bandwidth_throttle,
ObBackupFileWriteCtx &write_ctx)
{
int ret = OB_SUCCESS;
if (OB_FAIL(write_ctx.open(OB_MAX_BACKUP_FILE_SIZE, io_fd_, *dev_handle_))) {
if (OB_FAIL(write_ctx.open(OB_MAX_BACKUP_FILE_SIZE, io_fd_, *dev_handle_, bandwidth_throttle))) {
LOG_WARN("failed to open backup file write ctx", K(ret));
}
return ret;
@ -727,7 +729,8 @@ ObBackupMacroBlockIndexMerger::~ObBackupMacroBlockIndexMerger()
reset();
}
int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy)
int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
const ObBackupBlockType block_type = BACKUP_BLOCK_MACRO_DATA;
@ -743,7 +746,7 @@ int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_par
LOG_WARN("failed to ensure space", K(ret));
} else if (OB_FAIL(buffer_node_.init(merge_param.tenant_id_, block_type, node_level))) {
LOG_WARN("failed to init buffer node", K(ret), K(merge_param), K(block_type), K(node_level));
} else if (OB_FAIL(prepare_merge_ctx_(merge_param, sql_proxy))) {
} else if (OB_FAIL(prepare_merge_ctx_(merge_param, sql_proxy, bandwidth_throttle))) {
LOG_WARN("failed to prepare merge ctx", K(ret), K(merge_param));
} else if (OB_FAIL(write_backup_file_header_(file_type))) {
LOG_WARN("failed to write backup file header", K(ret));
@ -848,7 +851,8 @@ int ObBackupMacroBlockIndexMerger::merge_index()
}
int ObBackupMacroBlockIndexMerger::prepare_merge_ctx_(
const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy)
const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
ObArray<ObBackupRetryDesc> retry_list;
@ -869,7 +873,7 @@ int ObBackupMacroBlockIndexMerger::prepare_merge_ctx_(
LOG_WARN("failed to get output file path", K(ret), K(merge_param));
} else if (OB_FAIL(open_file_writer_(backup_path, merge_param.backup_dest_.get_storage_info()))) {
LOG_WARN("failed to prepare file writer", K(ret), K(backup_path), K(merge_param));
} else if (OB_FAIL(prepare_file_write_ctx_(write_ctx_))) {
} else if (OB_FAIL(prepare_file_write_ctx_(bandwidth_throttle, write_ctx_))) {
LOG_WARN("failed to prepare file write ctx", K(ret));
}
return ret;
@ -1257,7 +1261,7 @@ ObBackupMetaIndexMerger::~ObBackupMetaIndexMerger()
}
int ObBackupMetaIndexMerger::init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta,
common::ObISQLClient &sql_proxy)
common::ObISQLClient &sql_proxy, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
const ObBackupBlockType block_type = BACKUP_BLOCK_META_DATA;
@ -1273,7 +1277,7 @@ int ObBackupMetaIndexMerger::init(const ObBackupIndexMergeParam &merge_param, co
LOG_WARN("failed to ensure space", K(ret));
} else if (OB_FAIL(buffer_node_.init(merge_param.tenant_id_, block_type, node_level))) {
LOG_WARN("failed to init buffer node", K(ret), K(merge_param), K(block_type), K(node_level));
} else if (OB_FAIL(prepare_merge_ctx_(merge_param, is_sec_meta, sql_proxy))) {
} else if (OB_FAIL(prepare_merge_ctx_(merge_param, is_sec_meta, sql_proxy, bandwidth_throttle))) {
LOG_WARN("failed to prepare merge ctx", K(ret), K(merge_param), K(is_sec_meta));
} else if (OB_FAIL(write_backup_file_header_(file_type))) {
LOG_WARN("failed to write backup file header", K(ret));
@ -1369,7 +1373,8 @@ int ObBackupMetaIndexMerger::merge_index()
}
int ObBackupMetaIndexMerger::prepare_merge_ctx_(
const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy)
const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta,
common::ObISQLClient &sql_proxy, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
ObArray<ObBackupRetryDesc> retry_list;
@ -1390,7 +1395,7 @@ int ObBackupMetaIndexMerger::prepare_merge_ctx_(
LOG_WARN("failed to get output file path", K(ret), K(merge_param));
} else if (OB_FAIL(open_file_writer_(backup_path, merge_param.backup_dest_.get_storage_info()))) {
LOG_WARN("failed to prepare file writer", K(ret), K(backup_path), K(merge_param));
} else if (OB_FAIL(prepare_file_write_ctx_(write_ctx_))) {
} else if (OB_FAIL(prepare_file_write_ctx_(bandwidth_throttle, write_ctx_))) {
LOG_WARN("failed to prepare file write ctx", K(ret));
}
return ret;

View File

@ -165,7 +165,7 @@ protected:
virtual int get_all_retries_(const int64_t task_id, const uint64_t tenant_id, const share::ObBackupDataType &backup_data_type,
const share::ObLSID &ls_id, common::ObISQLClient &sql_proxy, common::ObIArray<ObBackupRetryDesc> &retry_list);
int open_file_writer_(const share::ObBackupPath &backup_path, const share::ObBackupStorageInfo *storage_info);
int prepare_file_write_ctx_(ObBackupFileWriteCtx &write_ctx);
int prepare_file_write_ctx_(common::ObInOutBandwidthThrottle &bandwidth_throttle, ObBackupFileWriteCtx &write_ctx);
template <class IndexType>
int encode_index_to_buffer_(const common::ObIArray<IndexType> &index_list, blocksstable::ObBufferWriter &buffer_writer);
template <class IndexType, class IndexIndexType>
@ -196,7 +196,8 @@ class ObBackupMacroBlockIndexMerger : public ObIBackupIndexMerger {
public:
ObBackupMacroBlockIndexMerger();
virtual ~ObBackupMacroBlockIndexMerger();
int init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy);
int init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
void reset();
virtual int merge_index() override;
@ -205,7 +206,8 @@ protected:
const ObBackupRetryDesc &desc, ObIMacroBlockIndexIterator *&iter);
private:
int prepare_merge_ctx_(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy);
int prepare_merge_ctx_(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
int prepare_merge_iters_(const ObBackupIndexMergeParam &merge_param,
const common::ObIArray<ObBackupRetryDesc> &retry_list, common::ObISQLClient &sql_proxy,
MERGE_ITER_ARRAY &merge_iters);
@ -243,7 +245,8 @@ class ObBackupMetaIndexMerger : public ObIBackupIndexMerger {
public:
ObBackupMetaIndexMerger();
virtual ~ObBackupMetaIndexMerger();
int init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy);
int init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
void reset();
virtual int merge_index() override;
@ -253,7 +256,8 @@ protected:
private:
int prepare_merge_ctx_(
const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy);
const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy,
common::ObInOutBandwidthThrottle &bandwidth_throttle);
int prepare_merge_iters_(const ObBackupIndexMergeParam &merge_param,
const common::ObIArray<ObBackupRetryDesc> &retry_list, const bool is_sec_meta, MERGE_ITER_ARRAY &merge_iters);
int get_unfinished_iters_(const MERGE_ITER_ARRAY &merge_iters, MERGE_ITER_ARRAY &unfinished_iters);

View File

@ -569,7 +569,7 @@ int ObLSBackupDataDagNet::inner_init_before_run_()
if (OB_FAIL(param_.convert_to(backup_param))) {
LOG_WARN("failed to convert param", K(param_));
} else if (OB_FAIL(ls_backup_ctx_.open(backup_param, backup_data_type_,
*param_.report_ctx_.sql_proxy_, OB_BACKUP_INDEX_CACHE))) {
*param_.report_ctx_.sql_proxy_, OB_BACKUP_INDEX_CACHE, *GCTX.bandwidth_throttle_))) {
LOG_WARN("failed to open log stream backup ctx", K(ret), K(backup_param));
} else if (OB_FAIL(prepare_backup_tablet_provider_(backup_param, backup_data_type_, ls_backup_ctx_,
OB_BACKUP_INDEX_CACHE, *param_.report_ctx_.sql_proxy_, provider_))) {
@ -1030,8 +1030,9 @@ int ObLSBackupComplementLogDagNet::start_running()
param_.retry_id_,
compl_start_scn_,
compl_end_scn_,
report_ctx_,
is_only_calc_stat_,
report_ctx_))) {
*GCTX.bandwidth_throttle_))) {
LOG_WARN("failed to init child dag", K(ret), K_(param), K_(compl_start_scn), K_(compl_end_scn));
} else if (OB_FAIL(complement_dag->create_first_task())) {
LOG_WARN("failed to create first task for child dag", K(ret), KPC(complement_dag));
@ -1843,7 +1844,8 @@ ObLSBackupComplementLogDag::ObLSBackupComplementLogDag()
compl_start_scn_(),
compl_end_scn_(),
is_only_calc_stat_(false),
report_ctx_()
report_ctx_(),
bandwidth_throttle_(NULL)
{}
ObLSBackupComplementLogDag::~ObLSBackupComplementLogDag()
@ -1852,7 +1854,7 @@ ObLSBackupComplementLogDag::~ObLSBackupComplementLogDag()
int ObLSBackupComplementLogDag::init(const ObBackupJobDesc &job_desc, const ObBackupDest &backup_dest,
const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id,
const int64_t turn_id, const int64_t retry_id, const SCN &start_scn, const SCN &end_scn,
const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx)
const ObBackupReportCtx &report_ctx, const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -1884,6 +1886,7 @@ int ObLSBackupComplementLogDag::init(const ObBackupJobDesc &job_desc, const ObBa
compl_end_scn_ = end_scn;
report_ctx_ = report_ctx;
is_only_calc_stat_ = is_only_calc_stat;
bandwidth_throttle_ = &bandwidth_throttle;
is_inited_ = true;
}
return ret;
@ -1896,7 +1899,7 @@ int ObLSBackupComplementLogDag::create_first_task()
if (OB_FAIL(alloc_task(task))) {
LOG_WARN("failed to alloc task", K(ret));
} else if (OB_FAIL(task->init(job_desc_, backup_dest_, tenant_id_, dest_id_, backup_set_desc_, ls_id_, compl_start_scn_,
compl_end_scn_, turn_id_, retry_id_, is_only_calc_stat_, report_ctx_))) {
compl_end_scn_, turn_id_, retry_id_, report_ctx_, is_only_calc_stat_, *bandwidth_throttle_))) {
LOG_WARN("failed to init task", K(ret), K_(tenant_id), K_(backup_set_desc), K_(ls_id), K_(compl_start_scn), K_(compl_end_scn));
} else if (OB_FAIL(add_task(*task))) {
LOG_WARN("failed to add task", K(ret));
@ -2576,8 +2579,8 @@ int ObLSBackupDataTask::init(const int64_t task_id, const share::ObBackupDataTyp
LOG_WARN("get invalid args", K(ret), K(task_id), K(param), K(report_ctx));
} else if (OB_FAIL(backup_items_.assign(backup_items))) {
LOG_WARN("failed to assign", K(ret));
} else if (OB_FAIL(backup_data_ctx_.open(param, backup_data_type, task_id))) {
LOG_WARN("failed to open backup data ctx", K(ret), K(param), K(backup_data_type));
} else if (OB_FAIL(backup_data_ctx_.open(param, backup_data_type, task_id, *ls_backup_ctx.bandwidth_throttle_))) {
LOG_WARN("failed to open backup data ctx", K(ret), K(param), K(backup_data_type), KP(ls_backup_ctx.bandwidth_throttle_));
} else if (OB_FAIL(disk_checker_.init(ls_backup_ctx.get_tablet_holder()))) {
LOG_WARN("failed to init disk checker", K(ret));
} else if (OB_FAIL(param_.assign(param))) {
@ -3851,7 +3854,7 @@ int ObLSBackupMetaTask::backup_ls_meta_and_tablet_metas_(const uint64_t tenant_i
LOG_WARN("log stream not exist", K(ret), K(ls_id));
} else if (OB_FAIL(share::ObBackupPathUtil::construct_backup_set_dest(param_.backup_dest_, param_.backup_set_desc_, backup_set_dest))) {
LOG_WARN("failed to construct backup set dest", K(ret), K(param_));
} else if (OB_FAIL(writer.init(backup_set_dest, param_.ls_id_, param_.turn_id_, param_.retry_id_))) {
} else if (OB_FAIL(writer.init(backup_set_dest, param_.ls_id_, param_.turn_id_, param_.retry_id_, *ls_backup_ctx_->bandwidth_throttle_))) {
LOG_WARN("failed to init tablet info writer", K(ret));
} else {
const int64_t WAIT_GC_LOCK_TIMEOUT = 30 * 60 * 1000 * 1000; // 30 min TODO(zeyong) optimization timeout later 4.3
@ -4712,7 +4715,7 @@ int ObBackupIndexRebuildTask::merge_macro_index_()
ObBackupMacroBlockIndexMerger macro_index_merger;
if (OB_FAIL(param_.convert_to(index_level_, merge_param))) {
LOG_WARN("failed to convert param", K(ret), K_(param), K_(index_level));
} else if (OB_FAIL(macro_index_merger.init(merge_param, *report_ctx_.sql_proxy_))) {
} else if (OB_FAIL(macro_index_merger.init(merge_param, *report_ctx_.sql_proxy_, *GCTX.bandwidth_throttle_))) {
LOG_WARN("failed to init index merger", K(ret), K(merge_param));
} else if (OB_FAIL(macro_index_merger.merge_index())) {
LOG_WARN("failed to merge macro index", K(ret), K_(param));
@ -4729,7 +4732,7 @@ int ObBackupIndexRebuildTask::merge_meta_index_(const bool is_sec_meta)
ObBackupMetaIndexMerger meta_index_merger;
if (OB_FAIL(param_.convert_to(index_level_, merge_param))) {
LOG_WARN("failed to convert param", K(ret), K_(param), K_(index_level));
} else if (OB_FAIL(meta_index_merger.init(merge_param, is_sec_meta, *report_ctx_.sql_proxy_))) {
} else if (OB_FAIL(meta_index_merger.init(merge_param, is_sec_meta, *report_ctx_.sql_proxy_, *GCTX.bandwidth_throttle_))) {
LOG_WARN("failed to init index merger", K(ret), K(merge_param));
} else if (OB_FAIL(meta_index_merger.merge_index())) {
LOG_WARN("failed to merge meta index", K(ret), K_(param));
@ -4950,7 +4953,9 @@ ObLSBackupComplementLogTask::ObLSBackupComplementLogTask()
turn_id_(0),
retry_id_(-1),
archive_dest_(),
report_ctx_()
report_ctx_(),
bandwidth_throttle_(NULL),
last_active_time_(0)
{}
ObLSBackupComplementLogTask::~ObLSBackupComplementLogTask()
@ -4959,7 +4964,7 @@ ObLSBackupComplementLogTask::~ObLSBackupComplementLogTask()
int ObLSBackupComplementLogTask::init(const ObBackupJobDesc &job_desc, const ObBackupDest &backup_dest,
const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id,
const SCN &start_scn, const SCN &end_scn, const int64_t turn_id, const int64_t retry_id,
const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx)
const ObBackupReportCtx &report_ctx, const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -4991,6 +4996,7 @@ int ObLSBackupComplementLogTask::init(const ObBackupJobDesc &job_desc, const ObB
retry_id_ = retry_id;
is_only_calc_stat_ = is_only_calc_stat;
report_ctx_ = report_ctx;
bandwidth_throttle_ = &bandwidth_throttle;
is_inited_ = true;
}
return ret;
@ -5814,6 +5820,10 @@ int ObLSBackupComplementLogTask::inner_transfer_clog_file_(const ObBackupPath &s
LOG_WARN("read len not expected", K(ret), K(read_len), K(transfer_len));
} else if (OB_FAIL(device_handle->pwrite(fd, dst_len, transfer_len, buf, write_size))) {
LOG_WARN("failed to write multipart upload file", K(ret));
} else if (OB_FAIL(bandwidth_throttle_->limit_out_and_sleep(write_size, last_active_time_, INT64_MAX))) {
LOG_WARN("failed to limit out and sleep", K(ret));
} else {
last_active_time_ = ObTimeUtility::current_time();
}
return ret;
}

View File

@ -203,6 +203,7 @@ private:
share::SCN compl_start_scn_;
share::SCN compl_end_scn_;
bool is_only_calc_stat_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDagNet);
};
@ -374,8 +375,8 @@ public:
virtual ~ObLSBackupComplementLogDag();
int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id,
const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const int64_t turn_id,
const int64_t retry_id, const share::SCN &start_scn, const share::SCN &end_scn,
const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx);
const int64_t retry_id, const share::SCN &start_scn, const share::SCN &end_scn, const ObBackupReportCtx &report_ctx,
const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle);
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
@ -399,6 +400,7 @@ private:
share::SCN compl_end_scn_;
bool is_only_calc_stat_;
ObBackupReportCtx report_ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDag);
};
@ -693,8 +695,8 @@ public:
virtual ~ObLSBackupComplementLogTask();
int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id,
const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const share::SCN &start_scn,
const share::SCN &end_scn, const int64_t turn_id, const int64_t retry_id,
const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx);
const share::SCN &end_scn, const int64_t turn_id, const int64_t retry_id, const ObBackupReportCtx &report_ctx,
const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle);
virtual int process() override;
private:
@ -767,6 +769,8 @@ private:
share::ObBackupDest archive_dest_;
bool is_only_calc_stat_;
ObBackupReportCtx report_ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
int64_t last_active_time_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogTask);
};

View File

@ -385,7 +385,7 @@ void TestBackupIndexMerger::fake_init_meta_index_merger_(ObFakeBackupMetaIndexMe
ObBackupIndexMergeParam merge_param;
build_backup_index_merge_param_(merge_param);
const bool is_sec_meta = false;
ret = merger.init(merge_param, is_sec_meta, sql_proxy);
ret = merger.init(merge_param, is_sec_meta, sql_proxy, throttle_);
EXPECT_EQ(OB_SUCCESS, ret);
}
@ -398,7 +398,7 @@ void TestBackupIndexMerger::fake_init_macro_index_merger_(ObFakeBackupMacroIndex
const int64_t file_count = 80;
const int64_t per_file_item_count = 1024;
merger.set_count(file_count, per_file_item_count);
ret = merger.init(merge_param, sql_proxy);
ret = merger.init(merge_param, sql_proxy, throttle_);
EXPECT_EQ(OB_SUCCESS, ret);
}

View File

@ -68,6 +68,7 @@ protected:
common::ObInOutBandwidthThrottle throttle_;
char test_dir_[OB_MAX_URI_LENGTH];
char test_dir_uri_[OB_MAX_URI_LENGTH];
ObInOutBandwidthThrottle bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(TestBackupIndexIterator);
};
@ -125,6 +126,7 @@ void TestBackupIndexIterator::SetUp()
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
inner_init_();
ASSERT_EQ(OB_SUCCESS, bandwidth_throttle_.init(1024 * 1024 * 60));
}
void TestBackupIndexIterator::TearDown()
@ -175,7 +177,7 @@ void TestBackupIndexIterator::fake_init_macro_index_merger_(const int64_t file_c
ObBackupIndexMergeParam merge_param;
build_backup_index_merge_param_(merge_param);
merger.set_count(file_count, per_file_item_count);
ret = merger.init(merge_param, sql_proxy);
ret = merger.init(merge_param, sql_proxy, bandwidth_throttle_);
ASSERT_EQ(OB_SUCCESS, ret);
}