4.0 add enable to read interface

This commit is contained in:
godyangfight 2022-11-10 08:05:59 +00:00 committed by wangzelin.wzl
parent c5c2d1cc61
commit d54e36b2f6
12 changed files with 443 additions and 52 deletions

View File

@ -84,13 +84,14 @@ void ObLSCompleteMigrationCtx::reuse()
ObLSCompleteMigrationParam::ObLSCompleteMigrationParam()
: arg_(),
task_id_(),
result_(OB_SUCCESS)
result_(OB_SUCCESS),
rebuild_seq_(0)
{
}
bool ObLSCompleteMigrationParam::is_valid() const
{
return arg_.is_valid() && !task_id_.is_invalid();
return arg_.is_valid() && !task_id_.is_invalid() && rebuild_seq_ >= 0;
}
void ObLSCompleteMigrationParam::reset()
@ -98,6 +99,7 @@ void ObLSCompleteMigrationParam::reset()
arg_.reset();
task_id_.reset();
result_ = OB_SUCCESS;
rebuild_seq_ = 0;
}
@ -129,6 +131,7 @@ int ObLSCompleteMigrationDagNet::init_by_param(const ObIDagInitParam *param)
ctx_.tenant_id_ = MTL_ID();
ctx_.arg_ = init_param->arg_;
ctx_.task_id_ = init_param->task_id_;
ctx_.rebuild_seq_ = init_param->rebuild_seq_;
if (OB_SUCCESS != init_param->result_) {
if (OB_FAIL(ctx_.set_result(init_param->result_, false /*allow_retry*/))) {
LOG_WARN("failed to set result", K(ret), KPC(init_param));
@ -335,7 +338,6 @@ int ObLSCompleteMigrationDagNet::update_migration_status_(ObLS *ls)
LOG_WARN("tenant dag scheduler has set stop, stop migration dag net", K(ret), K(ctx_));
break;
} else {
ObLSLockGuard lock_ls(ls, true/*rdlock*/);
if (OB_FAIL(ls->get_migration_status(current_migration_status))) {
LOG_WARN("failed to get migration status", K(ret), K(ctx_));
} else if (ctx_.is_failed()) {
@ -359,7 +361,7 @@ int ObLSCompleteMigrationDagNet::update_migration_status_(ObLS *ls)
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ls->set_migration_status_without_lock(new_migration_status))) {
} else if (OB_FAIL(ls->set_migration_status(new_migration_status, ctx_.rebuild_seq_))) {
LOG_WARN("failed to set migration status", K(ret), K(current_migration_status), K(new_migration_status), K(ctx_));
} else {
is_finish = true;
@ -1180,6 +1182,7 @@ int ObStartCompleteMigrationTask::update_ls_migration_status_hold_()
int ret = OB_SUCCESS;
ObLS *ls = nullptr;
const ObMigrationStatus hold_status = ObMigrationStatus::OB_MIGRATION_STATUS_HOLD;
int64_t rebuild_seq = 0;
if (!is_inited_) {
ret = OB_NOT_INIT;
@ -1189,7 +1192,7 @@ int ObStartCompleteMigrationTask::update_ls_migration_status_hold_()
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to change member list", K(ret), KP(ls));
} else if (OB_FAIL(ls->set_migration_status(hold_status))) {
} else if (OB_FAIL(ls->set_migration_status(hold_status, rebuild_seq))) {
LOG_WARN("failed to set migration status", K(ret), KPC(ls));
} else {
#ifdef ERRSIM

View File

@ -41,6 +41,7 @@ public:
int64_t start_ts_;
int64_t finish_ts_;
int64_t rebuild_seq_;
INHERIT_TO_STRING_KV(
"ObIHADagNetCtx", ObIHADagNetCtx,
@ -48,7 +49,8 @@ public:
K_(arg),
K_(task_id),
K_(start_ts),
K_(finish_ts));
K_(finish_ts),
K_(rebuild_seq));
private:
DISALLOW_COPY_AND_ASSIGN(ObLSCompleteMigrationCtx);
};
@ -61,10 +63,11 @@ public:
virtual bool is_valid() const override;
void reset();
VIRTUAL_TO_STRING_KV(K_(arg), K_(task_id), K_(result));
VIRTUAL_TO_STRING_KV(K_(arg), K_(task_id), K_(result), K_(rebuild_seq));
ObMigrationOpArg arg_;
share::ObTaskId task_id_;
int32_t result_;
int64_t rebuild_seq_;
};
class ObLSCompleteMigrationDagNet: public share::ObIDagNet

