support concurrent plus archivelog

This commit is contained in:
oceanoverflow 2024-10-30 07:47:15 +00:00 committed by ob-robot
parent ff3b08f9c9
commit bc3b4f168a
11 changed files with 2645 additions and 1749 deletions

View File

@ -184,6 +184,7 @@ int ObBackupSetTaskMgr::process()
break;
}
case ObBackupStatus::Status::BACKUP_LOG: {
DEBUG_SYNC(BEFORE_BACKUP_COMPLEMENT_LOG);
if (OB_FAIL(backup_completing_log_())) {
LOG_WARN("[DATA_BACKUP]failed to backup completing log", K(ret), K(set_task_attr_));
}

View File

@ -416,12 +416,8 @@ int ObBackupDataBaseTask::set_optional_servers_(const ObIArray<common::ObAddr> &
uint64_t tenant_id = get_tenant_id();
share::ObLSTableOperator *lst_operator = GCTX.lst_operator_;
int64_t cluster_id = GCONF.cluster_id;
ObLSID server_ls_id = execute_on_sys_server_() ? ObLSID(ObLSID::SYS_LS_ID) : ls_id_;
if (nullptr == lst_operator) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("lst_operator ptr is null", K(ret));
} else if (OB_FAIL(lst_operator->get(cluster_id, tenant_id, server_ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
LOG_WARN("failed to get log stream info", K(ret), K(cluster_id), K(tenant_id), K(ls_id_));
if (OB_FAIL(get_ls_replica_array_(ls_info))) {
LOG_WARN("failed to get ls replica array", K(ret));
} else {
const ObLSInfo::ReplicaArray &replica_array = ls_info.get_replicas();
for (int i = 0; OB_SUCC(ret) && i < replica_array.count(); ++i) {
@ -464,6 +460,30 @@ int ObBackupDataBaseTask::set_optional_servers_(const ObIArray<common::ObAddr> &
return ret;
}
int ObBackupDataBaseTask::get_ls_replica_array_(ObLSInfo &ls_info)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
const int64_t cluster_id = GCONF.cluster_id;
share::ObLSTableOperator *lst_operator = GCTX.lst_operator_;
ObLSID server_ls_id = execute_on_sys_server_() ? ObLSID(ObLSID::SYS_LS_ID) : ls_id_;
if (nullptr == lst_operator) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("lst_operator ptr is null", K(ret));
} else if (OB_FAIL(lst_operator->get(cluster_id, tenant_id, server_ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
LOG_WARN("failed to get log stream info", K(ret), K(cluster_id), K(tenant_id), K(ls_id_));
} else if (0 != ls_info.get_replicas().count()) {
// do nothing
} else if (!server_ls_id.is_sys_ls() && fallback_to_sys_server_when_needed_()) {
LOG_INFO("fall back to sys server", K_(ls_id), K(ls_info));
server_ls_id = ObLSID(ObLSID::SYS_LS_ID);
if (OB_FAIL(lst_operator->get(cluster_id, tenant_id, server_ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
LOG_WARN("failed to get log stream info", K(ret), K(cluster_id), K(tenant_id), K(ls_id_));
}
}
return ret;
}
bool ObBackupDataBaseTask::check_replica_in_black_server_(const ObLSReplica &replica, const ObIArray<common::ObAddr> &black_servers)
{
bool is_in_black_servers = false;

View File

@ -273,6 +273,8 @@ private:
virtual int do_update_dst_and_doing_status_(common::ObISQLClient &sql_proxy, common::ObAddr &dst,
share::ObTaskId &trace_id) final override;
virtual bool execute_on_sys_server_() const { return false; }
virtual bool fallback_to_sys_server_when_needed_() const { return false; }
int get_ls_replica_array_(ObLSInfo &ls_info);
bool check_replica_in_black_server_(const share::ObLSReplica &replica, const ObIArray<common::ObAddr> &black_servers);
public:
INHERIT_TO_STRING_KV("ObBackupScheduleTask", ObBackupScheduleTask, K_(incarnation_id), K_(backup_set_id),
@ -322,7 +324,8 @@ public:
private:
int calc_start_replay_scn_(const share::ObBackupJobAttr &job_attr, const share::ObBackupSetTaskAttr &set_task_attr,
const share::ObBackupLSTaskAttr &ls_attr, share::SCN &scn);
bool execute_on_sys_server_() const override { return true; }
bool execute_on_sys_server_() const override { return false; }
virtual bool fallback_to_sys_server_when_needed_() const { return true; }
private:
DISALLOW_COPY_AND_ASSIGN(ObBackupComplLogTask);
};

View File

@ -146,7 +146,9 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_PREFETCH_BACKUP_INFO, ObDagPrio::DAG_PRIO_HA
false, 7, {"tenant_id", "backup_set_id", "backup_data_type", "ls_id", "turn_id", "retry_id", "task_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_INDEX_REBUILD, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_INDEX_REBUILD", "BACKUP",
false, 6, {"tenant_id", "backup_set_id", "backup_data_type", "ls_id", "turn_id", "retry_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_COMPLEMENT_LOG, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG", "BACKUP",
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_LS_LOG_GROUP, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG_LS_GROUP", "BACKUP",
false, 3, {"tenant_id", "backup_set_id", "ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BACKUP_LS_LOG, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "BACKUP_COMPLEMENT_LOG_LS", "BACKUP",
false, 3, {"tenant_id", "backup_set_id", "ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_INITIAL_BACKUP_FUSE, ObDagPrio::DAG_PRIO_HA_LOW, ObSysTaskType::BACKUP_TASK, "INITIAL_BACKUP_FUSE", "BACKUP",
false, 0, {})

View File

@ -235,6 +235,10 @@ public:
TASK_TYPE_VECTOR_INDEX_MEMDATA_SYNC = 80,
TASK_TYPE_DELETE_LOB_META_ROW = 81,
TASK_TYPE_TRANSFER_BUILD_TABLET_INFO = 82,
TASK_TYPE_BACKUP_LS_LOG_GROUP = 83,
TASK_TYPE_BACKUP_LS_LOG = 84,
TASK_TYPE_BACKUP_LS_LOG_FILE = 85,
TASK_TYPE_BACKUP_LS_LOG_FINISH = 86,
TASK_TYPE_MAX,
};

View File

@ -300,6 +300,7 @@ ob_set_subtarget(ob_storage backup
backup/ob_backup_operator.cpp
backup/ob_backup_extern_info_mgr.cpp
backup/ob_backup_restore_util.cpp
backup/ob_backup_complement_log.cpp
backup/ob_ls_backup_clean_mgr.cpp
backup/ob_backup_linked_item.cpp
backup/ob_backup_linked_block_writer.cpp

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,347 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef STORAGE_LOG_STREAM_BACKUP_COMPLEMENT_LOG_H_
#define STORAGE_LOG_STREAM_BACKUP_COMPLEMENT_LOG_H_
#include "lib/oblog/ob_log_module.h"
#include "lib/utility/ob_tracepoint.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "share/backup/ob_archive_struct.h"
#include "share/ob_common_rpc_proxy.h"
#include "share/ob_rs_mgr.h"
#include "share/scheduler/ob_tenant_dag_scheduler.h"
#include "share/scheduler/ob_dag_scheduler_config.h"
#include "storage/backup/ob_backup_factory.h"
#include "storage/backup/ob_backup_iterator.h"
#include "storage/backup/ob_backup_operator.h"
#include "storage/backup/ob_backup_utils.h"
#include "storage/backup/ob_backup_restore_util.h"
#include "storage/blocksstable/ob_data_buffer.h"
#include "storage/blocksstable/ob_macro_block_checker.h"
#include "storage/ls/ob_ls.h"
#include "storage/tablet/ob_tablet_iterator.h"
#include "storage/tx/ob_ts_mgr.h"
#include "storage/tx_storage/ob_ls_map.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/checkpoint/ob_checkpoint_executor.h"
#include "storage/meta_mem/ob_tablet_handle.h"
#include "logservice/archiveservice/ob_archive_file_utils.h"
#include "share/ls/ob_ls_table_operator.h"
#include "share/scheduler/ob_dag_warning_history_mgr.h"
#include "storage/ob_storage_rpc.h"
#include "storage/blocksstable/ob_logic_macro_id.h"
#include "share/backup/ob_backup_struct.h"
#include "share/backup/ob_archive_persist_helper.h"
#include "share/backup/ob_archive_path.h"
#include "share/backup/ob_archive_store.h"
#include "share/backup/ob_backup_data_table_operator.h"
#include "share/backup/ob_backup_connectivity.h"
#include "share/rc/ob_tenant_base.h"
#include "observer/omt/ob_tenant.h"
#include "storage/high_availability/ob_storage_ha_utils.h"
#include "storage/backup/ob_backup_task.h"
#include <algorithm>
namespace oceanbase {
namespace backup {
struct ObBackupPieceFile {
ObBackupPieceFile();
void reset();
int set(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const share::ObLSID &ls_id,
const int64_t file_id, const share::SCN &start_scn, const ObBackupPathString &path);
TO_STRING_KV(K_(dest_id), K_(round_id), K_(piece_id), K_(ls_id), K_(file_id), K_(path));
int64_t dest_id_;
int64_t round_id_;
int64_t piece_id_;
share::ObLSID ls_id_;
int64_t file_id_;
share::SCN start_scn_;
ObBackupPathString path_;
};
struct ObBackupComplementLogCtx final {
ObBackupJobDesc job_desc_;
share::ObBackupDest backup_dest_;
uint64_t tenant_id_;
int64_t dest_id_;
share::ObBackupSetDesc backup_set_desc_;
share::ObLSID ls_id_;
share::SCN compl_start_scn_;
share::SCN compl_end_scn_;
int64_t turn_id_;
int64_t retry_id_;
ObBackupReportCtx report_ctx_;
bool is_only_calc_stat_;
bool is_valid() const;
bool operator==(const ObBackupComplementLogCtx &other) const;
uint64_t calc_hash(uint64_t seed) const;
TO_STRING_KV(K_(backup_dest), K_(tenant_id), K_(dest_id), K_(backup_set_desc), K_(ls_id), K_(turn_id));
public:
int set_result(const int32_t result, const bool need_retry,
const enum share::ObDagType::ObDagTypeEnum type = ObDagType::DAG_TYPE_MAX);
bool is_failed() const;
int get_result(int32_t &result);
int check_allow_retry(bool &allow_retry);
private:
mutable lib::ObMutex mutex_;
ObStorageHAResultMgr result_mgr_;
};
class ObBackupPieceOp : public ObBaseDirEntryOperator {
public:
ObBackupPieceOp();
virtual int func(const dirent *entry) override;
int get_file_id_list(common::ObIArray<int64_t> &files) const;
TO_STRING_KV(K_(file_id_list));
private:
ObArray<int64_t> file_id_list_;
};
struct CompareArchivePiece {
bool operator()(const share::ObTenantArchivePieceAttr &lhs, const share::ObTenantArchivePieceAttr &rhs) const;
};
class ObBackupComplementLogDagNet : public ObBackupDagNet {
public:
ObBackupComplementLogDagNet();
virtual ~ObBackupComplementLogDagNet();
virtual int init_by_param(const share::ObIDagInitParam *param) override;
virtual int start_running() override;
virtual bool operator==(const share::ObIDagNet &other) const override;
virtual bool is_valid() const override;
virtual int64_t hash() const override;
virtual int fill_comment(char *buf, const int64_t buf_len) const override;
virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override;
INHERIT_TO_STRING_KV("ObIDagNet", share::ObIDagNet, K_(ctx));
private:
bool is_inited_;
ObBackupComplementLogCtx ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObBackupComplementLogDagNet);
};
class ObBackupLSLogGroupDag : public share::ObIDag {
public:
ObBackupLSLogGroupDag();
virtual ~ObBackupLSLogGroupDag();
int init(const share::ObLSID &ls_id, ObBackupComplementLogCtx *ctx, common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual bool operator == (const share::ObIDag &other) const override;
virtual int64_t hash() const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual lib::Worker::CompatMode get_compat_mode() const { return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
virtual bool is_ha_dag() const override { return true; }
protected:
bool is_inited_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogGroupDag);
};
class ObBackupLSLogGroupTask : public share::ObITask {
public:
ObBackupLSLogGroupTask();
virtual ~ObBackupLSLogGroupTask();
int init(const share::ObLSID &ls_id, ObBackupComplementLogCtx *ctx, common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual int process() override;
private:
int get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id);
int get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn);
int inner_get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array);
int get_next_ls_id_(share::ObLSID &ls_id);
int generate_ls_dag_();
int record_server_event_();
private:
bool is_inited_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
int64_t current_idx_;
common::ObArray<share::ObLSID> ls_ids_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogGroupTask);
};
class ObBackupLSLogDag : public share::ObIDag {
public:
ObBackupLSLogDag();
virtual ~ObBackupLSLogDag();
int init(const share::ObLSID &ls_id, ObBackupComplementLogCtx *ctx,
common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual bool operator == (const share::ObIDag &other) const override;
virtual int64_t hash() const override;
virtual lib::Worker::CompatMode get_compat_mode() const { return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
virtual bool is_ha_dag() const override { return true; }
protected:
bool is_inited_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogDag);
};
class ObBackupLSLogTask : public share::ObITask {
public:
ObBackupLSLogTask();
virtual ~ObBackupLSLogTask();
int init(const share::ObLSID &ls_id, ObBackupComplementLogCtx *ctx, common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual int process() override;
private:
int get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id);
int inner_process_(const int64_t archive_dest_id, const share::ObLSID &ls_id);
int deal_with_piece_meta_(const common::ObIArray<ObTenantArchivePieceAttr> &piece_list);
int inner_deal_with_piece_meta_(const ObTenantArchivePieceAttr &piece_attr);
int generate_ls_copy_task_(const bool is_only_calc_stat, const share::ObLSID &ls_id);
int get_complement_log_dir_path_(share::ObBackupPath &backup_path);
private:
int calc_backup_file_range_(const int64_t dest_id, const share::ObLSID &ls_id,
common::ObArray<ObTenantArchivePieceAttr> &piece_list, common::ObIArray<ObBackupPieceFile> &file_list);
int update_ls_task_stat_(const share::ObBackupStats &old_backup_stat,
const int64_t compl_log_file_count, share::ObBackupStats &new_backup_stat);
int report_complement_log_stat_(const common::ObIArray<ObBackupPieceFile> &file_list);
int get_piece_id_by_scn_(const uint64_t tenant_id, const int64_t dest_id, const share::SCN &scn, int64_t &piece_id);
int get_all_pieces_(const uint64_t tenant_id, const int64_t dest_id, const int64_t start_piece_id, const int64_t end_piece_id,
common::ObArray<share::ObTenantArchivePieceAttr> &piece_list);
int wait_pieces_frozen_(const common::ObArray<share::ObTenantArchivePieceAttr> &piece_list);
int wait_piece_frozen_(const share::ObTenantArchivePieceAttr &piece);
int check_piece_frozen_(const share::ObTenantArchivePieceAttr &piece, bool &is_frozen);
int get_all_piece_file_list_(const uint64_t tenant_id, const share::ObLSID &ls_id,
const common::ObIArray<share::ObTenantArchivePieceAttr> &piece_list, const share::SCN &start_scn, const share::SCN &end_scn,
common::ObIArray<ObBackupPieceFile> &piece_file_list);
int inner_get_piece_file_list_(const share::ObLSID &ls_id, const ObTenantArchivePieceAttr &piece_attr, common::ObIArray<ObBackupPieceFile> &piece_file_list);
int locate_archive_file_id_by_scn_(const ObTenantArchivePieceAttr &piece_attr, const share::ObLSID &ls_id, const SCN &scn, int64_t &file_id);
int get_file_in_between_(const int64_t start_file_id, const int64_t end_file_id, common::ObIArray<ObBackupPieceFile> &list);
int filter_file_id_smaller_than_(const int64_t file_id, common::ObIArray<ObBackupPieceFile> &list);
int filter_file_id_larger_than_(const int64_t file_id, common::ObIArray<ObBackupPieceFile> &list);
int get_src_backup_piece_dir_(const share::ObLSID &ls_id, const ObTenantArchivePieceAttr &piece_attr, share::ObBackupPath &backup_path);
private:
int write_format_file_();
int generate_format_desc_(share::ObBackupFormatDesc &format_desc);
int transform_and_copy_meta_file_(const ObTenantArchivePieceAttr &piece_attr);
// ls_file_info
int copy_ls_file_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// piece_file_info
int copy_piece_file_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// single_piece_info
int copy_single_piece_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// tenant_archive_piece_infos
int copy_tenant_archive_piece_infos_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// checkpoint_info
int copy_checkpoint_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// round_start
int copy_round_start_file_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// piece_start
int copy_piece_start_file_(const ObTenantArchivePieceAttr &piece_attr, const share::ObBackupDest &src, const share::ObBackupDest &dest);
int get_archive_backup_dest_(const ObBackupPathString &path, share::ObBackupDest &archive_dest);
int get_copy_src_and_dest_(const ObTenantArchivePieceAttr &piece_file, share::ObBackupDest &src, share::ObBackupDest &dest);
int record_server_event_();
private:
bool is_inited_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
common::ObArray<ObBackupPieceFile> file_list_;
share::ObBackupDest archive_dest_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogTask);
};
class ObBackupLSLogFinishTask;
class ObBackupLSLogFileTask : public share::ObITask
{
public:
ObBackupLSLogFileTask();
virtual ~ObBackupLSLogFileTask();
int init(const share::ObLSID &ls_id, common::ObInOutBandwidthThrottle *bandwidth_throttle,
ObBackupComplementLogCtx *ctx, ObBackupLSLogFinishTask *finish_task);
virtual int process() override;
virtual int generate_next_task(ObITask *&next_task) override;
private:
int build_backup_piece_file_info_(ObBackupLSLogFinishTask *finish_task);
int inner_process_(const ObBackupPieceFile &piece_file);
int inner_backup_complement_log_(const share::ObBackupPath &src_path, const share::ObBackupPath &dst_path);
int get_src_backup_file_path_(const ObBackupPieceFile &piece_file, share::ObBackupPath &backup_path);
int get_dst_backup_file_path_(const ObBackupPieceFile &piece_file, share::ObBackupPath &backup_path);
int transfer_clog_file_(const share::ObBackupPath &src_path, const share::ObBackupPath &dst_path);
int inner_transfer_clog_file_(const ObBackupPath &src_path, const ObBackupPath &dst_path,
ObIODevice *&device_handle, ObIOFd &fd, const int64_t dst_len, int64_t &transfer_len);
int get_transfer_length_(const int64_t delta_len, int64_t &transfer_len);
int get_file_length_(const common::ObString &path, const share::ObBackupStorageInfo *storage_info, int64_t &length);
int get_archive_backup_dest_(const ObBackupPathString &path, share::ObBackupDest &archive_dest);
int record_server_event_();
int report_progress_();
private:
bool is_inited_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
ObBackupLSLogFinishTask *finish_task_;
ObBackupPieceFile backup_piece_file_;
int64_t last_active_time_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
share::ObBackupDest archive_dest_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogFileTask);
};
class ObBackupLSLogFinishTask : public share::ObITask
{
public:
ObBackupLSLogFinishTask();
virtual ~ObBackupLSLogFinishTask();
int init(const share::ObLSID &ls_id, const common::ObIArray<ObBackupPieceFile> &file_list,
ObBackupComplementLogCtx *ctx);
virtual int process() override;
int check_is_iter_end(bool &is_iter_end);
int get_copy_file_info(ObBackupPieceFile &piece_file);
private:
int report_task_result_();
int record_server_event_();
private:
bool is_inited_;
ObMutex mutex_;
int64_t idx_;
share::ObLSID ls_id_;
ObBackupComplementLogCtx *ctx_;
common::ObArray<ObBackupPieceFile> file_list_;
DISALLOW_COPY_AND_ASSIGN(ObBackupLSLogFinishTask);
};
} // namespace backup
} // namespace oceanbase
#endif

View File

@ -18,6 +18,7 @@
#include "storage/backup/ob_backup_handler.h"
#include "share/backup/ob_backup_connectivity.h"
#include "storage/backup/ob_backup_fuse_tablet_dag.h"
#include "storage/backup/ob_backup_complement_log.h"
using namespace oceanbase::share;
using namespace oceanbase::storage;
@ -227,7 +228,7 @@ int ObBackupHandler::schedule_backup_complement_log_dag(const ObBackupJobDesc &j
LOG_WARN("failed to deep copy backup dest", K(ret), K(backup_dest));
} else if (OB_FAIL(ObBackupStorageInfoOperator::get_dest_id(*sql_proxy, tenant_id, backup_dest, param.dest_id_))) {
LOG_WARN("failed to get dest id", K(ret), K(backup_dest));
} else if (OB_FAIL(dag_scheduler->create_and_add_dag_net<ObLSBackupComplementLogDagNet>(&param))) {
} else if (OB_FAIL(dag_scheduler->create_and_add_dag_net<ObBackupComplementLogDagNet>(&param))) {
LOG_WARN("failed to create log stream backup dag net", K(ret), K(param));
} else {
FLOG_INFO("success to create log stream backup dag net", K(ret), K(param));

File diff suppressed because it is too large Load Diff

View File

@ -185,30 +185,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObBackupBuildTenantIndexDagNet);
};
class ObLSBackupComplementLogDagNet : public ObBackupDagNet {
public:
ObLSBackupComplementLogDagNet();
virtual ~ObLSBackupComplementLogDagNet();
virtual int init_by_param(const share::ObIDagInitParam *param) override;
virtual int start_running() override;
virtual bool operator==(const share::ObIDagNet &other) const override;
virtual bool is_valid() const override;
virtual int64_t hash() const override;
virtual int fill_comment(char *buf, const int64_t buf_len) const override;
virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override;
INHERIT_TO_STRING_KV("ObIDagNet", share::ObIDagNet, K_(param), K_(compl_start_scn), K_(compl_end_scn));
private:
bool is_inited_;
ObLSBackupDagInitParam param_;
ObBackupReportCtx report_ctx_;
share::SCN compl_start_scn_;
share::SCN compl_end_scn_;
bool is_only_calc_stat_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDagNet);
};
class ObLSBackupMetaDag : public share::ObIDag {
public:
ObLSBackupMetaDag();
@ -379,42 +355,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObLSBackupIndexRebuildDag);
};
class ObLSBackupComplementLogDag : public share::ObIDag {
public:
ObLSBackupComplementLogDag();
virtual ~ObLSBackupComplementLogDag();
int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id,
const int64_t dest_id, const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const int64_t turn_id,
const int64_t retry_id, const share::SCN &start_scn, const share::SCN &end_scn, const ObBackupReportCtx &report_ctx,
const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle);
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual bool operator==(const ObIDag &other) const override;
virtual int64_t hash() const override;
virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; }
virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; }
virtual bool is_ha_dag() const override { return true; }
private:
bool is_inited_;
ObBackupJobDesc job_desc_;
share::ObBackupDest backup_dest_;
uint64_t tenant_id_;
int64_t dest_id_;
share::ObBackupSetDesc backup_set_desc_;
share::ObLSID ls_id_;
int64_t turn_id_;
int64_t retry_id_;
share::SCN compl_start_scn_;
share::SCN compl_end_scn_;
bool is_only_calc_stat_;
ObBackupReportCtx report_ctx_;
lib::Worker::CompatMode compat_mode_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogDag);
};
class ObLSBackupMetaTask : public share::ObITask {
public:
ObLSBackupMetaTask();
@ -704,125 +644,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObLSBackupFinishTask);
};
class ObLSBackupComplementLogTask : public share::ObITask {
struct BackupPieceFile {
BackupPieceFile();
void reset();
int set(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const share::ObLSID &ls_id,
const int64_t file_id, const share::SCN &start_scn, const ObBackupPathString &path);
TO_STRING_KV(K_(dest_id), K_(round_id), K_(piece_id), K_(ls_id), K_(file_id), K_(path));
int64_t dest_id_;
int64_t round_id_;
int64_t piece_id_;
share::ObLSID ls_id_;
int64_t file_id_;
share::SCN start_scn_;
ObBackupPathString path_;
};
class BackupPieceOp : public ObBaseDirEntryOperator {
public:
BackupPieceOp();
virtual int func(const dirent *entry) override;
int get_file_id_list(common::ObIArray<int64_t> &files) const;
TO_STRING_KV(K_(file_id_list));
private:
ObArray<int64_t> file_id_list_;
};
struct CompareArchivePiece {
bool operator()(const share::ObTenantArchivePieceAttr &lhs, const share::ObTenantArchivePieceAttr &rhs) const;
};
public:
ObLSBackupComplementLogTask();
virtual ~ObLSBackupComplementLogTask();
int init(const ObBackupJobDesc &job_desc, const share::ObBackupDest &backup_dest, const uint64_t tenant_id, const int64_t dest_id,
const share::ObBackupSetDesc &backup_set_desc, const share::ObLSID &ls_id, const share::SCN &start_scn,
const share::SCN &end_scn, const int64_t turn_id, const int64_t retry_id, const ObBackupReportCtx &report_ctx,
const bool is_only_calc_stat, common::ObInOutBandwidthThrottle &bandwidth_throttle);
virtual int process() override;
private:
int get_newly_created_ls_in_piece_(const int64_t dest_id, const uint64_t tenant_id,
const share::SCN &start_scn, const share::SCN &end_scn, common::ObIArray<share::ObLSID> &ls_array);
int inner_process_(const int64_t archive_dest_id, const share::ObLSID &ls_id);
int get_complement_log_dir_path_(share::ObBackupPath &backup_path);
int write_format_file_();
int generate_format_desc_(share::ObBackupFormatDesc &format_desc);
int calc_backup_file_range_(const int64_t dest_id, const share::ObLSID &ls_id,
common::ObIArray<ObTenantArchivePieceAttr> &piece_list, common::ObIArray<BackupPieceFile> &file_list);
int get_active_round_dest_id_(const uint64_t tenant_id, int64_t &dest_id);
int get_piece_id_by_scn_(const uint64_t tenant_id, const int64_t dest_id, const share::SCN &scn, int64_t &piece_id);
int get_all_pieces_(const uint64_t tenant_id, const int64_t dest_id, const int64_t start_piece_id, const int64_t end_piece_id,
common::ObIArray<share::ObTenantArchivePieceAttr> &piece_list);
int wait_pieces_frozen_(const common::ObIArray<share::ObTenantArchivePieceAttr> &piece_list);
int wait_piece_frozen_(const share::ObTenantArchivePieceAttr &piece);
int check_piece_frozen_(const share::ObTenantArchivePieceAttr &piece, bool &is_frozen);
int get_all_piece_file_list_(const uint64_t tenant_id, const share::ObLSID &ls_id,
const common::ObIArray<share::ObTenantArchivePieceAttr> &piece_list, const share::SCN &start_scn, const share::SCN &end_scn,
common::ObIArray<BackupPieceFile> &piece_file_list);
int inner_get_piece_file_list_(const share::ObLSID &ls_id, const ObTenantArchivePieceAttr &piece_attr, common::ObIArray<BackupPieceFile> &piece_file_list);
int locate_archive_file_id_by_scn_(const ObTenantArchivePieceAttr &piece_attr, const share::ObLSID &ls_id, const SCN &scn, int64_t &file_id);
int get_file_in_between_(const int64_t start_file_id, const int64_t end_file_id, common::ObIArray<BackupPieceFile> &list);
int filter_file_id_smaller_than_(const int64_t file_id, common::ObIArray<BackupPieceFile> &list);
int filter_file_id_larger_than_(const int64_t file_id, common::ObIArray<BackupPieceFile> &list);
int get_src_backup_piece_dir_(const share::ObLSID &ls_id, const ObTenantArchivePieceAttr &piece_attr, share::ObBackupPath &backup_path);
int get_src_backup_file_path_(const BackupPieceFile &piece_file, share::ObBackupPath &backup_path);
int get_dst_backup_file_path_(const BackupPieceFile &piece_file, share::ObBackupPath &backup_path);
int update_ls_task_stat_(const share::ObBackupStats &old_backup_stat,
const int64_t compl_log_file_count, share::ObBackupStats &new_backup_stat);
int report_progress_();
int report_complement_log_stat_(const common::ObIArray<BackupPieceFile> &list);
int backup_complement_log_(const common::ObIArray<ObTenantArchivePieceAttr> &piece_list, const common::ObIArray<BackupPieceFile> &path_list);
int filter_file_for_piece_(const ObTenantArchivePieceAttr &piece, const common::ObIArray<BackupPieceFile> &path_list,
common::ObIArray<BackupPieceFile> &filter_file_list);
int backup_complement_log_for_piece_(const ObTenantArchivePieceAttr &piece, const common::ObIArray<BackupPieceFile> &path_list);
int inner_backup_complement_log_(const share::ObBackupPath &src_path, const share::ObBackupPath &dst_path);
int transfer_clog_file_(const share::ObBackupPath &src_path, const share::ObBackupPath &dst_path);
int inner_transfer_clog_file_(const ObBackupPath &src_path, const ObBackupPath &dst_path,
ObIODevice *&device_handle, ObIOFd &fd, const int64_t dst_len, int64_t &transfer_len);
int get_transfer_length_(const int64_t delta_len, int64_t &transfer_len);
int get_file_length_(const common::ObString &path, const share::ObBackupStorageInfo *storage_info, int64_t &length);
int get_copy_src_and_dest_(const ObTenantArchivePieceAttr &piece_attr, share::ObBackupDest &src, share::ObBackupDest &dest);
int transform_and_copy_meta_file_(const ObTenantArchivePieceAttr &piece_attr);
// ls_file_info
int copy_ls_file_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// piece_file_info
int copy_piece_file_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// single_piece_info
int copy_single_piece_info_(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// tenant_archive_piece_infos
int copy_tenant_archive_piece_infos(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// checkpoint_info
int copy_checkpoint_info(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// round_start
int copy_round_start_file(const ObTenantArchivePieceAttr &piece_attr, const share::ObArchiveStore &src_store, const share::ObArchiveStore &dest_store);
// piece_start
int copy_piece_start_file(const ObTenantArchivePieceAttr &piece_attr, const share::ObBackupDest &src, const share::ObBackupDest &dest);
int get_archive_backup_dest_(const ObBackupPathString &path, share::ObBackupDest &archive_dest);
private:
bool is_inited_;
ObBackupJobDesc job_desc_;
share::ObBackupDest backup_dest_;
uint64_t tenant_id_;
int64_t dest_id_;
share::ObBackupSetDesc backup_set_desc_;
share::ObLSID ls_id_;
share::SCN compl_start_scn_;
share::SCN compl_end_scn_;
int64_t turn_id_;
int64_t retry_id_;
share::ObBackupDest archive_dest_;
bool is_only_calc_stat_;
ObBackupReportCtx report_ctx_;
common::ObInOutBandwidthThrottle *bandwidth_throttle_;
int64_t last_active_time_;
DISALLOW_COPY_AND_ASSIGN(ObLSBackupComplementLogTask);
};
} // namespace backup
} // namespace oceanbase