[FEAT MERGE] support baseline data transformation from row store to column store in delayed mode

This commit is contained in:
HoniiTro19 2024-11-22 10:15:19 +00:00 committed by ob-robot
parent beae2e24c9
commit b1e23e14c6
40 changed files with 403 additions and 88 deletions

View File

@ -3182,6 +3182,39 @@ int ObDDLOperator::insert_column_ids_into_column_group(ObMySQLTransaction &trans
return ret;
}
int ObDDLOperator::update_origin_column_group_with_new_schema(ObMySQLTransaction &trans,
const ObTableSchema &origin_table_schema,
const ObTableSchema &new_table_schema)
{
int ret = OB_SUCCESS;
int64_t new_delete_version = OB_INVALID_VERSION;
int64_t new_insert_version = OB_INVALID_VERSION;
uint64_t origin_tenant_id = origin_table_schema.get_tenant_id();
uint64_t new_tenant_id = new_table_schema.get_tenant_id();
ObSchemaService *schema_service = schema_service_.get_schema_service();
if (OB_ISNULL(schema_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is NULL", K(ret));
} else if (OB_UNLIKELY(origin_tenant_id != new_tenant_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("origin tenant id does not equal to new tenant id", K(ret), K(origin_tenant_id), K(new_tenant_id));
} else if (OB_FAIL(schema_service_.gen_new_schema_version(origin_tenant_id, new_delete_version))) {
LOG_WARN("fail to generate new schema version for delete operation", K(ret), K(new_delete_version));
} else if (OB_FAIL(schema_service_.gen_new_schema_version(new_tenant_id, new_insert_version))) {
LOG_WARN("fail to generate new schema version for create operation", K(ret), K(new_insert_version));
} else if (OB_FAIL(schema_service->get_table_sql_service().update_origin_column_group_with_new_schema(trans,
new_delete_version,
new_insert_version,
origin_table_schema,
new_table_schema))) {
LOG_WARN("fail to update origin column group with new schema", K(ret),
K(new_delete_version),
K(new_insert_version),
K(origin_table_schema),
K(new_table_schema));
}
return ret;
}
int ObDDLOperator::insert_single_column(ObMySQLTransaction &trans,
const ObTableSchema &new_table_schema,

View File

@ -310,6 +310,9 @@ public:
const ObTableSchema &new_table_schema,
const ObIArray<uint64_t> &column_ids,
const ObColumnGroupSchema &column_group);
int update_origin_column_group_with_new_schema(ObMySQLTransaction &trans,
const ObTableSchema &origin_table_schema,
const ObTableSchema &new_table_schema);
int insert_single_column(common::ObMySQLTransaction &trans,
const share::schema::ObTableSchema &new_table_schema,
share::schema::ObColumnSchemaV2 &new_column);

View File

@ -4897,6 +4897,28 @@ int ObDDLService::alter_column_group(obrpc::ObAlterTableArg &alter_table_arg,
K(alter_table_arg.alter_table_schema_.alter_type_));
}
}
LOG_DEBUG("ddl service alter column group finish", K(ret), K(orig_table_schema), K(new_table_schema));
}
return ret;
}
int ObDDLService::update_column_group_table_inplace(const share::schema::ObTableSchema &origin_table_schema,
const share::schema::ObTableSchema &new_table_schema,
ObDDLOperator &ddl_operator,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!origin_table_schema.is_valid()
|| !new_table_schema.is_valid()
|| !new_table_schema.is_column_store_supported())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(new_table_schema));
} else if (OB_FAIL(ddl_operator.update_origin_column_group_with_new_schema(trans,
origin_table_schema,
new_table_schema))) {
LOG_WARN("fail to clear origin table schema and insert new schema", K(ret),
K(origin_table_schema),
K(new_table_schema));
}
return ret;
}
@ -14160,6 +14182,35 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg,
}
}
// alter column group delayed
if (OB_SUCC(ret) && (ObDDLType::DDL_ALTER_COLUMN_GROUP_DELAYED == ddl_type)) {
if (tenant_data_version < DATA_VERSION_4_3_5_0) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("compat version not support", K(ret), K(tenant_data_version));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "tenant data version is less than 4.3.5, alter column group delayed");
} else if (GCTX.is_shared_storage_mode()) {
ret = OB_NOT_SUPPORTED;
SQL_RESV_LOG(WARN, "alter column group delayed does not support shared storage mode", K(ret));
} else if (OB_FAIL(ObSchemaUtils::mock_default_cg(orig_table_schema->get_tenant_id(), new_table_schema))) {
LOG_WARN("fail to mock default cg", K(ret), K(orig_table_schema), K(new_table_schema));
} else if (OB_FAIL(alter_column_group(alter_table_arg,
*orig_table_schema,
new_table_schema,
schema_guard,
ddl_operator,
trans))) {
LOG_WARN("failed to alter table column group", K(ret));
} else if (OB_FAIL(update_column_group_table_inplace(*orig_table_schema,
new_table_schema,
ddl_operator,
trans))) {
LOG_WARN("failed to alter table column group table", K(ret));
} else {
// only change schemas here, leave data reshaping in daily merge
LOG_DEBUG("alter column group in trans", K(ret), K(new_table_schema));
}
}
if (OB_SUCC(ret)) {
ObSchemaOperationType operation_type = OB_DDL_ALTER_TABLE;
if (obrpc::ObAlterTableArg::PARTITIONED_TABLE == alter_table_arg.alter_part_type_) {
@ -14670,7 +14721,11 @@ int ObDDLService::check_alter_column_group(const obrpc::ObAlterTableArg &alter_t
int ret = OB_SUCCESS;
if (OB_DDL_ADD_COLUMN_GROUP == alter_table_arg.alter_table_schema_.alter_type_ ||
OB_DDL_DROP_COLUMN_GROUP == alter_table_arg.alter_table_schema_.alter_type_) {
ddl_type = ObDDLType::DDL_ALTER_COLUMN_GROUP;
if (true == alter_table_arg.is_alter_column_group_delayed_) {
ddl_type = ObDDLType::DDL_ALTER_COLUMN_GROUP_DELAYED;
} else {
ddl_type = ObDDLType::DDL_ALTER_COLUMN_GROUP;
}
if (alter_table_arg.alter_table_schema_.get_column_group_count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument, alter table arg don't have any column group when alter column group",
@ -15342,7 +15397,7 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar
trans,
alter_table_arg.allocator_,
tenant_data_version))) {
LOG_WARN("fail to create user_hidden table", K(ret));
LOG_WARN("fail to create user_hidden table", K(ret), KPC(orig_table_schema), K(new_table_schema));
}
}
@ -17804,7 +17859,7 @@ int ObDDLService::alter_table(obrpc::ObAlterTableArg &alter_table_arg,
LOG_INFO("refresh session active time of temp tables succeed!", K(ret));
}
} else if (OB_FAIL(check_is_offline_ddl(alter_table_arg, data_version, ddl_type, ddl_need_retry_at_executor))) {
LOG_WARN("failed to check is offline ddl", K(ret));
LOG_WARN("failed to check is offline ddl", K(ret), K(alter_table_arg));
} else if (((MOCK_DATA_VERSION_4_2_1_3 <= data_version && DATA_VERSION_4_3_0_0 > data_version)
|| (DATA_VERSION_4_3_2_0 <= data_version)) // [4213, 430) & [4320, )
&& OB_FAIL(check_is_oracle_mode_add_column_not_null_ddl(alter_table_arg,

View File

@ -1812,6 +1812,10 @@ private:
common::ObMySQLTransaction &trans);
int update_column_group_table_inplace(const share::schema::ObTableSchema &origin_table_schema,
const share::schema::ObTableSchema &new_table_schema,
ObDDLOperator &ddl_operator,
common::ObMySQLTransaction &trans);
int check_alter_table_constraint(
const obrpc::ObAlterTableArg &alter_table_arg,
const ObTableSchema &orig_table_schema,

View File

@ -2789,7 +2789,8 @@ OB_DEF_SERIALIZE(ObAlterTableArg)
client_session_id_,
client_session_create_ts_,
lock_priority_,
is_direct_load_partition_);
is_direct_load_partition_,
is_alter_column_group_delayed_);
if (OB_SUCC(ret)) {
if (OB_FAIL(rebuild_index_arg_list_.serialize(buf, buf_len, pos))) {
@ -2893,7 +2894,8 @@ OB_DEF_DESERIALIZE(ObAlterTableArg)
client_session_id_,
client_session_create_ts_,
lock_priority_,
is_direct_load_partition_);
is_direct_load_partition_,
is_alter_column_group_delayed_);
return ret;
}
@ -2945,7 +2947,8 @@ OB_DEF_SERIALIZE_SIZE(ObAlterTableArg)
client_session_id_,
client_session_create_ts_,
lock_priority_,
is_direct_load_partition_);
is_direct_load_partition_,
is_alter_column_group_delayed_);
}
if (OB_FAIL(ret)) {

View File

@ -2363,7 +2363,8 @@ public:
client_session_id_(0),
client_session_create_ts_(0),
lock_priority_(transaction::tablelock::ObTableLockPriority::NORMAL),
is_direct_load_partition_(false)
is_direct_load_partition_(false),
is_alter_column_group_delayed_(false)
{
}
virtual ~ObAlterTableArg()
@ -2451,7 +2452,8 @@ public:
K_(client_session_id),
K_(client_session_create_ts),
K_(lock_priority),
K_(is_direct_load_partition));
K_(is_direct_load_partition),
K_(is_alter_column_group_delayed));
private:
int alloc_index_arg(const ObIndexArg::IndexActionType index_action_type, ObIndexArg *&index_arg);
public:
@ -2492,6 +2494,7 @@ public:
int64_t client_session_create_ts_;
transaction::tablelock::ObTableLockPriority lock_priority_;
bool is_direct_load_partition_;
bool is_alter_column_group_delayed_;
int serialize_index_args(char *buf, const int64_t data_len, int64_t &pos) const;
int deserialize_index_args(const char *buf, const int64_t data_len, int64_t &pos);
int64_t get_index_args_serialize_size() const;

