fix bug:core-dump occurs in partition column with auto-increment when multi-node deployment
This commit is contained in:
parent
f8e03ff190
commit
a6e7d65447
@ -8,6 +8,8 @@
|
||||
#include "observer/table_load/ob_table_load_coordinator_trans.h"
|
||||
#include "observer/table_load/ob_table_load_table_ctx.h"
|
||||
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
||||
#include "share/ob_autoincrement_service.h"
|
||||
#include "share/sequence/ob_sequence_cache.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -19,6 +21,7 @@ using namespace lib;
|
||||
using namespace table;
|
||||
using namespace sql;
|
||||
using namespace obrpc;
|
||||
using namespace share;
|
||||
|
||||
ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx)
|
||||
: ctx_(ctx),
|
||||
@ -99,6 +102,14 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array, uint64_t
|
||||
ctx_->param_.session_count_, allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||
}
|
||||
// init session_ctx_array_
|
||||
else if (OB_FAIL(init_session_ctx_array())) {
|
||||
LOG_WARN("fail to init session ctx array", KR(ret));
|
||||
}
|
||||
// init sequence_cache_ and sequence_schema_
|
||||
else if (ctx_->schema_.has_identity_column_ && OB_FAIL(init_sequence())) {
|
||||
LOG_WARN("fail to init sequence", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||
LOG_WARN("fail to init task scheduler", KR(ret));
|
||||
} else if (OB_FAIL(task_scheduler_->start())) {
|
||||
@ -142,6 +153,14 @@ void ObTableLoadCoordinatorCtx::destroy()
|
||||
ObTableLoadTransCtx *trans_ctx = iter->second;
|
||||
ctx_->free_trans_ctx(trans_ctx);
|
||||
}
|
||||
if (nullptr != session_ctx_array_) {
|
||||
for (int64_t i = 0; i < ctx_->param_.session_count_; ++i) {
|
||||
SessionContext *session_ctx = session_ctx_array_ + i;
|
||||
session_ctx->~SessionContext();
|
||||
}
|
||||
allocator_.free(session_ctx_array_);
|
||||
session_ctx_array_ = nullptr;
|
||||
}
|
||||
trans_ctx_map_.reuse();
|
||||
segment_ctx_map_.reset();
|
||||
commited_trans_ctx_array_.reset();
|
||||
@ -300,6 +319,138 @@ int ObTableLoadCoordinatorCtx::alloc_trans(const ObTableLoadSegmentID &segment_i
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::generate_autoinc_params(AutoincParam &autoinc_param)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
if (OB_FAIL(ObTableLoadSchema::get_table_schema(ctx_->param_.tenant_id_,
|
||||
ctx_->param_.table_id_,
|
||||
schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(ctx_->param_.tenant_id_),
|
||||
K(ctx_->param_.table_id_));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("table not exist", KR(ret), K(ctx_->param_.tenant_id_), K(ctx_->param_.table_id_));
|
||||
} else {
|
||||
//ddl对于auto increment是最后进行自增值同步,对于autoinc_param参数初始化得使用原表table id的table schema
|
||||
ObColumnSchemaV2 *autoinc_column_schema = nullptr;
|
||||
uint64_t column_id = 0;
|
||||
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
|
||||
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
|
||||
ObColumnSchemaV2 *column_schema = *iter;
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid column schema", KR(ret), KP(column_schema));
|
||||
} else {
|
||||
column_id = column_schema->get_column_id();
|
||||
if (column_schema->is_autoincrement() && column_id != OB_HIDDEN_PK_INCREMENT_COLUMN_ID) {
|
||||
autoinc_column_schema = column_schema;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}//end for
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(autoinc_column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null autoinc column schema", KR(ret), KP(autoinc_column_schema));
|
||||
} else {
|
||||
autoinc_param.tenant_id_ = ctx_->param_.tenant_id_;
|
||||
autoinc_param.autoinc_table_id_ = ctx_->param_.table_id_;
|
||||
autoinc_param.autoinc_first_part_num_ = table_schema->get_first_part_num();
|
||||
autoinc_param.autoinc_table_part_num_ = table_schema->get_all_part_num();
|
||||
autoinc_param.autoinc_col_id_ = column_id;
|
||||
autoinc_param.auto_increment_cache_size_ = MAX_INCREMENT_CACHE_SIZE;
|
||||
autoinc_param.part_level_ = table_schema->get_part_level();
|
||||
autoinc_param.autoinc_col_type_ = autoinc_column_schema->get_data_type();
|
||||
autoinc_param.total_value_count_ = 1;
|
||||
autoinc_param.autoinc_desired_count_ = 0;
|
||||
autoinc_param.autoinc_mode_is_order_ = table_schema->is_order_auto_increment_mode();
|
||||
autoinc_param.autoinc_increment_ = 1;
|
||||
autoinc_param.autoinc_offset_ = 1;
|
||||
autoinc_param.part_value_no_order_ = true;
|
||||
if (autoinc_column_schema->is_tbl_part_key_column()) {
|
||||
// don't keep intra-partition value asc order when partkey column is auto inc
|
||||
autoinc_param.part_value_no_order_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::init_sequence()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = ctx_->param_.tenant_id_;
|
||||
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
|
||||
share::schema::ObSchemaGetterGuard table_schema_guard;
|
||||
share::schema::ObSchemaGetterGuard sequence_schema_guard;
|
||||
const ObSequenceSchema *sequence_schema = nullptr;
|
||||
const ObTableSchema *target_table_schema = nullptr;
|
||||
uint64_t sequence_id = OB_INVALID_ID;
|
||||
if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, table_schema_guard,
|
||||
target_table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
|
||||
} else {
|
||||
//ddl对于identity是建表的时候进行自增值同步,对于sequence参数初始化得用隐藏表table id的table schema
|
||||
for (ObTableSchema::const_column_iterator iter = target_table_schema->column_begin();
|
||||
OB_SUCC(ret) && iter != target_table_schema->column_end(); ++iter) {
|
||||
ObColumnSchemaV2 *column_schema = *iter;
|
||||
if (OB_ISNULL(column_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid column schema", K(column_schema));
|
||||
} else {
|
||||
uint64_t column_id = column_schema->get_column_id();
|
||||
if (column_schema->is_identity_column() && column_id != OB_HIDDEN_PK_INCREMENT_COLUMN_ID) {
|
||||
sequence_id = column_schema->get_sequence_id();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}//end for
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema service is null", KR(ret));
|
||||
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
|
||||
tenant_id,
|
||||
sequence_schema_guard))) {
|
||||
LOG_WARN("get schema guard failed", KR(ret));
|
||||
} else if (OB_FAIL(sequence_schema_guard.get_sequence_schema(
|
||||
tenant_id,
|
||||
sequence_id,
|
||||
sequence_schema))) {
|
||||
LOG_WARN("fail get sequence schema", K(sequence_id), KR(ret));
|
||||
} else if (OB_ISNULL(sequence_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null unexpected", KR(ret));
|
||||
} else if (OB_FAIL(sequence_schema_.assign(*sequence_schema))) {
|
||||
LOG_WARN("cache sequence_schema fail", K(tenant_id), K(sequence_id), KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::init_session_ctx_array()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *buf = nullptr;
|
||||
AutoincParam autoinc_param;
|
||||
if (OB_ISNULL(buf = allocator_.alloc(sizeof(SessionContext) * ctx_->param_.session_count_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to allocate memory", KR(ret));
|
||||
} else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(generate_autoinc_params(autoinc_param))) {
|
||||
LOG_WARN("fail to init auto increment param", KR(ret));
|
||||
} else {
|
||||
session_ctx_array_ = new (buf) SessionContext[ctx_->param_.session_count_];
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->param_.session_count_; ++i) {
|
||||
SessionContext *session_ctx = session_ctx_array_ + i;
|
||||
session_ctx->autoinc_param_ = autoinc_param;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_id,
|
||||
ObTableLoadCoordinatorTrans *&trans)
|
||||
{
|
||||
|
@ -11,11 +11,16 @@
|
||||
#include "observer/table_load/ob_table_load_partition_calc.h"
|
||||
#include "observer/table_load/ob_table_load_partition_location.h"
|
||||
#include "observer/table_load/ob_table_load_schema.h"
|
||||
#include "share/ob_autoincrement_param.h"
|
||||
#include "share/table/ob_table_load_array.h"
|
||||
#include "share/table/ob_table_load_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace share
|
||||
{
|
||||
class ObSequenceCache;
|
||||
} // namespace share
|
||||
namespace observer
|
||||
{
|
||||
class ObTableLoadTableCtx;
|
||||
@ -105,6 +110,9 @@ private:
|
||||
int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx);
|
||||
int alloc_trans(const table::ObTableLoadSegmentID &segment_id,
|
||||
ObTableLoadCoordinatorTrans *&trans);
|
||||
int init_session_ctx_array();
|
||||
int generate_autoinc_params(share::AutoincParam &autoinc_param);
|
||||
int init_sequence();
|
||||
public:
|
||||
ObTableLoadTableCtx * const ctx_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
@ -116,6 +124,14 @@ public:
|
||||
common::ObArray<int64_t> idx_array_;
|
||||
table::ObTableLoadResultInfo result_info_;
|
||||
common::ObString credential_;
|
||||
share::schema::ObSequenceSchema sequence_schema_;
|
||||
struct SessionContext
|
||||
{
|
||||
SessionContext() {}
|
||||
~SessionContext() {}
|
||||
share::AutoincParam autoinc_param_;
|
||||
};
|
||||
SessionContext *session_ctx_array_;
|
||||
private:
|
||||
struct SegmentCtx : public common::LinkHashValue<table::ObTableLoadSegmentID>
|
||||
{
|
||||
|
@ -61,7 +61,9 @@ int ObTableLoadPartitionCalc::init_session()
|
||||
}
|
||||
|
||||
ObTableLoadPartitionCalc::ObTableLoadPartitionCalc()
|
||||
: tenant_id_(OB_INVALID_ID),
|
||||
: is_partition_with_autoinc_(false),
|
||||
partition_with_autoinc_idx_(OB_INVALID_INDEX),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
table_id_(OB_INVALID_ID),
|
||||
is_partitioned_(false),
|
||||
allocator_("TLD_PartCalc"),
|
||||
@ -166,6 +168,11 @@ int ObTableLoadPartitionCalc::init_rowkey_index(const ObTableSchema *table_schem
|
||||
rowkey_obj_index_[pos - 1].index_ = i - 1;
|
||||
} else {
|
||||
rowkey_obj_index_[pos - 1].index_ = i;
|
||||
if (column_schema->is_tbl_part_key_column() &&
|
||||
(column_schema->is_identity_column() || column_schema->is_autoincrement())) {
|
||||
is_partition_with_autoinc_ = true;
|
||||
partition_with_autoinc_idx_ = pos - 1;
|
||||
}
|
||||
}
|
||||
rowkey_obj_index_[pos - 1].column_schema_ = column_schema;
|
||||
}
|
||||
|
@ -61,6 +61,8 @@ public:
|
||||
table::ObTableLoadArray<IndexAndType> rowkey_obj_index_;
|
||||
common::ObTimeZoneInfo tz_info_;
|
||||
ObTableLoadTimeConverter time_cvrt_;
|
||||
bool is_partition_with_autoinc_;
|
||||
int64_t partition_with_autoinc_idx_;
|
||||
private:
|
||||
// data members
|
||||
uint64_t tenant_id_;
|
||||
|
@ -179,8 +179,9 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, ObTableLoadObjRowArr
|
||||
LOG_WARN("fail to write for non partitioned", KR(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(handle_partition_with_autoinc_identity(trans_ctx_->ctx_->store_ctx_, session_ctx,
|
||||
obj_rows, param_.sql_mode_, session_id))) {
|
||||
if (coordinator_ctx_->partition_calc_.is_partition_with_autoinc_ &&
|
||||
OB_FAIL(handle_partition_with_autoinc_identity(session_ctx, obj_rows, param_.sql_mode_,
|
||||
session_id))) {
|
||||
LOG_WARN("fail to handle partition column with autoincrement or identity", KR(ret));
|
||||
} else if (OB_FAIL(write_for_partitioned(session_ctx, obj_rows))) {
|
||||
LOG_WARN("fail to write for partitioned", KR(ret));
|
||||
@ -197,76 +198,71 @@ int ObTableLoadTransBucketWriter::write(int32_t session_id, ObTableLoadObjRowArr
|
||||
}
|
||||
|
||||
int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
|
||||
ObTableLoadStoreCtx *&store_ctx, SessionContext &session_ctx,
|
||||
table::ObTableLoadObjRowArray &obj_rows, const uint64_t &sql_mode, int32_t session_id)
|
||||
SessionContext &session_ctx, table::ObTableLoadObjRowArray &obj_rows, const uint64_t &sql_mode,
|
||||
int32_t session_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t row_count = obj_rows.count();
|
||||
ObArenaAllocator autoinc_allocator("TLD_Autoinc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_);
|
||||
ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_));
|
||||
ObCastCtx cast_ctx(&autoinc_allocator, &cast_params, CM_NONE,
|
||||
ObCharset::get_system_collation());
|
||||
ObTableLoadCastObjCtx cast_obj_ctx(&(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
|
||||
false);
|
||||
ObObj out_obj;
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < row_count; ++j) {
|
||||
ObObj out_obj;
|
||||
ObStorageDatum storage_datum;
|
||||
ObTableLoadObjRow &obj_row = obj_rows.at(j);
|
||||
const int64_t rowkey_obj_count = coordinator_ctx_->partition_calc_.rowkey_obj_index_.count();
|
||||
ObDataTypeCastParams cast_params(&(coordinator_ctx_->partition_calc_.tz_info_));
|
||||
ObCastCtx cast_ctx(&(session_ctx.allocator_), &cast_params, CM_NONE,
|
||||
ObCharset::get_system_collation());
|
||||
ObTableLoadCastObjCtx cast_obj_ctx(&(coordinator_ctx_->partition_calc_.time_cvrt_), &cast_ctx,
|
||||
false);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_obj_count; ++i) {
|
||||
out_obj.set_null();
|
||||
const ObTableLoadPartitionCalc::IndexAndType &index_and_type =
|
||||
coordinator_ctx_->partition_calc_.rowkey_obj_index_.at(i);
|
||||
const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_;
|
||||
const int64_t obj_index = index_and_type.index_;
|
||||
if (OB_UNLIKELY(obj_index >= param_.column_count_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid length", KR(ret), K(obj_index), K(param_.column_count_));
|
||||
} else if ((column_schema->is_identity_column() || column_schema->is_autoincrement()) &&
|
||||
column_schema->is_tbl_part_key_column()) {
|
||||
if (!obj_row.cells_[obj_index].is_null() &&
|
||||
OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
|
||||
obj_row.cells_[obj_index], out_obj))) {
|
||||
LOG_WARN("fail to cast obj", KR(ret));
|
||||
} else if (OB_FAIL(storage_datum.from_obj_enhance(out_obj))) {
|
||||
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
|
||||
} else if (column_schema->is_autoincrement() &&
|
||||
OB_FAIL(handle_autoinc_column(store_ctx, storage_datum,
|
||||
column_schema->get_meta_type().get_type_class(),
|
||||
session_id, sql_mode))) {
|
||||
LOG_WARN("fail to handle autoinc column", KR(ret), K(i), K(storage_datum));
|
||||
} else if (column_schema->is_identity_column() &&
|
||||
OB_FAIL(handle_identity_column(store_ctx, column_schema, storage_datum,
|
||||
session_ctx.allocator_))) {
|
||||
LOG_WARN("fail to handle identity column", KR(ret), K(i), K(storage_datum));
|
||||
} else if (OB_FAIL(storage_datum.to_obj_enhance(obj_row.cells_[obj_index],
|
||||
column_schema->get_meta_type()))) {
|
||||
LOG_WARN("fail to obj enhance", KR(ret), K(obj_row.cells_[obj_index]));
|
||||
} else if (OB_FAIL(ob_write_obj(session_ctx.allocator_, obj_row.cells_[obj_index],
|
||||
obj_row.cells_[obj_index]))) {
|
||||
LOG_WARN("fail to deep copy obj", KR(ret), K(obj_row.cells_[obj_index]));
|
||||
}
|
||||
}
|
||||
out_obj.set_null();
|
||||
const ObTableLoadPartitionCalc::IndexAndType &index_and_type =
|
||||
coordinator_ctx_->partition_calc_.rowkey_obj_index_.at(
|
||||
coordinator_ctx_->partition_calc_.partition_with_autoinc_idx_);
|
||||
const ObColumnSchemaV2 *column_schema = index_and_type.column_schema_;
|
||||
const int64_t obj_index = index_and_type.index_;
|
||||
if (OB_UNLIKELY(obj_index >= param_.column_count_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid length", KR(ret), K(obj_index), K(param_.column_count_));
|
||||
} else if (!obj_row.cells_[obj_index].is_null() &&
|
||||
OB_FAIL(ObTableLoadObjCaster::cast_obj(cast_obj_ctx, index_and_type.column_schema_,
|
||||
obj_row.cells_[obj_index], out_obj))) {
|
||||
LOG_WARN("fail to cast obj", KR(ret));
|
||||
} else if (OB_FAIL(storage_datum.from_obj_enhance(out_obj))) {
|
||||
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
|
||||
} else if (column_schema->is_autoincrement() &&
|
||||
OB_FAIL(handle_autoinc_column(storage_datum,
|
||||
column_schema->get_meta_type().get_type_class(),
|
||||
session_id, sql_mode))) {
|
||||
LOG_WARN("fail to handle autoinc column", KR(ret), K(storage_datum));
|
||||
} else if (column_schema->is_identity_column() &&
|
||||
OB_FAIL(handle_identity_column(column_schema, storage_datum,
|
||||
autoinc_allocator))) {
|
||||
LOG_WARN("fail to handle identity column", KR(ret), K(storage_datum));
|
||||
} else if (OB_FAIL(storage_datum.to_obj_enhance(obj_row.cells_[obj_index],
|
||||
column_schema->get_meta_type()))) {
|
||||
LOG_WARN("fail to obj enhance", KR(ret), K(obj_row.cells_[obj_index]));
|
||||
} else if (OB_FAIL(ob_write_obj(obj_row.get_allocator_handler()->get_allocator(), obj_row.cells_[obj_index],
|
||||
obj_row.cells_[obj_index]))) {
|
||||
LOG_WARN("fail to deep copy obj", KR(ret), K(obj_row.cells_[obj_index]));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadTransBucketWriter::handle_autoinc_column(ObTableLoadStoreCtx *&store_ctx,
|
||||
ObStorageDatum &datum,
|
||||
int ObTableLoadTransBucketWriter::handle_autoinc_column(ObStorageDatum &datum,
|
||||
const ObObjTypeClass &tc,
|
||||
int32_t session_id,
|
||||
const uint64_t &sql_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObTableLoadAutoincNextval::eval_nextval(
|
||||
&(store_ctx->session_ctx_array_[session_id - 1].autoinc_param_), datum, tc, sql_mode))) {
|
||||
&(coordinator_ctx_->session_ctx_array_[session_id - 1].autoinc_param_), datum, tc,
|
||||
sql_mode))) {
|
||||
LOG_WARN("fail to get auto increment next value", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadTransBucketWriter::handle_identity_column(ObTableLoadStoreCtx *&store_ctx,
|
||||
const ObColumnSchemaV2 *column_schema,
|
||||
int ObTableLoadTransBucketWriter::handle_identity_column(const ObColumnSchemaV2 *column_schema,
|
||||
ObStorageDatum &datum,
|
||||
ObArenaAllocator &cast_allocator)
|
||||
{
|
||||
@ -279,7 +275,7 @@ int ObTableLoadTransBucketWriter::handle_identity_column(ObTableLoadStoreCtx *&s
|
||||
LOG_WARN("default identity column has null value", KR(ret));
|
||||
} else if (column_schema->is_default_on_null_identity_column()) {
|
||||
ObSequenceValue seq_value;
|
||||
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(store_ctx->sequence_schema_,
|
||||
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(coordinator_ctx_->sequence_schema_,
|
||||
cast_allocator, seq_value))) {
|
||||
LOG_WARN("fail get nextval for seq", KR(ret));
|
||||
} else if (datum.is_null()) {
|
||||
|
@ -39,14 +39,12 @@ public:
|
||||
private:
|
||||
class SessionContext;
|
||||
int init_session_ctx_array();
|
||||
int handle_partition_with_autoinc_identity(ObTableLoadStoreCtx *&store_ctx,
|
||||
SessionContext &session_ctx,
|
||||
int handle_partition_with_autoinc_identity(SessionContext &session_ctx,
|
||||
table::ObTableLoadObjRowArray &obj_rows,
|
||||
const uint64_t &sql_mode, int32_t session_id);
|
||||
int handle_autoinc_column(ObTableLoadStoreCtx *&store_ctx, blocksstable::ObStorageDatum &datum,
|
||||
const ObObjTypeClass &tc, int32_t session_id, const uint64_t &sql_mode);
|
||||
int handle_identity_column(ObTableLoadStoreCtx *&store_ctx,
|
||||
const share::schema::ObColumnSchemaV2 *column_schema,
|
||||
int handle_autoinc_column(blocksstable::ObStorageDatum &datum, const ObObjTypeClass &tc,
|
||||
int32_t session_id, const uint64_t &sql_mode);
|
||||
int handle_identity_column(const share::schema::ObColumnSchemaV2 *column_schema,
|
||||
blocksstable::ObStorageDatum &datum,
|
||||
common::ObArenaAllocator &cast_allocator);
|
||||
// 非分区表
|
||||
|
@ -373,7 +373,7 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator,
|
||||
LOG_WARN("fail to cast obj and check", KR(ret), K(i), K(row.cells_[i]));
|
||||
} else if (OB_FAIL(datum_row.storage_datums_[i].from_obj_enhance(out_obj))) {
|
||||
LOG_WARN("fail to from obj enhance", KR(ret), K(out_obj));
|
||||
} else if (column_schema->is_autoincrement() && !column_schema->is_tbl_part_key_column() &&
|
||||
} else if (column_schema->is_autoincrement() &&
|
||||
OB_FAIL(handle_autoinc_column(column_schema, datum_row.storage_datums_[i],
|
||||
column_schema->get_meta_type().get_type_class(),
|
||||
session_id))) {
|
||||
|
@ -32,6 +32,10 @@ public:
|
||||
{
|
||||
allocator_handle_ = allocator_handle;
|
||||
}
|
||||
ObTableLoadSharedAllocatorHandle& get_allocator_handler()
|
||||
{
|
||||
return allocator_handle_;
|
||||
}
|
||||
TO_STRING_KV(K_(count));
|
||||
|
||||
private:
|
||||
|
Loading…
x
Reference in New Issue
Block a user