fix bug:partition column with auto-increment is not supported

This commit is contained in:
Revendell
2023-02-24 14:52:21 +00:00
committed by ob-robot
parent e058f14334
commit 9856c9374e
6 changed files with 127 additions and 17 deletions

View File

@ -58,9 +58,11 @@ int ObTableLoadObjCaster::convert_obj(const ObObjType &expect_type, const ObObj
{
int ret = OB_SUCCESS;
dest = &src;
if (!src.is_null() && lib::is_mysql_mode() && 0 == src.get_val_len() && !ob_is_string_tc(expect_type)) {
if (src.is_string_type() && !src.is_null() && lib::is_mysql_mode() && 0 == src.get_val_len() &&
!ob_is_string_tc(expect_type)) {
dest = &zero_obj;
} else if (lib::is_oracle_mode() && (src.is_null_oracle() || 0 == src.get_val_len())) {
} else if (src.is_string_type() && lib::is_oracle_mode() &&
(src.is_null_oracle() || 0 == src.get_val_len())) {
dest = &null_obj;
}
return ret;

View File

@ -238,12 +238,6 @@ int ObTableLoadPartitionCalc::get_row(const ObTableLoadObjRow &obj_row, int32_t
if (OB_UNLIKELY(obj_index >= length)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid length", KR(ret), K(obj_index), K(length));
} else if (index_and_type.column_schema_->is_identity_column() && obj_row.cells_[obj_index].is_null()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition column with identity cannot be null", KR(ret), K(index_and_type.column_schema_->get_column_name()));
} else if (index_and_type.column_schema_->is_autoincrement() && obj_row.cells_[obj_index].is_null()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partition column with auto_increment cannot be null", KR(ret), K(index_and_type.column_schema_->get_column_name()));
} else if (OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
obj_row.cells_[obj_index], rowkey_objs[i]))) {
LOG_WARN("fail to cast obj", KR(ret));

View File

@ -49,7 +49,7 @@ private:
common::ObIAllocator &allocator) const;
int get_partition_by_row(common::ObIArray<common::ObNewRow> &part_rows,
common::ObIArray<table::ObTableLoadPartitionId> &partition_ids) const;
private:
public:
struct IndexAndType
{
IndexAndType() : index_(-1) {}
@ -57,6 +57,10 @@ private:
const share::schema::ObColumnSchemaV2 *column_schema_;
TO_STRING_KV(K_(index), KP_(column_schema));
};
public:
table::ObTableLoadArray<IndexAndType> rowkey_obj_index_;
common::ObTimeZoneInfo tz_info_;
ObTableLoadTimeConverter time_cvrt_;
private:
// data members
uint64_t tenant_id_;
@ -69,9 +73,6 @@ private:
sql::ObSqlCtx sql_ctx_;
sql::ObExecContext exec_ctx_;
sql::ObTableLocation table_location_;
table::ObTableLoadArray<IndexAndType> rowkey_obj_index_;
common::ObTimeZoneInfo tz_info_;
ObTableLoadTimeConverter time_cvrt_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(ObTableLoadPartitionCalc);
};

View File

@ -4,18 +4,24 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_autoinc_nextval.h"
#include "observer/table_load/ob_table_load_trans_bucket_writer.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_obj_cast.h"
#include "observer/table_load/ob_table_load_partition_calc.h"
#include "observer/table_load/ob_table_load_stat.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_trans_ctx.h"
#include "share/ob_autoincrement_service.h"
#include "share/sequence/ob_sequence_cache.h"
namespace oceanbase
{
namespace observer
{
using namespace blocksstable;
using namespace common;
using namespace common::hash;
using namespace share::schema;
@ -157,7 +163,7 @@ int ObTableLoadTransBucketWriter::advance_sequence_no(int32_t session_id, uint64
return ret;
}
int ObTableLoadTransBucketWriter::write(int32_t session_id, const ObTableLoadObjRowArray &obj_rows)
int ObTableLoadTransBucketWriter::write(int32_t session_id, ObTableLoadObjRowArray &obj_rows)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -173,7 +179,10 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, const ObTableLoadObj
LOG_WARN("fail to write for non partitioned", KR(ret));
}
} else {
if (OB_FAIL(write_for_partitioned(session_ctx, obj_rows))) {
if (OB_FAIL(handle_partition_with_autoinc_identity(trans_ctx_->ctx_->store_ctx_, session_ctx,
obj_rows, param_.sql_mode_, session_id))) {
LOG_WARN("fail to handle partition column with autoincrement or identity", KR(ret));
} else if (OB_FAIL(write_for_partitioned(session_ctx, obj_rows))) {
LOG_WARN("fail to write for partitioned", KR(ret));
}
}
@ -187,6 +196,99 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, const ObTableLoadObj
return ret;
}
int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
ObTableLoadStoreCtx *&store_ctx, SessionContext &session_ctx,
table::ObTableLoadObjRowArray &obj_rows, const uint64_t &sql_mode, int32_t session_id)
{
int ret = OB_SUCCESS;
const int64_t row_count = obj_rows.count();
for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) {
ObObj out_obj;
ObStorageDatum storage_datum;
ObTableLoadObjRow &obj_row = obj_rows.at(j);
const int64_t rowkey_obj_count = coordinator_ctx_->partition_calc_.rowkey_obj_index_.count();
ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_));
ObCastCtx cast_ctx(&(session_ctx.allocator_), &cast_params, CM_NONE,
ObCharset::get_system_collation());
ObTableLoadCastObjCtx cast_obj_ctx(&(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
false);
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_obj_count; ++i) {
out_obj.set_null();
const ObTableLoadPartitionCalc::IndexAndType &index_and_type =
coordinator_ctx_->partition_calc_.rowkey_obj_index_.at(i);
const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_;
const int64_t obj_index = index_and_type.index_;
if (OB_UNLIKELY(obj_index >= param_.column_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid length", KR(ret), K(obj_index), K(param_.column_count_));
} else if ((column_schema->is_identity_column() || column_schema->is_autoincrement()) &&
column_schema->is_tbl_part_key_column()) {
if (!obj_row.cells_[obj_index].is_null() &&
OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
obj_row.cells_[obj_index], out_obj))) {
LOG_WARN("fail to cast obj", KR(ret));
} else if (OB_FAIL(storage_datum.from_obj_enhance(out_obj))) {
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
} else if (column_schema->is_autoincrement() &&
OB_FAIL(handle_autoinc_column(store_ctx, storage_datum,
column_schema->get_meta_type().get_type_class(),
session_id, sql_mode))) {
LOG_WARN("fail to handle autoinc column", KR(ret), K(i), K(storage_datum));
} else if (column_schema->is_identity_column() &&
OB_FAIL(handle_identity_column(store_ctx, column_schema, storage_datum,
session_ctx.allocator_))) {
LOG_WARN("fail to handle identity column", KR(ret), K(i), K(storage_datum));
} else if (OB_FAIL(storage_datum.to_obj_enhance(obj_row.cells_[obj_index],
column_schema->get_meta_type()))) {
LOG_WARN("fail to obj enhance", KR(ret), K(obj_row.cells_[obj_index]));
} else if (OB_FAIL(ob_write_obj(session_ctx.allocator_, obj_row.cells_[obj_index],
obj_row.cells_[obj_index]))) {
LOG_WARN("fail to deep copy obj", KR(ret), K(obj_row.cells_[obj_index]));
}
}
}
}
return ret;
}
int ObTableLoadTransBucketWriter::handle_autoinc_column(ObTableLoadStoreCtx *&store_ctx,
ObStorageDatum &datum,
const ObObjTypeClass &tc,
int32_t session_id,
const uint64_t &sql_mode)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadAutoincNextval::eval_nextval(
&(store_ctx->session_ctx_array_[session_id - 1].autoinc_param_), datum, tc, sql_mode))) {
LOG_WARN("fail to get auto increment next value", KR(ret));
}
return ret;
}
int ObTableLoadTransBucketWriter::handle_identity_column(ObTableLoadStoreCtx *&store_ctx,
const ObColumnSchemaV2 *column_schema,
ObStorageDatum &datum,
ObArenaAllocator &cast_allocator)
{
int ret = OB_SUCCESS;
if (column_schema->is_always_identity_column()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support always identity column", KR(ret));
} else if (column_schema->is_default_identity_column() && datum.is_null()) {
ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN;
LOG_WARN("default identity column has null value", KR(ret));
} else if (column_schema->is_default_on_null_identity_column()) {
ObSequenceValue seq_value;
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(store_ctx->sequence_schema_,
cast_allocator, seq_value))) {
LOG_WARN("fail get nextval for seq", KR(ret));
} else if (datum.is_null()) {
datum.set_number(seq_value.val());
}
}
return ret;
}
int ObTableLoadTransBucketWriter::write_for_non_partitioned(SessionContext &session_ctx,
const ObTableLoadObjRowArray &obj_rows)
{

View File

@ -16,6 +16,7 @@ namespace oceanbase
namespace observer
{
class ObTableLoadParam;
class ObTableLoadStoreCtx;
class ObTableLoadTransCtx;
class ObTableLoadCoordinatorCtx;
@ -27,7 +28,7 @@ public:
int init();
int advance_sequence_no(int32_t session_id, uint64_t sequence_no, ObTableLoadMutexGuard &guard);
// 只在对应工作线程中调用, 串行执行
int write(int32_t session_id, const table::ObTableLoadObjRowArray &obj_rows);
int write(int32_t session_id, table::ObTableLoadObjRowArray &obj_rows);
int flush(int32_t session_id);
public:
void set_is_flush() { is_flush_ = true; }
@ -38,6 +39,16 @@ public:
private:
class SessionContext;
int init_session_ctx_array();
int handle_partition_with_autoinc_identity(ObTableLoadStoreCtx *&store_ctx,
SessionContext &session_ctx,
table::ObTableLoadObjRowArray &obj_rows,
const uint64_t &sql_mode, int32_t session_id);
int handle_autoinc_column(ObTableLoadStoreCtx *&store_ctx, blocksstable::ObStorageDatum &datum,
const ObObjTypeClass &tc, int32_t session_id, const uint64_t &sql_mode);
int handle_identity_column(ObTableLoadStoreCtx *&store_ctx,
const share::schema::ObColumnSchemaV2 *column_schema,
blocksstable::ObStorageDatum &datum,
common::ObArenaAllocator &cast_allocator);
// 非分区表
int write_for_non_partitioned(SessionContext &session_ctx,
const table::ObTableLoadObjRowArray &obj_rows);

View File

@ -373,12 +373,12 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator,
LOG_WARN("fail to cast obj and check", KR(ret), K(i), K(row.cells_[i]));
} else if (OB_FAIL(datum_row.storage_datums_[i].from_obj_enhance(out_obj))) {
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
} else if (column_schema->is_autoincrement() &&
} else if (column_schema->is_autoincrement() && !column_schema->is_tbl_part_key_column() &&
OB_FAIL(handle_autoinc_column(column_schema, datum_row.storage_datums_[i],
column_schema->get_meta_type().get_type_class(),
session_id))) {
LOG_WARN("fail to handle autoinc column", KR(ret), K(i), K(datum_row.storage_datums_[i]));
} else if (column_schema->is_identity_column() &&
} else if (column_schema->is_identity_column() && !column_schema->is_tbl_part_key_column() &&
OB_FAIL(handle_identity_column(column_schema, datum_row.storage_datums_[i], cast_allocator))) {
LOG_WARN("fail to handle identity column", KR(ret), K(i), K(datum_row.storage_datums_[i]));
}