View File

@ -404,7 +404,6 @@ int ObLSMigrationHandler::add_ls_migration_task(
ObLSMigrationTask task;
task.task_id_ = task_id;
task.arg_ = arg;
if (OB_FAIL(task_list_.push_back(task))) {
LOG_WARN("failed to push task into list", K(ret), K(task));
} else {
@ -878,6 +877,7 @@ int ObLSMigrationHandler::schedule_complete_ls_dag_net_(
ObLSCompleteMigrationParam param;
param.arg_ = task.arg_;
param.task_id_ = task.task_id_;
param.rebuild_seq_ = ls_->get_rebuild_seq();
if (OB_FAIL(get_result_(result))) {
LOG_WARN("failed to get result", K(ret), KPC(ls_), K(task));

View File

@ -932,8 +932,7 @@ int ObLS::finish_slog_replay()
LOG_WARN("failed to trans fail status", K(ret), K(current_migration_status),
K(new_migration_status));
} else if (can_update_ls_meta(ls_meta_.ls_create_status_) &&
OB_FAIL(set_migration_status_without_lock(new_migration_status,
false /* no need write slog */))) {
OB_FAIL(ls_meta_.set_migration_status(new_migration_status, false /*no need write slog*/))) {
LOG_WARN("failed to set migration status", K(ret), K(new_migration_status));
} else if (is_need_gc()) {
LOG_INFO("this ls should be gc later", KPC(this));
@ -1368,5 +1367,110 @@ int ObLS::diagnose(DiagnoseInfo &info) const
STORAGE_LOG(INFO, "diagnose finish", K(ret), K(info), K(ls_id));
return ret;
}
int ObLS::set_migration_status(
const ObMigrationStatus &migration_status,
const int64_t rebuild_seq,
const bool write_slog)
{
int ret = OB_SUCCESS;
share::ObLSRestoreStatus restore_status;
int64_t read_lock = LSLOCKLS;
int64_t write_lock = LSLOCKLOGMETA;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret), K(ls_meta_));
} else if (!ObMigrationStatusHelper::is_valid(migration_status)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("set migration status get invalid argument", K(ret), K(migration_status));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_meta));
} else if (!can_update_ls_meta(ls_meta_.ls_create_status_)) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "state not match, cannot update ls meta", K(ret), K(ls_meta_));
} else if (ls_meta_.get_rebuild_seq() != rebuild_seq) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "rebuild seq not match, cannot update migration status", K(ret),
K(ls_meta_), K(rebuild_seq));
} else if (OB_FAIL(ls_meta_.get_restore_status(restore_status))) {
LOG_WARN("failed to get restore status", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_meta_.set_migration_status(migration_status, write_slog))) {
LOG_WARN("failed to set migration status", K(ret), K(migration_status));
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE == migration_status
&& restore_status.is_restore_none()) {
ls_tablet_svr_.enable_to_read();
} else {
ls_tablet_svr_.disable_to_read();
}
return ret;
}
int ObLS::set_restore_status(
const share::ObLSRestoreStatus &restore_status,
const int64_t rebuild_seq)
{
int ret = OB_SUCCESS;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
int64_t read_lock = LSLOCKLS;
int64_t write_lock = LSLOCKLOGMETA;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret), K(ls_meta_));
} else if (!restore_status.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("set restore status get invalid argument", K(ret), K(restore_status));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_meta));
} else if (!can_update_ls_meta(ls_meta_.ls_create_status_)) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "state not match, cannot update ls meta", K(ret), K(ls_meta_));
} else if (ls_meta_.get_rebuild_seq() != rebuild_seq) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "rebuild seq not match, cannot update restore status", K(ret),
K(ls_meta_), K(rebuild_seq));
} else if (OB_FAIL(ls_meta_.get_migration_status(migration_status))) {
LOG_WARN("failed to get migration status", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_meta_.set_restore_status(restore_status))) {
LOG_WARN("failed to set restore status", K(ret), K(restore_status));
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE == migration_status
&& restore_status.is_restore_none()) {
ls_tablet_svr_.enable_to_read();
} else {
ls_tablet_svr_.disable_to_read();
}
return ret;
}
int ObLS::set_ls_rebuild()
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKLS;
int64_t write_lock = LSLOCKLOGMETA;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret), K(ls_meta_));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_meta));
} else if (!can_update_ls_meta(ls_meta_.ls_create_status_)) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "state not match, cannot update ls meta", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_meta_.set_ls_rebuild())) {
LOG_WARN("failed to set ls rebuild", K(ret), K(ls_meta_));
} else {
ls_tablet_svr_.disable_to_read();
}
return ret;
}
}
}

