From 4f8426eb0baafdc2a81ac2bc8336baccb383aabb Mon Sep 17 00:00:00 2001 From: oceanoverflow Date: Thu, 27 Jun 2024 05:02:28 +0000 Subject: [PATCH] Fix backup do not support bandwidth throttle --- .../test_ls_tablet_info_writer_and_reader.cpp | 4 ++- src/storage/backup/ob_backup_ctx.cpp | 15 ++++---- src/storage/backup/ob_backup_ctx.h | 12 ++++--- .../backup/ob_backup_extern_info_mgr.cpp | 7 ++-- .../backup/ob_backup_extern_info_mgr.h | 6 ++-- .../backup/ob_backup_file_writer_ctx.cpp | 35 +++++++++++++++---- .../backup/ob_backup_file_writer_ctx.h | 6 +++- src/storage/backup/ob_backup_index_merger.cpp | 25 +++++++------ src/storage/backup/ob_backup_index_merger.h | 14 +++++--- src/storage/backup/ob_backup_task.cpp | 34 +++++++++++------- src/storage/backup/ob_backup_task.h | 12 ++++--- .../backup/test_backup_index_merger.cpp | 4 +-- .../storage/backup/test_backup_iterator.cpp | 4 ++- 13 files changed, 121 insertions(+), 57 deletions(-) diff --git a/mittest/mtlenv/storage/test_ls_tablet_info_writer_and_reader.cpp b/mittest/mtlenv/storage/test_ls_tablet_info_writer_and_reader.cpp index ed54b7353..8ce7b4b33 100644 --- a/mittest/mtlenv/storage/test_ls_tablet_info_writer_and_reader.cpp +++ b/mittest/mtlenv/storage/test_ls_tablet_info_writer_and_reader.cpp @@ -202,10 +202,12 @@ void TestLSTabletInfoWR::fill_tablet_meta() TEST_F(TestLSTabletInfoWR, testTabletInfoWriterAndReader) { int ret = OB_SUCCESS; + ObInOutBandwidthThrottle bandwidth_throttle; + ASSERT_EQ(OB_SUCCESS, bandwidth_throttle.init(1024 * 1024 * 60)); LOG_INFO("test tablet info", K(tablet_metas.count()), K(backup_set_dest_)); backup::ObExternTabletMetaWriter writer; backup::ObExternTabletMetaReader reader; - ASSERT_EQ(OB_SUCCESS, writer.init(backup_set_dest_, ObLSID(TEST_LS_ID), 1, 0)); + ASSERT_EQ(OB_SUCCESS, writer.init(backup_set_dest_, ObLSID(TEST_LS_ID), 1, 0, bandwidth_throttle)); for (int i = 0; i < tablet_metas.count(); i++) { blocksstable::ObSelfBufferWriter buffer_writer("TestBuff"); blocksstable::ObBufferReader buffer_reader; diff --git a/src/storage/backup/ob_backup_ctx.cpp b/src/storage/backup/ob_backup_ctx.cpp index aaeb83235..e3702ef3f 100644 --- a/src/storage/backup/ob_backup_ctx.cpp +++ b/src/storage/backup/ob_backup_ctx.cpp @@ -291,7 +291,7 @@ ObBackupDataCtx::~ObBackupDataCtx() } int ObBackupDataCtx::open(const ObLSBackupDataParam ¶m, const share::ObBackupDataType &backup_data_type, - const int64_t file_id) + const int64_t file_id, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; static const int64_t BUF_SIZE = 4 * 1024 * 1024; @@ -315,7 +315,7 @@ int ObBackupDataCtx::open(const ObLSBackupDataParam ¶m, const share::ObBacku file_id_ = file_id; file_offset_ = 0; backup_data_type_ = backup_data_type; - if (OB_FAIL(prepare_file_write_ctx_(param, backup_data_type, file_id))) { + if (OB_FAIL(prepare_file_write_ctx_(param, backup_data_type, file_id, bandwidth_throttle))) { LOG_WARN("failed to prepare file write ctx", K(ret), K(param), K(backup_data_type), K(file_id)); } else { is_inited_ = true; @@ -442,7 +442,8 @@ int ObBackupDataCtx::open_file_writer_(const share::ObBackupPath &backup_path) } int ObBackupDataCtx::prepare_file_write_ctx_( - const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id) + const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id, + common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; share::ObBackupPath backup_path; @@ -451,7 +452,7 @@ int ObBackupDataCtx::prepare_file_write_ctx_( LOG_WARN("failed to get macro block backup path", K(ret), K(file_id)); } else if (OB_FAIL(open_file_writer_(backup_path))) { LOG_WARN("failed to open file writer", K(ret), K(backup_path)); - } else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_))) { + } else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_, bandwidth_throttle))) { LOG_WARN("failed to open file write ctx", K(ret), K(param), K(type), K(backup_path), K(data_file_size), K(file_id)); } return ret; @@ -855,7 +856,8 @@ ObLSBackupCtx::ObLSBackupCtx() rebuild_seq_(), check_tablet_info_cost_time_(), backup_tx_table_filled_tx_scn_(share::SCN::min_scn()), - tablet_checker_() + tablet_checker_(), + bandwidth_throttle_(NULL) {} ObLSBackupCtx::~ObLSBackupCtx() @@ -865,7 +867,7 @@ ObLSBackupCtx::~ObLSBackupCtx() int ObLSBackupCtx::open( const ObLSBackupParam ¶m, const share::ObBackupDataType &backup_data_type, - common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache) + common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; ObArray tablet_list; @@ -898,6 +900,7 @@ int ObLSBackupCtx::open( sql_proxy_ = &sql_proxy; rebuild_seq_ = 0; check_tablet_info_cost_time_ = 0; + bandwidth_throttle_ = &bandwidth_throttle; is_inited_ = true; if (OB_FAIL(prepare_tablet_id_reader_(reader))) { LOG_WARN("failed to prepare tablet id reader", K(ret), K(param)); diff --git a/src/storage/backup/ob_backup_ctx.h b/src/storage/backup/ob_backup_ctx.h index 590c0432b..12681b787 100644 --- a/src/storage/backup/ob_backup_ctx.h +++ b/src/storage/backup/ob_backup_ctx.h @@ -133,7 +133,8 @@ struct ObBackupDataCtx { public: ObBackupDataCtx(); virtual ~ObBackupDataCtx(); - int open(const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id); + int open(const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id, + common::ObInOutBandwidthThrottle &bandwidth_throttle); int write_backup_file_header(const ObBackupFileHeader &file_header); int write_macro_block_data(const blocksstable::ObBufferReader ¯o_data, const blocksstable::ObLogicMacroBlockId &logic_id, ObBackupMacroBlockIndex ¯o_index); @@ -157,7 +158,8 @@ private: } int open_file_writer_(const share::ObBackupPath &backup_path); int prepare_file_write_ctx_( - const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id); + const ObLSBackupDataParam ¶m, const share::ObBackupDataType &type, const int64_t file_id, + common::ObInOutBandwidthThrottle &bandwidth_throttle); int get_macro_block_backup_path_(const int64_t file_id, share::ObBackupPath &backup_path); int write_macro_block_data_(const blocksstable::ObBufferReader &buffer, const blocksstable::ObLogicMacroBlockId &logic_id, ObBackupMacroBlockIndex ¯o_index); @@ -229,8 +231,9 @@ struct ObLSBackupCtx { public: ObLSBackupCtx(); virtual ~ObLSBackupCtx(); - int open(const ObLSBackupParam ¶m, const share::ObBackupDataType &backup_data_type, - common::ObMySQLProxy &sql_proxy, ObBackupIndexKVCache &index_kv_cache); + int open( + const ObLSBackupParam ¶m, const share::ObBackupDataType &backup_data_type, common::ObMySQLProxy &sql_proxy, + ObBackupIndexKVCache &index_kv_cache, common::ObInOutBandwidthThrottle &bandwidth_throttle); int next(common::ObTabletID &tablet_id); void set_backup_data_type(const share::ObBackupDataType &backup_data_type); int set_tablet(const common::ObTabletID &tablet_id, ObBackupTabletHandleRef *tablet_handle); @@ -316,6 +319,7 @@ public: int64_t check_tablet_info_cost_time_; share::SCN backup_tx_table_filled_tx_scn_; ObBackupTabletChecker tablet_checker_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; DISALLOW_COPY_AND_ASSIGN(ObLSBackupCtx); }; diff --git a/src/storage/backup/ob_backup_extern_info_mgr.cpp b/src/storage/backup/ob_backup_extern_info_mgr.cpp index 1c3fade7b..064f690fb 100644 --- a/src/storage/backup/ob_backup_extern_info_mgr.cpp +++ b/src/storage/backup/ob_backup_extern_info_mgr.cpp @@ -339,7 +339,7 @@ int64_t ObTabletInfoTrailer::get_serialize_size_() const int ObExternTabletMetaWriter::init( const share::ObBackupDest &backup_set_dest, const share::ObLSID &ls_id, - const int64_t turn_id, const int64_t retry_id) + const int64_t turn_id, const int64_t retry_id, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; const int64_t start_file_id = 1; @@ -357,6 +357,7 @@ int ObExternTabletMetaWriter::init( ls_id_ = ls_id; turn_id_ = turn_id; retry_id_ = retry_id; + bandwidth_throttle_ = &bandwidth_throttle; if (OB_FAIL(prepare_backup_file_(start_file_id))) { LOG_WARN("failed to prepare backup file", K(ret), K(start_file_id)); } else { @@ -381,8 +382,8 @@ int ObExternTabletMetaWriter::prepare_backup_file_(const int64_t file_id) } else if (OB_FAIL(util.open_with_access_type( dev_handle_, io_fd_, backup_set_dest_.get_storage_info(), backup_path.get_obstr(), access_type))) { LOG_WARN("failed to open with access type", K(ret), K(backup_set_dest_), K(backup_path)); - } else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_))) { - LOG_WARN("failed to open file write ctx", K(ret), K(backup_path), K(data_file_size), K(file_id)); + } else if (OB_FAIL(file_write_ctx_.open(data_file_size, io_fd_, *dev_handle_, *bandwidth_throttle_))) { + LOG_WARN("failed to open file write ctx", K(ret), K(backup_path), K(data_file_size), K(file_id), KP_(bandwidth_throttle)); } else { file_trailer_.reset(); file_trailer_.file_id_ = file_id; diff --git a/src/storage/backup/ob_backup_extern_info_mgr.h b/src/storage/backup/ob_backup_extern_info_mgr.h index c6fe56384..714ab6546 100644 --- a/src/storage/backup/ob_backup_extern_info_mgr.h +++ b/src/storage/backup/ob_backup_extern_info_mgr.h @@ -91,10 +91,11 @@ public: static const int64_t BUF_SIZE = 4 * 1024 * 1024; static const int64_t TRAILER_BUF = 1024; ObExternTabletMetaWriter(): is_inited_(false), backup_set_dest_(), ls_id_(), turn_id_(0), retry_id_(0), - io_fd_(0), dev_handle_(nullptr), file_write_ctx_(), file_trailer_(), tmp_buffer_("BackupExtInfo") {}; + io_fd_(0), dev_handle_(nullptr), file_write_ctx_(), file_trailer_(), tmp_buffer_("BackupExtInfo"), + bandwidth_throttle_(NULL) {}; ~ObExternTabletMetaWriter() {}; int init(const share::ObBackupDest &backup_set_dest, const share::ObLSID &ls_id, - const int64_t turn_id, const int64_t retry_id); + const int64_t turn_id, const int64_t retry_id, common::ObInOutBandwidthThrottle &bandwidth_throttle); int write_meta_data(const blocksstable::ObBufferReader &meta_data, const common::ObTabletID &tablet_id); int close(); private: @@ -124,6 +125,7 @@ private: ObBackupFileWriteCtx file_write_ctx_; ObTabletInfoTrailer file_trailer_; blocksstable::ObSelfBufferWriter tmp_buffer_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; DISALLOW_COPY_AND_ASSIGN(ObExternTabletMetaWriter); }; diff --git a/src/storage/backup/ob_backup_file_writer_ctx.cpp b/src/storage/backup/ob_backup_file_writer_ctx.cpp index 8c2750702..745ef5c66 100644 --- a/src/storage/backup/ob_backup_file_writer_ctx.cpp +++ b/src/storage/backup/ob_backup_file_writer_ctx.cpp @@ -17,6 +17,8 @@ namespace oceanbase { namespace backup { +ERRSIM_POINT_DEF(EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE); + /* ObBackupFileWriteCtx */ ObBackupFileWriteCtx::ObBackupFileWriteCtx() @@ -25,13 +27,16 @@ ObBackupFileWriteCtx::ObBackupFileWriteCtx() max_file_size_(0), io_fd_(), dev_handle_(NULL), - data_buffer_("BackupFileWriteCtx") + data_buffer_("BackupFileWriteCtx"), + bandwidth_throttle_(NULL), + last_active_time_(0) {} ObBackupFileWriteCtx::~ObBackupFileWriteCtx() {} -int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &dev_handle) +int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &dev_handle, + common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -47,6 +52,7 @@ int ObBackupFileWriteCtx::open(const int64_t max_file_size, const common::ObIOFd max_file_size_ = max_file_size; io_fd_ = io_fd; dev_handle_ = &dev_handle; + bandwidth_throttle_ = &bandwidth_throttle; is_inited_ = true; } return ret; @@ -109,17 +115,34 @@ int ObBackupFileWriteCtx::flush_buffer_(const bool is_last_part) const int64_t offset = file_size_; if (!check_can_flush_(is_last_part)) { LOG_DEBUG("can not flush now", K(is_last_part), K(data_buffer_)); - } else if (OB_ISNULL(dev_handle_)) { + } else if (OB_ISNULL(dev_handle_) || OB_ISNULL(bandwidth_throttle_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("dev handle should not be null", K(ret)); + LOG_WARN("dev handle or bandwidth throttle should not be null", K(ret), KP_(dev_handle), KP_(bandwidth_throttle)); } else if (OB_FAIL(dev_handle_->pwrite(io_fd_, offset, data_buffer_.length(), data_buffer_.data(), write_size))) { LOG_WARN("failed to write data buffer", K(ret), K(data_buffer_)); } else if (data_buffer_.length() != write_size) { ret = OB_IO_ERROR; LOG_WARN("write length not equal buffer length", K(offset), K(data_buffer_.length()), K(write_size)); } else { - file_size_ += write_size; - data_buffer_.reuse(); + int64_t bytes = write_size; +#ifdef ERRSIM + if (OB_SUCC(ret)) { + ret = EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE ? : OB_SUCCESS; + if (OB_FAIL(ret)) { + STORAGE_LOG(ERROR, "fake EN_BACKUP_TRIGGER_BANDWIDTH_THROTTLE", K(ret)); + // in case of errsim, that the bytes be larger to trigger bandwidth throttle + bytes = write_size * 10; + ret = OB_SUCCESS; + } + } +#endif + if (OB_FAIL(bandwidth_throttle_->limit_out_and_sleep(bytes, last_active_time_, INT64_MAX))) { + LOG_WARN("failed to limit out and sleep", K(ret)); + } else { + file_size_ += write_size; + data_buffer_.reuse(); + last_active_time_ = ObTimeUtility::current_time(); + } } return ret; } diff --git a/src/storage/backup/ob_backup_file_writer_ctx.h b/src/storage/backup/ob_backup_file_writer_ctx.h index bf69cecf3..bd2f8aa93 100644 --- a/src/storage/backup/ob_backup_file_writer_ctx.h +++ b/src/storage/backup/ob_backup_file_writer_ctx.h @@ -14,6 +14,7 @@ #define STORAGE_LOG_STREAM_BACKUP_FILE_WRITER_CTX_H_ #include "common/storage/ob_io_device.h" #include "storage/blocksstable/ob_data_buffer.h" +#include "lib/utility/utility.h" namespace oceanbase { namespace backup { @@ -22,7 +23,8 @@ struct ObBackupFileWriteCtx { public: ObBackupFileWriteCtx(); virtual ~ObBackupFileWriteCtx(); - int open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &device_handle); + int open(const int64_t max_file_size, const common::ObIOFd &io_fd, common::ObIODevice &device_handle, + common::ObInOutBandwidthThrottle &bandwidth_throttle); bool is_opened() const { return is_inited_; @@ -47,6 +49,8 @@ private: common::ObIOFd io_fd_; common::ObIODevice *dev_handle_; blocksstable::ObSelfBufferWriter data_buffer_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; + int64_t last_active_time_; DISALLOW_COPY_AND_ASSIGN(ObBackupFileWriteCtx); }; diff --git a/src/storage/backup/ob_backup_index_merger.cpp b/src/storage/backup/ob_backup_index_merger.cpp index 125f3f3a6..9445658dd 100644 --- a/src/storage/backup/ob_backup_index_merger.cpp +++ b/src/storage/backup/ob_backup_index_merger.cpp @@ -600,10 +600,12 @@ int ObIBackupIndexMerger::open_file_writer_(const share::ObBackupPath &path, con return ret; } -int ObIBackupIndexMerger::prepare_file_write_ctx_(ObBackupFileWriteCtx &write_ctx) +int ObIBackupIndexMerger::prepare_file_write_ctx_( + common::ObInOutBandwidthThrottle &bandwidth_throttle, + ObBackupFileWriteCtx &write_ctx) { int ret = OB_SUCCESS; - if (OB_FAIL(write_ctx.open(OB_MAX_BACKUP_FILE_SIZE, io_fd_, *dev_handle_))) { + if (OB_FAIL(write_ctx.open(OB_MAX_BACKUP_FILE_SIZE, io_fd_, *dev_handle_, bandwidth_throttle))) { LOG_WARN("failed to open backup file write ctx", K(ret)); } return ret; @@ -727,7 +729,8 @@ ObBackupMacroBlockIndexMerger::~ObBackupMacroBlockIndexMerger() reset(); } -int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy) +int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; const ObBackupBlockType block_type = BACKUP_BLOCK_MACRO_DATA; @@ -743,7 +746,7 @@ int ObBackupMacroBlockIndexMerger::init(const ObBackupIndexMergeParam &merge_par LOG_WARN("failed to ensure space", K(ret)); } else if (OB_FAIL(buffer_node_.init(merge_param.tenant_id_, block_type, node_level))) { LOG_WARN("failed to init buffer node", K(ret), K(merge_param), K(block_type), K(node_level)); - } else if (OB_FAIL(prepare_merge_ctx_(merge_param, sql_proxy))) { + } else if (OB_FAIL(prepare_merge_ctx_(merge_param, sql_proxy, bandwidth_throttle))) { LOG_WARN("failed to prepare merge ctx", K(ret), K(merge_param)); } else if (OB_FAIL(write_backup_file_header_(file_type))) { LOG_WARN("failed to write backup file header", K(ret)); @@ -848,7 +851,8 @@ int ObBackupMacroBlockIndexMerger::merge_index() } int ObBackupMacroBlockIndexMerger::prepare_merge_ctx_( - const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy) + const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; ObArray retry_list; @@ -869,7 +873,7 @@ int ObBackupMacroBlockIndexMerger::prepare_merge_ctx_( LOG_WARN("failed to get output file path", K(ret), K(merge_param)); } else if (OB_FAIL(open_file_writer_(backup_path, merge_param.backup_dest_.get_storage_info()))) { LOG_WARN("failed to prepare file writer", K(ret), K(backup_path), K(merge_param)); - } else if (OB_FAIL(prepare_file_write_ctx_(write_ctx_))) { + } else if (OB_FAIL(prepare_file_write_ctx_(bandwidth_throttle, write_ctx_))) { LOG_WARN("failed to prepare file write ctx", K(ret)); } return ret; @@ -1257,7 +1261,7 @@ ObBackupMetaIndexMerger::~ObBackupMetaIndexMerger() } int ObBackupMetaIndexMerger::init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, - common::ObISQLClient &sql_proxy) + common::ObISQLClient &sql_proxy, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; const ObBackupBlockType block_type = BACKUP_BLOCK_META_DATA; @@ -1273,7 +1277,7 @@ int ObBackupMetaIndexMerger::init(const ObBackupIndexMergeParam &merge_param, co LOG_WARN("failed to ensure space", K(ret)); } else if (OB_FAIL(buffer_node_.init(merge_param.tenant_id_, block_type, node_level))) { LOG_WARN("failed to init buffer node", K(ret), K(merge_param), K(block_type), K(node_level)); - } else if (OB_FAIL(prepare_merge_ctx_(merge_param, is_sec_meta, sql_proxy))) { + } else if (OB_FAIL(prepare_merge_ctx_(merge_param, is_sec_meta, sql_proxy, bandwidth_throttle))) { LOG_WARN("failed to prepare merge ctx", K(ret), K(merge_param), K(is_sec_meta)); } else if (OB_FAIL(write_backup_file_header_(file_type))) { LOG_WARN("failed to write backup file header", K(ret)); @@ -1369,7 +1373,8 @@ int ObBackupMetaIndexMerger::merge_index() } int ObBackupMetaIndexMerger::prepare_merge_ctx_( - const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy) + const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, + common::ObISQLClient &sql_proxy, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; ObArray retry_list; @@ -1390,7 +1395,7 @@ int ObBackupMetaIndexMerger::prepare_merge_ctx_( LOG_WARN("failed to get output file path", K(ret), K(merge_param)); } else if (OB_FAIL(open_file_writer_(backup_path, merge_param.backup_dest_.get_storage_info()))) { LOG_WARN("failed to prepare file writer", K(ret), K(backup_path), K(merge_param)); - } else if (OB_FAIL(prepare_file_write_ctx_(write_ctx_))) { + } else if (OB_FAIL(prepare_file_write_ctx_(bandwidth_throttle, write_ctx_))) { LOG_WARN("failed to prepare file write ctx", K(ret)); } return ret; diff --git a/src/storage/backup/ob_backup_index_merger.h b/src/storage/backup/ob_backup_index_merger.h index 918b423d0..b5fc93ef7 100644 --- a/src/storage/backup/ob_backup_index_merger.h +++ b/src/storage/backup/ob_backup_index_merger.h @@ -165,7 +165,7 @@ protected: virtual int get_all_retries_(const int64_t task_id, const uint64_t tenant_id, const share::ObBackupDataType &backup_data_type, const share::ObLSID &ls_id, common::ObISQLClient &sql_proxy, common::ObIArray &retry_list); int open_file_writer_(const share::ObBackupPath &backup_path, const share::ObBackupStorageInfo *storage_info); - int prepare_file_write_ctx_(ObBackupFileWriteCtx &write_ctx); + int prepare_file_write_ctx_(common::ObInOutBandwidthThrottle &bandwidth_throttle, ObBackupFileWriteCtx &write_ctx); template int encode_index_to_buffer_(const common::ObIArray &index_list, blocksstable::ObBufferWriter &buffer_writer); template @@ -196,7 +196,8 @@ class ObBackupMacroBlockIndexMerger : public ObIBackupIndexMerger { public: ObBackupMacroBlockIndexMerger(); virtual ~ObBackupMacroBlockIndexMerger(); - int init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy); + int init(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle); void reset(); virtual int merge_index() override; @@ -205,7 +206,8 @@ protected: const ObBackupRetryDesc &desc, ObIMacroBlockIndexIterator *&iter); private: - int prepare_merge_ctx_(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy); + int prepare_merge_ctx_(const ObBackupIndexMergeParam &merge_param, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle); int prepare_merge_iters_(const ObBackupIndexMergeParam &merge_param, const common::ObIArray &retry_list, common::ObISQLClient &sql_proxy, MERGE_ITER_ARRAY &merge_iters); @@ -243,7 +245,8 @@ class ObBackupMetaIndexMerger : public ObIBackupIndexMerger { public: ObBackupMetaIndexMerger(); virtual ~ObBackupMetaIndexMerger(); - int init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy); + int init(const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle); void reset(); virtual int merge_index() override; @@ -253,7 +256,8 @@ protected: private: int prepare_merge_ctx_( - const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy); + const ObBackupIndexMergeParam &merge_param, const bool is_sec_meta, common::ObISQLClient &sql_proxy, + common::ObInOutBandwidthThrottle &bandwidth_throttle); int prepare_merge_iters_(const ObBackupIndexMergeParam &merge_param, const common::ObIArray &retry_list, const bool is_sec_meta, MERGE_ITER_ARRAY &merge_iters); int get_unfinished_iters_(const MERGE_ITER_ARRAY &merge_iters, MERGE_ITER_ARRAY &unfinished_iters); diff --git a/src/storage/backup/ob_backup_task.cpp b/src/storage/backup/ob_backup_task.cpp index 4cbccfc9c..17c081cdf 100644 --- a/src/storage/backup/ob_backup_task.cpp +++ b/src/storage/backup/ob_backup_task.cpp @@ -569,7 +569,7 @@ int ObLSBackupDataDagNet::inner_init_before_run_() if (OB_FAIL(param_.convert_to(backup_param))) { LOG_WARN("failed to convert param", K(param_)); } else if (OB_FAIL(ls_backup_ctx_.open(backup_param, backup_data_type_, - *param_.report_ctx_.sql_proxy_, OB_BACKUP_INDEX_CACHE))) { + *param_.report_ctx_.sql_proxy_, OB_BACKUP_INDEX_CACHE, *GCTX.bandwidth_throttle_))) { LOG_WARN("failed to open log stream backup ctx", K(ret), K(backup_param)); } else if (OB_FAIL(prepare_backup_tablet_provider_(backup_param, backup_data_type_, ls_backup_ctx_, OB_BACKUP_INDEX_CACHE, *param_.report_ctx_.sql_proxy_, provider_))) { @@ -1030,8 +1030,9 @@ int ObLSBackupComplementLogDagNet::start_running() param_.retry_id_, compl_start_scn_, compl_end_scn_, + report_ctx_, is_only_calc_stat_, - report_ctx_))) { + *GCTX.bandwidth_throttle_))) { LOG_WARN("failed to init child dag", K(ret), K_(param), K_(compl_start_scn), K_(compl_end_scn)); } else if (OB_FAIL(complement_dag->create_first_task())) { LOG_WARN("failed to create first task for child dag", K(ret), KPC(complement_dag)); @@ -1843,7 +1844,8 @@ ObLSBackupComplementLogDag::ObLSBackupComplementLogDag() compl_start_scn_(), compl_end_scn_(), is_only_calc_stat_(false), - report_ctx_() + report_ctx_(), + bandwidth_throttle_(NULL) {} ObLSBackupComplementLogDag::~ObLSBackupComplementLogDag() @@ -1852,7 +1854,7 @@ ObLSBackupComplementLogDag::~ObLSBackupComplementLogDag() int ObLSBackupComplementLogDag::init(const ObBackupJobDesc &job_desc, const ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const int64_t turn_id, const int64_t retry_id, const SCN &start_scn, const SCN &end_scn, - const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx) + const ObBackupReportCtx &report_ctx, const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -1884,6 +1886,7 @@ int ObLSBackupComplementLogDag::init(const ObBackupJobDesc &job_desc, const ObBa compl_end_scn_ = end_scn; report_ctx_ = report_ctx; is_only_calc_stat_ = is_only_calc_stat; + bandwidth_throttle_ = &bandwidth_throttle; is_inited_ = true; } return ret; @@ -1896,7 +1899,7 @@ int ObLSBackupComplementLogDag::create_first_task() if (OB_FAIL(alloc_task(task))) { LOG_WARN("failed to alloc task", K(ret)); } else if (OB_FAIL(task->init(job_desc_, backup_dest_, tenant_id_, dest_id_, backup_set_desc_, ls_id_, compl_start_scn_, - compl_end_scn_, turn_id_, retry_id_, is_only_calc_stat_, report_ctx_))) { + compl_end_scn_, turn_id_, retry_id_, report_ctx_, is_only_calc_stat_, *bandwidth_throttle_))) { LOG_WARN("failed to init task", K(ret), K_(tenant_id), K_(backup_set_desc), K_(ls_id), K_(compl_start_scn), K_(compl_end_scn)); } else if (OB_FAIL(add_task(*task))) { LOG_WARN("failed to add task", K(ret)); @@ -2576,8 +2579,8 @@ int ObLSBackupDataTask::init(const int64_t task_id, const share::ObBackupDataTyp LOG_WARN("get invalid args", K(ret), K(task_id), K(param), K(report_ctx)); } else if (OB_FAIL(backup_items_.assign(backup_items))) { LOG_WARN("failed to assign", K(ret)); - } else if (OB_FAIL(backup_data_ctx_.open(param, backup_data_type, task_id))) { - LOG_WARN("failed to open backup data ctx", K(ret), K(param), K(backup_data_type)); + } else if (OB_FAIL(backup_data_ctx_.open(param, backup_data_type, task_id, *ls_backup_ctx.bandwidth_throttle_))) { + LOG_WARN("failed to open backup data ctx", K(ret), K(param), K(backup_data_type), KP(ls_backup_ctx.bandwidth_throttle_)); } else if (OB_FAIL(disk_checker_.init(ls_backup_ctx.get_tablet_holder()))) { LOG_WARN("failed to init disk checker", K(ret)); } else if (OB_FAIL(param_.assign(param))) { @@ -3851,7 +3854,7 @@ int ObLSBackupMetaTask::backup_ls_meta_and_tablet_metas_(const uint64_t tenant_i LOG_WARN("log stream not exist", K(ret), K(ls_id)); } else if (OB_FAIL(share::ObBackupPathUtil::construct_backup_set_dest(param_.backup_dest_, param_.backup_set_desc_, backup_set_dest))) { LOG_WARN("failed to construct backup set dest", K(ret), K(param_)); - } else if (OB_FAIL(writer.init(backup_set_dest, param_.ls_id_, param_.turn_id_, param_.retry_id_))) { + } else if (OB_FAIL(writer.init(backup_set_dest, param_.ls_id_, param_.turn_id_, param_.retry_id_, *ls_backup_ctx_->bandwidth_throttle_))) { LOG_WARN("failed to init tablet info writer", K(ret)); } else { const int64_t WAIT_GC_LOCK_TIMEOUT = 30 * 60 * 1000 * 1000; // 30 min TODO(zeyong) optimization timeout later 4.3 @@ -4712,7 +4715,7 @@ int ObBackupIndexRebuildTask::merge_macro_index_() ObBackupMacroBlockIndexMerger macro_index_merger; if (OB_FAIL(param_.convert_to(index_level_, merge_param))) { LOG_WARN("failed to convert param", K(ret), K_(param), K_(index_level)); - } else if (OB_FAIL(macro_index_merger.init(merge_param, *report_ctx_.sql_proxy_))) { + } else if (OB_FAIL(macro_index_merger.init(merge_param, *report_ctx_.sql_proxy_, *GCTX.bandwidth_throttle_))) { LOG_WARN("failed to init index merger", K(ret), K(merge_param)); } else if (OB_FAIL(macro_index_merger.merge_index())) { LOG_WARN("failed to merge macro index", K(ret), K_(param)); @@ -4729,7 +4732,7 @@ int ObBackupIndexRebuildTask::merge_meta_index_(const bool is_sec_meta) ObBackupMetaIndexMerger meta_index_merger; if (OB_FAIL(param_.convert_to(index_level_, merge_param))) { LOG_WARN("failed to convert param", K(ret), K_(param), K_(index_level)); - } else if (OB_FAIL(meta_index_merger.init(merge_param, is_sec_meta, *report_ctx_.sql_proxy_))) { + } else if (OB_FAIL(meta_index_merger.init(merge_param, is_sec_meta, *report_ctx_.sql_proxy_, *GCTX.bandwidth_throttle_))) { LOG_WARN("failed to init index merger", K(ret), K(merge_param)); } else if (OB_FAIL(meta_index_merger.merge_index())) { LOG_WARN("failed to merge meta index", K(ret), K_(param)); @@ -4950,7 +4953,9 @@ ObLSBackupComplementLogTask::ObLSBackupComplementLogTask() turn_id_(0), retry_id_(-1), archive_dest_(), - report_ctx_() + report_ctx_(), + bandwidth_throttle_(NULL), + last_active_time_(0) {} ObLSBackupComplementLogTask::~ObLSBackupComplementLogTask() @@ -4959,7 +4964,7 @@ ObLSBackupComplementLogTask::~ObLSBackupComplementLogTask() int ObLSBackupComplementLogTask::init(const ObBackupJobDesc &job_desc, const ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const SCN &start_scn, const SCN &end_scn, const int64_t turn_id, const int64_t retry_id, - const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx) + const ObBackupReportCtx &report_ctx, const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -4991,6 +4996,7 @@ int ObLSBackupComplementLogTask::init(const ObBackupJobDesc &job_desc, const ObB retry_id_ = retry_id; is_only_calc_stat_ = is_only_calc_stat; report_ctx_ = report_ctx; + bandwidth_throttle_ = &bandwidth_throttle; is_inited_ = true; } return ret; @@ -5814,6 +5820,10 @@ int ObLSBackupComplementLogTask::inner_transfer_clog_file_(const ObBackupPath &s LOG_WARN("read len not expected", K(ret), K(read_len), K(transfer_len)); } else if (OB_FAIL(device_handle->pwrite(fd, dst_len, transfer_len, buf, write_size))) { LOG_WARN("failed to write multipart upload file", K(ret)); + } else if (OB_FAIL(bandwidth_throttle_->limit_out_and_sleep(write_size, last_active_time_, INT64_MAX))) { + LOG_WARN("failed to limit out and sleep", K(ret)); + } else { + last_active_time_ = ObTimeUtility::current_time(); } return ret; } diff --git a/src/storage/backup/ob_backup_task.h b/src/storage/backup/ob_backup_task.h index c991f89ab..89694899c 100644 --- a/src/storage/backup/ob_backup_task.h +++ b/src/storage/backup/ob_backup_task.h @@ -203,6 +203,7 @@ private: share::SCN compl_start_scn_; share::SCN compl_end_scn_; bool is_only_calc_stat_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDagNet); }; @@ -374,8 +375,8 @@ public: virtual ~ObLSBackupComplementLogDag(); int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const int64_t turn_id, - const int64_t retry_id, const share::SCN &start_scn, const share::SCN &end_scn, - const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx); + const int64_t retry_id, const share::SCN &start_scn, const share::SCN &end_scn, const ObBackupReportCtx &report_ctx, + const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle); virtual int create_first_task() override; virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override; virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; @@ -399,6 +400,7 @@ private: share::SCN compl_end_scn_; bool is_only_calc_stat_; ObBackupReportCtx report_ctx_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDag); }; @@ -693,8 +695,8 @@ public: virtual ~ObLSBackupComplementLogTask(); int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const share::SCN &start_scn, - const share::SCN &end_scn, const int64_t turn_id, const int64_t retry_id, - const bool is_only_calc_stat, const ObBackupReportCtx &report_ctx); + const share::SCN &end_scn, const int64_t turn_id, const int64_t retry_id, const ObBackupReportCtx &report_ctx, + const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle); virtual int process() override; private: @@ -767,6 +769,8 @@ private: share::ObBackupDest archive_dest_; bool is_only_calc_stat_; ObBackupReportCtx report_ctx_; + common::ObInOutBandwidthThrottle *bandwidth_throttle_; + int64_t last_active_time_; DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogTask); }; diff --git a/unittest/storage/backup/test_backup_index_merger.cpp b/unittest/storage/backup/test_backup_index_merger.cpp index b0b93a450..b744d8da5 100644 --- a/unittest/storage/backup/test_backup_index_merger.cpp +++ b/unittest/storage/backup/test_backup_index_merger.cpp @@ -385,7 +385,7 @@ void TestBackupIndexMerger::fake_init_meta_index_merger_(ObFakeBackupMetaIndexMe ObBackupIndexMergeParam merge_param; build_backup_index_merge_param_(merge_param); const bool is_sec_meta = false; - ret = merger.init(merge_param, is_sec_meta, sql_proxy); + ret = merger.init(merge_param, is_sec_meta, sql_proxy, throttle_); EXPECT_EQ(OB_SUCCESS, ret); } @@ -398,7 +398,7 @@ void TestBackupIndexMerger::fake_init_macro_index_merger_(ObFakeBackupMacroIndex const int64_t file_count = 80; const int64_t per_file_item_count = 1024; merger.set_count(file_count, per_file_item_count); - ret = merger.init(merge_param, sql_proxy); + ret = merger.init(merge_param, sql_proxy, throttle_); EXPECT_EQ(OB_SUCCESS, ret); } diff --git a/unittest/storage/backup/test_backup_iterator.cpp b/unittest/storage/backup/test_backup_iterator.cpp index b04089ae1..e75bac649 100644 --- a/unittest/storage/backup/test_backup_iterator.cpp +++ b/unittest/storage/backup/test_backup_iterator.cpp @@ -68,6 +68,7 @@ protected: common::ObInOutBandwidthThrottle throttle_; char test_dir_[OB_MAX_URI_LENGTH]; char test_dir_uri_[OB_MAX_URI_LENGTH]; + ObInOutBandwidthThrottle bandwidth_throttle_; DISALLOW_COPY_AND_ASSIGN(TestBackupIndexIterator); }; @@ -125,6 +126,7 @@ void TestBackupIndexIterator::SetUp() tenant_ctx.set(io_service); ObTenantEnv::set_tenant(&tenant_ctx); inner_init_(); + ASSERT_EQ(OB_SUCCESS, bandwidth_throttle_.init(1024 * 1024 * 60)); } void TestBackupIndexIterator::TearDown() @@ -175,7 +177,7 @@ void TestBackupIndexIterator::fake_init_macro_index_merger_(const int64_t file_c ObBackupIndexMergeParam merge_param; build_backup_index_merge_param_(merge_param); merger.set_count(file_count, per_file_item_count); - ret = merger.init(merge_param, sql_proxy); + ret = merger.init(merge_param, sql_proxy, bandwidth_throttle_); ASSERT_EQ(OB_SUCCESS, ret); }