View File

@ -1785,18 +1785,27 @@ int ObSchemaServiceSQLImpl::fetch_all_column_group_mapping(
}
}
bool non_default_cg_exist = false;
// get all column_group ids of this table
ObArray<uint64_t> cg_ids;
ObTableSchema::const_column_group_iterator it_begin = table_schema->column_group_begin();
ObTableSchema::const_column_group_iterator it_end = table_schema->column_group_end();
const ObColumnGroupSchema *column_group = NULL;
for (; OB_SUCC(ret) && (it_begin != it_end); ++it_begin) {
column_group = *it_begin;
if (OB_ISNULL(column_group)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column_group schema should not be null", KR(ret));
} else if (OB_FAIL(cg_ids.push_back(column_group->get_column_group_id()))) {
LOG_WARN("fail to push back column_group id", KR(ret), KPC(column_group));
if (FAILEDx(table_schema->has_non_default_column_group(non_default_cg_exist))) {
LOG_WARN("fail to check table schema has non default column group", K(ret), K(table_schema));
} else {
for (; OB_SUCC(ret) && (it_begin != it_end); ++it_begin) {
column_group = *it_begin;
if (OB_ISNULL(column_group)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column_group schema should not be null", KR(ret));
} else if (ObColumnGroupType::DEFAULT_COLUMN_GROUP == column_group->get_column_group_type() && non_default_cg_exist) {
LOG_INFO("filter old version default column group", K(non_default_cg_exist), KPC(column_group));
// support multi-version column group read for alter column group delayed, filter old version default_cg
continue;
} else if (OB_FAIL(cg_ids.push_back(column_group->get_column_group_id()))) {
LOG_WARN("fail to push back column_group id", KR(ret), KPC(column_group));
}
}
}
@ -1814,7 +1823,7 @@ int ObSchemaServiceSQLImpl::fetch_all_column_group_mapping(
LOG_WARN("fail to append sql", KR(ret), K(schema_version));
} else if (OB_FAIL(sql.append_fmt(" ORDER BY TENANT_ID DESC, TABLE_ID DESC, COLUMN_GROUP_ID ASC, COLUMN_ID ASC"))) {
LOG_WARN("fail to append sql", KR(ret));
} else if (is_history && OB_FAIL(sql.append_fmt(", SCHEMA_VERSION DESC"))) {
} else if (is_history && OB_FAIL(sql.append_fmt(", SCHEMA_VERSION ASC"))) {
LOG_WARN("fail to append sql", KR(ret));
} else {
const bool check_deleted = is_history;

View File

@ -629,6 +629,7 @@ ObTableParam::ObTableParam(ObIAllocator &allocator)
is_fts_index_(false),
is_multivalue_index_(false),
is_column_replica_table_(false),
is_normal_cgs_at_the_end_(false),
is_vec_index_(false),
is_partition_table_(false)
{
@ -660,6 +661,7 @@ void ObTableParam::reset()
is_fts_index_ = false;
is_multivalue_index_ = false;
is_column_replica_table_ = false;
is_normal_cgs_at_the_end_ = false;
is_vec_index_ = false;
is_partition_table_ = false;
}
@ -703,6 +705,9 @@ OB_DEF_SERIALIZE(ObTableParam)
if (OB_SUCC(ret)) {
OB_UNIS_ENCODE(is_column_replica_table_);
}
if (OB_SUCC(ret)) {
OB_UNIS_ENCODE(is_normal_cgs_at_the_end_);
}
if (OB_SUCC(ret)) {
OB_UNIS_ENCODE(is_vec_index_);
}
@ -797,6 +802,10 @@ OB_DEF_DESERIALIZE(ObTableParam)
LST_DO_CODE(OB_UNIS_DECODE,
is_column_replica_table_);
}
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_DECODE,
is_normal_cgs_at_the_end_);
}
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_DECODE,
is_vec_index_);
@ -849,6 +858,10 @@ OB_DEF_SERIALIZE_SIZE(ObTableParam)
LST_DO_CODE(OB_UNIS_ADD_LEN,
is_column_replica_table_);
}
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_ADD_LEN,
is_normal_cgs_at_the_end_);
}
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_ADD_LEN,
is_vec_index_);
@ -1216,6 +1229,14 @@ int ObTableParam::construct_columns_and_projector(
}
}
}
if (OB_SUCC(ret) && 0 < table_schema.get_column_group_count()) {
const ObColumnGroupType column_group_type = (*table_schema.column_group_begin())->get_column_group_type();
if (ROWKEY_COLUMN_GROUP == column_group_type || ALL_COLUMN_GROUP == column_group_type) {
// delayed column transform already has the column group information that meets the cs replica requirements before the storage schema update
is_normal_cgs_at_the_end_ = true;
}
}
return ret;
}
@ -1581,7 +1602,8 @@ int64_t ObTableParam::to_string(char *buf, const int64_t buf_len) const
K_(is_fts_index),
K_(parser_name),
K_(is_vec_index),
K_(is_column_replica_table));
K_(is_column_replica_table),
K_(is_normal_cgs_at_the_end));
J_OBJ_END();
return pos;