View File

@ -320,8 +320,7 @@ public:
int update_id_meta_without_writing_slog(const int64_t service_type,
const int64_t limited_id,
const int64_t latest_log_ts);
// int set_ls_rebuild();
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_ls_rebuild);
int set_ls_rebuild();
// protect in ls lock
// int set_gc_state(const logservice::LSGCState &gc_state);
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_gc_state);
@ -345,24 +344,18 @@ public:
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, update_ls_meta);
CONST_DELEGATE_WITH_RET(ls_meta_, get_rebuild_seq, int64_t);
CONST_DELEGATE_WITH_RET(ls_meta_, get_tablet_change_checkpoint_ts, int64_t);
// set restore status
// @param [in] restore status.
// int set_restore_status(const share::ObLSRestoreStatus &status);
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_restore_status);
// int set_restore_status_without_lock(const share::ObLSRestoreStatus &status);
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_restore_status);
int set_restore_status(
const share::ObLSRestoreStatus &restore_status,
const int64_t rebuild_seq);
// get restore status
// @param [out] restore status.
// int get_restore_status(share::ObLSRestoreStatus &status);
DELEGATE_WITH_RET(ls_meta_, get_restore_status, int);
// set migration status
// @param [in] migration status.
// int set_migration_status_without_lock(const ObMigrationStatus &migration_status,
// const bool write_slog = true);
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_migration_status);
// int set_migration_status(const ObMigrationStatus &migration_status,
// const bool write_slog = true);
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_migration_status);
int set_migration_status(
const ObMigrationStatus &migration_status,
const int64_t rebuild_seq,
const bool write_slog = true);
// get migration status
// @param [out] migration status.
// int get_migration_status(ObMigrationstatus &status);
@ -394,6 +387,7 @@ public:
// @param [out] meta_package
// @param [out] tablet_ids
int get_ls_meta_package_and_tablet_ids(ObLSMetaPackage &meta_package, common::ObIArray<common::ObTabletID> &tablet_ids);
DELEGATE_WITH_RET(ls_meta_, get_migration_and_restore_status, int);
// ObLSTabletService interface:
// create tablets in a ls
@ -441,7 +435,8 @@ public:
DELEGATE_WITH_RET(ls_tablet_svr_, update_tablet_ha_data_status, int);
DELEGATE_WITH_RET(ls_tablet_svr_, update_tablet_restore_status, int);
DELEGATE_WITH_RET(ls_tablet_svr_, create_or_update_migration_tablet, int);
DELEGATE_WITH_RET(ls_tablet_svr_, enable_to_read, void);
DELEGATE_WITH_RET(ls_tablet_svr_, disable_to_read, void);
// ObLockTable interface:
// check whether the lock op is conflict with exist lock.

View File

