fix column array access out of bounds problem when there are virtual generated columns

This commit is contained in:
obdev 2023-03-07 05:43:45 +00:00 committed by ob-robot
parent 851f6655e8
commit cb8394b24e
15 changed files with 67 additions and 45 deletions

View File

@ -165,7 +165,7 @@ int ObTableLoadBeginP::init_idx_array(const ObTableSchema *table_schema)
{
int ret = OB_SUCCESS;
ObSEArray<ObColDesc, 64> column_descs;
if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) {
LOG_WARN("fail to get column ids", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && (i < column_descs.count()); ++i) {

View File

@ -189,9 +189,7 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.table_id_ = param_.table_id_;
merge_param.target_table_id_ = store_ctx_->ctx_->ddl_param_.dest_table_id_;
merge_param.rowkey_column_num_ = store_ctx_->ctx_->schema_.rowkey_column_count_;
merge_param.schema_column_count_ =
(store_ctx_->ctx_->schema_.is_heap_table_ ? store_ctx_->table_data_desc_.column_count_ + 1
: store_ctx_->table_data_desc_.column_count_);
merge_param.store_column_count_ = store_ctx_->ctx_->schema_.store_column_count_;
merge_param.table_data_desc_ = store_ctx_->table_data_desc_;
merge_param.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_);
merge_param.col_descs_ = &(store_ctx_->ctx_->schema_.column_descs_);

View File

@ -127,7 +127,7 @@ int ObTableLoadPartitionCalc::init_rowkey_index(const ObTableSchema *table_schem
{
int ret = OB_SUCCESS;
ObSEArray<ObColDesc, 64> column_descs;
if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) {
LOG_WARN("fail to get column ids", KR(ret));
} else if (OB_UNLIKELY(column_descs.empty())) {
ret = OB_ERR_UNEXPECTED;

View File

@ -146,7 +146,7 @@ ObTableLoadSchema::ObTableLoadSchema()
has_autoinc_column_(false),
has_identity_column_(false),
rowkey_column_count_(0),
column_count_(0),
store_column_count_(0),
collation_type_(CS_TYPE_INVALID),
schema_version_(0),
is_inited_(false)
@ -168,7 +168,7 @@ void ObTableLoadSchema::reset()
has_autoinc_column_ = false;
has_identity_column_ = false;
rowkey_column_count_ = 0;
column_count_ = 0;
store_column_count_ = 0;
collation_type_ = CS_TYPE_INVALID;
schema_version_ = 0;
column_descs_.reset();
@ -211,15 +211,16 @@ int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema)
is_heap_table_ = table_schema->is_heap_table();
has_autoinc_column_ = (table_schema->get_autoinc_column_id() != 0);
rowkey_column_count_ = table_schema->get_rowkey_column_num();
column_count_ = table_schema->get_column_count();
collation_type_ = table_schema->get_collation_type();
schema_version_ = table_schema->get_schema_version();
if (OB_FAIL(ObTableLoadUtils::deep_copy(table_schema->get_table_name_str(), table_name_,
allocator_))) {
LOG_WARN("fail to deep copy table name", KR(ret));
} else if (OB_FAIL(table_schema->get_column_ids(column_descs_))) {
} else if (OB_FAIL(table_schema->get_store_column_count(store_column_count_))) {
LOG_WARN("fail to get store column count", KR(ret));
} else if (OB_FAIL(table_schema->get_column_ids(column_descs_, false))) {
LOG_WARN("fail to get column descs", KR(ret));
} else if (OB_FAIL(table_schema->get_multi_version_column_descs(multi_version_column_descs_))) {
} else if (OB_FAIL(table_schema->get_multi_version_column_descs(multi_version_column_descs_))) {
LOG_WARN("fail to get multi version column descs", KR(ret));
} else if (OB_FAIL(datum_utils_.init(multi_version_column_descs_, rowkey_column_count_,
lib::is_oracle_mode(), allocator_))) {

View File

@ -40,8 +40,8 @@ public:
void reset();
int init(uint64_t tenant_id, uint64_t table_id);
bool is_valid() const { return is_inited_; }
TO_STRING_KV(K_(table_name), K_(is_partitioned_table), K_(is_heap_table),
K_(has_autoinc_column), K_(has_identity_column), K_(rowkey_column_count), K_(column_count),
TO_STRING_KV(K_(table_name), K_(is_partitioned_table), K_(is_heap_table), K_(has_autoinc_column),
K_(has_identity_column), K_(rowkey_column_count), K_(store_column_count),
K_(collation_type), K_(column_descs), K_(is_inited));
private:
int init_table_schema(const share::schema::ObTableSchema *table_schema);
@ -53,9 +53,12 @@ public:
bool has_autoinc_column_;
bool has_identity_column_;
int64_t rowkey_column_count_;
int64_t column_count_;
// column count in store, does not contain virtual generated columns
int64_t store_column_count_;
common::ObCollationType collation_type_;
int64_t schema_version_;
// if it is a heap table, it contains hidden primary key column
// does not contain virtual generated columns
common::ObArray<share::schema::ObColDesc> column_descs_;
common::ObArray<share::schema::ObColDesc> multi_version_column_descs_;
blocksstable::ObStorageDatumUtils datum_utils_;

View File

@ -55,6 +55,11 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) {
LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_),
K(param_.table_id_));
} else if (OB_UNLIKELY(param.column_count_ != (schema_.is_heap_table_
? (schema_.store_column_count_ - 1)
: schema_.store_column_count_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column count", KR(ret), K(param.column_count_), K(schema_.store_column_count_), K(schema_.is_heap_table_));
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) {
LOG_WARN("fail to init allocator", KR(ret));
} else if (OB_FAIL(trans_ctx_allocator_.init("TLD_TCtxPool", param_.tenant_id_))) {

View File

@ -106,7 +106,6 @@ ObTableLoadTransStoreWriter::ObTableLoadTransStoreWriter(ObTableLoadTransStore *
param_(trans_ctx_->ctx_->param_),
allocator_("TLD_TSWriter", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_),
table_data_desc_(nullptr),
column_descs_(nullptr),
ref_count_(0),
is_flush_(false),
is_inited_(false)
@ -134,13 +133,12 @@ int ObTableLoadTransStoreWriter::init()
} else if (OB_UNLIKELY(trans_store_->session_store_array_.count() != param_.session_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KPC(trans_store_));
} else if (OB_FAIL(OTTZ_MGR.get_tenant_tz(param_.tenant_id_, tz_info_.get_tz_map_wrap()))) {
LOG_WARN("fail to get tenant time zone", KR(ret), K(param_.tenant_id_));
} else {
table_data_desc_ = &trans_ctx_->ctx_->store_ctx_->table_data_desc_;
table_data_desc_ = &store_ctx_->table_data_desc_;
collation_type_ = trans_ctx_->ctx_->schema_.collation_type_;
column_descs_ = &(trans_ctx_->ctx_->schema_.column_descs_);
if (OB_FAIL(init_session_ctx_array())) {
if (OB_FAIL(OTTZ_MGR.get_tenant_tz(param_.tenant_id_, tz_info_.get_tz_map_wrap()))) {
LOG_WARN("fail to get tenant time zone", KR(ret), K(param_.tenant_id_));
} else if (OB_FAIL(init_session_ctx_array())) {
LOG_WARN("fail to init session ctx array", KR(ret));
} else if (OB_FAIL(init_column_schemas())) {
LOG_WARN("fail to init column schemas", KR(ret));
@ -154,15 +152,16 @@ int ObTableLoadTransStoreWriter::init()
int ObTableLoadTransStoreWriter::init_column_schemas()
{
int ret = OB_SUCCESS;
const ObIArray<ObColDesc> &column_descs = store_ctx_->ctx_->schema_.column_descs_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(ObTableLoadSchema::get_table_schema(param_.tenant_id_, param_.table_id_, schema_guard,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(param_));
}
for (int64_t i = 0; OB_SUCC(ret) && i < table_schema->get_column_count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) {
const ObColumnSchemaV2 *column_schema =
table_schema->get_column_schema(column_descs_->at(i).col_id_);
table_schema->get_column_schema(column_descs.at(i).col_id_);
if (column_schema->is_hidden()) {
} else if (OB_FAIL(column_schemas_.push_back(column_schema))) {
LOG_WARN("failed to push back column schema", K(ret), K(i), KPC(column_schema));

View File

@ -96,7 +96,8 @@ private:
common::ObCollationType collation_type_;
common::ObTimeZoneInfo tz_info_;
ObTableLoadTimeConverter time_cvrt_;
const common::ObIArray<share::schema::ObColDesc> *column_descs_;
// does not contain hidden primary key columns
// and does not contain virtual generated columns
common::ObArray<const share::schema::ObColumnSchemaV2 *> column_schemas_;
struct SessionContext
{

View File

@ -21,7 +21,10 @@ using namespace sql;
*/
ObDirectLoadDataFuseParam::ObDirectLoadDataFuseParam()
: datum_utils_(nullptr), error_row_handler_(nullptr), result_info_(nullptr)
: store_column_count_(0),
datum_utils_(nullptr),
error_row_handler_(nullptr),
result_info_(nullptr)
{
}
@ -31,8 +34,8 @@ ObDirectLoadDataFuseParam::~ObDirectLoadDataFuseParam()
bool ObDirectLoadDataFuseParam::is_valid() const
{
return tablet_id_.is_valid() && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != error_row_handler_ && nullptr != result_info_;
return tablet_id_.is_valid() && store_column_count_ > 0 && table_data_desc_.is_valid() &&
nullptr != datum_utils_ && nullptr != error_row_handler_ && nullptr != result_info_;
}
/**
@ -186,6 +189,9 @@ int ObDirectLoadDataFuse::supply_consume()
} else {
ret = OB_SUCCESS;
}
} else if (OB_UNLIKELY(item.datum_row_->count_ != param_.store_column_count_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column count", KR(ret), K(item.datum_row_->count_), K(param_.store_column_count_));
} else {
item.iter_idx_ = iter_idx;
if (OB_FAIL(rows_merger_.push(item))) {

View File

@ -25,10 +25,11 @@ public:
ObDirectLoadDataFuseParam();
~ObDirectLoadDataFuseParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(error_row_handler),
KP_(result_info));
TO_STRING_KV(K_(tablet_id), K_(store_column_count), K_(table_data_desc), KP_(datum_utils),
KP_(error_row_handler), KP_(result_info));
public:
common::ObTabletID tablet_id_;
int64_t store_column_count_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
observer::ObTableLoadErrorRowHandler *error_row_handler_;

View File

@ -38,7 +38,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam()
: table_id_(OB_INVALID_ID),
target_table_id_(OB_INVALID_ID),
rowkey_column_num_(0),
schema_column_count_(0),
store_column_count_(0),
datum_utils_(nullptr),
col_descs_(nullptr),
is_heap_table_(false),
@ -56,7 +56,7 @@ ObDirectLoadMergeParam::~ObDirectLoadMergeParam()
bool ObDirectLoadMergeParam::is_valid() const
{
return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < schema_column_count_ &&
return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ &&
table_data_desc_.is_valid() && nullptr != datum_utils_ && nullptr != col_descs_ &&
nullptr != insert_table_ctx_ && nullptr != error_row_handler_;
}
@ -209,8 +209,7 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
} else {
int64_t table_row_cnt = 0;
int64_t table_avg_len = 0;
int64_t col_cnt =
param_.is_heap_table_ ? param_.schema_column_count_ - 1 : param_.schema_column_count_;
int64_t col_cnt = param_.table_data_desc_.column_count_;
ObOptTableStat *table_stat = nullptr;
ObOptDmlStat dml_stat;
StatLevel stat_level;

View File

@ -42,7 +42,7 @@ public:
ObDirectLoadMergeParam();
~ObDirectLoadMergeParam();
bool is_valid() const;
TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(schema_column_count),
TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count),
K_(table_data_desc), KP_(datum_utils), K_(is_heap_table), K_(is_fast_heap_table),
K_(online_opt_stat_gather), KP_(insert_table_ctx), KP_(error_row_handler),
KP_(result_info));
@ -50,7 +50,7 @@ public:
uint64_t table_id_;
uint64_t target_table_id_;
int64_t rowkey_column_num_;
int64_t schema_column_count_;
int64_t store_column_count_;
storage::ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;

View File

@ -225,9 +225,11 @@ int ObDirectLoadOriginTableScanner::init_table_access_param()
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = origin_table_->get_meta().table_id_;
const ObTabletID &tablet_id = origin_table_->get_meta().tablet_id_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
ObRelativeTable relative_table;
int64_t store_column_count = 0;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id,
schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
@ -238,11 +240,12 @@ int ObDirectLoadOriginTableScanner::init_table_access_param()
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(schema_param_.convert(table_schema))) {
LOG_WARN("fail to convert schema para", KR(ret));
} else if (OB_FAIL(relative_table.init(&schema_param_, origin_table_->get_meta().tablet_id_))) {
} else if (OB_FAIL(relative_table.init(&schema_param_, tablet_id))) {
LOG_WARN("fail to init relative table", KR(ret));
} else if (OB_FAIL(table_schema->get_store_column_count(store_column_count))) {
LOG_WARN("fail to get store column count", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < schema_param_.get_read_info().get_schema_column_count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < store_column_count; ++i) {
if (OB_FAIL(col_ids_.push_back(i))) {
LOG_WARN("fail to push back col id", KR(ret), K(i));
}
@ -262,8 +265,12 @@ int ObDirectLoadOriginTableScanner::init_table_access_ctx()
{
int ret = OB_SUCCESS;
const int64_t snapshot_version = ObTimeUtil::current_time_ns();
ObQueryFlag query_flag(ObQueryFlag::Forward, false /*daily_merge*/, true /*optimize*/,
false /*whole_macro_scan*/, false /*full_row*/, false /*index_back*/,
ObQueryFlag query_flag(ObQueryFlag::Forward,
false /*daily_merge*/,
true /*optimize*/,
false /*whole_macro_scan*/,
false /*full_row*/,
false /*index_back*/,
false /*query_stat*/); //whole_macro_scan use false,otherwise query range is not overlap with sstable range will report error
ObVersionRange trans_version_range;
query_flag.multi_version_minor_merge_ = false;

View File

@ -77,7 +77,7 @@ private:
private:
common::ObArenaAllocator allocator_;
ObDirectLoadOriginTable *origin_table_;
ObSEArray<int32_t, OB_MAX_COLUMN_NUMBER> col_ids_;
ObArray<int32_t> col_ids_;
share::schema::ObTableSchemaParam schema_param_;
ObTableAccessParam table_access_param_;
ObDirectLoadIOCallback io_callback_;

View File

@ -232,6 +232,7 @@ int ObDirectLoadPartitionRangeMergeTask::RowIterator::init(
// init data_fuse_
ObDirectLoadDataFuseParam data_fuse_param;
data_fuse_param.tablet_id_ = tablet_id;
data_fuse_param.store_column_count_ = merge_param.store_column_count_;
data_fuse_param.table_data_desc_ = merge_param.table_data_desc_;
data_fuse_param.datum_utils_ = merge_param.datum_utils_;
data_fuse_param.error_row_handler_ = merge_param.error_row_handler_;
@ -241,7 +242,7 @@ int ObDirectLoadPartitionRangeMergeTask::RowIterator::init(
LOG_WARN("fail to init data fuse", KR(ret));
}
// init datum_row_
else if (OB_FAIL(datum_row_.init(merge_param.schema_column_count_ +
else if (OB_FAIL(datum_row_.init(merge_param.store_column_count_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
LOG_WARN("fail to init datum row", KR(ret));
} else {
@ -386,6 +387,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::RowIterator::init(
// init data_fuse_
ObDirectLoadDataFuseParam data_fuse_param;
data_fuse_param.tablet_id_ = tablet_id;
data_fuse_param.store_column_count_ = merge_param.store_column_count_;
data_fuse_param.table_data_desc_ = merge_param.table_data_desc_;
data_fuse_param.datum_utils_ = merge_param.datum_utils_;
data_fuse_param.error_row_handler_ = merge_param.error_row_handler_;
@ -395,7 +397,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::RowIterator::init(
LOG_WARN("fail to init data fuse", KR(ret));
}
// init datum_row_
else if (OB_FAIL(datum_row_.init(merge_param.schema_column_count_ +
else if (OB_FAIL(datum_row_.init(merge_param.store_column_count_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
LOG_WARN("fail to init datum row", KR(ret));
} else {
@ -550,7 +552,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init(
LOG_WARN("fail to init fragment scanner", KR(ret));
}
// init datum_row_
else if (OB_FAIL(datum_row_.init(merge_param.schema_column_count_ +
else if (OB_FAIL(datum_row_.init(merge_param.store_column_count_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
LOG_WARN("fail to init datum row", KR(ret));
} else {
@ -560,7 +562,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init(
datum_row_.storage_datums_[merge_param.rowkey_column_num_ + 1].set_int(0); // fill sql_no
deserialize_datums_ = datum_row_.storage_datums_ + merge_param.rowkey_column_num_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
deserialize_datum_cnt_ = merge_param.schema_column_count_ - merge_param.rowkey_column_num_;
deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_;
pk_interval_ = pk_interval;
result_info_ = merge_param.result_info_;
is_inited_ = true;
@ -702,7 +704,7 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::init(
LOG_WARN("fail to init tablet whole scanner", KR(ret));
}
// init datum_row_
else if (OB_FAIL(datum_row_.init(merge_param.schema_column_count_ +
else if (OB_FAIL(datum_row_.init(merge_param.store_column_count_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
LOG_WARN("fail to init datum row", KR(ret));
} else {
@ -712,7 +714,7 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::init(
datum_row_.storage_datums_[merge_param.rowkey_column_num_ + 1].set_int(0); // fill sql_no
deserialize_datums_ = datum_row_.storage_datums_ + merge_param.rowkey_column_num_ +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
deserialize_datum_cnt_ = merge_param.schema_column_count_ - merge_param.rowkey_column_num_;
deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_;
pk_interval_ = pk_interval;
result_info_ = merge_param.result_info_;
is_inited_ = true;