fix migration copy ddl sstable issue
This commit is contained in:
committed by
wangzelin.wzl
parent
4132ca4695
commit
c8de32d7f2
@ -417,6 +417,9 @@ class ObString;
|
||||
ACT(BEFORE_BACKUP_TASK_FINISH,)\
|
||||
ACT(BEFORE_UPDATE_LS_META_TABLE,)\
|
||||
ACT(BEFORE_LOAD_ARCHIVE_ROUND,)\
|
||||
ACT(BEFORE_MIG_DDL_TABLE_MERGE_TASK,)\
|
||||
ACT(BEFORE_COPY_DDL_SSTABLE,)\
|
||||
ACT(BEFORE_DDL_WRITE_PREPARE_LOG,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
||||
@ -1291,6 +1291,21 @@ DEF_STR_LIST(sanity_whitelist, OB_CLUSTER_PARAMETER, "", "vip who wouldn't leadi
|
||||
DEF_TIME(_advance_checkpoint_timeout, OB_CLUSTER_PARAMETER, "30m", "[10s,180m]",
|
||||
"the timeout for backup/migrate advance checkpoint Range: [10s,180m]",
|
||||
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
ERRSIM_DEF_INT(errsim_migration_tablet_id, OB_CLUSTER_PARAMETER, "0", "[0,)",
|
||||
"the tablet id that migration want to insert error"
|
||||
"Range: [0,) in integer",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
ERRSIM_DEF_INT(errsim_max_ddl_sstable_count, OB_CLUSTER_PARAMETER, "0", "[0,)",
|
||||
"max ddl sstable count in errsim mode"
|
||||
"Range: [0,) in integer",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
ERRSIM_DEF_INT(errsim_max_ddl_block_count, OB_CLUSTER_PARAMETER, "0", "[0,)",
|
||||
"max ddl block count in errsim mode"
|
||||
"Range: [0,) in integer",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
ERRSIM_DEF_STR(errsim_migration_src_server_addr, OB_CLUSTER_PARAMETER, "",
|
||||
"the server dest ls choose as src when in errsim mode",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
||||
DEF_BOOL(enable_cgroup, OB_CLUSTER_PARAMETER, "True",
|
||||
"when set to false, cgroup will not init; when set to true but cgroup root dir is not ready, print ERROR",
|
||||
|
||||
@ -288,7 +288,13 @@ int ObDDLTableMergeTask::init(const ObDDLTableMergeDagParam &ddl_dag_param)
|
||||
int ObDDLTableMergeTask::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t MAX_DDL_SSTABLE = 128;
|
||||
int64_t MAX_DDL_SSTABLE = 128;
|
||||
#ifdef ERRSIM
|
||||
if (0 != GCONF.errsim_max_ddl_sstable_count) {
|
||||
MAX_DDL_SSTABLE = GCONF.errsim_max_ddl_sstable_count;
|
||||
LOG_INFO("set max ddl sstable in errsim mode", K(MAX_DDL_SSTABLE));
|
||||
}
|
||||
#endif
|
||||
LOG_INFO("ddl merge task start process", K(*this));
|
||||
ObTabletHandle tablet_handle;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
@ -312,6 +318,14 @@ int ObDDLTableMergeTask::process()
|
||||
LOG_WARN("get ddl sstable handles failed", K(ret));
|
||||
} else if (ddl_sstable_handles.get_count() >= MAX_DDL_SSTABLE || merge_param_.is_commit_) {
|
||||
DEBUG_SYNC(BEFORE_DDL_TABLE_MERGE_TASK);
|
||||
#ifdef ERRSIM
|
||||
static int64_t counter = 0;
|
||||
counter++;
|
||||
if (counter >= 2) {
|
||||
DEBUG_SYNC(BEFORE_MIG_DDL_TABLE_MERGE_TASK);
|
||||
}
|
||||
#endif
|
||||
|
||||
ObTabletDDLParam ddl_param;
|
||||
ObTableHandleV2 table_handle;
|
||||
bool is_data_complete = false;
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/tx/ob_ts_mgr.h"
|
||||
#include "storage/ddl/ob_tablet_ddl_kv_mgr.h"
|
||||
#include "observer/ob_server_event_history_table_operator.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase::storage;
|
||||
@ -1018,6 +1019,14 @@ int ObDDLSSTableRedoWriter::write_prepare_log(const ObITable::TableKey &table_ke
|
||||
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_SYNC_ADD("storage_ddl", "before_write_prepare_log",
|
||||
"table_key", table_key,
|
||||
"table_id", table_id,
|
||||
"execution_id", execution_id,
|
||||
"ddl_task_id", ddl_task_id);
|
||||
DEBUG_SYNC(BEFORE_DDL_WRITE_PREPARE_LOG);
|
||||
#endif
|
||||
prepare_log_ts = 0;
|
||||
ObLS *ls = nullptr;
|
||||
ObDDLPrepareLog log;
|
||||
|
||||
@ -210,6 +210,12 @@ int ObDDLKV::set_macro_block(const ObDDLMacroBlock ¯o_block)
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t MAX_DDL_BLOCK_COUNT = 10L * 1024L * 1024L * 1024L / OB_SERVER_BLOCK_MGR.get_macro_block_size();
|
||||
int64_t freeze_block_count = MAX_DDL_BLOCK_COUNT;
|
||||
#ifdef ERRSIM
|
||||
if (0 != GCONF.errsim_max_ddl_block_count) {
|
||||
freeze_block_count = GCONF.errsim_max_ddl_block_count;
|
||||
LOG_INFO("ddl set macro block count", K(freeze_block_count));
|
||||
}
|
||||
#endif
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ddl kv is not init", K(ret));
|
||||
|
||||
@ -1085,8 +1085,8 @@ int ObStartMigrationTask::choose_src_()
|
||||
int64_t local_clog_checkpoint_ts = 0;
|
||||
if (OB_FAIL(get_local_ls_checkpoint_ts_(local_clog_checkpoint_ts))) {
|
||||
LOG_WARN("failed to get local ls checkpoint ts", K(ret));
|
||||
} else if (OB_FAIL(src_provider.init(tenant_id, storage_rpc_))) {
|
||||
LOG_WARN("failed to init src provider", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(src_provider.init(tenant_id, ctx_->arg_.type_, storage_rpc_))) {
|
||||
LOG_WARN("failed to init src provider", K(ret), K(tenant_id), "type", ctx_->arg_.type_);
|
||||
} else if (OB_FAIL(src_provider.choose_ob_src(ls_id, local_clog_checkpoint_ts, src_info))) {
|
||||
LOG_WARN("failed to choose ob src", K(ret), K(tenant_id), K(ls_id), K(local_clog_checkpoint_ts));
|
||||
} else if (OB_FAIL(fetch_ls_info_(tenant_id, ls_id, src_info.src_addr_, ls_info))) {
|
||||
|
||||
@ -18,13 +18,18 @@
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
|
||||
ObStorageHASrcProvider::ObStorageHASrcProvider() : is_inited_(false), tenant_id_(OB_INVALID_ID), storage_rpc_(nullptr)
|
||||
ObStorageHASrcProvider::ObStorageHASrcProvider()
|
||||
: is_inited_(false),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
type_(ObMigrationOpType::MAX_LS_OP),
|
||||
storage_rpc_(nullptr)
|
||||
{}
|
||||
|
||||
ObStorageHASrcProvider::~ObStorageHASrcProvider()
|
||||
{}
|
||||
|
||||
int ObStorageHASrcProvider::init(const uint64_t tenant_id, storage::ObStorageRpc *storage_rpc)
|
||||
int ObStorageHASrcProvider::init(const uint64_t tenant_id, const ObMigrationOpType::TYPE &type,
|
||||
storage::ObStorageRpc *storage_rpc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
@ -35,6 +40,7 @@ int ObStorageHASrcProvider::init(const uint64_t tenant_id, storage::ObStorageRpc
|
||||
LOG_WARN("get invalid argument", K(ret), K(tenant_id), K(ls_id), KP(storage_rpc));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
type_ = type;
|
||||
storage_rpc_ = storage_rpc;
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -62,8 +68,27 @@ int ObStorageHASrcProvider::choose_ob_src(const share::ObLSID &ls_id, const int6
|
||||
} else {
|
||||
src_info.src_addr_ = chosen_src_addr;
|
||||
src_info.cluster_id_ = GCONF.cluster_id;
|
||||
SERVER_EVENT_ADD(
|
||||
"storage_ha", "choose_src", "tenant_id", tenant_id_, "ls_id", ls_id.id(), "src_addr", chosen_src_addr);
|
||||
#ifdef ERRSIM
|
||||
if (ObMigrationOpType::ADD_LS_OP == type_ || ObMigrationOpType::MIGRATE_LS_OP == type_) {
|
||||
const ObString &errsim_server = GCONF.errsim_migration_src_server_addr.str();
|
||||
if (!errsim_server.empty()) {
|
||||
common::ObAddr tmp_errsim_addr;
|
||||
if (OB_FAIL(tmp_errsim_addr.parse_from_string(errsim_server))) {
|
||||
LOG_WARN("failed to parse from string", K(ret), K(errsim_server));
|
||||
} else {
|
||||
src_info.src_addr_ = tmp_errsim_addr;
|
||||
src_info.cluster_id_ = GCONF.cluster_id;
|
||||
LOG_INFO("storage ha choose errsim src", K(tmp_errsim_addr));
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
SERVER_EVENT_ADD("storage_ha", "choose_src",
|
||||
"tenant_id", tenant_id_,
|
||||
"ls_id", ls_id.id(),
|
||||
"src_addr", src_info.src_addr_,
|
||||
"op_type", ObMigrationOpType::get_str(type_));
|
||||
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -24,7 +24,7 @@ class ObStorageHASrcProvider {
|
||||
public:
|
||||
ObStorageHASrcProvider();
|
||||
virtual ~ObStorageHASrcProvider();
|
||||
int init(const uint64_t tenant_id, storage::ObStorageRpc *storage_rpc);
|
||||
int init(const uint64_t tenant_id, const ObMigrationOpType::TYPE &type, storage::ObStorageRpc *storage_rpc);
|
||||
int choose_ob_src(const share::ObLSID &ls_id, const int64_t local_clog_checkpoint_ts,
|
||||
ObStorageHASrcInfo &src_info);
|
||||
|
||||
@ -41,6 +41,7 @@ private:
|
||||
private:
|
||||
bool is_inited_;
|
||||
uint64_t tenant_id_;
|
||||
ObMigrationOpType::TYPE type_;
|
||||
storage::ObStorageRpc *storage_rpc_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObStorageHASrcProvider);
|
||||
};
|
||||
|
||||
@ -533,6 +533,15 @@ int ObStorageHATabletsBuilder::build_copy_tablet_sstable_info_arg_(
|
||||
ObTablet *tablet = nullptr;
|
||||
arg.reset();
|
||||
|
||||
#ifdef ERRSIM
|
||||
const int64_t errsim_tablet_id = GCONF.errsim_migration_tablet_id;
|
||||
if (errsim_tablet_id == tablet_id.id()) {
|
||||
SERVER_EVENT_SYNC_ADD("storage_ha", "before_copy_ddl_sstable",
|
||||
"tablet_id", tablet_id);
|
||||
DEBUG_SYNC(BEFORE_COPY_DDL_SSTABLE);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("storage ha tablets builder do not init", K(ret));
|
||||
@ -703,6 +712,14 @@ int ObStorageHATabletsBuilder::get_need_copy_ddl_sstable_range_(
|
||||
need_copy_log_ts_range.start_log_ts_ = ddl_start_log_ts;
|
||||
need_copy_log_ts_range.end_log_ts_ = ddl_checkpoint_ts;
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
LOG_INFO("ddl checkpoint pushed", K(ddl_checkpoint_pushed), K(ddl_sstable_array), K(ddl_start_log_ts), K(ddl_checkpoint_ts));
|
||||
SERVER_EVENT_SYNC_ADD("storage_ha", "get_need_copy_ddl_sstable_range",
|
||||
"tablet_id", tablet->get_tablet_meta().tablet_id_,
|
||||
"dest_ddl_checkpoint_pushed", ddl_checkpoint_pushed,
|
||||
"start_log_ts", need_copy_log_ts_range.start_log_ts_,
|
||||
"end_log_ts", need_copy_log_ts_range.end_log_ts_);
|
||||
#endif
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("checkpoint ts should be greater than start ts",
|
||||
@ -718,12 +735,15 @@ int ObStorageHATabletsBuilder::get_ddl_sstable_min_start_log_ts_(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObITable *> sstables;
|
||||
min_start_log_ts = 0;
|
||||
min_start_log_ts = INT64_MAX;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("storage ha tables builder do not init", K(ret));
|
||||
} else if (ddl_sstable_array.count() > 0 && OB_FAIL(ddl_sstable_array.get_all_tables(sstables))) {
|
||||
} else if (ddl_sstable_array.empty()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ddl sstable should not be empty", K(ret));
|
||||
} else if (OB_FAIL(ddl_sstable_array.get_all_tables(sstables))) {
|
||||
LOG_WARN("failed to get all tables", K(ret), K(param_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
|
||||
|
||||
Reference in New Issue
Block a user