@ -201,20 +201,6 @@ int64_t ObLSMeta::get_rebuild_seq() const
return rebuild_seq_;
}
int ObLSMeta::inc_rebuild_seq()
{
ObSpinLockTimeGuard guard(lock_);
ObLSMeta tmp(*this);
++tmp.rebuild_seq_;
int ret = OB_SUCCESS;
if (OB_FAIL(write_slog_(tmp))) {
LOG_WARN("rebuild_lsn write slog failed", K(ret));
} else {
++rebuild_seq_;
}
return ret;
}
int ObLSMeta::set_migration_status(const ObMigrationStatus &migration_status,
const bool write_slog)
{
@ -628,6 +614,25 @@ int ObLSMeta::update_id_meta(const int64_t service_type,
return ret;
}
int ObLSMeta::get_migration_and_restore_status(
ObMigrationStatus &migration_status,
share::ObLSRestoreStatus &ls_restore_status)
{
int ret = OB_SUCCESS;
migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
ls_restore_status = ObLSRestoreStatus::LS_RESTORE_STATUS_MAX;
ObSpinLockTimeGuard guard(lock_);
if (!is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls meta is not valid, cannot get", K(ret), K(*this));
} else {
migration_status = migration_status_;
ls_restore_status = restore_status_;
}
return ret;
}
ObLSMeta::ObSpinLockTimeGuard::ObSpinLockTimeGuard(common::ObSpinLock &lock,
const int64_t warn_threshold)
: time_guard_("ls_meta", warn_threshold),

View File

@ -49,7 +49,6 @@ public:
void reset();
bool is_valid() const;
int64_t get_rebuild_seq() const;
int inc_rebuild_seq();
int set_migration_status(const ObMigrationStatus &migration_status,
const bool write_slog = true);
int get_migration_status (ObMigrationStatus &migration_status) const;
@ -82,6 +81,10 @@ public:
int build_saved_info();
int set_saved_info(const ObLSSavedInfo &saved_info);
int clear_saved_info();
int get_migration_and_restore_status(
ObMigrationStatus &migration_status,
share::ObLSRestoreStatus &ls_restore_status);
int init(
const uint64_t tenant_id,
const share::ObLSID &ls_id,

View File

@ -80,6 +80,7 @@ ObLSTabletService::ObLSTabletService()
tablet_id_set_(),
bucket_lock_(),
rs_reporter_(nullptr),
allow_to_read_mgr_(),
is_inited_(false)
{
}
@ -104,6 +105,8 @@ int ObLSTabletService::init(
} else if (OB_FAIL(bucket_lock_.init(ObTabletCommon::BUCKET_LOCK_BUCKET_CNT,
ObLatchIds::TABLET_BUCKET_LOCK))) {
LOG_WARN("failed to init bucket lock", K(ret));
} else if (OB_FAIL(set_allow_to_read_(ls))) {
LOG_WARN("failed to set allow to read", K(ret));
} else {
ls_ = ls;
rs_reporter_ = rs_reporter;
@ -712,10 +715,15 @@ int ObLSTabletService::table_scan(ObTableScanIterator &iter, ObTableScanParam &p
{
int ret = OB_SUCCESS;
ObTabletHandle data_tablet;
AllowToReadMgr::AllowToReadInfo read_info;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else if (OB_FAIL(prepare_scan_table_param(param, *(MTL(ObTenantSchemaService*)->get_schema_service())))) {
LOG_WARN("failed to prepare scan table param", K(ret), K(param));
} else if (OB_FAIL(get_tablet_with_timeout(param.tablet_id_, data_tablet, param.timeout_))) {
@ -725,6 +733,15 @@ int ObLSTabletService::table_scan(ObTableScanIterator &iter, ObTableScanParam &p
} else {
result = &iter;
}
if (OB_SUCC(ret)) {
bool is_same = false;
allow_to_read_mgr_.check_read_info_same(read_info, is_same);
if (!is_same) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
}
}
return ret;
}
@ -732,6 +749,7 @@ int ObLSTabletService::table_rescan(ObTableScanParam &param, ObNewRowIterator *&
{
int ret = OB_SUCCESS;
ObTabletHandle data_tablet;
AllowToReadMgr::AllowToReadInfo read_info;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
@ -739,6 +757,10 @@ int ObLSTabletService::table_rescan(ObTableScanParam &param, ObNewRowIterator *&
} else if (OB_ISNULL(result)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else if (OB_FAIL(prepare_scan_table_param(param, *(MTL(ObTenantSchemaService*)->get_schema_service())))) {
LOG_WARN("failed to prepare scan table param", K(ret), K(result), K(param));
} else {
@ -749,6 +771,15 @@ int ObLSTabletService::table_rescan(ObTableScanParam &param, ObNewRowIterator *&
LOG_WARN("failed to do table scan", K(ret), K(result), K(param));
}
}
if (OB_SUCC(ret)) {
bool is_same = false;
allow_to_read_mgr_.check_read_info_same(read_info, is_same);
if (!is_same) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
}
}
return ret;
}
@ -4820,9 +4851,15 @@ int ObLSTabletService::get_multi_ranges_cost(
int ret = OB_SUCCESS;
ObTabletTableIterator iter;
const int64_t max_snapshot_version = INT64_MAX;
AllowToReadMgr::AllowToReadInfo read_info;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else if (OB_FAIL(get_read_tables(tablet_id, max_snapshot_version, iter))) {
LOG_WARN("fail to get all read tables", K(ret), K(tablet_id), K(max_snapshot_version));
} else {
@ -4848,9 +4885,15 @@ int ObLSTabletService::split_multi_ranges(
int ret = OB_SUCCESS;
ObTabletTableIterator iter;
const int64_t max_snapshot_version = INT64_MAX;
AllowToReadMgr::AllowToReadInfo read_info;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else if (OB_FAIL(get_read_tables(tablet_id, max_snapshot_version, iter))) {
LOG_WARN("fail to get all read tables", K(ret), K(tablet_id), K(max_snapshot_version));
} else {
@ -4865,6 +4908,15 @@ int ObLSTabletService::split_multi_ranges(
LOG_WARN("fail to get splitted ranges", K(ret), K(ranges), K(expected_task_count));
}
}
if (OB_SUCC(ret)) {
bool is_same = false;
allow_to_read_mgr_.check_read_info_same(read_info, is_same);
if (!is_same) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
}
}
return ret;
}
@ -4879,6 +4931,8 @@ int ObLSTabletService::estimate_row_count(
ObPartitionEst batch_est;
ObTabletTableIterator tablet_iter;
common::ObSEArray<ObITable*, 4> tables;
AllowToReadMgr::AllowToReadInfo read_info;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
@ -4886,6 +4940,10 @@ int ObLSTabletService::estimate_row_count(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(param), K(scan_range));
} else if (scan_range.is_empty()) {
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else {
const int64_t snapshot_version = -1 == param.frozen_version_ ?
GET_BATCH_ROWS_READ_SNAPSHOT_VERSION : param.frozen_version_;
@ -4924,8 +4982,15 @@ int ObLSTabletService::estimate_row_count(
}
}
if (OB_SUCC(ret)) {
logical_row_count = batch_est.logical_row_count_;
physical_row_count = batch_est.physical_row_count_;
bool is_same = false;
allow_to_read_mgr_.check_read_info_same(read_info, is_same);
if (!is_same) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else {
logical_row_count = batch_est.logical_row_count_;
physical_row_count = batch_est.physical_row_count_;
}
}
LOG_DEBUG("estimate result", K(ret), K(batch_est), K(est_records));
return ret;
@ -4940,10 +5005,15 @@ int ObLSTabletService::estimate_block_count(
macro_block_count = 0;
micro_block_count = 0;
ObTabletTableIterator tablet_iter;
AllowToReadMgr::AllowToReadInfo read_info;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
} else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) {
} else if (!read_info.allow_to_read()) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
} else if (OB_FAIL(get_read_tables(tablet_id, INT64_MAX, tablet_iter, false/*allow_no_ready_read*/))) {
LOG_WARN("failed to get read tables", K(ret));
}
@ -4978,6 +5048,15 @@ int ObLSTabletService::estimate_block_count(
}
}
}
if (OB_SUCC(ret)) {
bool is_same = false;
allow_to_read_mgr_.check_read_info_same(read_info, is_same);
if (!is_same) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("ls is not allow to read", K(ret), KPC(ls_));
}
}
return ret;
}
@ -5300,6 +5379,40 @@ int ObLSTabletService::build_tablet_iter(ObLSTabletIDIterator &iter)
return ret;
}
int ObLSTabletService::set_allow_to_read_(ObLS *ls)
{
int ret = OB_SUCCESS;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
share::ObLSRestoreStatus restore_status;
if (OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("set allow to read get invalid argument", K(ret), KP(ls));
} else {
if (OB_FAIL(ls->get_migration_and_restore_status(migration_status, restore_status))) {
LOG_WARN("failed to get ls migration and restore status", K(ret), KPC(ls));
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE != migration_status
&& ObLSRestoreStatus::RESTORE_MAJOR_DATA != restore_status
&& ObLSRestoreStatus::RESTORE_NONE != restore_status) {
allow_to_read_mgr_.disable_to_read();
FLOG_INFO("set ls do not allow to read", KPC(ls), K(migration_status), K(restore_status));
} else {
allow_to_read_mgr_.enable_to_read();
}
}
return ret;
}
void ObLSTabletService::enable_to_read()
{
allow_to_read_mgr_.enable_to_read();
}
void ObLSTabletService::disable_to_read()
{
allow_to_read_mgr_.disable_to_read();
}
ObLSTabletService::DeleteTabletInfo::DeleteTabletInfo()
: delete_data_tablet_(false),
@ -5476,5 +5589,50 @@ int ObLSTabletService::DestroyMemtableAndMemberOperator::operator()(const common
return ret;
}
void ObLSTabletService::AllowToReadMgr::disable_to_read()
{
AllowToReadInfo read_info;
AllowToReadInfo next_read_info;
do {
LOAD128(read_info, &read_info_);
if (!read_info.allow_to_read()) {
break;
} else {
next_read_info.info_.allow_to_read_ = 0;
next_read_info.info_.seq_ = read_info.info_.seq_ + 1;
}
} while (!CAS128(&read_info_, read_info, next_read_info));
}
void ObLSTabletService::AllowToReadMgr::enable_to_read()
{
AllowToReadInfo read_info;
AllowToReadInfo next_read_info;
do {
LOAD128(read_info, &read_info_);
if (read_info.allow_to_read()) {
break;
} else {
next_read_info.info_.allow_to_read_ = 1;
next_read_info.info_.seq_ = read_info.info_.seq_;
}
} while (!CAS128(&read_info_, read_info, next_read_info));
}
void ObLSTabletService::AllowToReadMgr::load_allow_to_read_info(
AllowToReadInfo &read_info)
{
LOAD128(read_info, &read_info_);
}
void ObLSTabletService::AllowToReadMgr::check_read_info_same(
const AllowToReadInfo &read_info, bool &is_same)
{
AllowToReadInfo current_read_info;
LOAD128(current_read_info, &read_info_);
is_same = read_info == current_read_info;
}
} // namespace storage
} // namespace oceanbase

