fix missing new add default value after co merge to row store
This commit is contained in:
@ -174,11 +174,14 @@ int ObCOTabletMergeCtx::build_ctx(bool &finish_flag)
|
||||
LOG_WARN("failed to build basic ctx", KR(ret), "param", get_dag_param(), KPC(this));
|
||||
} else if (OB_FAIL(init_major_sstable_status())) {
|
||||
LOG_WARN("failed to get major sstable status", K(ret));
|
||||
} else if (OB_FAIL(static_param_.init_co_major_merge_params())) {
|
||||
} else if (is_major_merge_type(get_merge_type())) {
|
||||
// meta major merge not support row col switch now
|
||||
if (OB_FAIL(static_param_.init_co_major_merge_params())) {
|
||||
LOG_WARN("failed to set major merge params", K(ret), K(static_param_));
|
||||
} else if (is_build_row_store_from_rowkey_cg() && OB_FAIL(init_table_read_info())) {
|
||||
STORAGE_LOG(WARN, "fail to init table read info", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -487,8 +490,8 @@ int ObCOTabletMergeCtx::create_sstable(const ObSSTable *&new_sstable)
|
||||
CTX_SET_DIAGNOSE_LOCATION(*this);
|
||||
} else {
|
||||
// only exist one co table here, emtpy or empty cg sstables
|
||||
LOG_DEBUG("chengkong debug: FinsihTask with only one co sstable");
|
||||
new_sstable = static_cast<ObSSTable *>(base_co_table);
|
||||
LOG_DEBUG("[RowColSwitch] FinsihTask with only one co sstable", KPC(new_sstable));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -535,7 +538,7 @@ int ObCOTabletMergeCtx::inner_add_cg_sstables(const ObSSTable *&new_sstable)
|
||||
LOG_WARN("failed to fill cg sstables to co sstable", K(ret), KPC(base_co_table));
|
||||
} else {
|
||||
new_sstable = base_co_table;
|
||||
LOG_DEBUG("chengkong debug, success to fill cg sstables to co sstable", KPC(new_sstable));
|
||||
LOG_DEBUG("[RowColSwitch] Success to fill cg sstables to co sstable", KPC(new_sstable));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -626,6 +629,8 @@ int ObCOTabletMergeCtx::get_cg_schema_for_merge(const int64_t idx, const ObStora
|
||||
} else if (OB_UNLIKELY(!cg_schema_ptr->is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get invalid cg schema", K(ret), KPC(cg_schema_ptr), K(idx), K(cg_schemas.at(idx)), KPC(this));
|
||||
} else {
|
||||
LOG_DEBUG("[RowColSwitch] get cg schema for merge", K(idx), KPC(cg_schema_ptr));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -649,6 +654,29 @@ int ObCOTabletMergeCtx::init_major_sstable_status()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCOTabletMergeCtx::construct_column_param(
|
||||
const uint64_t column_id,
|
||||
const ObStorageColumnSchema *column_schema,
|
||||
ObColumnParam &column_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_param.set_column_id(column_id);
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
if (OB_HIDDEN_TRANS_VERSION_COLUMN_ID == column_id
|
||||
|| OB_HIDDEN_SQL_SEQUENCE_COLUMN_ID == column_id) {
|
||||
ObObjMeta meta_type;
|
||||
meta_type.set_int();
|
||||
column_param.set_meta_type(meta_type);
|
||||
} else {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(column_id));
|
||||
}
|
||||
} else if (OB_FAIL(column_schema->construct_column_param(column_param))) {
|
||||
STORAGE_LOG(WARN, "fail to construct column param from column schema", K(ret), K(column_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCOTabletMergeCtx::init_table_read_info()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -672,16 +700,19 @@ int ObCOTabletMergeCtx::init_table_read_info()
|
||||
int32_t col_index = OB_INVALID_INDEX; // to construct table_read_info, extra_rowkey_cnt is ignored
|
||||
int32_t multi_version_col_cnt = 0;
|
||||
bool is_multi_version_col = false;
|
||||
const ObStorageColumnSchema *column_schema = nullptr;
|
||||
ObColumnParam *column = nullptr;
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < column_cnt; i++) {
|
||||
column_id = all_column_ids.at(i).col_id_;
|
||||
cg_idx = OB_INVALID_INDEX;
|
||||
col_index = OB_INVALID_INDEX;
|
||||
column_schema = storage_schema->get_column_schema(column_id);
|
||||
column = nullptr;
|
||||
is_multi_version_col = OB_HIDDEN_TRANS_VERSION_COLUMN_ID == column_id || OB_HIDDEN_SQL_SEQUENCE_COLUMN_ID == column_id;
|
||||
if (OB_FAIL(ObTableParam::alloc_column(mem_ctx_.get_allocator(), column))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc column", K(ret));
|
||||
} else if (FALSE_IT(column->set_column_id(column_id))) {
|
||||
} else if (OB_FAIL(construct_column_param(column_id, column_schema, *column))) {
|
||||
STORAGE_LOG(WARN, "fail to construct column param", K(ret), K(column_id), K(column_schema));
|
||||
} else if (OB_FAIL(tmp_cols.push_back(column))) {
|
||||
STORAGE_LOG(WARN, "fail to push back column param", K(ret));
|
||||
} else if (is_multi_version_col) {
|
||||
@ -714,7 +745,7 @@ int ObCOTabletMergeCtx::init_table_read_info()
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG("chengkong debug: Generate read info for co merge", K(ret), K(table_read_info_));
|
||||
LOG_INFO("[RowColSwitch] Generate read info for co merge", K(ret), K(table_read_info_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -108,6 +108,10 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
|
||||
const uint32_t end_cg_idx,
|
||||
const bool check_info_ready) const;
|
||||
int init_major_sstable_status();
|
||||
int construct_column_param(
|
||||
const uint64_t column_id,
|
||||
const ObStorageColumnSchema *column_schema,
|
||||
ObColumnParam &column_param);
|
||||
int init_table_read_info();
|
||||
int inner_loop_prepare_index_tree(
|
||||
const uint32_t start_cg_idx,
|
||||
|
||||
@ -263,7 +263,7 @@ int ObCOMerger:: alloc_row_writers(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid cg idx for mock row store cg schema", K(ret), K(start_cg_idx_), K(end_cg_idx_));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "chengkong debug: mock row store cg", K(ctx->mocked_row_store_cg_));
|
||||
STORAGE_LOG(DEBUG, "[RowColSwitch] mock row store cg", K(ctx->mocked_row_store_cg_));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -716,6 +716,11 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info(
|
||||
if (ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE != static_param_.merge_reason_) {
|
||||
ADD_COMMENT("merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(static_param_.merge_reason_));
|
||||
}
|
||||
if (is_major_merge_type(get_merge_type())
|
||||
&& ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE != static_param_.co_major_merge_type_) {
|
||||
ADD_COMMENT("major", static_param_.major_sstable_status_);
|
||||
ADD_COMMENT("co", ObCOMajorMergePolicy::co_major_merge_type_to_str(static_param_.co_major_merge_type_));
|
||||
}
|
||||
int64_t mem_peak_mb = mem_ctx_.get_total_mem_peak() >> 20;
|
||||
if (mem_peak_mb > 0) {
|
||||
ADD_COMMENT("cost_mb", mem_peak_mb);
|
||||
|
||||
@ -457,7 +457,7 @@ int ObPartitionRowMergeIter::construct_out_cols_project(const ObMergeParameter &
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
access_param_.iter_param_.out_cols_project_ = &out_cols_project_;
|
||||
LOG_DEBUG("chengkong debug: construct out cols project", K(out_cols_project_));
|
||||
LOG_DEBUG("[RowColSwitch] Construct out cols project", K(out_cols_project_));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -1847,14 +1847,16 @@ int ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
ObCOSSTableV2 *co_sstable = nullptr;
|
||||
ObCOMajorSSTableStatus major_sstable_status = ObCOMajorSSTableStatus::INVALID_CO_MAJOR_SSTABLE_STATUS;
|
||||
int64_t estimate_row_cnt = 0;
|
||||
ObTabletID tablet_id;
|
||||
|
||||
if (OB_ISNULL(first_sstable) || OB_UNLIKELY(!first_sstable->is_co_sstable())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("first sstable in tables handle is null or not co sstable", K(ret), K(result.handle_));
|
||||
} else if (FALSE_IT(co_sstable = static_cast<ObCOSSTableV2 *>(first_sstable))) {
|
||||
} else if (FALSE_IT(tablet_id = co_sstable->get_key().tablet_id_)) {
|
||||
} else if (OB_FAIL(decide_co_major_sstable_status(*co_sstable, storage_schema, major_sstable_status))) {
|
||||
LOG_WARN("failed to decide co major sstable status");
|
||||
} else if (OB_FAIL(estimate_row_cnt_for_major_merge(co_sstable->get_key().tablet_id_.id(), result.handle_, storage_schema, tablet_handle, estimate_row_cnt))) {
|
||||
} else if (OB_FAIL(estimate_row_cnt_for_major_merge(tablet_id.id(), result.handle_, storage_schema, tablet_handle, estimate_row_cnt))) {
|
||||
// if estimate row cnt failed, make major sstable match schema
|
||||
major_merge_type = is_major_sstable_match_schema(major_sstable_status) ? BUILD_COLUMN_STORE_MERGE : REBUILD_COLUMN_STORE_MERGE;
|
||||
LOG_WARN("failed to estimate row count for co major merge, build column store by default", "estimate_ret", ret, K(major_sstable_status), K(major_merge_type));
|
||||
@ -1873,7 +1875,7 @@ int ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
} else {
|
||||
major_merge_type = BUILD_ROW_STORE_MERGE;
|
||||
}
|
||||
LOG_DEBUG("chengkong debug: finish decide major merge type", K(major_sstable_status), K(major_merge_type), K(estimate_row_cnt), K(column_cnt));
|
||||
LOG_DEBUG("[RowColSwitch] finish decide major merge type", K(tablet_id), K(major_sstable_status), K(major_merge_type), K(estimate_row_cnt), K(column_cnt));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -96,6 +96,29 @@ bool ObStorageColumnSchema::is_valid() const
|
||||
return common::ob_is_valid_obj_type(static_cast<ObObjType>(meta_type_.get_type()));
|
||||
}
|
||||
|
||||
int ObStorageColumnSchema::construct_column_param(share::schema::ObColumnParam &column_param) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
column_param.set_meta_type(meta_type_);
|
||||
if (orig_default_value_.is_fixed_len_char_type()) {
|
||||
blocksstable::ObStorageDatum datum;
|
||||
ObObj obj;
|
||||
if (OB_FAIL(datum.from_obj(orig_default_value_))) {
|
||||
STORAGE_LOG(WARN, "fail to covent datum from obj", K(ret), K(orig_default_value_));
|
||||
} else {
|
||||
(void) ObStorageSchema::trim(orig_default_value_.get_collation_type(), datum);
|
||||
if (OB_FAIL(datum.to_obj_enhance(obj, orig_default_value_.get_meta()))) {
|
||||
STORAGE_LOG(WARN, "failed to transfer datum to obj", K(ret), K(datum));
|
||||
} else if (OB_FAIL(column_param.set_orig_default_value(obj))) {
|
||||
STORAGE_LOG(WARN, "fail to set orig default value", K(ret));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(column_param.set_orig_default_value(orig_default_value_))) {
|
||||
STORAGE_LOG(WARN, "fail to set orig default value", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageColumnSchema::deep_copy_default_val(ObIAllocator &allocator, const ObObj &default_val)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1609,7 +1632,7 @@ int ObStorageSchema::get_orig_default_row(
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObStorageSchema::trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum) const
|
||||
void ObStorageSchema::trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum)
|
||||
{
|
||||
const char *str = storage_datum.ptr_;
|
||||
int32_t len = storage_datum.len_;
|
||||
|
||||
@ -66,6 +66,7 @@ public:
|
||||
void reset();
|
||||
void destroy(ObIAllocator &allocator);
|
||||
bool is_valid() const;
|
||||
int construct_column_param(share::schema::ObColumnParam &column_param) const;
|
||||
inline common::ColumnType get_data_type() const { return meta_type_.get_type(); }
|
||||
inline bool is_generated_column() const { return is_generated_column_; }
|
||||
inline const common::ObObj &get_orig_default_value() const { return orig_default_value_; }
|
||||
@ -278,6 +279,8 @@ public:
|
||||
"rowkey_cnt", rowkey_array_.count(), K_(rowkey_array), "column_cnt", column_array_.count(), K_(column_array),
|
||||
"skip_index_cnt", skip_idx_attr_array_.count(), K_(skip_idx_attr_array),
|
||||
"column_group_cnt", column_group_array_.count(), K_(column_group_array), K_(has_all_column_group));
|
||||
public:
|
||||
static void trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum);
|
||||
private:
|
||||
void copy_from(const share::schema::ObMergeSchema &input_schema);
|
||||
int deep_copy_str(const ObString &src, ObString &dest);
|
||||
@ -308,7 +311,6 @@ private:
|
||||
int64_t get_array_serialize_length(const common::ObIArray<T> &array) const;
|
||||
template <typename T>
|
||||
bool check_column_array_valid(const common::ObIArray<T> &array) const;
|
||||
void trim(const ObCollationType type, blocksstable::ObStorageDatum &storage_datum) const;
|
||||
|
||||
public:
|
||||
static const uint32_t INVALID_ID = UINT32_MAX;
|
||||
|
||||
Reference in New Issue
Block a user