View File

@ -322,6 +322,7 @@ public:
inline bool has_virtual_column() const { return has_virtual_column_; }
inline int64_t get_rowid_version() const { return rowid_version_; }
inline bool is_column_replica_table() const { return is_column_replica_table_; }
inline bool is_normal_cgs_at_the_end() const { return is_normal_cgs_at_the_end_; }
inline const common::ObIArray<int32_t> &get_rowid_projector() const { return rowid_projector_; }
inline const common::ObIArray<int32_t> &get_output_projector() const { return output_projector_; }
inline const common::ObIArray<int32_t> &get_aggregate_projector() const { return aggregate_projector_; }
@ -412,6 +413,7 @@ private:
bool is_fts_index_;
bool is_multivalue_index_;
bool is_column_replica_table_;
bool is_normal_cgs_at_the_end_;
bool is_vec_index_;
bool is_partition_table_;
};

View File

@ -9499,6 +9499,28 @@ int ObTableSchema::has_all_column_group(bool &has_all_column_group) const
return ret;
}
int ObTableSchema::has_non_default_column_group(bool &has_non_default_column_group) const
{
int ret = OB_SUCCESS;
has_non_default_column_group = false;
ObSEArray<const ObColumnGroupSchema *, 8> column_group_metas;
if (OB_FAIL(get_store_column_groups(column_group_metas, false/*filter_empty_cg*/))) {
LOG_WARN("Failed to get column group metas", K(ret), KPC(this));
} else {
const ObColumnGroupSchema *cg_schema = nullptr;
for (int64_t idx = 0; OB_SUCC(ret) && idx < column_group_metas.count(); ++idx) {
if (OB_ISNULL(cg_schema = column_group_metas.at(idx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null cg_schema", K(ret));
} else if (DEFAULT_COLUMN_GROUP != cg_schema->get_column_group_type()) {
has_non_default_column_group = true;
break;
}
}
}
return ret;
}
int ObTableSchema::get_column_group_by_id(
const uint64_t column_group_id,
ObColumnGroupSchema *&column_group) const
@ -9680,7 +9702,16 @@ int ObTableSchema::add_column_group_to_array(ObColumnGroupSchema *column_group)
}
if (OB_SUCC(ret)) {
column_group_arr_[column_group_cnt_++] = column_group;
if (ROWKEY_COLUMN_GROUP == column_group->get_column_group_type()
|| ALL_COLUMN_GROUP == column_group->get_column_group_type()) {
for (int64_t idx = column_group_cnt_ - 1; idx >= 0; --idx) {
column_group_arr_[idx + 1] = column_group_arr_[idx];
}
column_group_arr_[0] = column_group;
++column_group_cnt_;
} else {
column_group_arr_[column_group_cnt_++] = column_group;
}
}
}

View File

@ -1678,6 +1678,7 @@ public:
const bool filter_empty_cg = true) const;
int remove_column_group(const uint64_t column_group_id);
int has_all_column_group(bool &has_all_column_group) const;
int has_non_default_column_group(bool &has_non_default_column_group) const;
// materialized view log related
template <typename Allocator>
static int build_mlog_table_name(Allocator &allocator,

View File

@ -6128,6 +6128,38 @@ int ObTableSqlService::update_single_column_group(ObISQLClient &sql_client,
return ret;
}
int ObTableSqlService::update_origin_column_group_with_new_schema(ObISQLClient &sql_client,
const int64_t delete_schema_version,
const int64_t insert_schema_version,
const ObTableSchema &origin_table_schema,
const ObTableSchema &new_table_schema)
{
int ret = OB_SUCCESS;
uint64_t data_version = OB_INVALID_VERSION;
uint64_t origin_tenant_id = origin_table_schema.get_tenant_id();
uint64_t new_tenant_id = new_table_schema.get_tenant_id();
if (OB_UNLIKELY(!sql_client.is_active()
|| !origin_table_schema.is_valid()
|| !new_table_schema.is_valid()
|| origin_tenant_id != new_tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(origin_table_schema), K(new_table_schema));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(origin_tenant_id, data_version))) {
LOG_WARN("fail to get min data version", KR(ret), K(origin_table_schema));
} else if (OB_FAIL(check_column_store_valid(origin_table_schema, data_version))) {
LOG_WARN("fail to check column store valid for origin table schema", KR(ret), K(origin_table_schema));
} else if (OB_FAIL(check_column_store_valid(new_table_schema, data_version))) {
LOG_WARN("fail to check column store valid for new table schema", KR(ret), K(new_table_schema));
} else if (OB_FAIL(delete_from_column_group(sql_client, origin_table_schema, delete_schema_version, false /*history table*/))) {
LOG_WARN("fail to delete __all_column_group for origin table schema", KR(ret), K(origin_table_schema));
} else if (OB_FAIL(delete_from_column_group_mapping(sql_client, origin_table_schema, delete_schema_version, false /*history table*/))) {
LOG_WARN("fail to delete __all_column_group_mapping for origin table schema", KR(ret), K(origin_table_schema));
} else if (OB_FAIL(add_column_groups(sql_client, new_table_schema, insert_schema_version, false/*only_history*/))) {
LOG_WARN("fail to add column groups from new table schema", KR(ret), K(new_table_schema), K(insert_schema_version));
}
return ret;
}
// Three scenes :
// 1. drop fk parent table
// 2. create child table with a fk references a mock fk parent table not exist

View File