View File

@ -92,6 +92,7 @@ class ObTabletTxMultiSourceDataUnit;
struct ObMigrationTabletParam;
class ObTableScanRange;
class ObLSTabletService : public logservice::ObIReplaySubHandler,
public logservice::ObIRoleChangeSubHandler,
public logservice::ObICheckpointSubHandler
@ -106,7 +107,46 @@ public:
void destroy();
int offline();
int online();
public:
class AllowToReadMgr final
{
public:
struct AllowToReadInfo final
{
AllowToReadInfo() { info_.seq_ = 0; info_.allow_to_read_ = 0; info_.reserved_ = 0; }
~AllowToReadInfo() = default;
bool allow_to_read() const { return info_.allow_to_read_ == 1; }
bool operator==(const AllowToReadInfo &other) const {
return info_.seq_ == other.info_.seq_
&& info_.allow_to_read_ == other.info_.allow_to_read_
&& info_.reserved_ == other.info_.reserved_;
}
TO_STRING_KV(K_(info));
static const int32_t RESERVED = 63;
union InfoUnion
{
struct types::uint128_t v128_;
struct
{
uint64_t seq_ : 64;
uint8_t allow_to_read_ : 1;
uint64_t reserved_ : RESERVED;
};
TO_STRING_KV(K_(seq), K_(allow_to_read), K_(reserved));
};
InfoUnion info_;
} __attribute__((__aligned__(16)));
public:
AllowToReadMgr(): read_info_() {}
~AllowToReadMgr() = default;
void disable_to_read();
void enable_to_read();
void load_allow_to_read_info(AllowToReadInfo &read_info);
void check_read_info_same(const AllowToReadInfo &read_info, bool &is_same);
private:
AllowToReadInfo read_info_;
};
private:
// for replay
virtual int replay(
@ -327,6 +367,9 @@ public:
int build_ha_tablet_new_table_store(
const ObTabletID &tablet_id,
const ObBatchUpdateTableStoreParam &param);
void enable_to_read();
void disable_to_read();
protected:
virtual int prepare_dml_running_ctx(
const common::ObIArray<uint64_t> *column_ids,
@ -719,6 +762,8 @@ private:
ObRelativeTable &data_table,
ObStoreCtx &store_ctx,
const ObStoreRow &tbl_row);
int set_allow_to_read_(ObLS *ls);
private:
friend class ObLSTabletIterator;
@ -729,7 +774,7 @@ private:
ObTabletIDSet tablet_id_set_;
common::ObBucketLock bucket_lock_; // for tablet update, not for dml
observer::ObIMetaReport *rs_reporter_;
AllowToReadMgr allow_to_read_mgr_;
bool is_inited_;
};

