[legacy bugfix] auto retry major compaction with forced flat store format on encoding estimate size overflow error detected
This commit is contained in:
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
@ -621,6 +621,7 @@ class EventTable
|
|||||||
EN_SCHEDULE_MEDIUM_COMPACTION = 709,
|
EN_SCHEDULE_MEDIUM_COMPACTION = 709,
|
||||||
EN_SCHEDULE_MAJOR_GET_TABLE_SCHEMA = 710,
|
EN_SCHEDULE_MAJOR_GET_TABLE_SCHEMA = 710,
|
||||||
EN_SKIP_INDEX_MAJOR = 711,
|
EN_SKIP_INDEX_MAJOR = 711,
|
||||||
|
EN_BUILD_DATA_MICRO_BLOCK = 712,
|
||||||
|
|
||||||
// please add new trace point after 750
|
// please add new trace point after 750
|
||||||
EN_SESSION_LEAK_COUNT_THRESHOLD = 751,
|
EN_SESSION_LEAK_COUNT_THRESHOLD = 751,
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@ -448,6 +448,7 @@ DEFINE_ERROR(OB_ERR_OBSERVER_START, -4393, -1, "HY000", "observer start process
|
|||||||
DEFINE_ERROR(OB_ERR_OBSERVER_STOP, -4394, -1, "HY000", "observer stop process failure");
|
DEFINE_ERROR(OB_ERR_OBSERVER_STOP, -4394, -1, "HY000", "observer stop process failure");
|
||||||
DEFINE_ERROR(OB_ERR_OBSERVICE_START, -4395, -1, "HY000", "observice start process has failure");
|
DEFINE_ERROR(OB_ERR_OBSERVICE_START, -4395, -1, "HY000", "observice start process has failure");
|
||||||
DEFINE_ERROR_DEP(OB_ERR_THREAD_PANIC, -4396, -1, "HY000", "Worker thread pannic, thread may be terminated or hung");
|
DEFINE_ERROR_DEP(OB_ERR_THREAD_PANIC, -4396, -1, "HY000", "Worker thread pannic, thread may be terminated or hung");
|
||||||
|
DEFINE_ERROR(OB_ENCODING_EST_SIZE_OVERFLOW, -4397, -1, "HY000", "Encoding estimated size overflow");
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////
|
||||||
//error code for root server & server management -4500 ---- -5000
|
//error code for root server & server management -4500 ---- -5000
|
||||||
|
|||||||
@ -262,6 +262,7 @@ constexpr int OB_DISK_HUNG = -4392;
|
|||||||
constexpr int OB_ERR_OBSERVER_START = -4393;
|
constexpr int OB_ERR_OBSERVER_START = -4393;
|
||||||
constexpr int OB_ERR_OBSERVER_STOP = -4394;
|
constexpr int OB_ERR_OBSERVER_STOP = -4394;
|
||||||
constexpr int OB_ERR_OBSERVICE_START = -4395;
|
constexpr int OB_ERR_OBSERVICE_START = -4395;
|
||||||
|
constexpr int OB_ENCODING_EST_SIZE_OVERFLOW = -4397;
|
||||||
constexpr int OB_IMPORT_NOT_IN_SERVER = -4505;
|
constexpr int OB_IMPORT_NOT_IN_SERVER = -4505;
|
||||||
constexpr int OB_CONVERT_ERROR = -4507;
|
constexpr int OB_CONVERT_ERROR = -4507;
|
||||||
constexpr int OB_BYPASS_TIMEOUT = -4510;
|
constexpr int OB_BYPASS_TIMEOUT = -4510;
|
||||||
@ -2074,6 +2075,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
|
|||||||
#define OB_ERR_OBSERVER_STOP__USER_ERROR_MSG "observer stop process failure"
|
#define OB_ERR_OBSERVER_STOP__USER_ERROR_MSG "observer stop process failure"
|
||||||
#define OB_ERR_OBSERVICE_START__USER_ERROR_MSG "observice start process has failure"
|
#define OB_ERR_OBSERVICE_START__USER_ERROR_MSG "observice start process has failure"
|
||||||
#define OB_ERR_THREAD_PANIC__USER_ERROR_MSG "Worker thread pannic, thread may be terminated or hung"
|
#define OB_ERR_THREAD_PANIC__USER_ERROR_MSG "Worker thread pannic, thread may be terminated or hung"
|
||||||
|
#define OB_ENCODING_EST_SIZE_OVERFLOW__USER_ERROR_MSG "Encoding estimated size overflow"
|
||||||
#define OB_IMPORT_NOT_IN_SERVER__USER_ERROR_MSG "Import not in service"
|
#define OB_IMPORT_NOT_IN_SERVER__USER_ERROR_MSG "Import not in service"
|
||||||
#define OB_CONVERT_ERROR__USER_ERROR_MSG "Convert error"
|
#define OB_CONVERT_ERROR__USER_ERROR_MSG "Convert error"
|
||||||
#define OB_BYPASS_TIMEOUT__USER_ERROR_MSG "Bypass timeout"
|
#define OB_BYPASS_TIMEOUT__USER_ERROR_MSG "Bypass timeout"
|
||||||
@ -4123,6 +4125,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
|
|||||||
#define OB_ERR_OBSERVER_STOP__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4394, observer stop process failure"
|
#define OB_ERR_OBSERVER_STOP__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4394, observer stop process failure"
|
||||||
#define OB_ERR_OBSERVICE_START__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4395, observice start process has failure"
|
#define OB_ERR_OBSERVICE_START__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4395, observice start process has failure"
|
||||||
#define OB_ERR_THREAD_PANIC__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4396, Worker thread pannic, thread may be terminated or hung"
|
#define OB_ERR_THREAD_PANIC__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4396, Worker thread pannic, thread may be terminated or hung"
|
||||||
|
#define OB_ENCODING_EST_SIZE_OVERFLOW__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4397, Encoding estimated size overflow"
|
||||||
#define OB_IMPORT_NOT_IN_SERVER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4505, Import not in service"
|
#define OB_IMPORT_NOT_IN_SERVER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4505, Import not in service"
|
||||||
#define OB_CONVERT_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4507, Convert error"
|
#define OB_CONVERT_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4507, Convert error"
|
||||||
#define OB_BYPASS_TIMEOUT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4510, Bypass timeout"
|
#define OB_BYPASS_TIMEOUT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4510, Bypass timeout"
|
||||||
@ -5814,7 +5817,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
|
|||||||
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
|
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
|
||||||
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
|
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
|
||||||
|
|
||||||
extern int g_all_ob_errnos[2045];
|
extern int g_all_ob_errnos[2046];
|
||||||
|
|
||||||
const char *ob_error_name(const int oberr);
|
const char *ob_error_name(const int oberr);
|
||||||
const char* ob_error_cause(const int oberr);
|
const char* ob_error_cause(const int oberr);
|
||||||
|
|||||||
@ -669,6 +669,13 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
|
|||||||
} else {
|
} else {
|
||||||
// Print status of current encoders for debugging
|
// Print status of current encoders for debugging
|
||||||
print_micro_block_encoder_status();
|
print_micro_block_encoder_status();
|
||||||
|
if (OB_BUF_NOT_ENOUGH == ret) {
|
||||||
|
// buffer not enough when building a block with pivoted rows, probably caused by estimate
|
||||||
|
// maximum block size after encoding failed, rewrite errno with OB_ENCODING_EST_SIZE_OVERFLOW
|
||||||
|
// to force compaction task retry with flat row store type.
|
||||||
|
ret = OB_ENCODING_EST_SIZE_OVERFLOW;
|
||||||
|
LOG_WARN("build block failed by probably estimated maximum encoding data size overflow", K(ret));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
|||||||
@ -1675,7 +1675,8 @@ int ObDataIndexBlockBuilder::init(
|
|||||||
STORAGE_LOG(WARN, "fail to init referemce pointer members", K(ret));
|
STORAGE_LOG(WARN, "fail to init referemce pointer members", K(ret));
|
||||||
} else if (OB_UNLIKELY(index_store_desc->row_store_type_ != data_store_desc.row_store_type_
|
} else if (OB_UNLIKELY(index_store_desc->row_store_type_ != data_store_desc.row_store_type_
|
||||||
&& (index_store_desc->row_store_type_ == FLAT_ROW_STORE
|
&& (index_store_desc->row_store_type_ == FLAT_ROW_STORE
|
||||||
|| data_store_desc.row_store_type_ == FLAT_ROW_STORE))) {
|
|| data_store_desc.row_store_type_ == FLAT_ROW_STORE)
|
||||||
|
&& !data_store_desc.is_force_flat_store_type_)) {
|
||||||
// since n-1 micro block should keep format same with data_blocks
|
// since n-1 micro block should keep format same with data_blocks
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "expect row store type equal", K(ret), KPC(index_store_desc), K(data_store_desc));
|
STORAGE_LOG(WARN, "expect row store type equal", K(ret), KPC(index_store_desc), K(data_store_desc));
|
||||||
@ -1689,16 +1690,19 @@ int ObDataIndexBlockBuilder::init(
|
|||||||
STORAGE_LOG(WARN, "failed to init idx read info", KPC(index_store_desc), K(ret));
|
STORAGE_LOG(WARN, "failed to init idx read info", KPC(index_store_desc), K(ret));
|
||||||
} else if (OB_FAIL(micro_helper_.open(*index_store_desc, idx_read_info_, *sstable_allocator_))) {
|
} else if (OB_FAIL(micro_helper_.open(*index_store_desc, idx_read_info_, *sstable_allocator_))) {
|
||||||
STORAGE_LOG(WARN, "fail to open base writer", K(ret));
|
STORAGE_LOG(WARN, "fail to open base writer", K(ret));
|
||||||
} else if (OB_FAIL(ObMacroBlockWriter::build_micro_writer(
|
|
||||||
index_store_desc,
|
|
||||||
*sstable_allocator_,
|
|
||||||
meta_block_writer_,
|
|
||||||
GCONF.micro_block_merge_verify_level))) {
|
|
||||||
STORAGE_LOG(WARN, "fail to init meta block writer", K(ret));
|
|
||||||
} else if (OB_FAIL(meta_row_.init(index_store_desc->row_column_count_))) {
|
} else if (OB_FAIL(meta_row_.init(index_store_desc->row_column_count_))) {
|
||||||
STORAGE_LOG(WARN, "fail to init flat writer", K(ret));
|
STORAGE_LOG(WARN, "fail to init flat writer", K(ret));
|
||||||
} else if (OB_FAIL(leaf_store_desc_.assign(*index_store_desc))) {
|
} else if (OB_FAIL(leaf_store_desc_.assign(*index_store_desc))) {
|
||||||
STORAGE_LOG(WARN, "fail to assign leaf store desc", K(ret));
|
STORAGE_LOG(WARN, "fail to assign leaf store desc", K(ret));
|
||||||
|
} else if (data_store_desc.is_force_flat_store_type_) {
|
||||||
|
leaf_store_desc_.force_flat_store_type();
|
||||||
|
}
|
||||||
|
if (FAILEDx(ObMacroBlockWriter::build_micro_writer(
|
||||||
|
&leaf_store_desc_,
|
||||||
|
*sstable_allocator_,
|
||||||
|
meta_block_writer_,
|
||||||
|
GCONF.micro_block_merge_verify_level))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to init meta block writer", K(ret));
|
||||||
} else {
|
} else {
|
||||||
// only increase the size limit of n-1 level micro block
|
// only increase the size limit of n-1 level micro block
|
||||||
leaf_store_desc_.micro_block_size_ = leaf_store_desc_.micro_block_size_limit_; // nearly 2M
|
leaf_store_desc_.micro_block_size_ = leaf_store_desc_.micro_block_size_limit_; // nearly 2M
|
||||||
@ -1961,6 +1965,10 @@ int ObDataIndexBlockBuilder::append_index_micro_block(ObMacroBlock ¯o_block,
|
|||||||
micro_writer_->dump_diagnose_info();
|
micro_writer_->dump_diagnose_info();
|
||||||
STORAGE_LOG(INFO, "print error meta block");
|
STORAGE_LOG(INFO, "print error meta block");
|
||||||
meta_block_writer_->dump_diagnose_info();
|
meta_block_writer_->dump_diagnose_info();
|
||||||
|
if (common::ObStoreFormat::is_row_store_type_with_encoding(macro_block.get_row_store_type())) {
|
||||||
|
ret = OB_ENCODING_EST_SIZE_OVERFLOW;
|
||||||
|
STORAGE_LOG(WARN, "build macro block failed by probably estimated maximum encoding data size overflow", K(ret));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
clean_status();
|
clean_status();
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -275,7 +275,30 @@ bool ObDataStoreDesc::is_valid() const
|
|||||||
&& ls_id_.is_valid()
|
&& ls_id_.is_valid()
|
||||||
&& tablet_id_.is_valid()
|
&& tablet_id_.is_valid()
|
||||||
&& compressor_type_ > ObCompressorType::INVALID_COMPRESSOR
|
&& compressor_type_ > ObCompressorType::INVALID_COMPRESSOR
|
||||||
&& snapshot_version_ > 0;
|
&& snapshot_version_ > 0
|
||||||
|
&& is_store_type_valid();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ObDataStoreDesc::is_store_type_valid() const
|
||||||
|
{
|
||||||
|
bool ret = false;
|
||||||
|
bool is_major = storage::is_major_merge_type(merge_type_)
|
||||||
|
|| storage::is_meta_major_merge(merge_type_);
|
||||||
|
if (!ObStoreFormat::is_row_store_type_valid(row_store_type_)) {
|
||||||
|
// invalid row store type
|
||||||
|
} else if (is_force_flat_store_type_) {
|
||||||
|
ret = (ObRowStoreType::FLAT_ROW_STORE == row_store_type_ && is_major);
|
||||||
|
} else if (!is_major) {
|
||||||
|
ret = (!ObStoreFormat::is_row_store_type_with_encoding(row_store_type_));
|
||||||
|
} else {
|
||||||
|
ret = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ret) {
|
||||||
|
STORAGE_LOG(WARN, "invalid row store type",
|
||||||
|
K_(row_store_type), K(is_major), K_(is_force_flat_store_type));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObDataStoreDesc::reset()
|
void ObDataStoreDesc::reset()
|
||||||
@ -308,6 +331,7 @@ void ObDataStoreDesc::reset()
|
|||||||
sstable_index_builder_ = nullptr;
|
sstable_index_builder_ = nullptr;
|
||||||
is_ddl_ = false;
|
is_ddl_ = false;
|
||||||
need_pre_warm_ = false;
|
need_pre_warm_ = false;
|
||||||
|
is_force_flat_store_type_ = false;
|
||||||
col_desc_array_.reset();
|
col_desc_array_.reset();
|
||||||
datum_utils_.reset();
|
datum_utils_.reset();
|
||||||
allocator_.reset();
|
allocator_.reset();
|
||||||
@ -342,6 +366,7 @@ int ObDataStoreDesc::assign(const ObDataStoreDesc &desc)
|
|||||||
major_working_cluster_version_ = desc.major_working_cluster_version_;
|
major_working_cluster_version_ = desc.major_working_cluster_version_;
|
||||||
is_ddl_ = desc.is_ddl_;
|
is_ddl_ = desc.is_ddl_;
|
||||||
need_pre_warm_ = desc.need_pre_warm_;
|
need_pre_warm_ = desc.need_pre_warm_;
|
||||||
|
is_force_flat_store_type_ = desc.is_force_flat_store_type_;
|
||||||
col_desc_array_.reset();
|
col_desc_array_.reset();
|
||||||
datum_utils_.reset();
|
datum_utils_.reset();
|
||||||
sstable_index_builder_ = desc.sstable_index_builder_;
|
sstable_index_builder_ = desc.sstable_index_builder_;
|
||||||
|
|||||||
@ -79,6 +79,7 @@ struct ObDataStoreDesc
|
|||||||
int64_t major_working_cluster_version_;
|
int64_t major_working_cluster_version_;
|
||||||
bool is_ddl_;
|
bool is_ddl_;
|
||||||
bool need_pre_warm_;
|
bool need_pre_warm_;
|
||||||
|
bool is_force_flat_store_type_;
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
common::ObFixedArray<share::schema::ObColDesc, common::ObIAllocator> col_desc_array_;
|
common::ObFixedArray<share::schema::ObColDesc, common::ObIAllocator> col_desc_array_;
|
||||||
blocksstable::ObStorageDatumUtils datum_utils_;
|
blocksstable::ObStorageDatumUtils datum_utils_;
|
||||||
@ -95,6 +96,12 @@ struct ObDataStoreDesc
|
|||||||
void reset();
|
void reset();
|
||||||
int assign(const ObDataStoreDesc &desc);
|
int assign(const ObDataStoreDesc &desc);
|
||||||
bool encoding_enabled() const { return ObStoreFormat::is_row_store_type_with_encoding(row_store_type_); }
|
bool encoding_enabled() const { return ObStoreFormat::is_row_store_type_with_encoding(row_store_type_); }
|
||||||
|
void force_flat_store_type()
|
||||||
|
{
|
||||||
|
row_store_type_ = ObRowStoreType::FLAT_ROW_STORE;
|
||||||
|
is_force_flat_store_type_ = true;
|
||||||
|
}
|
||||||
|
bool is_store_type_valid() const;
|
||||||
OB_INLINE bool is_major_merge() const { return storage::is_major_merge_type(merge_type_); }
|
OB_INLINE bool is_major_merge() const { return storage::is_major_merge_type(merge_type_); }
|
||||||
OB_INLINE bool is_meta_major_merge() const { return storage::is_meta_major_merge(merge_type_); }
|
OB_INLINE bool is_meta_major_merge() const { return storage::is_meta_major_merge(merge_type_); }
|
||||||
OB_INLINE bool is_use_pct_free() const { return macro_block_size_ != macro_store_size_; }
|
OB_INLINE bool is_use_pct_free() const { return macro_block_size_ != macro_store_size_; }
|
||||||
@ -177,6 +184,10 @@ public:
|
|||||||
OB_INLINE char *get_data_buf() { return data_.data(); }
|
OB_INLINE char *get_data_buf() { return data_.data(); }
|
||||||
OB_INLINE int32_t get_row_count() const { return macro_header_.fixed_header_.row_count_; }
|
OB_INLINE int32_t get_row_count() const { return macro_header_.fixed_header_.row_count_; }
|
||||||
OB_INLINE int32_t get_micro_block_count() const { return macro_header_.fixed_header_.micro_block_count_; }
|
OB_INLINE int32_t get_micro_block_count() const { return macro_header_.fixed_header_.micro_block_count_; }
|
||||||
|
OB_INLINE ObRowStoreType get_row_store_type() const
|
||||||
|
{
|
||||||
|
return static_cast<ObRowStoreType>(macro_header_.fixed_header_.row_store_type_);
|
||||||
|
}
|
||||||
void update_max_merged_trans_version(const int64_t max_merged_trans_version)
|
void update_max_merged_trans_version(const int64_t max_merged_trans_version)
|
||||||
{
|
{
|
||||||
if (max_merged_trans_version > max_merged_trans_version_) {
|
if (max_merged_trans_version > max_merged_trans_version_) {
|
||||||
|
|||||||
@ -947,6 +947,13 @@ int ObMacroBlockWriter::build_micro_block()
|
|||||||
}
|
}
|
||||||
data_block_pre_warmer_.reuse();
|
data_block_pre_warmer_.reuse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef ERRSIM
|
||||||
|
if (data_store_desc_->encoding_enabled()) {
|
||||||
|
ret = OB_E(EventTable::EN_BUILD_DATA_MICRO_BLOCK) ret;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
micro_writer_->reuse();
|
micro_writer_->reuse();
|
||||||
if (data_store_desc_->need_build_hash_index_for_micro_block_) {
|
if (data_store_desc_->need_build_hash_index_for_micro_block_) {
|
||||||
|
|||||||
@ -519,7 +519,14 @@ ObPartitionMajorMerger::~ObPartitionMajorMerger()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPartitionMajorMerger::open(ObTabletMergeCtx &ctx, const int64_t idx)
|
void ObPartitionMajorMerger::reset()
|
||||||
|
{
|
||||||
|
rewrite_block_cnt_ = 0;
|
||||||
|
need_rewrite_block_cnt_ = 0;
|
||||||
|
ObPartitionMerger::reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObPartitionMajorMerger::open(ObTabletMergeCtx &ctx, const int64_t idx, const bool force_flat_format)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
@ -537,6 +544,9 @@ int ObPartitionMajorMerger::open(ObTabletMergeCtx &ctx, const int64_t idx)
|
|||||||
merge_progress_ = ctx.merge_progress_;
|
merge_progress_ = ctx.merge_progress_;
|
||||||
}
|
}
|
||||||
task_idx_ = idx;
|
task_idx_ = idx;
|
||||||
|
if (force_flat_format) {
|
||||||
|
data_store_desc_.force_flat_store_type();
|
||||||
|
}
|
||||||
data_store_desc_.sstable_index_builder_ = ctx.get_merge_info().get_index_builder();
|
data_store_desc_.sstable_index_builder_ = ctx.get_merge_info().get_index_builder();
|
||||||
rewrite_block_cnt_ = 0;
|
rewrite_block_cnt_ = 0;
|
||||||
need_rewrite_block_cnt_ = 0;
|
need_rewrite_block_cnt_ = 0;
|
||||||
@ -594,13 +604,16 @@ int ObPartitionMajorMerger::init_partition_fuser(const ObMergeParameter &merge_p
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPartitionMajorMerger::merge_partition(ObTabletMergeCtx &ctx, const int64_t idx)
|
int ObPartitionMajorMerger::merge_partition(
|
||||||
|
ObTabletMergeCtx &ctx,
|
||||||
|
const int64_t idx,
|
||||||
|
const bool force_flat_format)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMergeParameter merge_param;
|
ObMergeParameter merge_param;
|
||||||
ObPartitionMajorMergeHelper merge_helper;
|
ObPartitionMajorMergeHelper merge_helper;
|
||||||
|
|
||||||
if (OB_FAIL(open(ctx, idx))) {
|
if (OB_FAIL(open(ctx, idx, force_flat_format))) {
|
||||||
STORAGE_LOG(WARN, "Failed to open partition major merge fuse", K(ret));
|
STORAGE_LOG(WARN, "Failed to open partition major merge fuse", K(ret));
|
||||||
} else if (OB_FAIL(prepare_merge_partition(merge_param, merge_helper))) {
|
} else if (OB_FAIL(prepare_merge_partition(merge_param, merge_helper))) {
|
||||||
STORAGE_LOG(WARN, "Failed to prepare merge partition", K(ret));
|
STORAGE_LOG(WARN, "Failed to prepare merge partition", K(ret));
|
||||||
@ -838,7 +851,6 @@ int ObPartitionMajorMerger::reuse_base_sstable(ObPartitionMajorMergeHelper &merg
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
*ObPartitionMinorMergerV2
|
*ObPartitionMinorMergerV2
|
||||||
*/
|
*/
|
||||||
@ -868,16 +880,16 @@ void ObPartitionMinorMerger::reset()
|
|||||||
ObPartitionMerger::reset();
|
ObPartitionMerger::reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPartitionMinorMerger::open(ObTabletMergeCtx &ctx, const int64_t idx)
|
int ObPartitionMinorMerger::open(ObTabletMergeCtx &ctx, const int64_t idx, const bool force_flat_format)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
STORAGE_LOG(WARN, "ObPartitionMergerV2 is init twice", K(ret), K(*this));
|
STORAGE_LOG(WARN, "ObPartitionMergerV2 is init twice", K(ret), K(*this));
|
||||||
} else if (OB_UNLIKELY(!ctx.is_valid() || idx < 0)) {
|
} else if (OB_UNLIKELY(!ctx.is_valid() || idx < 0 || force_flat_format)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "Invalid argument to init ObPartitionMergerV2", K(ret), K(ctx), K(idx));
|
STORAGE_LOG(WARN, "Invalid argument to init ObPartitionMergerV2", K(ret), K(ctx), K(idx), K(force_flat_format));
|
||||||
} else if (OB_FAIL(init_data_store_desc(ctx))) {
|
} else if (OB_FAIL(init_data_store_desc(ctx))) {
|
||||||
STORAGE_LOG(WARN, "Failed to init data store desc", K(ret));
|
STORAGE_LOG(WARN, "Failed to init data store desc", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -1099,14 +1111,17 @@ int ObPartitionMinorMerger::init_partition_fuser(const ObMergeParameter &merge_p
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPartitionMinorMerger::merge_partition(ObTabletMergeCtx &ctx, const int64_t idx)
|
int ObPartitionMinorMerger::merge_partition(
|
||||||
|
ObTabletMergeCtx &ctx,
|
||||||
|
const int64_t idx,
|
||||||
|
const bool force_flat_format)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObPartitionMinorMergeHelper merge_helper;
|
ObPartitionMinorMergeHelper merge_helper;
|
||||||
ObMergeParameter merge_param;
|
ObMergeParameter merge_param;
|
||||||
int64_t need_rewrite_block_cnt;
|
int64_t need_rewrite_block_cnt;
|
||||||
|
|
||||||
if (OB_FAIL(open(ctx, idx))) {
|
if (OB_FAIL(open(ctx, idx, force_flat_format))) {
|
||||||
STORAGE_LOG(WARN, "Failed to open partition minor merge fuse", K(ret));
|
STORAGE_LOG(WARN, "Failed to open partition minor merge fuse", K(ret));
|
||||||
} else if (OB_FAIL(prepare_merge_partition(merge_param, merge_helper))) {
|
} else if (OB_FAIL(prepare_merge_partition(merge_param, merge_helper))) {
|
||||||
STORAGE_LOG(WARN, "Failed to prepare merge partition", K(ret));
|
STORAGE_LOG(WARN, "Failed to prepare merge partition", K(ret));
|
||||||
|
|||||||
@ -48,10 +48,13 @@ public:
|
|||||||
ObPartitionMerger();
|
ObPartitionMerger();
|
||||||
virtual ~ObPartitionMerger();
|
virtual ~ObPartitionMerger();
|
||||||
virtual void reset();
|
virtual void reset();
|
||||||
virtual int merge_partition(ObTabletMergeCtx &ctx, const int64_t idx) = 0;
|
virtual int merge_partition(
|
||||||
|
ObTabletMergeCtx &ctx,
|
||||||
|
const int64_t idx,
|
||||||
|
const bool force_flat_format = false) = 0;
|
||||||
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(task_idx), K_(data_store_desc), K_(minimum_iters), K_(merge_info));
|
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(task_idx), K_(data_store_desc), K_(minimum_iters), K_(merge_info));
|
||||||
protected:
|
protected:
|
||||||
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx) = 0;
|
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx, const bool force_flat_format) = 0;
|
||||||
virtual int inner_process(const blocksstable::ObDatumRow &row) = 0;
|
virtual int inner_process(const blocksstable::ObDatumRow &row) = 0;
|
||||||
virtual int close();
|
virtual int close();
|
||||||
virtual int process(const blocksstable::ObMicroBlock µ_block);
|
virtual int process(const blocksstable::ObMicroBlock µ_block);
|
||||||
@ -97,10 +100,14 @@ class ObPartitionMajorMerger : public ObPartitionMerger
|
|||||||
public:
|
public:
|
||||||
ObPartitionMajorMerger();
|
ObPartitionMajorMerger();
|
||||||
~ObPartitionMajorMerger();
|
~ObPartitionMajorMerger();
|
||||||
virtual int merge_partition(ObTabletMergeCtx &ctx, const int64_t idx) override;
|
virtual void reset() override;
|
||||||
|
virtual int merge_partition(
|
||||||
|
ObTabletMergeCtx &ctx,
|
||||||
|
const int64_t idx,
|
||||||
|
const bool force_flat_format = false) override;
|
||||||
INHERIT_TO_STRING_KV("ObPartitionMajorMerger", ObPartitionMerger, KPC(merge_progress_));
|
INHERIT_TO_STRING_KV("ObPartitionMajorMerger", ObPartitionMerger, KPC(merge_progress_));
|
||||||
protected:
|
protected:
|
||||||
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx) override;
|
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx, const bool force_flat_format) override;
|
||||||
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
|
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
|
||||||
virtual int init_partition_fuser(const ObMergeParameter &merge_param) override;
|
virtual int init_partition_fuser(const ObMergeParameter &merge_param) override;
|
||||||
virtual int try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite) override;
|
virtual int try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite) override;
|
||||||
@ -120,11 +127,14 @@ public:
|
|||||||
ObPartitionMinorMerger();
|
ObPartitionMinorMerger();
|
||||||
~ObPartitionMinorMerger();
|
~ObPartitionMinorMerger();
|
||||||
virtual void reset() override;
|
virtual void reset() override;
|
||||||
virtual int merge_partition(ObTabletMergeCtx &ctx, const int64_t idx) override;
|
virtual int merge_partition(
|
||||||
|
ObTabletMergeCtx &ctx,
|
||||||
|
const int64_t idx,
|
||||||
|
const bool force_flat_format = false) override;
|
||||||
INHERIT_TO_STRING_KV("ObPartitionMinorMerger", ObPartitionMerger, K_(minimum_iter_idxs),
|
INHERIT_TO_STRING_KV("ObPartitionMinorMerger", ObPartitionMerger, K_(minimum_iter_idxs),
|
||||||
K_(need_build_bloom_filter), KP_(cols_id_map));
|
K_(need_build_bloom_filter), KP_(cols_id_map));
|
||||||
protected:
|
protected:
|
||||||
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx) override;
|
virtual int open(ObTabletMergeCtx &ctx, const int64_t idx, const bool force_flat_format) override;
|
||||||
virtual int close() override;
|
virtual int close() override;
|
||||||
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
|
virtual int inner_process(const blocksstable::ObDatumRow &row) override;
|
||||||
virtual int init_partition_fuser(const ObMergeParameter &merge_param) override;
|
virtual int init_partition_fuser(const ObMergeParameter &merge_param) override;
|
||||||
|
|||||||
@ -1417,8 +1417,24 @@ int ObTabletMergeTask::process()
|
|||||||
STORAGE_LOG(WARN, "Unexpected null partition merger", K(ret));
|
STORAGE_LOG(WARN, "Unexpected null partition merger", K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(merger_->merge_partition(*ctx_, idx_))) {
|
if (OB_FAIL(merger_->merge_partition(*ctx_, idx_))) {
|
||||||
STORAGE_LOG(WARN, "failed to merge partition", K(ret));
|
if (is_major_merge_type(ctx_->param_.merge_type_) && OB_ENCODING_EST_SIZE_OVERFLOW == ret) {
|
||||||
|
STORAGE_LOG(WARN, "failed to merge partition with possibly encoding error, "
|
||||||
|
"retry with flat row store type", K(ret), KPC(ctx_), K_(idx));
|
||||||
|
merger_->reset();
|
||||||
|
const bool force_flat_format = true;
|
||||||
|
if (OB_FAIL(merger_->merge_partition(*ctx_, idx_, force_flat_format))) {
|
||||||
|
if (OB_ALLOCATE_MEMORY_FAILED == ret || OB_TIMEOUT == ret || OB_IO_ERROR == ret) {
|
||||||
|
STORAGE_LOG(WARN, "retry merge partition with flat row store type failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
STORAGE_LOG(ERROR, "retry merge partition with flat row store type failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STORAGE_LOG(WARN, "failed to merge partition", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
FLOG_INFO("merge macro blocks ok", K(idx_), "task", *this);
|
FLOG_INFO("merge macro blocks ok", K(idx_), "task", *this);
|
||||||
}
|
}
|
||||||
merger_->reset();
|
merger_->reset();
|
||||||
|
|||||||
Reference in New Issue
Block a user