fix bugs & compaction check in ls offline
This commit is contained in:
parent
5b1587cae5
commit
4afce61098
@ -2029,6 +2029,27 @@ int ObTenantDagScheduler::get_all_compaction_dag_info(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDagScheduler::check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
exist = false;
|
||||
compaction::ObTabletMergeDag *dag = nullptr;
|
||||
ObThreadCondGuard guard(scheduler_sync_);
|
||||
for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) {
|
||||
ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]);
|
||||
ObIDag *cur = head->get_next();
|
||||
while (head != cur) {
|
||||
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
|
||||
if (ls_id == dag->get_ctx().param_.ls_id_) {
|
||||
exist = true;
|
||||
break;
|
||||
}
|
||||
cur = cur->get_next();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// get max estimated_finish_time to update server_progress
|
||||
int ObTenantDagScheduler::get_max_major_finish_time(const int64_t version, int64_t &estimated_finish_time)
|
||||
{
|
||||
|
@ -790,6 +790,7 @@ public:
|
||||
ObIArray<compaction::ObTabletCompactionProgress *> &progress_array);
|
||||
int get_max_major_finish_time(const int64_t version, int64_t &estimated_finish_time);
|
||||
int diagnose_dag(const ObIDag *dag, compaction::ObDiagnoseTabletCompProgress &input_progress);
|
||||
int check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist);
|
||||
int check_dag_net_exist(
|
||||
const ObDagId &dag_id, bool &exist);
|
||||
private:
|
||||
|
@ -32,6 +32,13 @@ using namespace share;
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
int64_t ObScheduleSuspectInfo::hash() const
|
||||
{
|
||||
int64_t hash_value = ObMergeDagHash::inner_hash();
|
||||
hash_value = common::murmurhash(&tenant_id_, sizeof(tenant_id_), hash_value);
|
||||
return hash_value;
|
||||
}
|
||||
|
||||
bool ObScheduleSuspectInfo::is_valid() const
|
||||
{
|
||||
bool bret = true;
|
||||
@ -45,6 +52,7 @@ bool ObScheduleSuspectInfo::is_valid() const
|
||||
|
||||
ObScheduleSuspectInfo & ObScheduleSuspectInfo::operator = (const ObScheduleSuspectInfo &other)
|
||||
{
|
||||
tenant_id_ = other.tenant_id_;
|
||||
merge_type_ = other.merge_type_;
|
||||
ls_id_ = other.ls_id_;
|
||||
tablet_id_ = other.tablet_id_;
|
||||
@ -320,13 +328,14 @@ int ObCompactionDiagnoseMgr::get_suspect_info(
|
||||
ObScheduleSuspectInfo &ret_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
compaction::ObMergeDagHash dag_hash;
|
||||
dag_hash.merge_type_ = merge_type;
|
||||
dag_hash.ls_id_ = ls_id;
|
||||
dag_hash.tablet_id_ = tablet_id;
|
||||
if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().get_suspect_info(dag_hash.inner_hash(), ret_info))) {
|
||||
ObScheduleSuspectInfo input_info;
|
||||
input_info.tenant_id_ = MTL_ID();
|
||||
input_info.merge_type_ = merge_type;
|
||||
input_info.ls_id_ = ls_id;
|
||||
input_info.tablet_id_ = tablet_id;
|
||||
if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().get_suspect_info(input_info.hash(), ret_info))) {
|
||||
if (OB_HASH_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get suspect info", K(ret), K(dag_hash));
|
||||
LOG_WARN("failed to get suspect info", K(ret), K(input_info));
|
||||
}
|
||||
} else if (ret_info.add_time_ + SUSPECT_INFO_WARNING_THRESHOLD < ObTimeUtility::fast_current_time()) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
@ -413,7 +422,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
SET_DIAGNOSE_INFO(
|
||||
info_array_[idx_++],
|
||||
MINI_MERGE,
|
||||
MTL_ID(),
|
||||
ret_info.tenant_id_,
|
||||
ls_id,
|
||||
ObTabletID(INT64_MAX),
|
||||
ObCompactionDiagnoseInfo::DIA_STATUS_FAILED,
|
||||
|
@ -32,13 +32,16 @@ struct ObScheduleSuspectInfo : public common::ObDLinkBase<ObScheduleSuspectInfo>
|
||||
{
|
||||
ObScheduleSuspectInfo()
|
||||
: ObMergeDagHash(),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
add_time_(0),
|
||||
suspect_info_("\0")
|
||||
{}
|
||||
int64_t hash() const;
|
||||
bool is_valid() const;
|
||||
ObScheduleSuspectInfo & operator = (const ObScheduleSuspectInfo &other);
|
||||
|
||||
TO_STRING_KV(K_(merge_type), K_(ls_id), K_(tablet_id), K_(add_time), K_(suspect_info));
|
||||
TO_STRING_KV(K_(tenant_id), K_(merge_type), K_(ls_id), K_(tablet_id), K_(add_time), K_(suspect_info));
|
||||
int64_t tenant_id_;
|
||||
int64_t add_time_;
|
||||
char suspect_info_[common::OB_DIAGNOSE_INFO_LENGTH];
|
||||
};
|
||||
@ -212,6 +215,7 @@ private:
|
||||
int64_t __pos = 0; \
|
||||
int ret = OB_SUCCESS; \
|
||||
compaction::ObScheduleSuspectInfo info; \
|
||||
info.tenant_id_ = MTL_ID(); \
|
||||
info.merge_type_ = type; \
|
||||
info.ls_id_ = ls_id; \
|
||||
info.tablet_id_ = tablet_id; \
|
||||
@ -226,7 +230,7 @@ private:
|
||||
buf[__pos++] = '.'; \
|
||||
} \
|
||||
SIMPLE_TO_STRING_##n \
|
||||
if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().add_suspect_info(info.inner_hash(), info))) { \
|
||||
if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().add_suspect_info(info.hash(), info))) { \
|
||||
STORAGE_LOG(WARN, "failed to add suspect info", K(ret), K(info)); \
|
||||
} else { \
|
||||
STORAGE_LOG(DEBUG, "success to add suspect info", K(ret), K(info)); \
|
||||
|
@ -604,6 +604,9 @@ int ObTabletMergePrepareTask::process()
|
||||
&& !MTL(ObTenantTabletScheduler *)->could_major_merge_start())) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_INFO("Merge has been paused", K(ret), K(ctx));
|
||||
} else if (ctx->ls_handle_.get_ls()->is_offline()) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_INFO("ls offline, skip merge", K(ret), K(ctx));
|
||||
} else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) {
|
||||
} else if (OB_FAIL(ctx->ls_handle_.get_ls()->get_tablet(ctx->param_.tablet_id_,
|
||||
ctx->tablet_handle_,
|
||||
@ -890,6 +893,10 @@ int ObTabletMergeFinishTask::process()
|
||||
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ctx.merge_progress_)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// update merge info
|
||||
if (OB_TMP_FAIL(ctx.merge_progress_->update_merge_info(ctx.merge_info_.get_sstable_merge_info()))) {
|
||||
STORAGE_LOG(WARN, "fail to update update merge info", K(tmp_ret));
|
||||
}
|
||||
if (OB_TMP_FAIL(compaction::ObCompactionSuggestionMgr::get_instance().analyze_merge_info(
|
||||
ctx.merge_info_,
|
||||
*ctx.merge_progress_))) {
|
||||
|
@ -151,6 +151,7 @@ int ObTenantCompactionProgressMgr::loop_major_sstable_(
|
||||
int64_t &size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObTimeGuard timeguard("loop_major_sstable_to_calc_progress_size", 30 * 1000 * 1000); // 30s
|
||||
ObSharedGuard<ObLSIterator> ls_iter_guard;
|
||||
ObLS *ls = nullptr;
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls_iter(ls_iter_guard, ObLSGetMod::STORAGE_MOD))) {
|
||||
|
@ -152,6 +152,7 @@ int ObTenantFreezeInfoMgr::get_min_dependent_freeze_info(FreezeInfo &freeze_info
|
||||
idx = info_list.count() - MIN_DEPENDENT_FREEZE_INFO_GAP;
|
||||
}
|
||||
ret = get_info_nolock(idx, freeze_info);
|
||||
LOG_INFO("get min dependent freeze info", K(ret), K(freeze_info)); // diagnose code for issue 45841468
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -347,6 +347,26 @@ int ObTenantTabletScheduler::update_upper_trans_version_and_gc_sstable()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::wait_ls_compaction_finish(const share::ObLSID &ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool exist = false;
|
||||
if (OB_UNLIKELY(!ls_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(MTL(ObTenantDagScheduler*)->check_ls_compaction_dag_exist(ls_id, exist))) {
|
||||
LOG_WARN("failed to check ls compaction dag", K(ret), K(ls_id));
|
||||
} else if (!exist) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(100 * 1000); // 100ms
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_build_bloomfilter(
|
||||
const uint64_t table_id,
|
||||
const blocksstable::MacroBlockId ¯o_id,
|
||||
@ -528,6 +548,8 @@ int ObTenantTabletScheduler::check_ls_state(ObLS &ls, bool &need_merge)
|
||||
need_merge = false;
|
||||
if (ls.is_deleted()) {
|
||||
LOG_INFO("ls is deleted", K(ret), K(ls));
|
||||
} else if (ls.is_offline()) {
|
||||
LOG_INFO("ls is offline", K(ret), K(ls));
|
||||
} else {
|
||||
need_merge = true;
|
||||
}
|
||||
|
@ -112,6 +112,7 @@ public:
|
||||
int merge_all();
|
||||
int schedule_merge(const int64_t broadcast_version);
|
||||
int update_upper_trans_version_and_gc_sstable();
|
||||
int wait_ls_compaction_finish(const share::ObLSID &ls_id);
|
||||
|
||||
// Schedule an async task to build bloomfilter for the given macro block.
|
||||
// The bloomfilter build task will be ignored if a same build task exists in the queue.
|
||||
|
@ -162,6 +162,7 @@ public:
|
||||
void destroy();
|
||||
int offline();
|
||||
int online();
|
||||
bool is_offline() const { return false; } // mock function, TODO(@yanyuan)
|
||||
|
||||
ObLSTxService *get_tx_svr() { return &ls_tx_svr_; }
|
||||
ObLockTable *get_lock_table() { return &lock_table_; }
|
||||
|
@ -36,16 +36,9 @@ using namespace share::schema;
|
||||
namespace storage
|
||||
{
|
||||
|
||||
int ObStorageSchemaRecorder::ObStorageSchemaLogCb::set_table_version(const int64_t table_version)
|
||||
void ObStorageSchemaRecorder::ObStorageSchemaLogCb::set_table_version(const int64_t table_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_UNLIKELY(!ATOMIC_BCAS(&table_version_, OB_INVALID_VERSION, table_version))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("double set table_version", K(ret), K(table_version_), K(table_version));
|
||||
}
|
||||
|
||||
return ret;
|
||||
ATOMIC_SET(&table_version_, table_version);
|
||||
}
|
||||
|
||||
int ObStorageSchemaRecorder::ObStorageSchemaLogCb::on_success()
|
||||
@ -490,24 +483,25 @@ int ObStorageSchemaRecorder::submit_schema_log(const int64_t table_id)
|
||||
logcb_ptr_ = new(buf) ObStorageSchemaLogCb(*this);
|
||||
}
|
||||
}
|
||||
if (FAILEDx(logcb_ptr_->set_table_version(storage_schema_->get_schema_version()))) {
|
||||
LOG_ERROR("fail to set table version", K(ret), K_(tablet_id));
|
||||
} else if (FALSE_IT(ATOMIC_STORE(&logcb_finish_flag_, false))) {
|
||||
} else if (FALSE_IT(storage_schema_->set_sync_finish(false))) {
|
||||
} else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_,
|
||||
ObLogTsRange::MAX_TS, false/*for_replay*/, memtable::MemtableRefOp::INC_REF))) {
|
||||
if (OB_BLOCK_FROZEN != ret) {
|
||||
LOG_WARN("failed to inc ref for storage schema", K(ret), K_(tablet_id), K(storage_schema_));
|
||||
if (OB_SUCC(ret)) {
|
||||
logcb_ptr_->set_table_version(storage_schema_->get_schema_version());
|
||||
ATOMIC_STORE(&logcb_finish_flag_, false);
|
||||
storage_schema_->set_sync_finish(false);
|
||||
if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_,
|
||||
ObLogTsRange::MAX_TS, false/*for_replay*/, memtable::MemtableRefOp::INC_REF))) {
|
||||
if (OB_BLOCK_FROZEN != ret) {
|
||||
LOG_WARN("failed to inc ref for storage schema", K(ret), K_(tablet_id), K(storage_schema_));
|
||||
}
|
||||
} else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, ref_ts_ns, need_nonblock, logcb_ptr_, lsn, clog_ts_))) {
|
||||
LOG_WARN("fail to submit log", K(ret), K_(tablet_id));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(dec_ref_on_memtable(false))) {
|
||||
LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id));
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("submit schema log succeed", K(ret), K_(ls_id), K_(tablet_id), K_(clog_ts), K_(clog_len),
|
||||
"schema_version", storage_schema_->get_schema_version());
|
||||
}
|
||||
} else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, ref_ts_ns, need_nonblock, logcb_ptr_, lsn, clog_ts_))) {
|
||||
LOG_WARN("fail to submit log", K(ret), K_(tablet_id));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(dec_ref_on_memtable(false))) {
|
||||
LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id));
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("submit schema log succeed", K(ret), K_(ls_id), K_(tablet_id), K_(clog_ts), K_(clog_len),
|
||||
"schema_version", storage_schema_->get_schema_version());
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -59,6 +59,14 @@ public:
|
||||
logservice::ObLogHandler *log_handler);
|
||||
void reset();
|
||||
bool is_inited() const { return is_inited_; }
|
||||
bool is_valid() const
|
||||
{
|
||||
return is_inited_
|
||||
&& ls_id_.is_valid()
|
||||
&& tablet_id_.is_valid()
|
||||
&& nullptr != log_handler_
|
||||
&& max_saved_table_version_ >= 0;
|
||||
}
|
||||
|
||||
// follower
|
||||
int replay_schema_log(const int64_t log_ts, const char *buf, const int64_t size, int64_t &pos);
|
||||
@ -72,6 +80,7 @@ public:
|
||||
ObStorageSchemaRecorder(const ObStorageSchemaRecorder&) = delete;
|
||||
ObStorageSchemaRecorder& operator=(const ObStorageSchemaRecorder&) = delete;
|
||||
int64_t get_max_sync_version() const { return ATOMIC_LOAD(&max_saved_table_version_); }
|
||||
TO_STRING_KV(K_(is_inited), K_(ls_id), K_(tablet_id));
|
||||
|
||||
private:
|
||||
class ObStorageSchemaLogCb : public logservice::AppendCb
|
||||
@ -80,7 +89,7 @@ private:
|
||||
virtual int on_success() override;
|
||||
virtual int on_failure() override;
|
||||
|
||||
int set_table_version(const int64_t table_version);
|
||||
void set_table_version(const int64_t table_version);
|
||||
|
||||
ObStorageSchemaLogCb(ObStorageSchemaRecorder &recorder)
|
||||
: recorder_(recorder),
|
||||
|
@ -2811,6 +2811,9 @@ int ObTablet::check_max_sync_schema_version() const
|
||||
if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
|
||||
LOG_WARN("failed to get memtable mgr", K(ret));
|
||||
} else if (FALSE_IT(data_memtable_mgr = static_cast<ObTabletMemtableMgr *>(memtable_mgr))) {
|
||||
} else if (OB_UNLIKELY(!data_memtable_mgr->get_storage_schema_recorder().is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("schema recorder is invalid", K(ret), K_(tablet_meta), KPC(data_memtable_mgr));
|
||||
} else if (OB_FAIL(data_memtable_mgr->get_multi_source_data_unit(&storage_schema, &tmp_allocator))) {
|
||||
LOG_ERROR("failed to storage schema from memtable, max_sync_schema_version is invalid", K(ret),
|
||||
K(max_sync_schema_version), KPC(data_memtable_mgr));
|
||||
|
@ -796,6 +796,7 @@ int64_t ObTabletMemtableMgr::to_string(char *buf, const int64_t buf_len) const
|
||||
J_COMMA();
|
||||
}
|
||||
}
|
||||
J_KV("schema_recorder", schema_recorder_);
|
||||
J_ARRAY_END();
|
||||
J_OBJ_END();
|
||||
J_OBJ_END();
|
||||
|
Loading…
x
Reference in New Issue
Block a user