fix bug:direct_load has multithread safety issue with identity column
This commit is contained in:
@ -393,7 +393,7 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator,
|
||||
session_id))) {
|
||||
LOG_WARN("fail to handle autoinc column", KR(ret), K(i), K(datum_row.storage_datums_[i]));
|
||||
} else if (column_schema->is_identity_column() &&
|
||||
OB_FAIL(handle_identity_column(column_schema, datum_row.storage_datums_[i]))) {
|
||||
OB_FAIL(handle_identity_column(column_schema, datum_row.storage_datums_[i], cast_allocator))) {
|
||||
LOG_WARN("fail to handle identity column", KR(ret), K(i), K(datum_row.storage_datums_[i]));
|
||||
}
|
||||
}
|
||||
@ -415,20 +415,17 @@ int ObTableLoadTransStoreWriter::handle_autoinc_column(const ObColumnSchemaV2 *c
|
||||
const ObObjTypeClass &tc, int32_t session_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t save_timeout_ts = THIS_WORKER.get_timeout_ts();
|
||||
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() +
|
||||
max(GCONF.rpc_timeout, RPC_TIMEOUT_US));
|
||||
if (OB_FAIL(ObTableLoadAutoincNextval::eval_nextval(
|
||||
&(store_ctx_->session_ctx_array_[session_id - 1].autoinc_param_), datum, tc,
|
||||
param_.sql_mode_))) {
|
||||
LOG_WARN("fail to get auto increment next value", KR(ret));
|
||||
}
|
||||
THIS_WORKER.set_timeout_ts(save_timeout_ts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 *column_schema,
|
||||
ObStorageDatum &datum)
|
||||
ObStorageDatum &datum,
|
||||
ObArenaAllocator &cast_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (column_schema->is_always_identity_column()) {
|
||||
@ -438,11 +435,12 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 *
|
||||
ret = OB_ERR_INVALID_NOT_NULL_CONSTRAINT_ON_IDENTITY_COLUMN;
|
||||
LOG_WARN("default identity column has null value", KR(ret));
|
||||
} else if (column_schema->is_default_on_null_identity_column()) {
|
||||
ObSequenceValue seq_value;
|
||||
if (OB_FAIL(share::ObSequenceCache::get_instance().nextval(
|
||||
trans_ctx_->ctx_->store_ctx_->sequence_schema_, allocator_, seq_value_))) {
|
||||
trans_ctx_->ctx_->store_ctx_->sequence_schema_, cast_allocator, seq_value))) {
|
||||
LOG_WARN("fail get nextval for seq", KR(ret));
|
||||
} else if (datum.is_null()) {
|
||||
datum.set_number(seq_value_.val());
|
||||
datum.set_number(seq_value.val());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -50,7 +50,6 @@ public:
|
||||
|
||||
class ObTableLoadTransStoreWriter
|
||||
{
|
||||
static const int64_t RPC_TIMEOUT_US = 20LL * 1000 * 1000; // 20s
|
||||
public:
|
||||
ObTableLoadTransStoreWriter(ObTableLoadTransStore *trans_store);
|
||||
~ObTableLoadTransStoreWriter();
|
||||
@ -82,7 +81,8 @@ private:
|
||||
const ObObjTypeClass &tc,
|
||||
int32_t session_id);
|
||||
int handle_identity_column(const share::schema::ObColumnSchemaV2 *column_schema,
|
||||
blocksstable::ObStorageDatum &datum);
|
||||
blocksstable::ObStorageDatum &datum,
|
||||
common::ObArenaAllocator &cast_allocator);
|
||||
int write_row_to_table_store(storage::ObDirectLoadTableStore &table_store,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const blocksstable::ObDatumRow &datum_row);
|
||||
@ -114,7 +114,6 @@ private:
|
||||
SessionContext *session_ctx_array_;
|
||||
int64_t ref_count_ CACHE_ALIGNED;
|
||||
bool is_flush_;
|
||||
share::ObSequenceValue seq_value_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user