@ -442,6 +442,11 @@ public:
const ObTableSchema &new_table_schema,
const ObColumnGroupSchema &ori_cg_schema,
const ObColumnGroupSchema &new_cg_schema);
int update_origin_column_group_with_new_schema(ObISQLClient &sql_client,
const int64_t delete_schema_version,
const int64_t insert_schema_version,
const ObTableSchema &orig_table_schema,
const ObTableSchema &new_table_schema);
private:
int log_operation_wrapper(
ObSchemaOperation &opt,

View File

@ -466,7 +466,7 @@ END_P SET_VAR DELIMITER
%type <ival> opt_with_consistent_snapshot opt_config_scope opt_index_keyname opt_full opt_mode_flag opt_extended opt_extended_or_full
%type <node> opt_priority opt_low_priority delete_option delete_option_list opt_delete_option_list
%type <node> opt_work begin_stmt commit_stmt rollback_stmt opt_ignore opt_ignore_or_replace ignore_or_replace xa_begin_stmt xa_end_stmt xa_prepare_stmt xa_commit_stmt xa_rollback_stmt xa_recover_stmt xa_xid opt_join_or_resume opt_suspend opt_one_phase opt_convert_xid
%type <node> alter_table_stmt alter_table_actions alter_table_action_list alter_table_action alter_column_option alter_index_option alter_constraint_option standalone_alter_action alter_partition_option opt_to alter_tablegroup_option opt_table opt_tablegroup_option_list alter_tg_partition_option alter_column_group_option
%type <node> alter_table_stmt alter_table_actions alter_table_action_list alter_table_action alter_column_option alter_index_option alter_constraint_option standalone_alter_action alter_partition_option opt_to alter_tablegroup_option opt_table opt_tablegroup_option_list alter_tg_partition_option alter_column_group_delayed_desc alter_column_group_option
%type <node> tablegroup_option_list tablegroup_option alter_tablegroup_actions alter_tablegroup_action tablegroup_option_list_space_seperated
%type <node> opt_tg_partition_option tg_hash_partition_option tg_key_partition_option tg_range_partition_option tg_subpartition_option tg_list_partition_option
%type <node> alter_column_behavior opt_set opt_position_column
@ -18437,18 +18437,29 @@ VISIBLE
}
;
alter_column_group_delayed_desc:
/* EMPTY */
{
$$ = NULL;
}
| DELAYED
{
malloc_terminal_node($$, result->malloc_pool_, T_ALTER_COLUMN_GROUP_DELAYED);
}
;
alter_column_group_option:
ADD COLUMN GROUP '(' column_group_list ')'
ADD COLUMN GROUP '(' column_group_list ')' alter_column_group_delayed_desc
{
ParseNode *column_group_list = NULL;
merge_nodes(column_group_list, result, T_COLUMN_GROUP_ADD, $5);
malloc_non_terminal_node($$, result->malloc_pool_, T_ALTER_COLUMN_GROUP_OPTION, 1, column_group_list);
malloc_non_terminal_node($$, result->malloc_pool_, T_ALTER_COLUMN_GROUP_OPTION, 2, column_group_list, $7);
}
| DROP COLUMN GROUP '(' column_group_list ')'
| DROP COLUMN GROUP '(' column_group_list ')' alter_column_group_delayed_desc
{
ParseNode *column_group_list = NULL;
merge_nodes(column_group_list, result, T_COLUMN_GROUP_DROP,$5);
malloc_non_terminal_node($$, result->malloc_pool_, T_ALTER_COLUMN_GROUP_OPTION, 1, column_group_list);
malloc_non_terminal_node($$, result->malloc_pool_, T_ALTER_COLUMN_GROUP_OPTION, 2, column_group_list, $7);
}
;

View File

@ -6940,6 +6940,7 @@ int ObAlterTableResolver::resolve_alter_column_groups(const ParseNode &node)
SQL_RESV_LOG(WARN, "get alter table stmt failed", K(ret), K(node.type_), KP(node.children_));
} else {
const ParseNode *column_group_node = node.children_[0];
const ParseNode *delayed_node = nullptr;
uint64_t compat_version = 0;
ObAlterTableArg &alter_table_arg = alter_table_stmt->get_alter_table_arg();
@ -6973,8 +6974,38 @@ int ObAlterTableResolver::resolve_alter_column_groups(const ParseNode &node)
ret = OB_ERR_UNEXPECTED;
SQL_RESV_LOG(WARN, "invalid parse tree ", K(ret), K(column_group_node->type_));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(parse_column_group(column_group_node, *table_schema_, alter_table_schema))) {
} else if (node.num_child_ > 1) {
if (compat_version < DATA_VERSION_4_3_5_0) {
ret = OB_NOT_SUPPORTED;
SQL_RESV_LOG(WARN, "alter column group delayed gets unsupported data_version", K(ret), K(compat_version));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "tenant data version is less than 4.3.5, alter column group delayed");
} else if (GCTX.is_shared_storage_mode()) {
ret = OB_NOT_SUPPORTED;
SQL_RESV_LOG(WARN, "alter column group delayed does not support shared storage mode", K(ret));
} else if (FALSE_IT(delayed_node = node.children_[1])) {
} else if (OB_ISNULL(delayed_node)) {
alter_table_stmt->get_alter_table_arg().is_alter_column_group_delayed_ = false;
} else if (T_ALTER_COLUMN_GROUP_DELAYED == delayed_node->type_) {
if (T_COLUMN_GROUP_DROP == column_group_node->type_) {
ret = OB_NOT_SUPPORTED;
SQL_RESV_LOG(WARN, "drop column group in delayed mode is not supported", K(ret));
} else {
alter_table_stmt->get_alter_table_arg().is_alter_column_group_delayed_ = true;
SQL_RESV_LOG(INFO, "set is_alter_column_group_delayed_ to true");
}
} else {
ret = OB_ERR_UNEXPECTED;
SQL_RESV_LOG(WARN, "invalid alter column group delayed type", K(ret), "type", delayed_node->type_);
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(parse_column_group(column_group_node,
*table_schema_,
alter_table_schema,
alter_table_stmt->get_alter_table_arg().is_alter_column_group_delayed_))) {
LOG_WARN("fail to parse column gorup list", K(ret));
}
}

View File

@ -13171,7 +13171,8 @@ int ObDDLResolver::deep_copy_column_expr_name(common::ObIAllocator &allocator,
int ObDDLResolver::parse_column_group(const ParseNode *column_group_node,
const ObTableSchema &table_schema,
ObTableSchema &dst_table_schema)
ObTableSchema &dst_table_schema,
const bool is_alter_column_group_delayed)
{
int ret = OB_SUCCESS;
bool sql_exist_all_column_group = false;
@ -13218,6 +13219,13 @@ int ObDDLResolver::parse_column_group(const ParseNode *column_group_node,
}
}
if (OB_SUCC(ret)
&& OB_UNLIKELY(sql_exist_all_column_group && !sql_exist_single_column_group && is_alter_column_group_delayed)) {
ret = OB_NOT_SUPPORTED;
SQL_RESV_LOG(WARN, "alter table add all column groups not supprt",
K(ret), K(sql_exist_all_column_group), K(sql_exist_single_column_group), K(is_alter_column_group_delayed));
}
/* all column group */
/* column group in resolver do not use real column group id*/
/* ddl service use column group name to distingush them*/

View File

@ -565,7 +565,10 @@ protected:
const uint64_t cg_id,
share::schema::ObColumnGroupSchema &column_group);
int parse_cg_node(const ParseNode &cg_node, obrpc::ObCreateIndexArg &create_index_arg) const;
int parse_column_group(const ParseNode *cg_node,const share::schema::ObTableSchema &table_schema, share::schema::ObTableSchema &dst_table_schema);
int parse_column_group(const ParseNode *cg_node,
const share::schema::ObTableSchema &table_schema,
share::schema::ObTableSchema &dst_table_schema,
const bool is_alter_column_group_delayed = false);
int resolve_index_column_group(const ParseNode *node, obrpc::ObCreateIndexArg &create_index_arg);
bool need_column_group(const ObTableSchema &table_schema);
int resolve_hints(const ParseNode *parse_node, ObDDLStmt &stmt, const ObTableSchema &table_schema);

View File

@ -384,7 +384,7 @@ int ObTableAccessParam::check_valid_before_query_init(
if (OB_UNLIKELY(!tablet_handle.is_valid() || OB_ISNULL(tablet = tablet_handle.get_obj()))) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid table handle", K(ret), K(tablet_handle), KPC(tablet));
} else if (OB_UNLIKELY(tablet->is_cs_replica_compat() && !table_param.is_column_replica_table())) {
} else if (OB_UNLIKELY(tablet->is_cs_replica_compat() && !table_param.is_column_replica_table() && !table_param.is_normal_cgs_at_the_end())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid table param for cs replica tablet", K(ret), K(table_param), KPC(tablet));
}

View File

