[CP] fix alloc write row concurrently core dump for drop column.
This commit is contained in:
@ -814,7 +814,8 @@ int ObComplementPrepareTask::process()
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObComplementWriteTask::ObComplementWriteTask()
|
ObComplementWriteTask::ObComplementWriteTask()
|
||||||
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), is_inited_(false), task_id_(0), param_(nullptr),
|
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), allocator_("WriteTaskAlloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||||
|
is_inited_(false), task_id_(0), param_(nullptr),
|
||||||
context_(nullptr), write_row_(),
|
context_(nullptr), write_row_(),
|
||||||
col_ids_(), org_col_ids_(), output_projector_()
|
col_ids_(), org_col_ids_(), output_projector_()
|
||||||
{
|
{
|
||||||
@ -822,6 +823,11 @@ ObComplementWriteTask::ObComplementWriteTask()
|
|||||||
|
|
||||||
ObComplementWriteTask::~ObComplementWriteTask()
|
ObComplementWriteTask::~ObComplementWriteTask()
|
||||||
{
|
{
|
||||||
|
col_ids_.reset();
|
||||||
|
org_col_ids_.reset();
|
||||||
|
output_projector_.reset();
|
||||||
|
write_row_.reset();
|
||||||
|
allocator_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObComplementWriteTask::init(const int64_t task_id, ObComplementDataParam ¶m,
|
int ObComplementWriteTask::init(const int64_t task_id, ObComplementDataParam ¶m,
|
||||||
@ -849,7 +855,7 @@ int ObComplementWriteTask::init(const int64_t task_id, ObComplementDataParam &pa
|
|||||||
} else if (OB_FAIL(hidden_table_schema->get_store_column_count(schema_stored_column_cnt))) {
|
} else if (OB_FAIL(hidden_table_schema->get_store_column_count(schema_stored_column_cnt))) {
|
||||||
LOG_WARN("get stored column cnt failed", K(ret));
|
LOG_WARN("get stored column cnt failed", K(ret));
|
||||||
} else if (OB_FAIL(write_row_.init(
|
} else if (OB_FAIL(write_row_.init(
|
||||||
param.allocator_, schema_stored_column_cnt + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
|
allocator_, schema_stored_column_cnt + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
|
||||||
LOG_WARN("Fail to init write row", K(ret));
|
LOG_WARN("Fail to init write row", K(ret));
|
||||||
} else {
|
} else {
|
||||||
write_row_.row_flag_.set_flag(ObDmlFlag::DF_INSERT);
|
write_row_.row_flag_.set_flag(ObDmlFlag::DF_INSERT);
|
||||||
|
|||||||
@ -43,8 +43,9 @@ public:
|
|||||||
is_inited_(false), orig_tenant_id_(common::OB_INVALID_TENANT_ID), dest_tenant_id_(common::OB_INVALID_TENANT_ID),
|
is_inited_(false), orig_tenant_id_(common::OB_INVALID_TENANT_ID), dest_tenant_id_(common::OB_INVALID_TENANT_ID),
|
||||||
orig_ls_id_(share::ObLSID::INVALID_LS_ID), dest_ls_id_(share::ObLSID::INVALID_LS_ID), orig_table_id_(common::OB_INVALID_ID),
|
orig_ls_id_(share::ObLSID::INVALID_LS_ID), dest_ls_id_(share::ObLSID::INVALID_LS_ID), orig_table_id_(common::OB_INVALID_ID),
|
||||||
dest_table_id_(common::OB_INVALID_ID), orig_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),
|
dest_table_id_(common::OB_INVALID_ID), orig_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),
|
||||||
allocator_("CompleteDataPar", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), row_store_type_(common::ENCODING_ROW_STORE), orig_schema_version_(0), dest_schema_version_(0),
|
row_store_type_(common::ENCODING_ROW_STORE), orig_schema_version_(0), dest_schema_version_(0),
|
||||||
snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0)
|
snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0),
|
||||||
|
allocator_("CompleteDataPar", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
|
||||||
{}
|
{}
|
||||||
~ObComplementDataParam() { destroy(); }
|
~ObComplementDataParam() { destroy(); }
|
||||||
int init(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg);
|
int init(const obrpc::ObDDLBuildSingleReplicaRequestArg &arg);
|
||||||
@ -99,7 +100,6 @@ public:
|
|||||||
uint64_t dest_table_id_;
|
uint64_t dest_table_id_;
|
||||||
ObTabletID orig_tablet_id_;
|
ObTabletID orig_tablet_id_;
|
||||||
ObTabletID dest_tablet_id_;
|
ObTabletID dest_tablet_id_;
|
||||||
common::ObArenaAllocator allocator_;
|
|
||||||
common::ObRowStoreType row_store_type_;
|
common::ObRowStoreType row_store_type_;
|
||||||
int64_t orig_schema_version_;
|
int64_t orig_schema_version_;
|
||||||
int64_t dest_schema_version_;
|
int64_t dest_schema_version_;
|
||||||
@ -111,6 +111,8 @@ public:
|
|||||||
lib::Worker::CompatMode compat_mode_;
|
lib::Worker::CompatMode compat_mode_;
|
||||||
uint64_t data_format_version_;
|
uint64_t data_format_version_;
|
||||||
ObSEArray<common::ObStoreRange, 32> ranges_;
|
ObSEArray<common::ObStoreRange, 32> ranges_;
|
||||||
|
private:
|
||||||
|
common::ObArenaAllocator allocator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
void add_ddl_event(const ObComplementDataParam *param, const ObString &stmt);
|
void add_ddl_event(const ObComplementDataParam *param, const ObString &stmt);
|
||||||
@ -120,8 +122,9 @@ struct ObComplementDataContext final
|
|||||||
public:
|
public:
|
||||||
ObComplementDataContext():
|
ObComplementDataContext():
|
||||||
is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
|
is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
|
||||||
allocator_("CompleteDataCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
|
lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
|
||||||
data_sstable_redo_writer_(), index_builder_(nullptr), start_scn_(share::SCN::min_scn()), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0)
|
data_sstable_redo_writer_(), index_builder_(nullptr), start_scn_(share::SCN::min_scn()), tablet_direct_load_mgr_handle_(), row_scanned_(0), row_inserted_(0), context_id_(0),
|
||||||
|
allocator_("CompleteDataCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
|
||||||
{}
|
{}
|
||||||
~ObComplementDataContext() { destroy(); }
|
~ObComplementDataContext() { destroy(); }
|
||||||
int init(const ObComplementDataParam ¶m, const blocksstable::ObDataStoreDesc &desc);
|
int init(const ObComplementDataParam ¶m, const blocksstable::ObDataStoreDesc &desc);
|
||||||
@ -139,7 +142,6 @@ public:
|
|||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
bool is_major_sstable_exist_;
|
bool is_major_sstable_exist_;
|
||||||
int complement_data_ret_;
|
int complement_data_ret_;
|
||||||
common::ObArenaAllocator allocator_;
|
|
||||||
ObSpinLock lock_;
|
ObSpinLock lock_;
|
||||||
int64_t concurrent_cnt_;
|
int64_t concurrent_cnt_;
|
||||||
ObDDLRedoLogWriter data_sstable_redo_writer_;
|
ObDDLRedoLogWriter data_sstable_redo_writer_;
|
||||||
@ -151,6 +153,8 @@ public:
|
|||||||
int64_t context_id_;
|
int64_t context_id_;
|
||||||
ObArray<int64_t> report_col_checksums_;
|
ObArray<int64_t> report_col_checksums_;
|
||||||
ObArray<int64_t> report_col_ids_;
|
ObArray<int64_t> report_col_ids_;
|
||||||
|
private:
|
||||||
|
common::ObArenaAllocator allocator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObComplementPrepareTask;
|
class ObComplementPrepareTask;
|
||||||
@ -229,6 +233,7 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t RETRY_INTERVAL = 100 * 1000; // 100ms
|
static const int64_t RETRY_INTERVAL = 100 * 1000; // 100ms
|
||||||
|
common::ObArenaAllocator allocator_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
int64_t task_id_;
|
int64_t task_id_;
|
||||||
ObComplementDataParam *param_;
|
ObComplementDataParam *param_;
|
||||||
|
|||||||
Reference in New Issue
Block a user