[CP] Deserialize table schema problem in replay thread

This commit is contained in:
tino247
2023-11-15 02:10:35 +00:00
committed by ob-robot
parent 83ca5fdc52
commit f0c5512266
4 changed files with 64 additions and 44 deletions

View File

@ -2455,6 +2455,7 @@ OB_INLINE const char*& ob_get_origin_thread_name()
}
static const char* PARALLEL_DDL_THREAD_NAME = "DDLPQueueTh";
static const char* REPLAY_SERVICE_THREAD_NAME = "ReplaySrv";
// There are many clusters in arbitration server, we need a field identify the different clusters.
OB_INLINE int64_t &ob_get_cluster_id()

View File

@ -100,6 +100,7 @@ int ObCompatModeGetter::get_tablet_compat_mode(
int ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(const uint64_t tenant_id, bool &is_oracle_mode)
{
int ret = OB_SUCCESS;
is_oracle_mode = false;
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
if (OB_FAIL(instance().get_tenant_compat_mode(tenant_id, compat_mode))) {
@ -122,6 +123,7 @@ int ObCompatModeGetter::check_is_oracle_mode_with_table_id(
bool &is_oracle_mode)
{
int ret = OB_SUCCESS;
is_oracle_mode = false;
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
if (OB_FAIL(instance().get_table_compat_mode(tenant_id, table_id, compat_mode))) {
LOG_WARN("fail to get tenant mode", KR(ret), K(tenant_id), K(table_id));

View File

@ -6088,19 +6088,7 @@ int ObSimpleTableSchemaV2::check_if_oracle_compat_mode(bool &is_oracle_mode) con
const uint64_t tenant_id = get_tenant_id();
const int64_t table_id = get_table_id();
is_oracle_mode = false;
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
if (OB_FAIL(ObCompatModeGetter::get_table_compat_mode(tenant_id, table_id, compat_mode))) {
LOG_WARN("fail to get tenant mode", KR(ret), K(tenant_id), K(table_id));
} else if (lib::Worker::CompatMode::ORACLE == compat_mode) {
is_oracle_mode = true;
} else if (lib::Worker::CompatMode::MYSQL == compat_mode) {
is_oracle_mode = false;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("compat_mode should not be INVALID.", KR(ret), K(tenant_id), K(table_id));
}
return ret;
return ObCompatModeGetter::check_is_oracle_mode_with_table_id(tenant_id, table_id, is_oracle_mode);
}
int ObSimpleTableSchemaV2::check_is_duplicated(

View File

@ -2029,59 +2029,79 @@ int ObTableSchema::add_column(const ColumnType &column)
int ret = common::OB_SUCCESS;
char *buf = NULL;
ColumnType *local_column = NULL;
bool is_oracle_mode = lib::is_oracle_mode();
bool is_oracle_mode = false;
const char* thread_name = ob_get_origin_thread_name();
const bool in_replay_thread = OB_NOT_NULL(thread_name)
&& 0 == STRCMP(thread_name, REPLAY_SERVICE_THREAD_NAME);
const uint64_t mtl_tenant_id = MTL_ID();
if (!column.is_valid()) {
ret = common::OB_INVALID_ARGUMENT;
SHARE_SCHEMA_LOG(WARN, "The column is not valid", K(ret));
SHARE_SCHEMA_LOG(WARN, "The column is not valid", KR(ret));
} else if (is_user_table() && column_cnt_ > common::OB_USER_ROW_MAX_COLUMNS_COUNT) {
ret = common::OB_ERR_TOO_MANY_COLUMNS;
} else if (column.is_autoincrement() && (autoinc_column_id_ != 0)
&& (autoinc_column_id_ != column.get_column_id())) {
ret = common::OB_ERR_WRONG_AUTO_KEY;
SHARE_SCHEMA_LOG(WARN, "Only one auto increment row is allowed", K(ret));
SHARE_SCHEMA_LOG(WARN, "Only one auto increment row is allowed", KR(ret));
} else if (NULL == (buf = static_cast<char*>(alloc(sizeof(ColumnType))))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
SHARE_SCHEMA_LOG(ERROR, "Fail to allocate memory, ", "size", sizeof(ColumnType), K(ret));
} else if (static_cast<int64_t>(table_id_) > 0 // resolver will add column when table_id is invalid
&& OB_FAIL(check_if_oracle_compat_mode(is_oracle_mode))) {
// When deserialize column in physical restore, tenant id is wrong. We need to use lib::is_oracle_mode() to do this check.
SHARE_SCHEMA_LOG(WARN, "check if_oracle_compat_mode failed", K(ret), K(tenant_id_), K(table_id_));
is_oracle_mode = lib::is_oracle_mode();
ret = OB_SUCCESS;
SHARE_SCHEMA_LOG(WARN, "replace error code to OB_SUCCESS, because tenant_id is invalid in physical restore",
K(ret), K(tenant_id_), K(table_id_), K(is_oracle_mode));
} else if (static_cast<int64_t>(table_id_) <= 0 // deserialize create table arg
&& OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id_, is_oracle_mode))) {
SHARE_SCHEMA_LOG(WARN, "check if_oracle_compat_mode failed", K(ret), K(tenant_id_), K(table_id_));
SHARE_SCHEMA_LOG(ERROR, "Fail to allocate memory", KR(ret), "size", sizeof(ColumnType));
} else if (static_cast<int64_t>(table_id_) > 0
&& is_ls_reserved_table(table_id_)) {
// create or replay ls inner tablet (which should use tenant's compat mode
if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(mtl_tenant_id, is_oracle_mode))) {
SHARE_SCHEMA_LOG(WARN, "check if_oracle_compat_mode failed",
KR(ret), K(mtl_tenant_id), K(tenant_id_), K(table_id_));
}
} else if (in_replay_thread) {
// For replay thread in physical restore/standby tenant,
// either tenant_id which is persisted in log or lib::is_oracle_mode() is not credible.
// Compat mode should be fetched by MTL_ID().
// bugfix: 52769689
if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_table_id(mtl_tenant_id, table_id_, is_oracle_mode))) {
SHARE_SCHEMA_LOG(WARN, "check if_oracle_compat_mode failed",
KR(ret), K(mtl_tenant_id), K(tenant_id_), K(table_id_));
}
} else if (static_cast<int64_t>(table_id_) > 0) {
if (OB_FAIL(check_if_oracle_compat_mode(is_oracle_mode))) {
SHARE_SCHEMA_LOG(WARN, "failed to get tenant's compat_mode",
KR(ret), K(mtl_tenant_id), K(tenant_id_), K(table_id_));
}
} else if (static_cast<int64_t>(table_id_) <= 0) {
// arg deserialize or ddl resolver
if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id_, is_oracle_mode))) {
SHARE_SCHEMA_LOG(WARN, "check if_oracle_compat_mode failed",
KR(ret), K(mtl_tenant_id), K(tenant_id_), K(table_id_));
}
}
if (OB_FAIL(ret)) {
} else if (!is_view_table() && OB_FAIL(check_row_length(is_oracle_mode, NULL, &column))) {
SHARE_SCHEMA_LOG(WARN, "check row length failed", K(ret));
SHARE_SCHEMA_LOG(WARN, "check row length failed", KR(ret), K(tenant_id_), K(table_id_), K(column));
} else {
if (NULL == (local_column = new (buf) ColumnType(allocator_))) {
ret = common::OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(WARN, "Fail to new local_column", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to new local_column", KR(ret));
} else {
*local_column = column;
ret = local_column->get_err_ret();
local_column->set_table_id(table_id_);
if (OB_FAIL(ret)) {
SHARE_SCHEMA_LOG(WARN, "failed copy assign column", K(ret), K(column));
SHARE_SCHEMA_LOG(WARN, "failed copy assign column", KR(ret), K(column));
} else if (!local_column->is_valid()) {
ret = common::OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(WARN, "The local column is not valid", K(ret));
SHARE_SCHEMA_LOG(WARN, "The local column is not valid", KR(ret));
} else if (OB_FAIL(add_column_update_prev_id(local_column))) {
SHARE_SCHEMA_LOG(WARN, "Fail to update previous next column id", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to update previous next column id", KR(ret));
} else if (OB_FAIL(add_col_to_id_hash_array(local_column))) {
SHARE_SCHEMA_LOG(WARN, "Fail to add column to id_hash_array", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to add column to id_hash_array", KR(ret));
} else if (OB_FAIL(add_col_to_name_hash_array(is_oracle_mode, local_column))) {
SHARE_SCHEMA_LOG(WARN, "Fail to add column to name_hash_array", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to add column to name_hash_array", KR(ret));
} else if (OB_FAIL(add_col_to_column_array(local_column))) {
SHARE_SCHEMA_LOG(WARN, "Fail to push column to array", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to push column to array", KR(ret));
} else {
if (column.is_rowkey_column()) {
if (OB_FAIL(set_rowkey_info(column))) {
SHARE_SCHEMA_LOG(WARN, "set rowkey info to table schema failed", K(ret));
SHARE_SCHEMA_LOG(WARN, "set rowkey info to table schema failed", KR(ret));
}
}
if (OB_SUCC(ret) && column.is_index_column()) {
@ -2092,7 +2112,7 @@ int ObTableSchema::add_column(const ColumnType &column)
index_column.fulltext_flag_ = column.is_fulltext_column();
index_column.spatial_flag_ = column.is_spatial_generated_column();
if (OB_FAIL(index_info_.set_column(column.get_index_position() - 1, index_column))) {
SHARE_SCHEMA_LOG(WARN, "Fail to set column to index info", K(ret));
SHARE_SCHEMA_LOG(WARN, "Fail to set column to index info", KR(ret));
} else {
if (index_column_num_ < index_info_.get_size()) {
index_column_num_ = index_info_.get_size();
@ -2105,14 +2125,14 @@ int ObTableSchema::add_column(const ColumnType &column)
}
if (OB_SUCC(ret) && column.is_generated_column()) {
if (OB_FAIL(generated_columns_.add_member(column.get_column_id() - common::OB_APP_MIN_COLUMN_ID))) {
SHARE_SCHEMA_LOG(WARN, "add column id to generated columns failed", K(column));
SHARE_SCHEMA_LOG(WARN, "add column id to generated columns failed", KR(ret), K(column));
} else if (!column.is_column_stored_in_sstable()) {
++virtual_column_cnt_;
}
}
if (OB_SUCC(ret) && column.is_label_se_column()) {
if (OB_FAIL(label_se_column_ids_.push_back(column.get_column_id()))) {
SHARE_SCHEMA_LOG(WARN, "fail to do array push back", K(ret), K_(label_se_column_ids));
SHARE_SCHEMA_LOG(WARN, "fail to do array push back", KR(ret), K_(label_se_column_ids));
}
}
}
@ -2125,7 +2145,7 @@ int ObTableSchema::add_column(const ColumnType &column)
if (column.is_part_key_column()) {
if (OB_FAIL(partition_key_info_.set_column(column.get_part_key_pos() - 1,
partition_key_column))) {
SHARE_SCHEMA_LOG(WARN, "Failed to set partition coumn");
SHARE_SCHEMA_LOG(WARN, "Failed to set partition coumn", KR(ret));
} else {
part_key_column_num_ = partition_key_info_.get_size();
}
@ -2135,7 +2155,7 @@ int ObTableSchema::add_column(const ColumnType &column)
if (column.is_subpart_key_column()) {
if (OB_FAIL(subpartition_key_info_.set_column(column.get_subpart_key_pos() - 1,
partition_key_column))) {
SHARE_SCHEMA_LOG(WARN, "Failed to set subpartition column", K(ret));
SHARE_SCHEMA_LOG(WARN, "Failed to set subpartition column", KR(ret));
} else {
subpart_key_column_num_ = subpartition_key_info_.get_size();
}
@ -2158,10 +2178,10 @@ int ObTableSchema::add_column(const ColumnType &column)
const common::ObRowkeyColumn *tmp_column = NULL;
if (NULL == (tmp_column = rowkey_info_.get_column(i))) {
ret = common::OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(WARN, "the column is NULL, ", K(i));
SHARE_SCHEMA_LOG(WARN, "the column is NULL, ", KR(ret), K(i));
} else if (tmp_column->column_id_ > common::OB_MIN_SHADOW_COLUMN_ID) {
if (OB_FAIL(shadow_rowkey_info_.set_column(shadow_pk_pos, *tmp_column))) {
SHARE_SCHEMA_LOG(WARN, "fail to set column to shadow rowkey info", K(ret), K(*tmp_column));
SHARE_SCHEMA_LOG(WARN, "fail to set column to shadow rowkey info", KR(ret), KPC(tmp_column));
} else {
++shadow_pk_pos;
}
@ -2174,6 +2194,15 @@ int ObTableSchema::add_column(const ColumnType &column)
set_max_used_column_id(column.get_column_id());
}
}
if (OB_FAIL(ret)) {
SHARE_SCHEMA_LOG(WARN, "add column failed", KR(ret), K(mtl_tenant_id),
K(tenant_id_), K(table_id_), K(is_oracle_mode), K(in_replay_thread),
"thead_name", OB_NOT_NULL(thread_name) ? thread_name : "NULL", K(column));
} else {
SHARE_SCHEMA_LOG(TRACE, "add column success", KR(ret), K(mtl_tenant_id),
K(tenant_id_), K(table_id_), K(is_oracle_mode), K(in_replay_thread),
"thead_name", OB_NOT_NULL(thread_name) ? thread_name : "NULL", K(column));
}
return ret;
}