View File

@ -46,7 +46,8 @@ ObLSRestoreHandler::ObLSRestoreHandler()
ls_(nullptr),
ls_restore_arg_(),
state_handler_(nullptr),
allocator_()
allocator_(),
rebuild_seq_(0)
{
}
@ -70,6 +71,7 @@ int ObLSRestoreHandler::init(ObLS *ls)
} else {
allocator_.set_label(OB_LS_RESTORE_HANDLER);
ls_ = ls;
rebuild_seq_ = ls->get_rebuild_seq();
is_inited_ = true;
}
return ret;
@ -511,6 +513,25 @@ int ObLSRestoreHandler::safe_to_destroy(bool &is_safe)
return ret;
}
int ObLSRestoreHandler::update_rebuild_seq()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls restore handler do not init", K(ret));
} else {
lib::ObMutexGuard guard(mtx_);
rebuild_seq_ = ls_->get_rebuild_seq();
}
return ret;
}
int64_t ObLSRestoreHandler::get_rebuild_seq()
{
lib::ObMutexGuard guard(mtx_);
return rebuild_seq_;
}
//================================ObILSRestoreState=======================================
ObILSRestoreState::ObILSRestoreState(const share::ObLSRestoreStatus::Status &status)
@ -599,16 +620,44 @@ int ObILSRestoreState::handle_pull_tablet(
return ret;
}
int ObILSRestoreState::update_restore_status_(
storage::ObLS &ls, const share::ObLSRestoreStatus &next_status)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
storage::ObLSRestoreHandler *ls_restore_handler = ls.get_ls_restore_handler();
int64_t rebuild_seq = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_ISNULL(ls_restore_handler)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls restore handler can't be nullptr", K(ret));
} else if (FALSE_IT(rebuild_seq = ls_restore_handler->get_rebuild_seq())) {
} else if (OB_FAIL(ls.set_restore_status(next_status, rebuild_seq))) {
LOG_WARN("fail to advance ls meta status", K(ret), K(next_status), K(ls), K(rebuild_seq));
if (OB_STATE_NOT_MATCH == ret) {
if (OB_SUCCESS != (tmp_ret = ls_restore_handler->update_rebuild_seq())) {
LOG_ERROR("failed to update rebuild seq", K(ret), K(tmp_ret), K(rebuild_seq));
}
//overwrite ret
ret = OB_SUCCESS;
}
}
return ret;
}
int ObILSRestoreState::deal_failed_restore(const ObLSRestoreResultMgr &result_mgr)
{
int ret = OB_SUCCESS;
ObLSRestoreStatus next_status(ObLSRestoreStatus::Status::RESTORE_FAILED);
ObLSRestoreResultMgr::Comment comment;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(ls_->set_restore_status(next_status))) {
LOG_WARN("fail to set restore status to failed", K(ret), K(next_status), KPC(ls_));
} else if (OB_FAIL(update_restore_status_(*ls_, next_status))) {
LOG_WARN("failed to update restore status", K(ret), KPC(ls_), K(next_status));
} else if (OB_FAIL(result_mgr.get_comment_str(comment))) {
LOG_WARN("fail to get comment str", K(ret));
} else if (OB_FAIL(report_ls_restore_progress_(*ls_, next_status, result_mgr.get_trace_id(),
@ -631,8 +680,8 @@ int ObILSRestoreState::advance_status_(
} else if (OB_ISNULL(ls_restore_handler)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls restore handler can't be nullptr", K(ret));
} else if (OB_FAIL(ls.set_restore_status(next_status))) {
LOG_WARN("fail to advance ls meta status", K(ret), K(next_status), K(ls));
} else if (OB_FAIL(update_restore_status_(ls, next_status))) {
LOG_WARN("failed to update restore status", K(ret), K(ls), K(next_status));
} else {
ls_restore_handler->wakeup();
LOG_INFO("success advance status", K(ls), K(next_status));
@ -1570,6 +1619,7 @@ int ObLSRestoreSysTabletState::leader_restore_sys_tablet_()
int ret = OB_SUCCESS;
ObLSRestoreStatus next_status(ObLSRestoreStatus::Status::WAIT_RESTORE_SYS_TABLETS);
ObArray<common::ObTabletID> no_use_tablet_ids;
storage::ObLSRestoreHandler *ls_restore_handler = nullptr;
LOG_INFO("ready to restore leader sys tablet", K(ls_restore_status_), KPC(ls_));
if (tablet_mgr_.has_no_task()) {
if (OB_FAIL(do_restore_sys_tablet())) {
@ -1582,6 +1632,8 @@ int ObLSRestoreSysTabletState::leader_restore_sys_tablet_()
// next term to retry
} else if (OB_FAIL(ls_->load_ls_inner_tablet())) {
LOG_WARN("fail to load ls inner tablet", K(ret));
} else if (OB_FAIL(ls_->get_ls_restore_handler()->update_rebuild_seq())) {
LOG_WARN("failed to update rebuild seq", K(ret), KPC(ls_));
} else if (OB_FAIL(advance_status_(*ls_, next_status))) {
LOG_WARN("fail to advance status", K(ret), KPC(ls_), K(next_status));
} else {
@ -1611,6 +1663,8 @@ int ObLSRestoreSysTabletState::follower_restore_sys_tablet_()
// next term to retry
} else if (OB_FAIL(ls_->load_ls_inner_tablet())) {
LOG_WARN("fail to load ls inner tablet", K(ret));
} else if (OB_FAIL(ls_->get_ls_restore_handler()->update_rebuild_seq())) {
LOG_WARN("failed to update rebuild seq", K(ret), KPC(ls_));
} else if (OB_FAIL(advance_status_(*ls_, next_status))) {
LOG_WARN("fail to advance status", K(ret), KPC(ls_), K(next_status));
} else {
@ -2304,7 +2358,7 @@ int ObLSRestoreWaitState::leader_wait_follower_()
} else if (!all_finish) {
} else if (OB_FAIL(advance_status_(*ls_, next_status))) {
LOG_WARN("fail to advance status", K(ret), K(next_status), KPC(ls_));
}
}
return ret;
}
@ -2337,7 +2391,6 @@ int ObLSRestoreWaitState::follower_wait_leader_()
} else {
LOG_INFO("follower success advance status", K(next_status), KPC(ls_));
}
return ret;
}

View File

@ -88,6 +88,8 @@ public:
void stop() { ATOMIC_STORE(&is_stop_, true); } // when remove ls, set this
int safe_to_destroy(bool &is_safe);
bool is_stop() { return is_stop_; }
int update_rebuild_seq();
int64_t get_rebuild_seq();
private:
int check_before_do_restore_(bool &can_do_restore);
int update_state_handle_();
@ -106,6 +108,7 @@ private:
ObTenantRestoreCtx ls_restore_arg_;
ObILSRestoreState *state_handler_;
common::ObFIFOAllocator allocator_;
int64_t rebuild_seq_;
DISALLOW_COPY_AND_ASSIGN(ObLSRestoreHandler);
};
@ -170,6 +173,9 @@ protected:
int enable_replay_();
void disable_replay_();
int update_restore_status_(
storage::ObLS &ls,
const share::ObLSRestoreStatus &next_status);
protected:
bool is_inited_;

