add compaction obtest & unittest

This commit is contained in:
yangqise7en 2024-07-12 03:16:39 +00:00 committed by ob-robot
parent 84d44176d9
commit 3e720b0574
28 changed files with 257 additions and 231 deletions

View File

@ -425,11 +425,22 @@ GLOBAL_ERRSIM_POINT_DEF(739, EN_CO_MREGE_DAG_SCHEDULE_REST, "");
GLOBAL_ERRSIM_POINT_DEF(740, EN_COMPACTION_SCHEDULE_MEDIUM_MERGE_AFTER_MINI, "");
GLOBAL_ERRSIM_POINT_DEF(741, EN_COMPACTION_MEDIUM_INIT_LARGE_PARALLEL_RANGE, "");
GLOBAL_ERRSIM_POINT_DEF(742, EN_GET_TABLET_LS_PAIR_IN_RS, "");
GLOBAL_ERRSIM_POINT_DEF(743, EN_SHARED_STORAGE_COMPACTION_CHOOSE_EXEC_SVR, "");
GLOBAL_ERRSIM_POINT_DEF(744, EN_SHARED_STORAGE_SKIP_USER_TABLET_REFRESH, "");
GLOBAL_ERRSIM_POINT_DEF(745, EN_SHARED_STORAGE_SCHEULD_TABLET_IN_IDLE, "");
GLOBAL_ERRSIM_POINT_DEF(746, EN_SHARED_STORAGE_DONT_UPDATE_LS_STATE, "");
GLOBAL_ERRSIM_POINT_DEF(747, EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW, "change last datum of row into int(999) for making checksum error");
GLOBAL_ERRSIM_POINT_DEF(748, EN_COMPACTION_ITER_SET_BATCH_CNT, "");
// compaction end at 750
// please add new trace point after 750
GLOBAL_ERRSIM_POINT_DEF(751, EN_SESSION_LEAK_COUNT_THRESHOLD, "used to control the threshold of report session leak ERROR");
GLOBAL_ERRSIM_POINT_DEF(800, EN_END_PARTICIPANT, "");
// compaction 801 - 899
// compaction 801 - 899
//LS Migration Related 900 - 1000
GLOBAL_ERRSIM_POINT_DEF(900, EN_INITIAL_MIGRATION_TASK_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(901, EN_START_MIGRATION_TASK_FAILED, "");

View File

@ -91,7 +91,7 @@ int ObAllVirtualDagWarningHistory::process_curr_tenant(ObNewRow *&row)
break;
case TENANT_ID:
//tenant_id
cells[i].set_int(dag_warning_info_.tenant_id_);
cells[i].set_int(MTL_ID());
break;
case TASK_ID:
//table_id

View File

@ -97,7 +97,7 @@ int ObAllVirtualTabletCompactionHistory::process_curr_tenant(ObNewRow *&row)
cells[i].set_int(ObServerConfig::get_instance().self_addr_.get_port());
break;
case TENANT_ID:
cells[i].set_int(merge_info_.tenant_id_);
cells[i].set_int(MTL_ID());
break;
case LS_ID:
// index_id

View File

@ -56,13 +56,14 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_CO_MERGE_FINISH, ObDagPrio::DAG_PRIO_COMPACT
false, 3, {"ls_id", "tablet_id", "compaction_scn"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TX_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::SPECIAL_TABLE_MERGE_TASK, "TX_TABLE_MERGE", "COMPACTION",
false, 3, {"ls_id", "tablet_id", "compaction_scn"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::WRITE_CKPT_TASK, "WRITE_CKPT", "COMPACTION",
false, 2, {"ls_id", "tablet_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_MINI_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::MDS_MINI_MERGE_TASK, "MDS_MINI_MERGE", "COMPACTION",
false, 3, {"ls_id", "tablet_id", "flush_scn"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION",
false, 2, {"ls_id", "tablet_count"})
// NOTICE: if you add/delete a compaction dag type here, remember to alter function is_compaction_dag and get_diagnose_tablet_type in ob_tenant_dag_scheduler.h
/*
* NOTICE: if you add/delete a compaction dag type here, remember to alter function is_compaction_dag and get_diagnose_tablet_type in ob_tenant_dag_scheduler.h
* AND update check_ls_compaction_dag_exist_with_cancel
*/
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL_COMPLEMENT", "DDL",
true, 7, {"ls_id", "source_tablet_id", "dest_tablet_id", "data_table_id", "target_table_id", "schema_version", "snapshot_version"})
@ -260,7 +261,7 @@ struct ObDagTypeStruct
struct ObDagType
{
enum ObDagTypeEnum
enum ObDagTypeEnum : uint8_t
{
#define DAG_SCHEDULER_DAG_TYPE_DEF(dag_type, init_dag_prio, sys_task_type, dag_type_str, dag_module_str, diagnose_with_comment, diagnose_priority, diagnose_int_info_cnt, ...) dag_type,
#include "ob_dag_scheduler_config.h"

View File

@ -39,8 +39,8 @@ const char * ObDagWarningInfo::get_dag_status_str(enum ObDagStatus status)
return str;
}
ObDagWarningInfo::ObDagWarningInfo() :
compaction::ObIDiagnoseInfo(),
ObDagWarningInfo::ObDagWarningInfo(const bool need_free_param) :
compaction::ObIDiagnoseInfo(need_free_param),
task_id_(),
dag_type_(share::ObDagType::DAG_TYPE_MAX),
dag_ret_(OB_SUCCESS),
@ -62,7 +62,6 @@ void ObDagWarningInfo::shallow_copy(ObIDiagnoseInfo *other)
{
ObDagWarningInfo *info = nullptr;
if (OB_NOT_NULL(other) && OB_NOT_NULL(info = dynamic_cast<ObDagWarningInfo *>(other))) {
tenant_id_ = info->tenant_id_;
priority_ = info->priority_;
task_id_ = info->task_id_;
dag_type_ = info->dag_type_;

View File

@ -40,14 +40,18 @@ public:
static const char *get_dag_status_str(enum ObDagStatus status);
ObDagWarningInfo();
ObDagWarningInfo(const bool need_free_param = true);
~ObDagWarningInfo();
OB_INLINE void reset();
TO_STRING_KV(K_(tenant_id), K_(task_id), K_(dag_type), K_(dag_ret), K_(dag_status),
TO_STRING_KV(K_(task_id), K_(dag_type), K_(dag_ret), K_(dag_status),
K_(gmt_create), K_(gmt_modified), K_(retry_cnt), K_(hash), K_(location));
virtual void shallow_copy(ObIDiagnoseInfo *other) override;
virtual void update(ObIDiagnoseInfo *other) override;
virtual int64_t get_hash() const override;
int64_t get_deep_copy_size() const
{ // for unittest
return sizeof(ObDagWarningInfo) + (OB_NOT_NULL(info_param_) ? info_param_->get_deep_copy_size() : 0);
}
public:
share::ObDagId task_id_;
share::ObDagType::ObDagTypeEnum dag_type_;
@ -62,7 +66,6 @@ public:
OB_INLINE void ObDagWarningInfo::reset()
{
tenant_id_ = 0;
task_id_.reset();
dag_type_ = share::ObDagType::DAG_TYPE_MAX;
info_param_ = NULL;

View File

@ -62,7 +62,7 @@ struct ObDiagnoseInfoStruct {
const char *info_str_fmt[DIAGNOSE_INFO_STR_FMT_MAX_NUM];
};
enum ObSuspectInfoType
enum ObSuspectInfoType : uint8_t
{
#define SUSPECT_INFO_TYPE_DEF(suspect_info_type, info_priority, with_comment, info_str, int_info_cnt, ...) suspect_info_type,
#include "ob_diagnose_config.h"

View File

@ -846,7 +846,6 @@ int ObIDag::gene_warning_info(ObDagWarningInfo &info, ObIAllocator &allocator)
info.gmt_modified_ = ObTimeUtility::fast_current_time();
info.location_ = error_location_;
info.dag_type_ = type_;
info.tenant_id_ = MTL_ID();
info.priority_ = static_cast<uint32_t>(ObDiagnoseInfoPrio::DIAGNOSE_PRIORITY_HIGH);
info.gmt_create_ = info.gmt_modified_;
info.dag_status_ = ObDagWarningInfo::ODS_WARNING;

View File

@ -189,7 +189,6 @@ int ObCOTabletMergeCtx::collect_running_info()
{
int ret = OB_SUCCESS;
ObSSTableMergeInfo fake_merge_info;
fake_merge_info.tenant_id_ = MTL_ID();
fake_merge_info.ls_id_ = get_ls_id();
fake_merge_info.is_fake_ = true;
fake_merge_info.tablet_id_ = get_tablet_id();

View File

@ -760,12 +760,11 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info(
}
ObScheduleSuspectInfo ret_info;
int64_t suspect_info_hash = ObScheduleSuspectInfo::gen_hash(MTL_ID(), hash);
info_allocator.reuse();
if (OB_SUCCESS == MTL(compaction::ObScheduleSuspectInfoMgr *)->get_with_param(suspect_info_hash, &ret_info, info_allocator)) {
if (OB_SUCCESS == MTL(compaction::ObScheduleSuspectInfoMgr *)->get_with_param(hash, &ret_info, info_allocator)) {
sstable_merge_info.suspect_add_time_ = ret_info.add_time_;
sstable_merge_info.info_param_ = ret_info.info_param_;
if (OB_TMP_FAIL(MTL(compaction::ObScheduleSuspectInfoMgr *)->delete_info(suspect_info_hash))) {
if (OB_TMP_FAIL(MTL(compaction::ObScheduleSuspectInfoMgr *)->delete_info(hash))) {
LOG_WARN_RET(tmp_ret, "failed to delete old suspect info", K(sstable_merge_info));
}
}

View File

@ -45,9 +45,7 @@ namespace compaction
* */
int64_t ObScheduleSuspectInfo::hash() const
{
int64_t hash_value = ObMergeDagHash::inner_hash();
hash_value = common::murmurhash(&tenant_id_, sizeof(tenant_id_), hash_value);
return hash_value;
return ObMergeDagHash::inner_hash();
}
bool ObScheduleSuspectInfo::is_valid() const
@ -61,13 +59,6 @@ bool ObScheduleSuspectInfo::is_valid() const
return bret;
}
int64_t ObScheduleSuspectInfo::gen_hash(int64_t tenant_id, int64_t dag_hash)
{
int64_t hash_value = dag_hash;
hash_value = common::murmurhash(&tenant_id, sizeof(tenant_id), hash_value);
return hash_value;
}
void ObScheduleSuspectInfo::shallow_copy(ObIDiagnoseInfo *other)
{
ObScheduleSuspectInfo *info = nullptr;
@ -75,7 +66,6 @@ void ObScheduleSuspectInfo::shallow_copy(ObIDiagnoseInfo *other)
merge_type_ = info->merge_type_;
ls_id_ = info->ls_id_;
tablet_id_ = info->tablet_id_;
tenant_id_ = info->tenant_id_;
priority_ = info->priority_;
add_time_ = info->add_time_;
hash_ = info->hash_;
@ -211,7 +201,7 @@ int ObIDiagnoseInfoMgr::init(bool with_map,
page_size_ = std::max(page_size, static_cast<int64_t>(INFO_PAGE_SIZE_LIMIT));
max_size = upper_align(max_size, page_size_);
if (OB_FAIL(allocator_.init(ObMallocAllocator::get_instance(),
page_size,
page_size_,
lib::ObMemAttr(tenant_id, pool_label_),
0,
max_size,
@ -304,8 +294,8 @@ int ObIDiagnoseInfoMgr::get_with_param(const int64_t key, ObIDiagnoseInfo *out_i
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "info_param is null", K(ret), K(info));
} else {
out_info->shallow_copy(info);
if (OB_FAIL(info->info_param_->deep_copy(allocator, out_info->info_param_))) {
out_info->shallow_copy(info/*src*/);
if (OB_FAIL(info->info_param_->deep_copy(allocator, out_info->info_param_/*dst*/))) {
STORAGE_LOG(WARN, "failed to deep copy info param", K(ret));
}
}
@ -492,7 +482,7 @@ int ObIDiagnoseInfoMgr::purge_with_rw_lock(bool batch_purge)
if (OB_SUCC(ret)) {
STORAGE_LOG(INFO, "success to purge", K(ret), K(batch_purge), K(batch_size), "max_size", allocator_.get_max(),
"used_size", allocator_.used(), "total_size", allocator_.total(), K(purge_count));
"used_size", allocator_.used(), "total_size", allocator_.total(), K(purge_count), K(info_list_.get_size()));
}
++version_;
return ret;
@ -533,7 +523,7 @@ int ObScheduleSuspectInfoMgr::add_suspect_info(const int64_t key, ObScheduleSusp
* */
#define ADD_DIAGNOSE_INFO(merge_type, ls_id, tablet_id, status, time, ...) \
SET_DIAGNOSE_INFO(info_array_[idx_++], merge_type, MTL_ID(), ls_id, tablet_id, status, time, __VA_ARGS__)
SET_DIAGNOSE_INFO(info_array_[idx_++], merge_type, ls_id, tablet_id, status, time, __VA_ARGS__)
#define ADD_DIAGNOSE_INFO_FOR_TABLET(merge_type, status, time, ...) \
ADD_DIAGNOSE_INFO(merge_type, ls_id, tablet_id, status, time, __VA_ARGS__)
#define ADD_COMMON_DIAGNOSE_INFO(merge_type, status, time, ...) \
@ -772,7 +762,6 @@ int ObCompactionDiagnoseMgr::get_suspect_info(
int ret = OB_SUCCESS;
suspect_info_type = share::ObSuspectInfoType::SUSPECT_INFO_TYPE_MAX;
ObScheduleSuspectInfo input_info;
input_info.tenant_id_ = MTL_ID();
input_info.merge_type_ = merge_type;
input_info.ls_id_ = ls_id;
input_info.tablet_id_ = tablet_id;
@ -1577,7 +1566,7 @@ int ObCompactionDiagnoseMgr::get_suspect_and_warning_info(
dag_hash.merge_type_ = merge_type;
dag_hash.ls_id_ = ls_id;
dag_hash.tablet_id_ = tablet_id;
if (OB_FAIL(MTL(ObScheduleSuspectInfoMgr *)->get_with_param(ObScheduleSuspectInfo::gen_hash(MTL_ID(), dag_hash.inner_hash()), &info, allocator))) {
if (OB_FAIL(MTL(ObScheduleSuspectInfoMgr *)->get_with_param(dag_hash.inner_hash(), &info, allocator))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("failed to get suspect info", K(ret), K(dag_hash));
} else { // no schedule suspect info

View File

@ -21,10 +21,6 @@
namespace oceanbase
{
namespace storage
{
class ObTenantTabletIterator;
}
namespace rootserver
{
class ObMajorFreezeService;
@ -40,7 +36,7 @@ namespace compaction
class ObIDiagnoseInfoMgr;
struct ObDiagnoseTabletCompProgress;
enum ObInfoParamStructType {
enum ObInfoParamStructType : uint8_t {
SUSPECT_INFO_PARAM = 0,
DAG_WARNING_INFO_PARAM,
INFO_PARAM_TYPE_MAX
@ -72,7 +68,7 @@ struct ObIBasicInfoParam
virtual int fill_comment(char *buf, const int64_t buf_len) const = 0;
virtual int deep_copy(ObIAllocator &allocator, ObIBasicInfoParam *&out_param) const = 0;
virtual int deep_copy(void *dest_buf, const int64_t buf_len, ObIBasicInfoParam *&out_param) const = 0;
static const int64_t MAX_INFO_PARAM_SIZE = 256;
ObInfoParamType type_;
@ -93,22 +89,22 @@ struct ObDiagnoseInfoParam : public ObIBasicInfoParam
virtual int64_t get_deep_copy_size() const override;
virtual int fill_comment(char *buf, const int64_t buf_len) const override;
virtual int deep_copy(ObIAllocator &allocator, ObIBasicInfoParam *&out_param) const override;
virtual int deep_copy(void *buf, const int64_t buf_len, ObIBasicInfoParam *&out_param) const override;
int64_t param_int_[int_size];
char comment_[str_size];
};
struct ObIDiagnoseInfo : public common::ObDLinkBase<ObIDiagnoseInfo> {
ObIDiagnoseInfo()
ObIDiagnoseInfo(const bool need_free_param)
: is_deleted_(false),
need_free_param_(need_free_param),
priority_(0),
seq_num_(0),
tenant_id_(OB_INVALID_ID),
info_param_(nullptr)
{}
virtual void destroy(ObIAllocator &allocator)
{
if (OB_NOT_NULL(info_param_)) {
if (OB_NOT_NULL(info_param_) && need_free_param_) {
info_param_->destroy();
allocator.free(info_param_);
info_param_ = nullptr;
@ -123,10 +119,11 @@ struct ObIDiagnoseInfo : public common::ObDLinkBase<ObIDiagnoseInfo> {
int deep_copy(ObIAllocator &allocator, T *&out_info);
bool is_deleted() const { return ATOMIC_LOAD(&is_deleted_); }
void set_deleted() { ATOMIC_SET(&is_deleted_, true); }
bool is_deleted_; // for iterator
bool need_free_param_;
uint32_t priority_;
uint64_t seq_num_; // for iterator
uint64_t tenant_id_;
ObIBasicInfoParam *info_param_;
};
@ -137,15 +134,18 @@ int ObIDiagnoseInfo::deep_copy(ObIAllocator &allocator, T *&out_info)
int ret = OB_SUCCESS;
void *buf = nullptr;
out_info = nullptr;
if(OB_ISNULL(buf = allocator.alloc(sizeof(T)))) {
const int64_t deep_copy_size = OB_NOT_NULL(info_param_) ? info_param_->get_deep_copy_size() : 0;
const int64_t alloc_size = sizeof(T) + deep_copy_size;
if (OB_ISNULL(buf = allocator.alloc(alloc_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc", K(ret), K(alloc_size), K(allocator.used()));
} else {
T *info = nullptr;
if (OB_ISNULL(info = new (buf) T())) {
if (OB_ISNULL(info = new (buf) T(false/*need_free_param*/))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "new diagnose info is nullptr", K(ret));
} else if (OB_NOT_NULL(info_param_)) {
if (OB_FAIL(info_param_->deep_copy(allocator, info->info_param_))){
if (OB_FAIL(info_param_->deep_copy((char *)buf + sizeof(T), deep_copy_size, info->info_param_))){
STORAGE_LOG(WARN, "fail to deep copy info param", K(ret));
}
}
@ -164,8 +164,8 @@ int ObIDiagnoseInfo::deep_copy(ObIAllocator &allocator, T *&out_info)
struct ObScheduleSuspectInfo : public ObIDiagnoseInfo, public ObMergeDagHash
{
ObScheduleSuspectInfo()
: ObIDiagnoseInfo(),
ObScheduleSuspectInfo(const bool need_free_param = true)
: ObIDiagnoseInfo(need_free_param),
ObMergeDagHash(),
add_time_(0),
hash_(0)
@ -175,8 +175,7 @@ struct ObScheduleSuspectInfo : public ObIDiagnoseInfo, public ObMergeDagHash
virtual void shallow_copy(ObIDiagnoseInfo *other) override;
virtual int64_t get_add_time() const override;
virtual int64_t get_hash() const override;
static int64_t gen_hash(int64_t tenant_id, int64_t dag_hash);
TO_STRING_KV(K_(tenant_id), "merge_type", merge_type_to_str(merge_type_), K_(ls_id), K_(tablet_id), K_(add_time), K_(hash));
TO_STRING_KV("merge_type", merge_type_to_str(merge_type_), K_(ls_id), K_(tablet_id), K_(add_time), K_(hash));
int64_t add_time_;
int64_t hash_;
@ -270,7 +269,7 @@ public:
static const int64_t GC_LOW_PERCENTAGE = 40; // GC_LOW_PERCENTAGE/100
static const int64_t INFO_BUCKET_LIMIT = 1000;
static const int64_t INFO_PAGE_SIZE = (1 << 16); // 64KB
static const int64_t INFO_PAGE_SIZE_LIMIT = (1 << 12); // 4KB
static const int64_t INFO_PAGE_SIZE_LIMIT = (1 << 13); // 8KB
static const int64_t INFO_IDLE_SIZE = 16LL * 1024LL * 1024LL; // 16MB
static const int64_t INFO_MAX_SIZE = 16LL * 1024LL * 1024LL; // 16MB // lowest
typedef common::hash::ObHashMap<int64_t, ObIDiagnoseInfo *> InfoMap;
@ -313,7 +312,7 @@ int ObIDiagnoseInfoMgr::alloc_and_add(const int64_t key, T *input_info)
}
}
if (OB_HASH_EXIST == ret) {
// do nothing
// OB_HASH_EXIST means info in map have higher priority than input info
ret = OB_SUCCESS;
} else if (OB_HASH_NOT_EXIST == ret || OB_SUCC(ret)) {
ret = OB_SUCCESS;
@ -372,6 +371,14 @@ protected:
struct ObCompactionDiagnoseInfo
{
ObCompactionDiagnoseInfo()
: merge_type_(),
tenant_id_(0),
ls_id_(0),
tablet_id_(0),
timestamp_(0),
status_(DIA_STATUS_MAX)
{}
enum ObDiagnoseStatus
{
DIA_STATUS_NOT_SCHEDULE = 0,
@ -384,11 +391,11 @@ struct ObCompactionDiagnoseInfo
};
const static char *ObDiagnoseStatusStr[DIA_STATUS_MAX];
static const char * get_diagnose_status_str(ObDiagnoseStatus status);
TO_STRING_KV("merge_type", merge_type_to_str(merge_type_), K_(tenant_id), K_(ls_id), K_(tablet_id), K_(status), K_(timestamp),
K_(diagnose_info));
TO_STRING_KV("merge_type", merge_type_to_str(merge_type_), K_(tenant_id), K_(ls_id), K_(tablet_id),
"status", get_diagnose_status_str(status_), K_(timestamp), K_(diagnose_info));
compaction::ObMergeType merge_type_;
int64_t tenant_id_;
uint64_t tenant_id_;
int64_t ls_id_;
int64_t tablet_id_;
int64_t timestamp_;
@ -408,11 +415,11 @@ public:
const static char *ObCompactionDiagnoseTypeStr[COMPACTION_DIAGNOSE_TYPE_MAX];
static const char * get_compaction_diagnose_type_str(ObCompactionDiagnoseType type);
static ObMergeType get_compaction_diagnose_merge_type(ObCompactionDiagnoseType type);
struct ObLSCheckStatus
struct ObLSCheckStatus
{
public:
ObLSCheckStatus() { reset(); }
ObLSCheckStatus(bool weak_read_ts_ready, bool need_merge)
ObLSCheckStatus(const bool weak_read_ts_ready, const bool need_merge)
: weak_read_ts_ready_(weak_read_ts_ready),
need_merge_(need_merge)
{}
@ -594,29 +601,34 @@ private:
#define IS_UNKNOW_LS_ID(ls_id) (ObLSID(INT64_MAX) == ls_id)
#define UNKNOW_TABLET_ID ObTabletID(INT64_MAX)
#define IS_UNKNOW_TABLET_ID(tablet_id) (ObTabletID(INT64_MAX) == tablet_id)
#define DEL_SUSPECT_INFO(type, ls_id, tablet_id, diagnose_type) \
{ \
compaction::ObMergeDagHash dag_hash; \
dag_hash.merge_type_ = type; \
dag_hash.ls_id_ = ls_id; \
dag_hash.tablet_id_ = tablet_id; \
int64_t tenant_id = MTL_ID(); \
int64_t hash_value = compaction::ObScheduleSuspectInfo::gen_hash(tenant_id, dag_hash.inner_hash()); \
if (OB_TMP_FAIL(MTL(compaction::ObScheduleSuspectInfoMgr *)->delete_info(hash_value))) { \
if (OB_HASH_NOT_EXIST != tmp_ret) { \
STORAGE_LOG(WARN, "failed to delete suspect info", K(tmp_ret), K(dag_hash), K(tenant_id)); \
} \
} else if (OB_TMP_FAIL(MTL(compaction::ObDiagnoseTabletMgr *)->delete_diagnose_tablet(ls_id, tablet_id, diagnose_type))) { \
STORAGE_LOG(WARN, "failed to delete diagnose tablet", K(tmp_ret), K(ls_id), K(tablet_id)); \
} else { \
STORAGE_LOG(DEBUG, "success to delete suspect info", K(tmp_ret), K(dag_hash), K(tenant_id)); \
} \
}
#define DEL_SUSPECT_INFO(type, ls_id, tablet_id, diagnose_type) \
{ \
compaction::ObMergeDagHash dag_hash; \
dag_hash.merge_type_ = type; \
dag_hash.ls_id_ = ls_id; \
dag_hash.tablet_id_ = tablet_id; \
int64_t hash_value = dag_hash.inner_hash(); \
if (OB_TMP_FAIL(MTL(compaction::ObScheduleSuspectInfoMgr *) \
->delete_info(hash_value))) { \
if (OB_HASH_NOT_EXIST != tmp_ret) { \
STORAGE_LOG(WARN, "failed to delete suspect info", K(tmp_ret), \
K(dag_hash)); \
} \
} else if (OB_TMP_FAIL(MTL(compaction::ObDiagnoseTabletMgr *) \
->delete_diagnose_tablet(ls_id, tablet_id, \
diagnose_type))) { \
STORAGE_LOG(WARN, "failed to delete diagnose tablet", K(tmp_ret), \
K(ls_id), K(tablet_id)); \
} else { \
STORAGE_LOG(DEBUG, "success to delete suspect info", K(tmp_ret), \
K(dag_hash)); \
} \
}
#define DEFINE_DIAGNOSE_PRINT_KV(n) \
template <LOG_TYPENAME_TN##n> \
int SET_DIAGNOSE_INFO(ObCompactionDiagnoseInfo &diagnose_info, compaction::ObMergeType type, \
const int64_t tenant_id, const ObLSID ls_id, const ObTabletID tablet_id, \
const ObLSID ls_id, const ObTabletID tablet_id, \
ObCompactionDiagnoseInfo::ObDiagnoseStatus status, \
const int64_t timestamp, \
LOG_PARAMETER_KV##n) \
@ -625,7 +637,7 @@ private:
int ret = OB_SUCCESS; \
diagnose_info.merge_type_ = type; \
diagnose_info.ls_id_ = ls_id.id(); \
diagnose_info.tenant_id_ = tenant_id; \
diagnose_info.tenant_id_ = MTL_ID(); \
diagnose_info.tablet_id_ = tablet_id.id(); \
diagnose_info.status_ = status; \
diagnose_info.timestamp_ = timestamp; \
@ -763,7 +775,6 @@ ADD_SUSPECT_INFO(merge_type, diagnose_type, UNKNOW_LS_ID, UNKNOW_TABLET_ID, info
int64_t __pos = 0; \
int ret = OB_SUCCESS; \
compaction::ObScheduleSuspectInfo info; \
info.tenant_id_ = MTL_ID(); \
info.priority_ = static_cast<uint32_t>(OB_SUSPECT_INFO_TYPES[info_type].priority); \
info.merge_type_ = type; \
info.ls_id_ = ls_id; \
@ -796,7 +807,6 @@ ADD_SUSPECT_INFO(merge_type, diagnose_type, UNKNOW_LS_ID, UNKNOW_TABLET_ID, info
int64_t __pos = 0; \
int ret = OB_SUCCESS; \
compaction::ObScheduleSuspectInfo info; \
info.tenant_id_ = MTL_ID(); \
info.priority_ = static_cast<uint32_t>(OB_SUSPECT_INFO_TYPES[info_type].priority); \
info.merge_type_ = type; \
info.ls_id_ = ls_id; \
@ -890,6 +900,21 @@ int ObDiagnoseInfoParam<int_size, str_size>::deep_copy(ObIAllocator &allocator,
if (OB_ISNULL(buf = allocator.alloc(get_deep_copy_size()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc memory", K(ret));
} else if (OB_FAIL(deep_copy(buf, get_deep_copy_size(), out_param))) {
STORAGE_LOG(WARN, "fail to deep copy", K(ret));
allocator.free(buf);
}
return ret;
}
template <int64_t int_size, int64_t str_size>
int ObDiagnoseInfoParam<int_size, str_size>::deep_copy(void *buf, const int64_t buf_len, ObIBasicInfoParam *&out_param) const
{
int ret = OB_SUCCESS;
out_param = NULL;
if (OB_UNLIKELY(NULL == buf || get_deep_copy_size() < buf_len)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "buf len is less than deep copy size", K(ret), KP(buf), K(get_deep_copy_size()), K(buf_len));
} else {
ObDiagnoseInfoParam<int_size, str_size> *info_param = nullptr;
if (OB_ISNULL(info_param = (new (buf) ObDiagnoseInfoParam<int_size, str_size>()))) {
@ -900,11 +925,7 @@ int ObDiagnoseInfoParam<int_size, str_size>::deep_copy(ObIAllocator &allocator,
info_param->struct_type_ = struct_type_;
MEMCPY(info_param->param_int_, param_int_, int_size * sizeof(int64_t));
MEMCPY(info_param->comment_, comment_, str_size);
}
if (OB_SUCC(ret)) {
out_param = info_param;
} else {
allocator.free(buf);
}
}
return ret;

View File

@ -556,7 +556,6 @@ int ObCompactionSuggestionMgr::analyze_merge_info(
if (strlen(buf) > 0) {
suggestion.merge_type_ = merge_info.merge_type_;
suggestion.tenant_id_ = merge_info.tenant_id_;
suggestion.ls_id_ = merge_info.ls_id_.id();
suggestion.tablet_id_ = merge_info.tablet_id_.id();
suggestion.merge_start_time_ = merge_info.merge_start_time_;

View File

@ -1057,7 +1057,7 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
}
#endif
int64_t storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION_V3;
int64_t storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION_LATEST;
if (medium_compat_version < ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2) {
storage_schema_version = ObStorageSchema::STORAGE_SCHEMA_VERSION;

View File

@ -62,7 +62,6 @@ int ObDataDescHelper::build(
// init merge info
const ObStaticMergeParam &static_param = merge_param.static_param_;
output_merge_info.reset();
output_merge_info.tenant_id_ = MTL_ID();
output_merge_info.ls_id_ = static_param.get_ls_id();
output_merge_info.tablet_id_ = static_param.get_tablet_id();
output_merge_info.merge_type_ = static_param.get_merge_type();
@ -654,6 +653,21 @@ int ObPartitionMajorMerger::inner_init()
return ret;
}
#ifdef ERRSIM
void write_wrong_row(const ObTabletID &tablet_id, const ObDatumRow &row)
{
int ret = OB_SUCCESS;
ret = OB_E(EventTable::EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW) ret;
if (OB_FAIL(ret)
&& tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID
&& (OB_CHECKSUM_ERROR == ret || GCTX.server_id_ == -ret)) {
ObDatumRow &tmp_row = const_cast<ObDatumRow &>(row);
tmp_row.storage_datums_[tmp_row.get_column_count() - 1].set_int(999);
LOG_ERROR("ERRSIM EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW", K(ret), K(tablet_id), K(row));
}
}
#endif
int ObPartitionMajorMerger::inner_process(const ObDatumRow &row)
{
int ret = OB_SUCCESS;
@ -661,6 +675,11 @@ int ObPartitionMajorMerger::inner_process(const ObDatumRow &row)
if (is_delete) {
// drop del row
} else {
#ifdef ERRSIM
if (data_store_desc_.get_row_column_count() > data_store_desc_.get_rowkey_column_count()) {
write_wrong_row(data_store_desc_.get_tablet_id(), row);
}
#endif
const blocksstable::ObMacroBlockDesc *macro_desc;
if (OB_FAIL(get_base_iter_curr_macro_block(macro_desc))) {
STORAGE_LOG(WARN, "Failed to get base iter macro", K(ret));

View File

@ -73,7 +73,6 @@ int ObTabletMergeInfo::init(const ObBasicTabletMergeCtx &ctx, bool need_check/*t
void ObTabletMergeInfo::build_sstable_merge_info(const ObBasicTabletMergeCtx &ctx)
{
const ObStaticMergeParam &static_param = ctx.static_param_;
sstable_merge_info_.tenant_id_ = MTL_ID();
sstable_merge_info_.ls_id_ = ctx.get_ls_id();
sstable_merge_info_.tablet_id_ = ctx.get_tablet_id();
sstable_merge_info_.compaction_scn_ = static_param.get_compaction_scn();

View File

@ -275,7 +275,7 @@ int ObLSTabletService::prepare_for_safe_destroy()
LOG_WARN("fail to delete all tablets", K(ret));
}
#ifdef ERRSIM
if (!ls_->get_ls_id().is_sys_ls()) {
if (OB_NOT_NULL(ls_) && !ls_->get_ls_id().is_sys_ls()) {
SERVER_EVENT_SYNC_ADD("ls_tablet_service", "after_delete_all_tablets",
"tenant_id", MTL_ID(),
"ls_id", ls_->get_ls_id().id());

View File

@ -113,8 +113,8 @@ void PartTableInfo::fill_info(char *buf, const int64_t buf_len) const
/*
* ObSSTableMergeInfo func
* */
ObSSTableMergeInfo::ObSSTableMergeInfo()
: compaction::ObIDiagnoseInfo(),
ObSSTableMergeInfo::ObSSTableMergeInfo(const bool need_free_param)
: compaction::ObIDiagnoseInfo(need_free_param),
ls_id_(),
tablet_id_(),
is_fake_(false),
@ -158,7 +158,7 @@ ObSSTableMergeInfo::ObSSTableMergeInfo()
bool ObSSTableMergeInfo::is_valid() const
{
bool bret = true;
if (OB_UNLIKELY(tenant_id_ <= 0 || !ls_id_.is_valid() || !tablet_id_.is_valid() || compaction_scn_ <= 0)) {
if (OB_UNLIKELY(!ls_id_.is_valid() || !tablet_id_.is_valid() || compaction_scn_ <= 0)) {
bret = false;
}
return bret;
@ -195,7 +195,6 @@ int ObSSTableMergeInfo::add(const ObSSTableMergeInfo &other)
void ObSSTableMergeInfo::reset()
{
tenant_id_ = 0;
ls_id_.reset();
tablet_id_.reset();
is_fake_ = false;
@ -283,7 +282,6 @@ void ObSSTableMergeInfo::shallow_copy(ObIDiagnoseInfo *other)
{
ObSSTableMergeInfo *info = nullptr;
if (OB_NOT_NULL(other) && OB_NOT_NULL(info = dynamic_cast<ObSSTableMergeInfo *>(other))) {
tenant_id_ = info->tenant_id_;
priority_ = info->priority_;
ls_id_ = info->ls_id_;
tablet_id_ = info->tablet_id_;

View File

@ -111,14 +111,14 @@ struct PartTableInfo {
struct ObSSTableMergeInfo final : public compaction::ObIDiagnoseInfo
{
public:
ObSSTableMergeInfo();
ObSSTableMergeInfo(const bool need_free_param = true);
~ObSSTableMergeInfo() = default;
bool is_valid() const;
int add(const ObSSTableMergeInfo &other);
OB_INLINE bool is_major_merge_type() const { return compaction::is_major_merge_type(merge_type_); }
void dump_info(const char *msg);
void reset();
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(compaction_scn),
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(compaction_scn),
"merge_type", merge_type_to_str(merge_type_), "merge_cost_time", merge_finish_time_ - merge_start_time_,
K_(merge_start_time), K_(merge_finish_time), K_(dag_id), K_(occupy_size), K_(new_flush_occupy_size), K_(original_size),
K_(compressed_size), K_(macro_block_count), K_(multiplexed_macro_block_count),

View File

@ -434,7 +434,7 @@ int ObStorageSchema::init(
const ObTableSchema &input_schema,
const lib::Worker::CompatMode compat_mode,
const bool skip_column_info/* = false*/,
const int64_t compat_version/* = STORAGE_SCHEMA_VERSION_V3*/)
const int64_t compat_version/* = STORAGE_SCHEMA_VERSION_LATEST*/)
{
int ret = OB_SUCCESS;

View File

@ -167,7 +167,7 @@ public:
const share::schema::ObTableSchema &input_schema,
const lib::Worker::CompatMode compat_mode,
const bool skip_column_info = false,
const int64_t compat_version = STORAGE_SCHEMA_VERSION_V3);
const int64_t compat_version = STORAGE_SCHEMA_VERSION_LATEST);
int init(
common::ObIAllocator &allocator,
const ObStorageSchema &old_schema,
@ -330,7 +330,7 @@ public:
static const int64_t STORAGE_SCHEMA_VERSION = 1;
static const int64_t STORAGE_SCHEMA_VERSION_V2 = 2; // add for store_column_cnt_
static const int64_t STORAGE_SCHEMA_VERSION_V3 = 3; // add for cg_group
static const int64_t STORAGE_SCHEMA_VERSION_LATEST = STORAGE_SCHEMA_VERSION_V3;
common::ObIAllocator *allocator_;
int64_t storage_schema_version_;

View File

@ -50,12 +50,14 @@ int ObStorageSchemaUtil::update_tablet_storage_schema(
const int64_t param_schema_version = param_schema.schema_version_;
const int64_t old_schema_column_group_cnt = old_schema_on_tablet.get_column_group_count();
const int64_t param_schema_column_group_cnt = param_schema.get_column_group_count();
// param schema may from major merge, will have column info, so if col cnt equal use param schema instead of tablet schema
const ObStorageSchema *column_group_schema = old_schema_column_group_cnt > param_schema_column_group_cnt
? &old_schema_on_tablet
: &param_schema;
const ObStorageSchema *input_schema = tablet_schema_stored_col_cnt > param_schema_stored_col_cnt
? &old_schema_on_tablet
: &param_schema;
if (OB_FAIL(alloc_storage_schema(allocator, new_storage_schema_ptr))) {
LOG_WARN("failed to alloc mem for tmp storage schema", K(ret), K(param_schema), K(old_schema_on_tablet));
} else if (OB_FAIL(new_storage_schema_ptr->init(allocator, *input_schema, false/*skip_solumn_info*/, column_group_schema))) {
@ -66,7 +68,7 @@ int ObStorageSchemaUtil::update_tablet_storage_schema(
new_storage_schema_ptr->store_column_cnt_ = MAX(tablet_schema_stored_col_cnt, param_schema_stored_col_cnt);
new_storage_schema_ptr->schema_version_ = MAX(tablet_schema_version, param_schema_version);
new_storage_schema_ptr->column_info_simplified_ =
(new_storage_schema_ptr->column_cnt_ != new_storage_schema_ptr->get_store_column_schemas().count());
(new_storage_schema_ptr->store_column_cnt_ != new_storage_schema_ptr->get_store_column_schemas().count());
if (param_schema_version > tablet_schema_version
|| param_schema_stored_col_cnt > tablet_schema_stored_col_cnt
|| param_schema_column_group_cnt > old_schema_column_group_cnt) {

View File

@ -1593,7 +1593,7 @@ int ObMigrationTabletParam::construct_placeholder_storage_schema_and_medium(
storage_schema.rowkey_array_.set_allocator(&allocator);
storage_schema.column_array_.set_allocator(&allocator);
storage_schema.storage_schema_version_ = ObStorageSchema::STORAGE_SCHEMA_VERSION_V3;
storage_schema.storage_schema_version_ = ObStorageSchema::STORAGE_SCHEMA_VERSION_LATEST;
storage_schema.is_use_bloomfilter_ = false;
storage_schema.table_type_ = ObTableType::USER_TABLE;
//storage_schema.table_mode_

View File

@ -23,6 +23,7 @@
#include "observer/omt/ob_tenant_node_balancer.h"
#include "share/scheduler/ob_dag_warning_history_mgr.h"
#include "storage/compaction/ob_tenant_compaction_progress.h"
#include "storage/compaction/ob_tablet_merge_task.h"
int64_t dag_cnt = 1;
int64_t stress_time= 1; // 100ms
@ -51,6 +52,7 @@ using namespace common;
using namespace lib;
using namespace share;
using namespace omt;
using namespace compaction;
namespace unittest
{
@ -1664,15 +1666,19 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
ASSERT_TRUE(nullptr != scheduler);
ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 1));
EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_HIGH, 1));
EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_MID].limits_);
EXPECT_EQ(1, scheduler->prio_sche_[ObDagPrio::DAG_PRIO_COMPACTION_HIGH].limits_);
LoopWaitTask *wait_task = nullptr;
LoopWaitTask *wait_task2 = nullptr;
const int64_t dag_cnt = 6;
// add 6 dag at prio = DAG_PRIO_COMPACTION_MID
ObLSID ls_ids[2] = {ObLSID(1), ObLSID(2)};
bool finish_flag[2] = {false, false};
const int64_t ls_cnt = 2;
ObLSID ls_ids[ls_cnt] = {ObLSID(1), ObLSID(2)};
bool finish_flag[ls_cnt] = {false, false};
for (int64_t i = 0; i < dag_cnt; ++i) {
const int64_t idx = i % 2;
const int64_t idx = i % ls_cnt;
TestCompMidCancelDag *dag = NULL;
EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag));
dag->ls_id_ = ls_ids[idx];
@ -1682,10 +1688,21 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task));
EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag));
}
// add 2 dag at prio = DAG_PRIO_COMPACTION_HIGH
for (int64_t i = 0; i < ls_cnt; ++i) {
ObBatchFreezeTabletsDag *batch_freeze_dag = NULL;
EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(batch_freeze_dag));
batch_freeze_dag->param_.ls_id_ = ls_ids[i];
EXPECT_EQ(OB_SUCCESS, alloc_task(*batch_freeze_dag, wait_task2));
EXPECT_EQ(OB_SUCCESS, wait_task2->init(1, 2, finish_flag[i]));
EXPECT_EQ(OB_SUCCESS, batch_freeze_dag->add_task(*wait_task2));
EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(batch_freeze_dag));
}
EXPECT_EQ(dag_cnt, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]);
CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_MID));
CHECK_EQ_UTIL_TIMEOUT(1, scheduler->get_running_task_cnt(ObDagPrio::DAG_PRIO_COMPACTION_HIGH));
// cancel two waiting dag of ls_ids[0]
// cancel waiting dag of ls_ids[0], all dag of ls_ids[1] will be destroyed when check_cancel
bool exist = false;
EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[0], exist));
EXPECT_EQ(exist, true);
@ -1693,13 +1710,14 @@ TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel)
EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[1], exist));
EXPECT_EQ(exist, false);
EXPECT_EQ(1, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]);
finish_flag[0] = true;
wait_scheduler();
EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[0], exist));
EXPECT_EQ(exist, false);
EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[0], exist));
EXPECT_EQ(exist, false);
}
TEST_F(TestDagScheduler, test_cancel_running_dag)

View File

@ -688,6 +688,7 @@ public:
ls_meta_.tenant_id_ = 1001;
ls_meta_.ls_id_ = ObLSID(100);
}
~FakeLS() {}
int64_t get_min_reserved_snapshot() { return 10; }
};

View File

@ -26,10 +26,11 @@ using namespace common;
using namespace storage;
using namespace share;
using namespace lib;
using namespace compaction;
namespace unittest
{
static const int64_t INFO_PAGE_SIZE = (1 << 12); // 4KB
static const int64_t INFO_PAGE_SIZE = (1 << 13); // 8KB
class TestDagWarningHistory : public ::testing::Test
{
public:
@ -60,6 +61,7 @@ public:
ASSERT_EQ(OB_SUCCESS, tenant_base_.init());
}
void calc_info_cnt_per_page(ObIDag &dag, int64_t &info_mem, int64_t &info_cnt_per_page);
void TearDown()
{
dag_history_mgr_->~ObDagWarningHistoryManager();
@ -75,6 +77,21 @@ private:
DISALLOW_COPY_AND_ASSIGN(TestDagWarningHistory);
};
void TestDagWarningHistory::calc_info_cnt_per_page(ObIDag &dag, int64_t &info_mem_size, int64_t &info_cnt_per_page)
{
int ret = OB_SUCCESS;
ObDagWarningInfo tmp_info;
compaction::ObInfoParamBuffer allocator;
if (OB_FAIL(dag.gene_warning_info(tmp_info, allocator))) {
COMMON_LOG(WARN, "failed to gene dag warning info", K(ret));
}
// every time will contain a header(16B)
info_mem_size = tmp_info.get_deep_copy_size();
info_cnt_per_page = (INFO_PAGE_SIZE - sizeof(ObFIFOAllocator::NormalPageHeader))
/ (info_mem_size + sizeof(ObFIFOAllocator::AllocHeader));
STORAGE_LOG(INFO, "size", K(info_mem_size), K(info_cnt_per_page));
}
class ObBasicDag : public ObIDag
{
public:
@ -177,7 +194,6 @@ TEST_F(TestDagWarningHistory, simple_add)
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(TRUE, ret_info.dag_ret_ == ObBasicDag::DAG_RET_START);
STORAGE_LOG(DEBUG, "", K(ret_info));
ASSERT_EQ(TRUE, ret_info.tenant_id_ == tenant_id_);
char comment[common::OB_DAG_WARNING_INFO_LENGTH];
memset(comment, '\0', sizeof(comment));
@ -294,7 +310,16 @@ TEST_F(TestDagWarningHistory, resize)
ret = MTL(ObDagWarningHistoryManager *)->init(true, MTL_ID(), "DagWarnHis", INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
const int64_t max_cnt = 40;
int64_t info_cnt_per_page = 0;
int64_t info_mem_size = 0;
ObSeqDag dag;
dag.init();
dag.set_dag_status(ObBasicDag::ObDagStatus::DAG_STATUS_ABORT);
dag.set_dag_ret(ObBasicDag::DAG_RET_START);
calc_info_cnt_per_page(dag, info_mem_size, info_cnt_per_page);
ASSERT_TRUE(info_cnt_per_page > 0);
const int64_t max_cnt = info_cnt_per_page * 3;
for (int i = 0; i < max_cnt; ++i) {
ObSeqDag dag;
dag.init();
@ -302,18 +327,20 @@ TEST_F(TestDagWarningHistory, resize)
dag.set_dag_ret(ObBasicDag::DAG_RET_START + i);
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag));
}
ASSERT_EQ(max_cnt, MTL(ObDagWarningHistoryManager *)->size());
// every info is 264 bytes, each page contains 13 info
// 40 infos are in 4 pages (13 13 13 1), set_max will left 12 info (12 * 264 < 4096 * 0.4), means 2 page (11 1)
ret = MTL(ObDagWarningHistoryManager *)->set_max(2 * INFO_PAGE_SIZE);
// after set_max, mgr will gc info util memory usage below GC_LOW_PERCENTAGE * mem_max
const int64_t new_mem_max = 2 * INFO_PAGE_SIZE;
ret = MTL(ObDagWarningHistoryManager *)->set_max(new_mem_max);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(12, MTL(ObDagWarningHistoryManager *)->size());
const int64_t new_size = MTL(ObDagWarningHistoryManager *)->size();
const int64_t gc_cnt = max_cnt - new_size;
STORAGE_LOG(INFO, "new size", K(new_size));
ASSERT_TRUE(new_size * info_mem_size < ObIDiagnoseInfoMgr::GC_LOW_PERCENTAGE * new_mem_max / 100.0);
ret = MTL(ObDagWarningHistoryManager *)->set_max(3 * INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(12, MTL(ObDagWarningHistoryManager *)->size());
ASSERT_EQ(new_size, MTL(ObDagWarningHistoryManager *)->size());
compaction::ObIDiagnoseInfoMgr::Iterator iterator;
ret = MTL(ObDagWarningHistoryManager *)->open_iter(iterator);
@ -321,14 +348,14 @@ TEST_F(TestDagWarningHistory, resize)
ObDagWarningInfo read_info;
char comment[common::OB_DAG_WARNING_INFO_LENGTH];
int i = 28;
int64_t i = gc_cnt;
while (OB_SUCC(ret)) {
if (OB_FAIL(iterator.get_next(&read_info, comment, sizeof(comment)))) {
ASSERT_EQ(OB_ITER_END, ret);
} else {
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i));
++i;
}
++i;
}
}
@ -339,11 +366,21 @@ TEST_F(TestDagWarningHistory, gc_info)
ASSERT_TRUE(nullptr != manager);
ret = MTL(ObDagWarningHistoryManager *)->init(true, MTL_ID(), "DagWarnHis", INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
ret = MTL(ObDagWarningHistoryManager *)->set_max(10 * INFO_PAGE_SIZE);
const int64_t page_cnt = 10;
ret = MTL(ObDagWarningHistoryManager *)->set_max(page_cnt * INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t hash_value = ObBasicDag::KEY_START;
const int64_t max_cnt = 129;
ObComplexDag dag(1);
dag.init();
dag.set_dag_status(ObBasicDag::ObDagStatus::DAG_STATUS_ABORT);
dag.set_dag_ret(ObBasicDag::DAG_RET_START);
int64_t info_mem_size = 0;
int64_t info_cnt_per_page = 0;
calc_info_cnt_per_page(dag, info_mem_size, info_cnt_per_page);
const int64_t max_cnt = page_cnt * info_cnt_per_page - 1;
for (int i = 0; i < max_cnt; ++i) {
ObComplexDag dag(hash_value++);
dag.init();
@ -351,116 +388,18 @@ TEST_F(TestDagWarningHistory, gc_info)
dag.set_dag_ret(ObBasicDag::DAG_RET_START + i);
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag));
}
// 9 page full, 1 page contain 12 info (13 13 13 13 13 13 13 13 13 12)
ASSERT_TRUE(info_cnt_per_page > 0);
// 9 page full, 1 page remain one empty space
ASSERT_EQ(max_cnt, MTL(ObDagWarningHistoryManager *)->size());
for (int i = 0; i < 30; ++i) {
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->delete_info(ObBasicDag::KEY_START+i));
const int64_t delete_cnt = 30;
for (int i = 0; i < delete_cnt; ++i) {
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->delete_info(ObBasicDag::KEY_START + i));
}
// every info is 264 bytes, each page contains 13 info
// 129 * 264 > 40960 * 0.8 // after gc, left 62 info (62 * 264 < 40960 * 0.4), means 5 pages (11 13 13 13 12)
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->gc_info());
ASSERT_EQ(62, MTL(ObDagWarningHistoryManager *)->size());
}
TEST_F(TestDagWarningHistory, complex_test)
{
int ret = OB_SUCCESS;
ObDagWarningHistoryManager* manager = MTL(ObDagWarningHistoryManager *);
ASSERT_TRUE(nullptr != manager);
ret = MTL(ObDagWarningHistoryManager *)->init(true, MTL_ID(), "DagWarnHis", INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
ret = MTL(ObDagWarningHistoryManager *)->set_max(2 * INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t hash_value = ObBasicDag::KEY_START;
const int64_t max_cnt = 39;
for (int i = 0; i < max_cnt; ++i) {
ObComplexDag dag(hash_value++);
dag.init();
dag.set_dag_status(ObBasicDag::ObDagStatus::DAG_STATUS_ABORT);
dag.set_dag_ret(ObBasicDag::DAG_RET_START + i);
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag));
}
// every info is 264 bytes, each page contains 13 infos
// 20 info will be purged , left 2 pages (6 13)
ASSERT_EQ(19, MTL(ObDagWarningHistoryManager *)->size());
compaction::ObIDiagnoseInfoMgr::Iterator iterator;
ret = MTL(ObDagWarningHistoryManager *)->open_iter(iterator);
ASSERT_EQ(OB_SUCCESS, ret);
ObDagWarningInfo read_info;
char comment[common::OB_DAG_WARNING_INFO_LENGTH];
// the first 20 info have been purged
int i = 20;
while (OB_SUCC(ret)) {
if (OB_FAIL(iterator.get_next(&read_info, comment, sizeof(comment)))) {
ASSERT_EQ(OB_ITER_END, ret);
} else {
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i));
++i;
}
}
// test when two iter is accessing info pool
compaction::ObIDiagnoseInfoMgr::Iterator iterator1;
compaction::ObIDiagnoseInfoMgr::Iterator iterator2;
ret = MTL(ObDagWarningHistoryManager *)->open_iter(iterator1);
ASSERT_EQ(OB_SUCCESS, ret);
ret = MTL(ObDagWarningHistoryManager *)->open_iter(iterator2);
ASSERT_EQ(OB_SUCCESS, ret);
i = 20;
ASSERT_EQ(OB_SUCCESS, iterator2.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i++));
ASSERT_EQ(OB_SUCCESS, iterator2.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i++));
i = 20;
ASSERT_EQ(OB_SUCCESS, iterator1.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i++));
ASSERT_EQ(OB_SUCCESS, iterator1.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i++));
// let the iterator1 in iter_end
while (OB_SUCC(ret)) {
if (OB_FAIL(iterator1.get_next(&read_info, comment, sizeof(comment)))) {
ASSERT_EQ(OB_ITER_END, ret);
} else {
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + i));
++i;
}
}
// before set_max (6 13), after set_max (6)
ret = MTL(ObDagWarningHistoryManager *)->set_max(INFO_PAGE_SIZE);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(6, MTL(ObDagWarningHistoryManager *)->size());
{
ObComplexDag dag(hash_value++);
dag.init();
dag.set_dag_status(ObBasicDag::ObDagStatus::DAG_STATUS_ABORT);
dag.set_dag_ret(ObBasicDag::DAG_RET_START + i++);
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag));
ASSERT_EQ(1, MTL(ObDagWarningHistoryManager *)->size());
}
ASSERT_EQ(OB_ITER_END, iterator1.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(OB_SUCCESS, iterator2.get_next(&read_info, comment, sizeof(comment)));
ASSERT_EQ(TRUE, read_info.dag_ret_ == (ObBasicDag::DAG_RET_START + max_cnt));
// test purge cuz add when there are some deleted info on list
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->delete_info(hash_value-1));
for (int i = 0; i < max_cnt; ++i) {
ObComplexDag dag(hash_value++);
dag.init();
dag.set_dag_status(ObBasicDag::ObDagStatus::DAG_STATUS_ABORT);
dag.set_dag_ret(ObBasicDag::DAG_RET_START + i);
ASSERT_EQ(OB_SUCCESS, MTL(ObDagWarningHistoryManager *)->add_dag_warning_info(&dag));
}
const int64_t cnt_after_gc = MTL(ObDagWarningHistoryManager *)->size();
ASSERT_TRUE(cnt_after_gc * sizeof(ObDagWarningInfo) <
page_cnt * INFO_PAGE_SIZE * ObIDiagnoseInfoMgr::GC_HIGH_PERCENTAGE * 1.0);
}
} // end namespace unittest

View File

@ -90,7 +90,6 @@ TEST_F(TestSSTableMergeInfoMgr, normal)
ret = MTL(ObTenantSSTableMergeInfoMgr*)->add_sstable_merge_info(merge_info);
ASSERT_NE(OB_SUCCESS, ret);
merge_info.tenant_id_ = 1;
merge_info.ls_id_ = 1;
merge_info.tablet_id_ = 2;
merge_info.compaction_scn_ = 100;
@ -119,7 +118,6 @@ TEST_F(TestSSTableMergeInfoMgr, iterator)
merge_info.info_param_ = &info_param;
const uint64_t tenant_id = 1001;
merge_info.tenant_id_ = 1;
merge_info.ls_id_ = 1;
merge_info.tablet_id_ = 3;
merge_info.compaction_scn_ = 100;
@ -201,7 +199,6 @@ TEST_F(TestSSTableMergeInfoMgr, resize)
merge_info.info_param_ = &info_param;
const uint64_t tenant_id = 1001;
merge_info.tenant_id_ = tenant_id;
merge_info.ls_id_ = 1;
merge_info.compaction_scn_ = 100;
merge_info.merge_type_ = ObMergeType::MINOR_MERGE;

View File

@ -20,6 +20,7 @@
#include "share/ob_encryption_util.h"
#include "storage/test_schema_prepare.h"
#include "mittest/mtlenv/mock_tenant_module_env.h"
#include "storage/ob_storage_schema_util.h"
namespace oceanbase
{
@ -280,6 +281,38 @@ TEST_F(TestStorageSchema, compat_serialize_and_deserialize)
ASSERT_EQ(true, judge_storage_schema_equal(storage_schema, des_storage_schema));
}
TEST_F(TestStorageSchema, test_update_tablet_store_schema)
{
int ret = OB_SUCCESS;
share::schema::ObTableSchema table_schema;
ObStorageSchema storage_schema1;
ObStorageSchema storage_schema2;
TestSchemaPrepare::prepare_schema(table_schema);
ASSERT_EQ(OB_SUCCESS, storage_schema1.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL));
ASSERT_EQ(OB_SUCCESS, storage_schema2.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL));
storage_schema2.store_column_cnt_ += 1;
storage_schema2.schema_version_ += 100;
// schema 2 have large store column cnt
ObStorageSchema *result_storage_schema = NULL;
ret = ObStorageSchemaUtil::update_tablet_storage_schema(ObTabletID(1), allocator_, storage_schema1, storage_schema2, result_storage_schema);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(result_storage_schema->schema_version_, storage_schema2.schema_version_);
ASSERT_EQ(result_storage_schema->store_column_cnt_, storage_schema2.store_column_cnt_);
ASSERT_EQ(result_storage_schema->is_column_info_simplified(), true);
ObStorageSchemaUtil::free_storage_schema(allocator_, result_storage_schema);
// schema_on_tablet and schema1 have same store column cnt, but storage_schema1 have full column info
ObStorageSchema schema_on_tablet;
ASSERT_EQ(OB_SUCCESS, schema_on_tablet.init(allocator_, storage_schema1, true/*skip_column_info*/));
ret = ObStorageSchemaUtil::update_tablet_storage_schema(ObTabletID(1), allocator_, schema_on_tablet, storage_schema1, result_storage_schema);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(true, judge_storage_schema_equal(storage_schema1, *result_storage_schema));
ASSERT_EQ(result_storage_schema->is_column_info_simplified(), false);
ObStorageSchemaUtil::free_storage_schema(allocator_, result_storage_schema);
}
} // namespace unittest
} // namespace oceanbase