@ -213,6 +213,17 @@ int ObCOTabletMergeCtx::prepare_cs_replica_param()
return ret;
}
// major_sstable_status_ is decided in static_param_, according to the storage schema in medium compaction info
int ObCOTabletMergeCtx::check_and_set_build_redundant_row_merge()
{
int ret = OB_SUCCESS;
if (static_param_.is_cs_replica_ && static_param_.is_build_redundent_row_store_from_rowkey_cg()) {
static_param_.co_major_merge_type_ = ObCOMajorMergePolicy::BUILD_REDUNDANT_ROW_STORE_MERGE;
LOG_INFO("[CS-Replica] Decide build redundant row store from rowkey cg for cs replica", K(ret), K_(static_param));
}
return ret;
}
int ObCOTabletMergeCtx::check_convert_co_checksum(const ObSSTable *new_sstable)
{
int ret = OB_SUCCESS;
@ -263,13 +274,15 @@ int ObCOTabletMergeCtx::prepare_schema()
} else if (OB_FAIL(get_medium_compaction_info())) {
// have checked medium info inside
LOG_WARN("failed to get medium compaction info", K(ret), KPC(this));
} else {
LOG_INFO("[CS-Replica] finish prepare schema for co merge", K(ret),
"is_cs_replica", static_param_.is_cs_replica_, KPC(this));
}
if (FAILEDx(prepare_row_store_cg_schema())) {
LOG_WARN("failed to init major sstable status", K(ret));
} else if (OB_FAIL(check_and_set_build_redundant_row_merge())) {
LOG_WARN("failed to check and set build redundant row merge", K(ret), KPC(this));
} else {
LOG_INFO("[CS-Replica] finish prepare schema for co merge", K(ret),
"is_cs_replica", static_param_.is_cs_replica_, KPC(this));
}
return ret;
}
@ -282,8 +295,10 @@ int ObCOTabletMergeCtx::build_ctx(bool &finish_flag)
LOG_WARN("failed to build basic ctx", KR(ret), "param", get_dag_param(), KPC(this));
} else if (is_major_merge_type(get_merge_type())) {
// meta major merge not support row col switch now
if (is_build_row_store_from_rowkey_cg() && OB_FAIL(mock_row_store_table_read_info())) {
STORAGE_LOG(WARN, "fail to init table read info", K(ret));
if (is_build_row_store_from_rowkey_cg() || is_build_redundant_row_store_from_rowkey_cg()) {
if (OB_FAIL(mock_row_store_table_read_info())) {
STORAGE_LOG(WARN, "fail to init table read info", K(ret));
}
}
}
return ret;
@ -298,7 +313,8 @@ int ObCOTabletMergeCtx::check_merge_ctx_valid()
if (OB_UNLIKELY(!tablet_handle_.is_valid()) || OB_ISNULL(tablet = tablet_handle_.get_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid tablet", K(ret), K_(tablet_handle));
} else if (tablet->is_row_store() && OB_UNLIKELY(!is_convert_co_major_merge(merge_type))) {
} else if (tablet->is_row_store()
&& OB_UNLIKELY(!is_convert_co_major_merge(merge_type) && !ObCOMajorMergePolicy::is_use_rs_build_schema_match_merge(static_param_.co_major_merge_type_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only row store tablet need to do convert co major merge", K(ret), K(merge_type), KPC(tablet));
} else if (OB_ISNULL(base_table = static_param_.tables_handle_.get_table(0))) {
@ -337,6 +353,8 @@ int ObCOTabletMergeCtx::cal_merge_param()
} else if (ObCOMajorMergePolicy::is_use_rs_build_schema_match_merge(co_major_merge_type)) {
force_full_merge = true;
static_param_.is_rebuild_column_store_ = true;
} else if (ObCOMajorMergePolicy::is_build_redundent_row_store_merge(co_major_merge_type)) {
force_full_merge = true;
}
if (FAILEDx(ObBasicTabletMergeCtx::cal_major_merge_param(force_full_merge, progressive_merge_mgr_))) {
LOG_WARN("failed to calc major merge param", KR(ret), K(force_full_merge));
@ -834,7 +852,16 @@ int ObCOTabletMergeCtx::prepare_row_store_cg_schema()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sstable", K(ret), K(sstable), K(static_param_.tables_handle_));
} else if (OB_UNLIKELY(!sstable->is_co_sstable())) {
// maybe cs replica, processed in ObCOTabletMergeCtx::prepare_cs_replica_param
if (static_param_.is_cs_replica_) {
// should be cs replica, processed in ObCOTabletMergeCtx::prepare_cs_replica_param
} else if (sstable->is_major_sstable() && !static_param_.schema_->is_row_store()) {
static_param_.major_sstable_status_ = ObCOMajorSSTableStatus::DELAYED_TRANSFORM_MAJOR;
// should be delayed column group transform
LOG_INFO("set major sstable status with column store schema", K(ret), KPC(sstable), K_(static_param));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("first table should be major in co merge", K(ret), KPC(sstable), K_(static_param));
}
} else if (OB_FAIL(ObCOMajorMergePolicy::decide_co_major_sstable_status(
*static_cast<ObCOSSTableV2 *>(sstable),
*get_schema(),

View File

@ -93,7 +93,7 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
const ObITableReadInfo *get_full_read_info() const
{
const ObITableReadInfo *ret_info = NULL;
if (is_build_row_store_from_rowkey_cg()) {
if (is_build_row_store_from_rowkey_cg() || is_build_redundant_row_store_from_rowkey_cg()) {
ret_info = &mocked_row_store_table_read_info_;
} else {
ret_info = &read_info_;
@ -147,8 +147,10 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
int prepare_mocked_row_store_cg_schema();
bool should_mock_row_store_cg_schema();
int prepare_cs_replica_param();
int check_and_set_build_redundant_row_merge();
int check_convert_co_checksum(const ObSSTable *new_sstable);
OB_INLINE bool is_build_row_store_from_rowkey_cg() const { return static_param_.is_build_row_store_from_rowkey_cg(); }
OB_INLINE bool is_build_redundant_row_store_from_rowkey_cg() const { return static_param_.is_build_redundent_row_store_from_rowkey_cg(); }
OB_INLINE bool is_build_row_store() const { return static_param_.is_build_row_store(); }
int get_cg_schema_for_merge(const int64_t idx, const ObStorageColumnGroupSchema *&cg_schema_ptr);
const ObSSTableMergeHistory &get_merge_history() { return dag_net_merge_history_; }

View File

@ -273,10 +273,11 @@ int ObCOMerger:: alloc_row_writers(
ObSSTable *cg_sstable = nullptr;
ObITableReadInfo *read_info = nullptr;
bool add_column = false;
const bool is_iter_co_build_row_store = ctx->is_build_row_store_from_rowkey_cg()
|| (ctx->is_build_redundant_row_store_from_rowkey_cg() && 0 == idx);
if (OB_FAIL(ctx->get_cg_schema_for_merge(idx, cg_schema_ptr))) {
LOG_WARN("fail to get cg schema for merge", K(ret), K(idx));
} else if (OB_ISNULL(writer = OB_NEWx(ObCOMergeRowWriter, &merger_arena_, ctx->is_build_row_store_from_rowkey_cg()))) {
} else if (OB_ISNULL(writer = OB_NEWx(ObCOMergeRowWriter, &merger_arena_, is_iter_co_build_row_store))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "Failed to allocate memory for ObCOMergeWriter", K(ret));
} else if (OB_ISNULL(merge_infos[idx])) {

View File

@ -125,7 +125,9 @@ enum ObCOMajorSSTableStatus: uint8_t {
+-----------------------+---------------+---------------+-------+
| COL_REPLICA_MAJOR | ROW STORE | ROW STORE | YES |
+-----------------------+---------------+---------------+-------+
|DELAYED_TRANSFORM_MAJOR| ALL+EACH/EACH | ROW STORE | NO |
|DELAYED_TRANSFORM_MAJOR| ALL+EACH/EACH | ROW STORE | NO |
+-----------------------+---------------+---------------+-------+
| PURE_COL_WITH_ALL | ALL+EACH | EACH | NO |
+-----------------------+---------------+---------------+-------+
*/
inline bool is_valid_co_major_sstable_status(const ObCOMajorSSTableStatus& major_sstable_status)
@ -144,6 +146,10 @@ inline bool is_major_sstable_match_schema(const ObCOMajorSSTableStatus& major_ss
{
return major_sstable_status == COL_WITH_ALL || major_sstable_status == PURE_COL;
}
inline bool is_build_redundent_row_store(const ObCOMajorSSTableStatus& major_sstable_status)
{
return PURE_COL_WITH_ALL == major_sstable_status;
}
const char* co_major_sstable_status_to_str(const ObCOMajorSSTableStatus& major_sstable_status);
/*

View File

@ -120,7 +120,8 @@ int ObBasicScheduleTabletFunc::check_with_schedule_scn(
const ObTablet &tablet,
const int64_t schedule_scn,
const ObTabletStatusCache &tablet_status,
bool &can_merge)
bool &can_merge,
const ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type)
{
const ObLSID &ls_id = ls_status_.ls_id_;
can_merge = false;
@ -139,7 +140,7 @@ int ObBasicScheduleTabletFunc::check_with_schedule_scn(
} else if (need_force_freeze) {
tablet_cnt_.force_freeze_cnt_++;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(freeze_param_.tablet_info_array_.push_back(ObTabletSchedulePair(tablet_id, schedule_scn)))) {
if (OB_TMP_FAIL(freeze_param_.tablet_info_array_.push_back(ObTabletSchedulePair(tablet_id, schedule_scn, co_major_merge_type)))) {
LOG_WARN("failed to push back tablet_id for batch_freeze", KR(tmp_ret), K(tablet_id));
}
}

View File

@ -47,7 +47,8 @@ protected:
const storage::ObTablet &tablet,
const int64_t schedule_scn,
const ObTabletStatusCache &tablet_status,
bool &can_merge);
bool &can_merge,
const ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE);
int check_need_force_freeze(
const storage::ObTablet &tablet,
const int64_t schedule_scn,

View File

@ -314,6 +314,11 @@ bool ObStaticMergeParam::is_build_row_store() const
return ObCOMajorMergePolicy::is_build_row_store_merge(co_major_merge_type_);
}
bool ObStaticMergeParam::is_build_redundent_row_store_from_rowkey_cg() const
{
return is_build_redundent_row_store(major_sstable_status_);
}
ObMergeLevel ObStaticMergeParam::get_merge_level_for_sstable(
const ObSSTable &sstable) const
{

View File

@ -46,6 +46,7 @@ struct ObStaticMergeParam final
int cal_major_merge_param(const bool force_full_merge, ObProgressiveMergeMgr &progressive_mgr);
bool is_build_row_store_from_rowkey_cg() const;
bool is_build_row_store() const;
bool is_build_redundent_row_store_from_rowkey_cg() const;
OB_INLINE void set_full_merge_and_level(bool is_full_merge)
{

View File

@ -131,7 +131,7 @@ int ObBatchFreezeTabletsTask::schedule_tablet_major_after_freeze(
// no need to schedule merge
} else if (OB_FAIL(ObTenantTabletScheduler::schedule_merge_dag(
ls.get_ls_id(), *tablet, MEDIUM_MERGE,
cur_pair.schedule_merge_scn_, EXEC_MODE_LOCAL))) {
cur_pair.schedule_merge_scn_, EXEC_MODE_LOCAL, nullptr/*dag_net_id*/, cur_pair.co_major_merge_type_))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("failed to schedule medium merge dag", K(ret), "ls_id", ls.get_ls_id(), K(cur_pair));
}

View File

@ -19,21 +19,25 @@ struct ObTabletSchedulePair
public:
ObTabletSchedulePair()
: tablet_id_(),
schedule_merge_scn_(0)
schedule_merge_scn_(0),
co_major_merge_type_(ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE)
{ }
ObTabletSchedulePair(
const common::ObTabletID &tablet_id,
const int64_t schedule_merge_scn)
const int64_t schedule_merge_scn,
const ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type)
: tablet_id_(tablet_id),
schedule_merge_scn_(schedule_merge_scn)
schedule_merge_scn_(schedule_merge_scn),
co_major_merge_type_(co_major_merge_type)
{ }
bool is_valid() const { return tablet_id_.is_valid() && schedule_merge_scn_ > 0; }
bool need_force_freeze() const { return schedule_merge_scn_ > 0; }
void reset() { tablet_id_.reset(); schedule_merge_scn_ = 0; }
TO_STRING_KV(K_(tablet_id), K_(schedule_merge_scn));
void reset() { tablet_id_.reset(); schedule_merge_scn_ = 0; co_major_merge_type_ = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE; }
TO_STRING_KV(K_(tablet_id), K_(schedule_merge_scn), K_(co_major_merge_type));
public:
common::ObTabletID tablet_id_;
int64_t schedule_merge_scn_;
ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type_;
};
struct ObBatchFreezeTabletsParam : public ObBatchExecParam<ObTabletSchedulePair>

View File

@ -756,6 +756,7 @@ int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed_and_c
int ret = OB_SUCCESS;
int64_t expected_task_count = 0;
const int64_t tablet_size = medium_info.storage_schema_.get_tablet_size();
const bool is_column_store_medium_info = !medium_info.storage_schema_.is_row_store();
const ObSSTable *first_sstable = static_cast<const ObSSTable *>(result.handle_.get_table(0));
ObTablet *tablet = nullptr;
@ -809,7 +810,7 @@ int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed_and_c
// determine co major type && check if schema changed for sn
if (OB_FAIL(ret)) {
} else if (!tablet->is_row_store() && OB_FAIL(init_co_major_merge_type(result, medium_info))) {
} else if (is_column_store_medium_info && OB_FAIL(init_co_major_merge_type(result, medium_info))) {
STORAGE_LOG(WARN, "failed to init co major merge type", K(ret), K(tablet));
#ifdef OB_BUILD_SHARED_STORAGE
} else if (GCTX.is_shared_storage_mode()) {
@ -895,12 +896,14 @@ int ObMediumCompactionScheduleFunc::init_co_major_merge_type(
ObCOMajorMergePolicy::ObCOMajorMergeType major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE;
ObTabletTableIterator iter;
ObSEArray<ObITable*, OB_DEFAULT_SE_ARRAY_COUNT> tables;
if (OB_ISNULL(first_sstable) || OB_UNLIKELY(!first_sstable->is_co_sstable())) {
if (OB_ISNULL(first_sstable)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("first sstable in tables handle is null or not co sstable", K(ret), K(result.handle_));
} else if (ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP == merge_reason_) {
// REBUILD_COLUMN_GROUP is requested by user, only use row store to build column store
} else if (ObAdaptiveMergePolicy::REBUILD_COLUMN_GROUP == merge_reason_ || !first_sstable->is_co_sstable()) {
// REBUILD_COLUMN_GROUP is requested by user or implicitly required by delayed column group transform
// only use row store to build column store
medium_info.co_major_merge_type_ = ObCOMajorMergePolicy::USE_RS_BUILD_SCHEMA_MATCH_MERGE;
LOG_INFO("use row store to build column store", K(ret), K(merge_reason_), K(result.handle_));
} else if (FALSE_IT(co_sstable = static_cast<ObCOSSTableV2 *>(first_sstable))) {
} else if (OB_FAIL(iter.set_tablet_handle(tablet_handle_))) {
LOG_WARN("failed to set tablet handle", K(ret), K(iter), K(tablet_handle_));

View File

@ -735,7 +735,8 @@ int ObMediumCompactionInfoList::get_next_schedule_info(
const int64_t major_frozen_snapshot,
const bool is_mv_refresh_tablet,
ObMediumCompactionInfo::ObCompactionType &compaction_type,
int64_t &schedule_scn) const
int64_t &schedule_scn,
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type) const
{
int ret = OB_SUCCESS;
DLIST_FOREACH_X(info, get_list(), OB_SUCC(ret)) {
@ -747,6 +748,7 @@ int ObMediumCompactionInfoList::get_next_schedule_info(
|| is_mv_refresh_tablet) {
schedule_scn = info->medium_snapshot_;
compaction_type = (ObMediumCompactionInfo::ObCompactionType)info->compaction_type_;
co_major_merge_type = static_cast<ObCOMajorMergePolicy::ObCOMajorMergeType>(info->co_major_merge_type_);
}
break; // found one unfinish medium info, loop end
}

View File

@ -137,7 +137,8 @@ public:
const int64_t major_frozen_snapshot,
const bool is_mv_refresh_tablet,
ObMediumCompactionInfo::ObCompactionType &compaction_type,
int64_t &schedule_scn) const;
int64_t &schedule_scn,
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type) const;
OB_INLINE ObMediumCompactionInfo::ObCompactionType get_last_compaction_type() const
{
return static_cast<ObMediumCompactionInfo::ObCompactionType>(extra_info_.last_compaction_type_);

View File

@ -1863,6 +1863,8 @@ int ObCOMajorMergePolicy::decide_co_major_sstable_status(
if (storage_schema.has_all_column_group()) {
if (co_sstable.is_row_store_only_co_table()) {
major_sstable_status = ObCOMajorSSTableStatus::COL_ONLY_ALL;
} else if (co_sstable.is_rowkey_cg_base()) {
major_sstable_status = ObCOMajorSSTableStatus::PURE_COL_WITH_ALL;
} else {
major_sstable_status = ObCOMajorSSTableStatus::COL_WITH_ALL;
}

View File

@ -311,6 +311,7 @@ public:
1) ALL+EACH
ALL+EACH --(BUILD_ROW_STORE_MERGE)--> ALL --(USE_RS_BUILD_SCHEMA_MATCH_MERGE)--> ALL+EACH
ALL+EACH --(BUILD_COLUMN_STORE_MERGE)--> ALL+EACH
EACH --(BUILD_REDUNDANT_ROW_STORE_MERGE)--> ALL+EACH
2) EACH
EACH --(BUILD_ROW_STORE_MERGE)--> ALL --(USE_RS_BUILD_SCHEMA_MATCH_MERGE)--> EACH
EACH --(BUILD_COLUMN_STORE_MERGE)--> EACH
@ -349,6 +350,10 @@ public:
{
return USE_RS_BUILD_SCHEMA_MATCH_MERGE == major_merge_type;
}
static inline bool is_build_redundent_row_store_merge(const ObCOMajorMergeType &major_merge_type)
{
return BUILD_REDUNDANT_ROW_STORE_MERGE == major_merge_type;
}
static int decide_co_major_sstable_status(
const ObCOSSTableV2 &co_sstable,
const ObStorageSchema &storage_schema,

View File

@ -202,18 +202,19 @@ int ObScheduleTabletFunc::schedule_tablet_execute(
const ObTabletID &tablet_id = tablet.get_tablet_id();
bool can_merge = false;
int64_t schedule_scn = 0;
ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE;
if (OB_FAIL(ObTenantTabletScheduler::check_ready_for_major_merge(ls_id, tablet, MEDIUM_MERGE))) {
LOG_WARN("failed to check ready for major merge", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(get_schedule_execute_info(tablet, schedule_scn))) {
} else if (OB_FAIL(get_schedule_execute_info(tablet, schedule_scn, co_major_merge_type))) {
if (OB_NO_NEED_MERGE == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get schedule execute info", KR(ret), K_(ls_status), K(tablet_id));
}
} else if (OB_FAIL(check_with_schedule_scn(tablet, schedule_scn, tablet_status_, can_merge))) {
} else if (OB_FAIL(check_with_schedule_scn(tablet, schedule_scn, tablet_status_, can_merge, co_major_merge_type))) {
LOG_WARN("failed to check with schedule scn", KR(ret), K(schedule_scn));
} else if (can_merge) {
if (OB_FAIL(ObTenantTabletScheduler::schedule_merge_dag(ls_id, tablet, MEDIUM_MERGE, schedule_scn, EXEC_MODE_LOCAL))) {
if (OB_FAIL(ObTenantTabletScheduler::schedule_merge_dag(ls_id, tablet, MEDIUM_MERGE, schedule_scn, EXEC_MODE_LOCAL, nullptr/*dag_net_id*/, co_major_merge_type))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("failed to schedule medium merge dag", K(ret), K_(ls_status), K(tablet_id));
}
@ -228,7 +229,8 @@ int ObScheduleTabletFunc::schedule_tablet_execute(
int ObScheduleTabletFunc::get_schedule_execute_info(
ObTablet &tablet,
int64_t &schedule_scn)
int64_t &schedule_scn,
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type)
{
int ret = OB_SUCCESS;
const ObLSID &ls_id = ls_status_.ls_id_;
@ -265,7 +267,7 @@ int ObScheduleTabletFunc::get_schedule_execute_info(
LOG_INFO("mv creation has not finished, can not schedule mv tablet", K(ret),
K(last_major_snapshot));
} else if (OB_FAIL(tablet_status_.medium_list()->get_next_schedule_info(
last_major_snapshot, merge_version_, is_mv_major_refresh_tablet, compaction_type, schedule_scn))) {
last_major_snapshot, merge_version_, is_mv_major_refresh_tablet, compaction_type, schedule_scn, co_major_merge_type))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("failed to get next schedule info", KR(ret), K(last_major_snapshot), K_(merge_version));
}

View File

@ -46,7 +46,8 @@ private:
storage::ObTablet &tablet);
int get_schedule_execute_info(
storage::ObTablet &tablet,
int64_t &schedule_scn);
int64_t &schedule_scn,
ObCOMajorMergePolicy::ObCOMajorMergeType &co_major_merge_type);
private:
ObTabletStatusCache tablet_status_;
ObCompactionScheduleTimeGuard time_guard_;

View File

@ -1041,12 +1041,14 @@ int ObTenantTabletScheduler::schedule_merge_dag(
const ObMergeType merge_type,
const int64_t &merge_snapshot_version,
const ObExecMode exec_mode,
const ObDagId *dag_net_id /*= nullptr*/)
const ObDagId *dag_net_id /*= nullptr*/,
const ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type /*= ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE*/)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_ready_for_major_merge(ls_id, tablet, merge_type))) {
LOG_WARN("failed to check ready for major merge", K(ret), K(ls_id), K(tablet), K(merge_type));
} else if (is_major_merge_type(merge_type) && (!tablet.is_row_store() || is_convert_co_major_merge(merge_type))) {
} else if (is_major_merge_type(merge_type)
&& (!tablet.is_row_store() || is_convert_co_major_merge(merge_type) || ObCOMajorMergePolicy::is_use_rs_build_schema_match_merge(co_major_merge_type))) {
ObCOMergeDagParam param;
if (OB_FAIL(ObDagParamFunc::fill_param(ls_id, tablet, merge_type, merge_snapshot_version, exec_mode, dag_net_id, param))) {
LOG_WARN("failed to fill param", KR(ret));

View File

@ -231,7 +231,8 @@ public:
const ObMergeType merge_type,
const int64_t &merge_snapshot_version,
const ObExecMode exec_mode,
const ObDagId *dag_net_id = nullptr);
const ObDagId *dag_net_id = nullptr,
const ObCOMajorMergePolicy::ObCOMajorMergeType co_major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE);
static int schedule_convert_co_merge_dag_net(
const ObLSID &ls_id,
const ObTablet &tablet,

View File

@ -526,11 +526,15 @@ int ObStorageSchema::init(
STORAGE_LOG(WARN, "Failed to generate column group array", K(ret));
}
bool is_column_table_schema = false;
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(!ObStorageSchema::is_valid())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "storage schema is invalid", K(ret));
} else if (OB_FAIL(input_schema.get_is_column_store(is_column_table_schema))) {
STORAGE_LOG(WARN, "fail to check is column store", K(ret));
} else {
is_column_table_schema_ = is_column_table_schema;
is_cs_replica_compat_ = is_cg_array_generated_in_cs_replica();
is_inited_ = true;
}
@ -538,7 +542,6 @@ int ObStorageSchema::init(
if (OB_UNLIKELY(!is_inited_)) {
reset();
}
return ret;
}
@ -611,6 +614,7 @@ int ObStorageSchema::init(
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "storage schema is invalid", K(ret));
} else {
is_column_table_schema_ = old_schema.is_column_table_schema_;
is_cs_replica_compat_ = is_cg_array_generated_in_cs_replica();
is_inited_ = true;
}
@ -1407,10 +1411,10 @@ int ObStorageSchema::get_column_group_index(
bool ObStorageSchema::is_cg_array_generated_in_cs_replica() const
{
bool bret = false;
if (column_group_array_.count() <= 1) {
// row store
if (column_group_array_.count() <= 1 || is_column_table_schema_) {
// row store or column store table schema after v435 (all/rowkey cg is placed in the front of cg array)
} else {
bret = column_group_array_.at(0).is_rowkey_column_group(); // only cs replica set rowkey cg in the front of cg array
bret = column_group_array_.at(0).is_rowkey_column_group(); // cs replica will set rowkey cg in the front of cg array
}
return bret;
}

View File

@ -383,7 +383,7 @@ public:
static const int32_t SS_ONE_BIT = 1;
static const int32_t SS_HALF_BYTE = 4;
static const int32_t SS_ONE_BYTE = 8;
static const int32_t SS_RESERVED_BITS = 17;
static const int32_t SS_RESERVED_BITS = 16;
// STORAGE_SCHEMA_VERSION is for serde compatibility.
// Currently we do not use "standard" serde function macro,
@ -408,6 +408,7 @@ public:
uint32_t is_use_bloomfilter_ :SS_ONE_BIT;
uint32_t column_info_simplified_ :SS_ONE_BIT;
uint32_t is_cs_replica_compat_ :SS_ONE_BIT; // for storage schema on tablet
uint32_t is_column_table_schema_ :SS_ONE_BIT;
uint32_t reserved_ :SS_RESERVED_BITS;
};
};

View File

@ -522,7 +522,7 @@ int ObTablet::init_for_merge(
} else if (FALSE_IT(set_initial_addr())) {
} else if (OB_FAIL(inner_inc_macro_ref_cnt())) {
LOG_WARN("failed to increase macro ref cnt", K(ret));
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *param.storage_schema_, is_convert_co_merge))) {
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *storage_schema_addr_.ptr_, is_convert_co_merge))) {
LOG_ERROR("find error while checking tablet schema mismatch", K(ret), KPC(param.storage_schema_), K(old_tablet), K(param.compaction_info_));
} else if (OB_FAIL(check_table_store_flag_match_with_table_store_(table_store_addr_.get_ptr()))) {
LOG_WARN("failed to check table store flag match with table store", K(ret), K(old_tablet), K_(table_store_addr));
@ -782,8 +782,6 @@ int ObTablet::init_with_migrate_param(
} else if (FALSE_IT(set_initial_addr())) {
} else if (OB_FAIL(inner_inc_macro_ref_cnt())) {
LOG_WARN("failed to increase macro ref cnt", K(ret));
} else if (OB_FAIL(check_tablet_schema_mismatch(*this, *storage_schema_addr_.ptr_, false/*is_convert_co_major_merge*/))) {
LOG_ERROR("find error while checking tablet schema mismatch", K(ret), KPC(storage_schema_addr_.ptr_), KPC(this));
} else {
is_inited_ = true;
LOG_INFO("succeeded to init tablet with migration tablet param", K(ret), K(param), KPC(this));
@ -991,7 +989,7 @@ int ObTablet::init_for_sstable_replace(
LOG_WARN("update restore status for tablet split failed", K(ret), K(param), KPC(this));
} else if (OB_FAIL(try_update_start_scn())) {
LOG_WARN("failed to update start scn", K(ret), K(param), K(table_store_addr_));
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *storage_schema, false/*is_convert_co_major_merge*/))) {
} else if (OB_FAIL(check_tablet_schema_mismatch(old_tablet, *storage_schema_addr_.ptr_, false/*is_convert_co_major_merge*/))) {
LOG_ERROR("find error while checking tablet schema mismatch", K(ret), KPC(storage_schema), K(old_tablet));
} else if (OB_FAIL(table_store_cache_.init(table_store_addr_.get_ptr()->get_major_sstables(),
table_store_addr_.get_ptr()->get_minor_sstables(),
@ -1307,23 +1305,12 @@ int ObTablet::check_tablet_schema_mismatch(
LOG_WARN("ls handle is invalid or nullptr", K(ret), K(ls_handle), KP(ls));
} else if (ls->is_cs_replica()) {
LOG_INFO("For columns tore replica, allow old tablet and new schema mismatch", K(ret), K(old_tablet), K(storage_schema));
} else if (is_old_tablet_row_store) {
if (is_storage_schema_row_store) {
// row store status match
} else if (is_convert_co_major_merge) {
// convert co major merge
LOG_INFO("convert co major merge, old tablet is row store and new storage schema is column store", K(ret), K(old_tablet), K(storage_schema));
} else {
// delayed add column group
LOG_INFO("should be delayed add column group, old tablet is row store and new storage schema is column store", K(ret), K(old_tablet), K(storage_schema));
}
} else {
if (!is_storage_schema_row_store) {
// column store status match
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected schema, old tablet is column store while new storage schema is column store", K(ret), K(old_tablet), K(storage_schema));
}
} else if (!is_old_tablet_row_store && is_storage_schema_row_store) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected schema, old tablet is column store while new storage schema is column store", K(ret), K(old_tablet), K(storage_schema));
} else if (is_old_tablet_row_store
&& (!is_storage_schema_row_store || is_convert_co_major_merge)) {
LOG_INFO("old tablet is row store and new storage schema is column store", K(ret), K(old_tablet), K(storage_schema), K(is_storage_schema_row_store), K(is_convert_co_major_merge));
}
return ret;
}