View File

@ -51,6 +51,7 @@ public:
virtual void SetUp() override;
virtual void TearDown() override;
void FakeLs(ObLS &ls);
private:
static constexpr uint64_t TEST_TENANT_ID = 500;
@ -88,6 +89,17 @@ void TestMetaPointerMap::TearDown()
tenant_base_.destroy();
}
void TestMetaPointerMap::FakeLs(ObLS &ls)
{
ls.ls_meta_.tenant_id_ = 1;
ls.ls_meta_.ls_id_.id_ = 1001;
ls.ls_meta_.gc_state_ = logservice::LSGCState::NORMAL;
ls.ls_meta_.migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
ls.ls_meta_.restore_status_ = ObLSRestoreStatus::RESTORE_NONE;
ls.ls_meta_.replica_type_ = ObReplicaType::REPLICA_TYPE_FULL;
}
class CalculateSize final
{
public:
@ -120,6 +132,8 @@ int CalculateSize::operator()(common::hash::HashMapPair<ObTabletMapKey, ObTablet
TEST_F(TestMetaPointerMap, test_meta_pointer_handle)
{
ObLS fake_ls;
FakeLs(fake_ls);
observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff;
ObLSTabletService *tablet_svr = fake_ls.get_tablet_svr();
@ -186,6 +200,7 @@ TEST_F(TestMetaPointerMap, test_meta_pointer_handle)
TEST_F(TestMetaPointerMap, test_meta_pointer_map)
{
ObLS fake_ls;
FakeLs(fake_ls);
observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff;
ObLSTabletService *tablet_svr = fake_ls.get_tablet_svr();
@ -267,6 +282,7 @@ TEST_F(TestMetaPointerMap, test_meta_pointer_map)
TEST_F(TestMetaPointerMap, test_erase_and_load_concurrency)
{
ObLS fake_ls;
FakeLs(fake_ls);
observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff;
ObLSTabletService *tablet_svr = fake_ls.get_tablet_svr();