fix empty co checksum report bug && fix cs replica ls migration bug and enhance diagnose
This commit is contained in:
parent
efd643a35d
commit
976bbf5cc4
@ -150,7 +150,7 @@ int ObMajorChecksumInfo::init_from_sstable(
|
||||
tmp_col_ckm_array.set_attr(ObMemAttr(MTL_ID(), "MajorCkmInfo"));
|
||||
if (sstable.is_co_sstable()) {
|
||||
const ObCOSSTableV2 &co_sstable = static_cast<const ObCOSSTableV2 &>(sstable);
|
||||
if (OB_FAIL(co_sstable.fill_column_ckm_array(storage_schema, tmp_col_ckm_array, false /*need_process_cs_replica*/))) {
|
||||
if (OB_FAIL(co_sstable.fill_column_ckm_array(storage_schema, tmp_col_ckm_array))) {
|
||||
LOG_WARN("fail to fill column checksum array", K(ret), KPC(this), K(sstable));
|
||||
}
|
||||
} else if (OB_FAIL(sstable.fill_column_ckm_array(tmp_col_ckm_array))) {
|
||||
|
@ -920,12 +920,10 @@ int ObCOSSTableV2::multi_get(
|
||||
|
||||
int ObCOSSTableV2::fill_column_ckm_array(
|
||||
const ObStorageSchema &storage_schema,
|
||||
ObIArray<int64_t> &column_checksums,
|
||||
bool need_process_cs_replica) const
|
||||
ObIArray<int64_t> &column_checksums) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// for cgs emtpy co table, only report rowkey cg/all cgs column checksum. but need report all columns for cs replica
|
||||
if (is_cgs_empty_co_table() && !need_process_cs_replica) {
|
||||
if (is_all_cg_base()) {
|
||||
ret = ObSSTable::fill_column_ckm_array(column_checksums);
|
||||
} else {
|
||||
const common::ObIArray<ObStorageColumnGroupSchema> &column_groups = storage_schema.get_column_groups();
|
||||
@ -938,7 +936,8 @@ int ObCOSSTableV2::fill_column_ckm_array(
|
||||
}
|
||||
|
||||
common::ObArray<ObSSTableWrapper> cg_tables;
|
||||
if (FAILEDx(get_all_tables(cg_tables))) {
|
||||
if (is_empty()) {
|
||||
} else if (FAILEDx(get_all_tables(cg_tables))) {
|
||||
LOG_WARN("fail to get_all_tables", K(ret));
|
||||
} else {
|
||||
ObSSTableMetaHandle cg_table_meta_hdl;
|
||||
|
@ -226,8 +226,7 @@ public:
|
||||
ObStoreRowIterator *&row_iter) override;
|
||||
int fill_column_ckm_array(
|
||||
const ObStorageSchema &storage_schema,
|
||||
ObIArray<int64_t> &column_checksums,
|
||||
bool need_process_cs_replica) const;
|
||||
ObIArray<int64_t> &column_checksums) const;
|
||||
INHERIT_TO_STRING_KV("ObSSTable", ObSSTable, KP(this), K_(cs_meta),
|
||||
K_(base_type), K_(is_cgs_empty_co), K_(valid_for_cs_reading));
|
||||
private:
|
||||
|
@ -971,9 +971,10 @@ int ObTenantTabletScheduler::check_ready_for_major_merge(
|
||||
LOG_WARN("failed to get migration status", K(tmp_ret), KPC(ls));
|
||||
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE == migration_status) {
|
||||
ObDagId co_dag_net_id;
|
||||
int schedule_ret = OB_SUCCESS;
|
||||
co_dag_net_id.init(GCTX.self_addr());
|
||||
if (OB_TMP_FAIL(schedule_convert_co_merge_dag_net(ls_id, tablet, 0 /*retry_times*/, co_dag_net_id))) {
|
||||
LOG_WARN("failed to schedule convert co merge for cs replica", K(tmp_ret), K(ls_id), K(tablet));
|
||||
if (OB_TMP_FAIL(schedule_convert_co_merge_dag_net(ls_id, tablet, 0 /*retry_times*/, co_dag_net_id, schedule_ret))) {
|
||||
LOG_WARN("failed to schedule convert co merge for cs replica", K(tmp_ret), K(ls_id), K(tablet), K(schedule_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1032,15 +1033,19 @@ int ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(
|
||||
const ObLSID &ls_id,
|
||||
const ObTablet &tablet,
|
||||
const int64_t retry_times,
|
||||
const ObDagId& curr_dag_net_id)
|
||||
const ObDagId& curr_dag_net_id,
|
||||
int &schedule_ret)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
schedule_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(
|
||||
ls_id, tablet, compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE, tablet.get_last_major_snapshot_version(), EXEC_MODE_LOCAL, &curr_dag_net_id))) {
|
||||
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("failed to schedule co merge dag net for cs replica", K(ret), K(ls_id), "tablet_id", tablet.get_tablet_id());
|
||||
} else {
|
||||
schedule_ret = tmp_ret;
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("[CS-Replica] schedule COMergeDagNet to convert row store to column store", K(retry_times), K(ls_id), "tablet_id", tablet.get_tablet_id(), K(curr_dag_net_id));
|
||||
|
@ -250,7 +250,8 @@ public:
|
||||
const ObLSID &ls_id,
|
||||
const ObTablet &tablet,
|
||||
const int64_t retry_times,
|
||||
const ObDagId& curr_dag_net_id);
|
||||
const ObDagId& curr_dag_net_id,
|
||||
int &schedule_ret);
|
||||
static int schedule_tablet_ddl_major_merge(
|
||||
ObLSHandle &ls_handle,
|
||||
ObTabletHandle &tablet_handle);
|
||||
|
@ -21,6 +21,7 @@ namespace oceanbase
|
||||
namespace storage
|
||||
{
|
||||
ERRSIM_POINT_DEF(EN_ALL_STATE_DETERMINISTIC_FALSE);
|
||||
ERRSIM_POINT_DEF(EN_DISABLE_WAITING_CONVERT_CO_WHEN_MIGRATION);
|
||||
|
||||
/*----------------------------- ObTabletCOConvertCtx -----------------------------*/
|
||||
ObTabletCOConvertCtx::ObTabletCOConvertCtx()
|
||||
@ -28,6 +29,7 @@ ObTabletCOConvertCtx::ObTabletCOConvertCtx()
|
||||
co_dag_net_id_(),
|
||||
status_(Status::MAX_STATUS),
|
||||
retry_cnt_(0),
|
||||
eagain_cnt_(0),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
@ -63,6 +65,7 @@ void ObTabletCOConvertCtx::reset()
|
||||
co_dag_net_id_.reset();
|
||||
status_ = Status::MAX_STATUS;
|
||||
retry_cnt_ = 0;
|
||||
eagain_cnt_ = 0;
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
@ -73,6 +76,7 @@ bool ObTabletCOConvertCtx::is_valid() const
|
||||
&& status_ >= Status::UNKNOWN
|
||||
&& status_ < Status::MAX_STATUS
|
||||
&& retry_cnt_ >= 0
|
||||
&& eagain_cnt_ >= 0
|
||||
&& is_inited_;
|
||||
}
|
||||
|
||||
@ -258,7 +262,9 @@ int ObHATabletGroupCOConvertCtx::check_need_convert(const ObTablet &tablet, bool
|
||||
need_convert = false;
|
||||
common::ObArenaAllocator tmp_allocator; // for schema_on_tablet
|
||||
ObStorageSchema *schema_on_tablet = nullptr;
|
||||
if (OB_FAIL(tablet.load_storage_schema(tmp_allocator, schema_on_tablet))) {
|
||||
if (0 == tablet.get_last_major_snapshot_version()) {
|
||||
// no major, may be doing ddl, do not need to convert
|
||||
} else if (OB_FAIL(tablet.load_storage_schema(tmp_allocator, schema_on_tablet))) {
|
||||
LOG_WARN("failed to load storage schema", K(ret),K(tablet));
|
||||
} else {
|
||||
need_convert = ObCSReplicaUtil::check_need_convert_cs_when_migration(tablet, *schema_on_tablet);
|
||||
@ -325,6 +331,7 @@ int ObHATabletGroupCOConvertCtx::inner_get_valid_convert_ctx_idx(const ObTabletI
|
||||
int ObHATabletGroupCOConvertCtx::inner_check_and_schedule(ObLS &ls, const ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int schedule_ret = OB_SUCCESS;
|
||||
int64_t idx = 0;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
ObTabletHandle tablet_handle;
|
||||
@ -370,8 +377,14 @@ int ObHATabletGroupCOConvertCtx::inner_check_and_schedule(ObLS &ls, const ObTabl
|
||||
} else if (OB_FAIL(MTL(ObTenantDagScheduler *)->check_dag_net_exist(convert_ctxs_[idx].co_dag_net_id_, is_dag_net_exist))) {
|
||||
LOG_WARN("failed to check dag exists", K(ret), K(convert_ctxs_[idx]), K(tablet_id));
|
||||
} else if (is_dag_net_exist) {
|
||||
} else if (OB_FAIL(compaction::ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(ls_id, *tablet, convert_ctxs_[idx].retry_cnt_, convert_ctxs_[idx].co_dag_net_id_))) {
|
||||
} else if (OB_FAIL(compaction::ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(ls_id, *tablet, convert_ctxs_[idx].retry_cnt_, convert_ctxs_[idx].co_dag_net_id_, schedule_ret))) {
|
||||
LOG_WARN("failed to schedule convert co merge", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_EAGAIN == schedule_ret && !convert_ctxs_[idx].is_eagain_exhausted()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 60 * 1000 * 1000L /*10min*/)) {
|
||||
LOG_INFO("[CS-Replica] convert co merge is doing now, please wait for a while, or set EN_DISABLE_WAITING_CONVERT_CO_WHEN_MIGRATION tracepoint to skip it",
|
||||
K(schedule_ret), K(ls_id), K(tablet_id), K(convert_ctxs_[idx]));
|
||||
}
|
||||
convert_ctxs_[idx].inc_eagain_cnt();
|
||||
} else {
|
||||
convert_ctxs_[idx].inc_retry_cnt();
|
||||
if (convert_ctxs_[idx].is_retry_exhausted()) {
|
||||
@ -444,6 +457,10 @@ int ObDataTabletsCheckCOConvertDag::inner_check_can_schedule(
|
||||
#ifdef ERRSIM
|
||||
LOG_INFO("migration dag net failed, make check dag schedule");
|
||||
#endif
|
||||
} else if (EN_DISABLE_WAITING_CONVERT_CO_WHEN_MIGRATION) {
|
||||
can_schedule = true;
|
||||
reason = ObCheckScheduleReason::CONVERT_DISABLED;
|
||||
FLOG_INFO("[CS-Replica] schedule check convert dag right now since waiting convert is disabled", K(ret), K(reason), K(migration_ctx.tablet_group_mgr_));
|
||||
} else {
|
||||
const int64_t tablet_group_cnt = migration_ctx.tablet_group_mgr_.get_tablet_group_ctx_count();
|
||||
ObHATabletGroupCtx *ctx = nullptr;
|
||||
@ -477,9 +494,9 @@ int ObDataTabletsCheckCOConvertDag::inner_check_can_schedule(
|
||||
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - current_time;
|
||||
if (REACH_TENANT_TIME_INTERVAL(OB_DATA_TABLETS_NOT_CHECK_CONVERT_THRESHOLD)) {
|
||||
LOG_INFO("[CS-Replica] finish check_can_schedule", K(ret), K(can_schedule), K(reason), K(wait_one_round_time), K(total_wait_time), K(cost_time), KPC(this), K(migration_ctx.tablet_group_mgr_));
|
||||
LOG_INFO("[CS-Replica] finish check_can_schedule", K(ret), K(can_schedule), K(reason), K(wait_one_round_time), K(total_wait_time), K(cost_time), K(migration_ctx.tablet_group_mgr_));
|
||||
} else {
|
||||
LOG_TRACE("[CS-Replica] finish check_can_schedule", K(ret), K(can_schedule), K(reason), K(wait_one_round_time), K(total_wait_time), K(cost_time), KPC(this), K(migration_ctx.tablet_group_mgr_));
|
||||
LOG_TRACE("[CS-Replica] finish check_can_schedule", K(ret), K(can_schedule), K(reason), K(wait_one_round_time), K(total_wait_time), K(cost_time), K(migration_ctx.tablet_group_mgr_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -547,6 +564,17 @@ int ObDataTabletsCheckCOConvertDag::create_first_task()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataTabletsCheckCOConvertDag::report_result()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_EAGAIN == dag_ret_) {
|
||||
// ignore waiting convert co error code, prevent migration dag net retry
|
||||
} else if (OB_FAIL(ObStorageHADag::report_result())) {
|
||||
LOG_WARN("failed to report result", K(ret), KPC(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObDataTabletsCheckCOConvertDag::operator == (const ObIDag &other) const
|
||||
{
|
||||
bool is_same = true;
|
||||
@ -682,8 +710,12 @@ int ObDataTabletsCheckConvertTask::process()
|
||||
LOG_INFO("migration dag net failed, make check dag exit");
|
||||
#endif
|
||||
} else if (!all_state_deterministic) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("not wait all tablets convert finish, failed this task, make dag retry", K(ret), K(all_state_deterministic), KPC_(ctx));
|
||||
if (EN_DISABLE_WAITING_CONVERT_CO_WHEN_MIGRATION) {
|
||||
FLOG_INFO("[CS-Replica] stop waiting convert co when migration if there are too many tablets", K(ret));
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("not wait all tablets convert finish, failed this task, make dag retry", K(ret), K(all_state_deterministic), KPC_(ctx));
|
||||
}
|
||||
}
|
||||
LOG_TRACE("[CS-Replica] Finish process check data tablets convert to column store", K(ret), KPC_(ls), KPC_(ctx));
|
||||
return ret;
|
||||
|
@ -43,17 +43,21 @@ public:
|
||||
OB_INLINE bool is_progressing() const { return Status::PROGRESSING == status_; }
|
||||
OB_INLINE bool is_finished() const { return Status::FINISHED == status_; }
|
||||
OB_INLINE bool is_retry_exhausted() const { return retry_cnt_ >= MAX_RETRY_CNT; }
|
||||
OB_INLINE bool is_eagain_exhausted() const { return eagain_cnt_ >= MAX_EAGAIN_CNT; }
|
||||
void set_progressing();
|
||||
OB_INLINE void set_finished() { status_ = Status::FINISHED; }
|
||||
OB_INLINE void set_retry_exhausted() { status_ = Status::RETRY_EXHAUSTED; }
|
||||
OB_INLINE void inc_retry_cnt() { retry_cnt_++; }
|
||||
OB_INLINE void inc_eagain_cnt() { eagain_cnt_++; }
|
||||
public:
|
||||
const static int64_t MAX_RETRY_CNT = 3;
|
||||
const static int64_t MAX_EAGAIN_CNT = 9; // allow at most 9 * OB_DATA_TABLETS_NOT_CHECK_CONVERT_THRESHOLD (20min) = 3h
|
||||
public:
|
||||
ObTabletID tablet_id_;
|
||||
share::ObDagId co_dag_net_id_;
|
||||
Status status_;
|
||||
int64_t retry_cnt_;
|
||||
int64_t eagain_cnt_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
@ -107,13 +111,15 @@ public:
|
||||
READY_TO_CHECK = 1,
|
||||
ALL_DETERMINISTIC = 2,
|
||||
WAIT_TIME_EXCEED = 3,
|
||||
MAX_NOT_SCHEDULE = 4,
|
||||
CONVERT_DISABLED = 4,
|
||||
MAX_NOT_SCHEDULE,
|
||||
};
|
||||
public:
|
||||
ObDataTabletsCheckCOConvertDag();
|
||||
virtual ~ObDataTabletsCheckCOConvertDag();
|
||||
virtual bool check_can_schedule() override;
|
||||
virtual int create_first_task() override;
|
||||
virtual int report_result() override;
|
||||
int init(
|
||||
ObIHADagNetCtx *ha_dag_net_ctx,
|
||||
ObLS *ls);
|
||||
@ -129,7 +135,7 @@ public:
|
||||
#ifdef ERRSIM
|
||||
const static int64_t OB_DATA_TABLETS_NOT_CHECK_CONVERT_THRESHOLD = 30 * 1000 * 1000; /*30s*/
|
||||
#else
|
||||
const static int64_t OB_DATA_TABLETS_NOT_CHECK_CONVERT_THRESHOLD = 10 * 60 * 1000 * 1000; /*10min*/
|
||||
const static int64_t OB_DATA_TABLETS_NOT_CHECK_CONVERT_THRESHOLD = 20 * 60 * 1000 * 1000; /*20min*/
|
||||
#endif
|
||||
private:
|
||||
ObLS *ls_;
|
||||
|
@ -3522,9 +3522,10 @@ void ObTabletFinishMigrationTask::schedule_convert_co_merge(
|
||||
// Specific dag net id for co merge dag net to convert row store tablet into columnar store one.
|
||||
// Use ObDataTabletsCheckCOConvertDag to check the convert result and re-schedule dag net if it failed, with the same dag net id.
|
||||
ObDagId co_dag_net_id;
|
||||
int schedule_ret = OB_SUCCESS;
|
||||
if (OB_FAIL(group_convert_ctx->get_co_dag_net_id(tablet_id, co_dag_net_id))) {
|
||||
LOG_WARN("failed to get convert ctx", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(compaction::ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(ls_id, *tablet, 0 /*retry_times*/, co_dag_net_id))) {
|
||||
} else if (OB_FAIL(compaction::ObTenantTabletScheduler::schedule_convert_co_merge_dag_net(ls_id, *tablet, 0 /*retry_times*/, co_dag_net_id, schedule_ret))) {
|
||||
LOG_WARN("failed to schedule convert co merge for cs replica", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(group_convert_ctx->set_convert_progressing(tablet_id))) {
|
||||
LOG_WARN("failed to set convert progressing", K(ret), K(tablet_id));
|
||||
|
@ -7992,7 +7992,7 @@ int ObTablet::get_sstable_column_checksum(
|
||||
ObArenaAllocator allocator;
|
||||
if (OB_FAIL(load_storage_schema(allocator, storage_schema))) {
|
||||
LOG_WARN("fail to load storage schema", K(ret));
|
||||
} else if (OB_FAIL(static_cast<const ObCOSSTableV2 *>(&sstable)->fill_column_ckm_array(*storage_schema, column_checksums, storage_schema->is_cs_replica_compat()))) {
|
||||
} else if (OB_FAIL(static_cast<const ObCOSSTableV2 *>(&sstable)->fill_column_ckm_array(*storage_schema, column_checksums))) {
|
||||
LOG_WARN("fail to fill_column_ckm_array", K(ret), K(sstable));
|
||||
}
|
||||
ObTabletObjLoadHelper::free(allocator, storage_schema);
|
||||
|
Loading…
x
Reference in New Issue
Block a user