fix merge 4103
This commit is contained in:
@ -181,7 +181,8 @@ int ObCOMerger::init_writers(ObSSTable *sstable)
|
|||||||
K(start_cg_idx_), K(cg_array.count()));
|
K(start_cg_idx_), K(cg_array.count()));
|
||||||
} else if (OB_FAIL(default_row.init(merger_arena_, merge_param_.static_param_.multi_version_column_descs_.count()))) {
|
} else if (OB_FAIL(default_row.init(merger_arena_, merge_param_.static_param_.multi_version_column_descs_.count()))) {
|
||||||
STORAGE_LOG(WARN, "Failed to init datum row", K(ret));
|
STORAGE_LOG(WARN, "Failed to init datum row", K(ret));
|
||||||
} else if (OB_FAIL(merge_param_.static_param_.schema_->get_orig_default_row(merge_param_.static_param_.multi_version_column_descs_, default_row))) {
|
} else if (OB_FAIL(merge_param_.static_param_.schema_->get_orig_default_row(merge_param_.static_param_.multi_version_column_descs_,
|
||||||
|
merge_infos[start_cg_idx_]->get_sstable_build_desc().get_static_desc().major_working_cluster_version_ >= DATA_VERSION_4_3_1_0, default_row))) {
|
||||||
STORAGE_LOG(WARN, "Failed to get default row from table schema", K(ret), K(merge_param_.static_param_.multi_version_column_descs_));
|
STORAGE_LOG(WARN, "Failed to get default row from table schema", K(ret), K(merge_param_.static_param_.multi_version_column_descs_));
|
||||||
} else if (OB_FAIL(ObLobManager::fill_lob_header(merger_arena_, merge_param_.static_param_.multi_version_column_descs_, default_row))) {
|
} else if (OB_FAIL(ObLobManager::fill_lob_header(merger_arena_, merge_param_.static_param_.multi_version_column_descs_, default_row))) {
|
||||||
STORAGE_LOG(WARN, "fail to fill lob header for default row", K(ret));
|
STORAGE_LOG(WARN, "fail to fill lob header for default row", K(ret));
|
||||||
|
|||||||
@ -241,9 +241,11 @@ int ObMajorPartitionMergeFuser::inner_init(const ObMergeParameter &merge_param)
|
|||||||
const common::ObIArray<share::schema::ObColDesc> &multi_version_column_ids = merge_param.static_param_.multi_version_column_descs_;
|
const common::ObIArray<share::schema::ObColDesc> &multi_version_column_ids = merge_param.static_param_.multi_version_column_descs_;
|
||||||
const ObStorageSchema *schema = merge_param.get_schema();
|
const ObStorageSchema *schema = merge_param.get_schema();
|
||||||
column_cnt_ = multi_version_column_ids.count();
|
column_cnt_ = multi_version_column_ids.count();
|
||||||
|
const bool need_trim_default_row = cluster_version_ >= DATA_VERSION_4_3_1_0 || cluster_version_ < DATA_VERSION_4_3_0_0;
|
||||||
|
|
||||||
if (OB_FAIL(default_row_.init(allocator_, column_cnt_))) {
|
if (OB_FAIL(default_row_.init(allocator_, column_cnt_))) {
|
||||||
STORAGE_LOG(WARN, "Failed to init datum row", K(ret), K_(column_cnt));
|
STORAGE_LOG(WARN, "Failed to init datum row", K(ret), K_(column_cnt));
|
||||||
} else if (OB_FAIL(schema->get_orig_default_row(multi_version_column_ids, default_row_))) {
|
} else if (OB_FAIL(schema->get_orig_default_row(multi_version_column_ids, need_trim_default_row, default_row_))) {
|
||||||
STORAGE_LOG(WARN, "Failed to get default row from table schema", K(ret), K(multi_version_column_ids));
|
STORAGE_LOG(WARN, "Failed to get default row from table schema", K(ret), K(multi_version_column_ids));
|
||||||
} else if (OB_FAIL(ObLobManager::fill_lob_header(allocator_, multi_version_column_ids, default_row_))) {
|
} else if (OB_FAIL(ObLobManager::fill_lob_header(allocator_, multi_version_column_ids, default_row_))) {
|
||||||
STORAGE_LOG(WARN, "fail to fill lob header for default row", K(ret), K(multi_version_column_ids));
|
STORAGE_LOG(WARN, "fail to fill lob header for default row", K(ret), K(multi_version_column_ids));
|
||||||
@ -373,6 +375,7 @@ int ObMinorPartitionMergeFuser::preprocess_fuse_row(const blocksstable::ObDatumR
|
|||||||
|
|
||||||
|
|
||||||
int ObMergeFuserBuilder::build(const ObMergeParameter &merge_param,
|
int ObMergeFuserBuilder::build(const ObMergeParameter &merge_param,
|
||||||
|
const int64_t cluster_version,
|
||||||
ObIAllocator &allocator,
|
ObIAllocator &allocator,
|
||||||
ObIPartitionMergeFuser *&partition_fuser)
|
ObIPartitionMergeFuser *&partition_fuser)
|
||||||
{
|
{
|
||||||
@ -388,7 +391,7 @@ int ObMergeFuserBuilder::build(const ObMergeParameter &merge_param,
|
|||||||
partition_fuser = alloc_helper<ObCOMinorSSTableFuser>(allocator, allocator);
|
partition_fuser = alloc_helper<ObCOMinorSSTableFuser>(allocator, allocator);
|
||||||
} else if (is_major_or_meta_merge_type(merge_type)) {
|
} else if (is_major_or_meta_merge_type(merge_type)) {
|
||||||
is_fuse_row_flag = false;
|
is_fuse_row_flag = false;
|
||||||
partition_fuser = alloc_helper<ObMajorPartitionMergeFuser>(allocator, allocator);
|
partition_fuser = alloc_helper<ObMajorPartitionMergeFuser>(allocator, allocator, cluster_version);
|
||||||
} else {
|
} else {
|
||||||
partition_fuser = alloc_helper<ObMinorPartitionMergeFuser>(allocator, allocator);
|
partition_fuser = alloc_helper<ObMinorPartitionMergeFuser>(allocator, allocator);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -145,10 +145,11 @@ protected:
|
|||||||
class ObMajorPartitionMergeFuser : public ObIPartitionMergeFuser
|
class ObMajorPartitionMergeFuser : public ObIPartitionMergeFuser
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObMajorPartitionMergeFuser(common::ObIAllocator &allocator)
|
ObMajorPartitionMergeFuser(common::ObIAllocator &allocator, const int64_t cluster_version)
|
||||||
: ObIPartitionMergeFuser(allocator),
|
: ObIPartitionMergeFuser(allocator),
|
||||||
default_row_(),
|
default_row_(),
|
||||||
generated_cols_(allocator_)
|
generated_cols_(allocator_),
|
||||||
|
cluster_version_(cluster_version)
|
||||||
{}
|
{}
|
||||||
virtual ~ObMajorPartitionMergeFuser();
|
virtual ~ObMajorPartitionMergeFuser();
|
||||||
virtual int end_fuse_row(const storage::ObNopPos &nop_pos, blocksstable::ObDatumRow &result_row) override;
|
virtual int end_fuse_row(const storage::ObNopPos &nop_pos, blocksstable::ObDatumRow &result_row) override;
|
||||||
@ -158,22 +159,11 @@ protected:
|
|||||||
protected:
|
protected:
|
||||||
blocksstable::ObDatumRow default_row_;
|
blocksstable::ObDatumRow default_row_;
|
||||||
ObFixedArray<int32_t, ObIAllocator> generated_cols_;
|
ObFixedArray<int32_t, ObIAllocator> generated_cols_;
|
||||||
|
const int64_t cluster_version_;
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObMajorPartitionMergeFuser);
|
DISALLOW_COPY_AND_ASSIGN(ObMajorPartitionMergeFuser);
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObMetaPartitionMergeFuser : public ObMajorPartitionMergeFuser
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
ObMetaPartitionMergeFuser(common::ObIAllocator &allocator) : ObMajorPartitionMergeFuser(allocator) {}
|
|
||||||
virtual ~ObMetaPartitionMergeFuser() {}
|
|
||||||
INHERIT_TO_STRING_KV("ObMajorPartitionMergeFuser", ObMajorPartitionMergeFuser,
|
|
||||||
"cur_fuser", "ObMetaPartitionMergeFuser");
|
|
||||||
private:
|
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObMetaPartitionMergeFuser);
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
class ObMinorPartitionMergeFuser : public ObIPartitionMergeFuser
|
class ObMinorPartitionMergeFuser : public ObIPartitionMergeFuser
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -197,6 +187,7 @@ protected:
|
|||||||
class ObMergeFuserBuilder {
|
class ObMergeFuserBuilder {
|
||||||
public:
|
public:
|
||||||
static int build(const ObMergeParameter &merge_param,
|
static int build(const ObMergeParameter &merge_param,
|
||||||
|
const int64_t cluster_version,
|
||||||
ObIAllocator &allocator,
|
ObIAllocator &allocator,
|
||||||
ObIPartitionMergeFuser *&partition_fuser);
|
ObIPartitionMergeFuser *&partition_fuser);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -263,7 +263,7 @@ int ObMerger::prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx)
|
|||||||
} else {
|
} else {
|
||||||
merge_param_.trans_state_mgr_ = &trans_state_mgr_;
|
merge_param_.trans_state_mgr_ = &trans_state_mgr_;
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ObMergeFuserBuilder::build(merge_param_, merger_arena_, partition_fuser_))) {
|
if (OB_FAIL(ObMergeFuserBuilder::build(merge_param_, ctx.static_desc_.major_working_cluster_version_ ,merger_arena_, partition_fuser_))) {
|
||||||
STORAGE_LOG(WARN, "failed to build partition fuser", K(ret), K(merge_param_));
|
STORAGE_LOG(WARN, "failed to build partition fuser", K(ret), K(merge_param_));
|
||||||
} else if (OB_FAIL(inner_prepare_merge(ctx, idx))) {
|
} else if (OB_FAIL(inner_prepare_merge(ctx, idx))) {
|
||||||
STORAGE_LOG(WARN, "failed to inner prepare merge", K(ret), K(ctx));
|
STORAGE_LOG(WARN, "failed to inner prepare merge", K(ret), K(ctx));
|
||||||
|
|||||||
@ -1493,6 +1493,7 @@ int ObStorageSchema::init_column_meta_array(
|
|||||||
|
|
||||||
int ObStorageSchema::get_orig_default_row(
|
int ObStorageSchema::get_orig_default_row(
|
||||||
const common::ObIArray<ObColDesc> &column_ids,
|
const common::ObIArray<ObColDesc> &column_ids,
|
||||||
|
bool need_trim,
|
||||||
blocksstable::ObDatumRow &default_row) const
|
blocksstable::ObDatumRow &default_row) const
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -1513,12 +1514,30 @@ int ObStorageSchema::get_orig_default_row(
|
|||||||
STORAGE_LOG(WARN, "column id not found", K(ret), K(column_ids.at(i)));
|
STORAGE_LOG(WARN, "column id not found", K(ret), K(column_ids.at(i)));
|
||||||
} else if (OB_FAIL(default_row.storage_datums_[i].from_obj_enhance(col_schema->get_orig_default_value()))) {
|
} else if (OB_FAIL(default_row.storage_datums_[i].from_obj_enhance(col_schema->get_orig_default_value()))) {
|
||||||
STORAGE_LOG(WARN, "Failed to transfer obj to datum", K(ret));
|
STORAGE_LOG(WARN, "Failed to transfer obj to datum", K(ret));
|
||||||
|
} else if (need_trim && col_schema->get_orig_default_value().is_fixed_len_char_type()) {
|
||||||
|
trim(col_schema->get_orig_default_value().get_collation_type(), default_row.storage_datums_[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObStorageSchema::trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum) const
|
||||||
|
{
|
||||||
|
const char *str = storage_datum.ptr_;
|
||||||
|
int32_t len = storage_datum.len_;
|
||||||
|
ObString space_pattern = ObCharsetUtils::get_const_str(type, ' ');
|
||||||
|
|
||||||
|
for (; len >= space_pattern.length(); len -= space_pattern.length()) {
|
||||||
|
if (0 != MEMCMP(str + len - space_pattern.length(),
|
||||||
|
space_pattern.ptr(),
|
||||||
|
space_pattern.length())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
storage_datum.len_ = len;
|
||||||
|
}
|
||||||
|
|
||||||
const ObStorageColumnSchema *ObStorageSchema::get_column_schema(const int64_t column_idx) const
|
const ObStorageColumnSchema *ObStorageSchema::get_column_schema(const int64_t column_idx) const
|
||||||
{
|
{
|
||||||
const ObStorageColumnSchema *found_col = nullptr;
|
const ObStorageColumnSchema *found_col = nullptr;
|
||||||
|
|||||||
@ -22,6 +22,7 @@ namespace oceanbase
|
|||||||
namespace blocksstable
|
namespace blocksstable
|
||||||
{
|
{
|
||||||
struct ObSSTableColumnMeta;
|
struct ObSSTableColumnMeta;
|
||||||
|
class ObStorageDatum;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace storage
|
namespace storage
|
||||||
@ -245,6 +246,7 @@ public:
|
|||||||
virtual int init_column_meta_array(
|
virtual int init_column_meta_array(
|
||||||
common::ObIArray<blocksstable::ObSSTableColumnMeta> &meta_array) const override;
|
common::ObIArray<blocksstable::ObSSTableColumnMeta> &meta_array) const override;
|
||||||
int get_orig_default_row(const common::ObIArray<share::schema::ObColDesc> &column_ids,
|
int get_orig_default_row(const common::ObIArray<share::schema::ObColDesc> &column_ids,
|
||||||
|
bool need_trim,
|
||||||
blocksstable::ObDatumRow &default_row) const;
|
blocksstable::ObDatumRow &default_row) const;
|
||||||
const ObStorageColumnSchema *get_column_schema(const int64_t column_id) const;
|
const ObStorageColumnSchema *get_column_schema(const int64_t column_id) const;
|
||||||
|
|
||||||
@ -299,6 +301,7 @@ private:
|
|||||||
int64_t get_array_serialize_length(const common::ObIArray<T> &array) const;
|
int64_t get_array_serialize_length(const common::ObIArray<T> &array) const;
|
||||||
template <typename T>
|
template <typename T>
|
||||||
bool check_column_array_valid(const common::ObIArray<T> &array) const;
|
bool check_column_array_valid(const common::ObIArray<T> &array) const;
|
||||||
|
void trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum) const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static const uint32_t INVALID_ID = UINT32_MAX;
|
static const uint32_t INVALID_ID = UINT32_MAX;
|
||||||
|
|||||||
Reference in New Issue
Block a user