[CP] Make ObTabletTableIterator and ObTableStoreIterator deep copy safe
This commit is contained in:
@ -208,10 +208,10 @@ int ObComplementDataParam::split_task_ranges(
|
|||||||
total_size,
|
total_size,
|
||||||
expected_task_count))) {
|
expected_task_count))) {
|
||||||
LOG_WARN("compute total task count failed", K(ret));
|
LOG_WARN("compute total task count failed", K(ret));
|
||||||
} else if (OB_FAIL(tablet_service->split_multi_ranges(tablet_id,
|
} else if (OB_FAIL(tablet_service->split_multi_ranges(tablet_id,
|
||||||
ranges,
|
ranges,
|
||||||
min(min(max(expected_task_count, 1), hint_parallelism), ObMacroDataSeq::MAX_PARALLEL_IDX + 1),
|
min(min(max(expected_task_count, 1), hint_parallelism), ObMacroDataSeq::MAX_PARALLEL_IDX + 1),
|
||||||
allocator_,
|
allocator_,
|
||||||
multi_range_split_array))) {
|
multi_range_split_array))) {
|
||||||
LOG_WARN("split multi ranges failed", K(ret));
|
LOG_WARN("split multi ranges failed", K(ret));
|
||||||
if (OB_REPLICA_NOT_READABLE == ret) {
|
if (OB_REPLICA_NOT_READABLE == ret) {
|
||||||
@ -1773,9 +1773,10 @@ int ObLocalScan::construct_multiple_scan_merge(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
void *buf = nullptr;
|
void *buf = nullptr;
|
||||||
get_table_param_.tablet_iter_ = table_iter;
|
|
||||||
LOG_INFO("start to do output_store.scan");
|
LOG_INFO("start to do output_store.scan");
|
||||||
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObMultipleScanMerge)))) {
|
if (OB_FAIL(get_table_param_.tablet_iter_.assign(table_iter))) {
|
||||||
|
LOG_WARN("fail to assign tablet iterator", K(ret));
|
||||||
|
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObMultipleScanMerge)))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to alloc memory for ObMultipleScanMerge", K(ret));
|
LOG_WARN("fail to alloc memory for ObMultipleScanMerge", K(ret));
|
||||||
} else if (FALSE_IT(scan_merge_ = new(buf)ObMultipleScanMerge())) {
|
} else if (FALSE_IT(scan_merge_ = new(buf)ObMultipleScanMerge())) {
|
||||||
|
|||||||
@ -5506,9 +5506,10 @@ void ObLSTabletService::dump_diag_info_for_old_row_loss(
|
|||||||
ObSingleMerge *get_merge = nullptr;
|
ObSingleMerge *get_merge = nullptr;
|
||||||
ObGetTableParam get_table_param;
|
ObGetTableParam get_table_param;
|
||||||
ObDatumRow *row = nullptr;
|
ObDatumRow *row = nullptr;
|
||||||
get_table_param.tablet_iter_ = data_table.tablet_iter_;
|
|
||||||
void *buf = nullptr;
|
void *buf = nullptr;
|
||||||
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObSingleMerge)))) {
|
if (OB_FAIL(get_table_param.tablet_iter_.assign(data_table.tablet_iter_))) {
|
||||||
|
LOG_WARN("Failed to assign tablet iterator", K(ret));
|
||||||
|
} else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObSingleMerge)))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("Failed to alloc memory for single merge", K(ret));
|
LOG_WARN("Failed to alloc memory for single merge", K(ret));
|
||||||
} else if (FALSE_IT(get_merge = new(buf)ObSingleMerge())) {
|
} else if (FALSE_IT(get_merge = new(buf)ObSingleMerge())) {
|
||||||
|
|||||||
@ -195,27 +195,43 @@ int64_t ObTabletHandle::to_string(char *buf, const int64_t buf_len) const
|
|||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTabletTableIterator::operator=(const ObTabletTableIterator& other)
|
int ObTabletTableIterator::assign(const ObTabletTableIterator& other)
|
||||||
{
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
if (other.tablet_handle_.is_valid()) {
|
if (other.tablet_handle_.is_valid()) {
|
||||||
tablet_handle_ = other.tablet_handle_;
|
tablet_handle_ = other.tablet_handle_;
|
||||||
} else if (tablet_handle_.is_valid()) {
|
} else if (tablet_handle_.is_valid()) {
|
||||||
tablet_handle_.reset();
|
tablet_handle_.reset();
|
||||||
}
|
}
|
||||||
if (other.table_store_iter_.is_valid()) {
|
if (OB_FAIL(ret)) {
|
||||||
table_store_iter_ = other.table_store_iter_;
|
} else if (other.table_store_iter_.is_valid()) {
|
||||||
|
if (OB_FAIL(table_store_iter_.assign(other.table_store_iter_))) {
|
||||||
|
LOG_WARN("assign table store iter fail", K(ret));
|
||||||
|
}
|
||||||
} else if (table_store_iter_.is_valid()) {
|
} else if (table_store_iter_.is_valid()) {
|
||||||
table_store_iter_.reset();
|
table_store_iter_.reset();
|
||||||
}
|
}
|
||||||
if (OB_UNLIKELY(nullptr != other.transfer_src_handle_)) {
|
|
||||||
if (nullptr == transfer_src_handle_) {
|
if (OB_FAIL(ret)) {
|
||||||
void *tablet_hdl_buf = ob_malloc(sizeof(ObTabletHandle), ObMemAttr(MTL_ID(), "TransferMetaH"));
|
} else {
|
||||||
transfer_src_handle_ = new (tablet_hdl_buf) ObTabletHandle();
|
if (OB_UNLIKELY(nullptr != other.transfer_src_handle_)) {
|
||||||
|
if (nullptr == transfer_src_handle_) {
|
||||||
|
void *tablet_hdl_buf = ob_malloc(sizeof(ObTabletHandle), ObMemAttr(MTL_ID(), "TransferMetaH"));
|
||||||
|
if (OB_ISNULL(tablet_hdl_buf)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("fail to allocator memory for handle", K(ret));
|
||||||
|
} else {
|
||||||
|
transfer_src_handle_ = new (tablet_hdl_buf) ObTabletHandle();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
*transfer_src_handle_ = *(other.transfer_src_handle_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*transfer_src_handle_ = *(other.transfer_src_handle_);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObTableStoreIterator *ObTabletTableIterator::table_iter()
|
ObTableStoreIterator *ObTabletTableIterator::table_iter()
|
||||||
@ -242,7 +258,7 @@ int ObTabletTableIterator::set_transfer_src_tablet_handle(const ObTabletHandle &
|
|||||||
void *tablet_hdl_buf = ob_malloc(sizeof(ObTabletHandle), ObMemAttr(MTL_ID(), "TransferTblH"));
|
void *tablet_hdl_buf = ob_malloc(sizeof(ObTabletHandle), ObMemAttr(MTL_ID(), "TransferTblH"));
|
||||||
if (OB_ISNULL(tablet_hdl_buf)) {
|
if (OB_ISNULL(tablet_hdl_buf)) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to allocator memory for handles");
|
LOG_WARN("fail to allocator memory for handles", K(ret));
|
||||||
} else {
|
} else {
|
||||||
transfer_src_handle_ = new (tablet_hdl_buf) ObTabletHandle();
|
transfer_src_handle_ = new (tablet_hdl_buf) ObTabletHandle();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -75,9 +75,7 @@ class ObTabletTableIterator final
|
|||||||
public:
|
public:
|
||||||
ObTabletTableIterator() : tablet_handle_(), table_store_iter_(), transfer_src_handle_(nullptr) {}
|
ObTabletTableIterator() : tablet_handle_(), table_store_iter_(), transfer_src_handle_(nullptr) {}
|
||||||
explicit ObTabletTableIterator(const bool is_reverse) : tablet_handle_(), table_store_iter_(is_reverse), transfer_src_handle_(nullptr) {}
|
explicit ObTabletTableIterator(const bool is_reverse) : tablet_handle_(), table_store_iter_(is_reverse), transfer_src_handle_(nullptr) {}
|
||||||
|
int assign(const ObTabletTableIterator& other);
|
||||||
ObTabletTableIterator(const ObTabletTableIterator& other) { *this = other; } ;
|
|
||||||
void operator=(const ObTabletTableIterator& other);
|
|
||||||
~ObTabletTableIterator() { reset(); }
|
~ObTabletTableIterator() { reset(); }
|
||||||
void reset()
|
void reset()
|
||||||
{
|
{
|
||||||
@ -106,6 +104,7 @@ private:
|
|||||||
ObTabletHandle tablet_handle_;
|
ObTabletHandle tablet_handle_;
|
||||||
ObTableStoreIterator table_store_iter_;
|
ObTableStoreIterator table_store_iter_;
|
||||||
ObTabletHandle *transfer_src_handle_;
|
ObTabletHandle *transfer_src_handle_;
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObTabletTableIterator);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObGetTableParam final
|
struct ObGetTableParam final
|
||||||
|
|||||||
@ -1875,7 +1875,9 @@ int ObPartitionIncrementalRangeSpliter::ObIncrementalIterator::prepare_get_table
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
*get_tbl_param_.tablet_iter_.table_iter() = tbls_iter_;
|
if (OB_FAIL(get_tbl_param_.tablet_iter_.table_iter()->assign(tbls_iter_))) {
|
||||||
|
STORAGE_LOG(WARN, "Failed to assign tablet iterator", KR(ret));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -192,9 +192,12 @@ int ObSingleRowGetter::init_dml_access_param(ObRelativeTable &relative_table,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
relative_table_ = &relative_table;
|
relative_table_ = &relative_table;
|
||||||
get_table_param_.tablet_iter_ = relative_table.tablet_iter_;
|
|
||||||
const share::schema::ObTableSchemaParam *schema_param = relative_table.get_schema_param();
|
const share::schema::ObTableSchemaParam *schema_param = relative_table.get_schema_param();
|
||||||
output_projector_.set_capacity(out_col_ids.count());
|
output_projector_.set_capacity(out_col_ids.count());
|
||||||
|
if (OB_FAIL(get_table_param_.tablet_iter_.assign(relative_table.tablet_iter_))) {
|
||||||
|
LOG_WARN("assign tablet iterator fail", K(ret));
|
||||||
|
}
|
||||||
for (int32_t i = 0; OB_SUCC(ret) && i < out_col_ids.count(); ++i) {
|
for (int32_t i = 0; OB_SUCC(ret) && i < out_col_ids.count(); ++i) {
|
||||||
int idx = OB_INVALID_INDEX;
|
int idx = OB_INVALID_INDEX;
|
||||||
if (OB_FAIL(schema_param->get_col_map().get(out_col_ids.at(i), idx))) {
|
if (OB_FAIL(schema_param->get_col_map().get(out_col_ids.at(i), idx))) {
|
||||||
|
|||||||
@ -38,8 +38,9 @@ ObTableStoreIterator::ObTableStoreIterator(const bool reverse, const bool need_l
|
|||||||
sstable_handle_array_.set_attr(ObMemAttr(MTL_ID(), "TblHdlArray"));
|
sstable_handle_array_.set_attr(ObMemAttr(MTL_ID(), "TblHdlArray"));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTableStoreIterator::operator=(const ObTableStoreIterator& other)
|
int ObTableStoreIterator::assign(const ObTableStoreIterator& other)
|
||||||
{
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
need_load_sstable_ = other.need_load_sstable_;
|
need_load_sstable_ = other.need_load_sstable_;
|
||||||
if (other.table_store_handle_.is_valid()) {
|
if (other.table_store_handle_.is_valid()) {
|
||||||
@ -47,23 +48,42 @@ void ObTableStoreIterator::operator=(const ObTableStoreIterator& other)
|
|||||||
} else if (table_store_handle_.is_valid()) {
|
} else if (table_store_handle_.is_valid()) {
|
||||||
table_store_handle_.reset();
|
table_store_handle_.reset();
|
||||||
}
|
}
|
||||||
if (other.sstable_handle_array_.count() > 0) {
|
|
||||||
sstable_handle_array_ = other.sstable_handle_array_;
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (other.sstable_handle_array_.count() > 0) {
|
||||||
|
if (OB_FAIL(sstable_handle_array_.assign(other.sstable_handle_array_))) {
|
||||||
|
LOG_WARN("assign sstable handle array fail", K(ret));
|
||||||
|
}
|
||||||
} else if (sstable_handle_array_.count() > 0) {
|
} else if (sstable_handle_array_.count() > 0) {
|
||||||
sstable_handle_array_.reset();
|
sstable_handle_array_.reset();
|
||||||
}
|
}
|
||||||
table_ptr_array_ = other.table_ptr_array_;
|
|
||||||
pos_ = other.pos_;
|
if (OB_FAIL(ret)) {
|
||||||
step_ = other.step_;
|
} else if (OB_FAIL(table_ptr_array_.assign(other.table_ptr_array_))) {
|
||||||
memstore_retired_ = other.memstore_retired_;
|
LOG_WARN("assign table ptr array fail", K(ret));
|
||||||
if (OB_UNLIKELY(nullptr != other.transfer_src_table_store_handle_)) {
|
} else {
|
||||||
|
pos_ = other.pos_;
|
||||||
|
step_ = other.step_;
|
||||||
|
memstore_retired_ = other.memstore_retired_;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (OB_UNLIKELY(nullptr != other.transfer_src_table_store_handle_)) {
|
||||||
if (nullptr == transfer_src_table_store_handle_) {
|
if (nullptr == transfer_src_table_store_handle_) {
|
||||||
void *meta_hdl_buf = ob_malloc(sizeof(ObStorageMetaHandle), ObMemAttr(MTL_ID(), "TransferMetaH"));
|
void *meta_hdl_buf = ob_malloc(sizeof(ObStorageMetaHandle), ObMemAttr(MTL_ID(), "TransferMetaH"));
|
||||||
transfer_src_table_store_handle_ = new (meta_hdl_buf) ObStorageMetaHandle();
|
if (OB_ISNULL(meta_hdl_buf)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("fail to allocator memory for handle", K(ret));
|
||||||
|
} else {
|
||||||
|
transfer_src_table_store_handle_ = new (meta_hdl_buf) ObStorageMetaHandle();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
*transfer_src_table_store_handle_ = *(other.transfer_src_table_store_handle_);
|
||||||
}
|
}
|
||||||
*transfer_src_table_store_handle_ = *(other.transfer_src_table_store_handle_);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObTableStoreIterator::~ObTableStoreIterator()
|
ObTableStoreIterator::~ObTableStoreIterator()
|
||||||
|
|||||||
@ -26,7 +26,7 @@ class ObTableHandleV2;
|
|||||||
class ObSSTableArray;
|
class ObSSTableArray;
|
||||||
class ObMemtableArray;
|
class ObMemtableArray;
|
||||||
|
|
||||||
class ObTableStoreIterator
|
class ObTableStoreIterator final
|
||||||
{
|
{
|
||||||
// TODO: currently, we will load all related tables into memory on initializetion of iterator,
|
// TODO: currently, we will load all related tables into memory on initializetion of iterator,
|
||||||
// maybe we should init with sstable address and prefetch sstable on iteratring for more smooth memory usage
|
// maybe we should init with sstable address and prefetch sstable on iteratring for more smooth memory usage
|
||||||
@ -48,8 +48,7 @@ public:
|
|||||||
typedef common::ObSEArray<ObStorageMetaHandle, DEFAULT_TABLE_HANDLE_CNT> SSTableHandleArray;
|
typedef common::ObSEArray<ObStorageMetaHandle, DEFAULT_TABLE_HANDLE_CNT> SSTableHandleArray;
|
||||||
typedef common::ObSEArray<TablePtr, DEFAULT_TABLE_CNT> TableArray;
|
typedef common::ObSEArray<TablePtr, DEFAULT_TABLE_CNT> TableArray;
|
||||||
ObTableStoreIterator(const bool is_reverse = false, const bool need_load_sstable = true);
|
ObTableStoreIterator(const bool is_reverse = false, const bool need_load_sstable = true);
|
||||||
ObTableStoreIterator(const ObTableStoreIterator& other) { *this = other; } ;
|
int assign(const ObTableStoreIterator& other);
|
||||||
void operator=(const ObTableStoreIterator& other);
|
|
||||||
virtual ~ObTableStoreIterator();
|
virtual ~ObTableStoreIterator();
|
||||||
|
|
||||||
OB_INLINE bool is_valid() const { return table_ptr_array_.count() > 0; }
|
OB_INLINE bool is_valid() const { return table_ptr_array_.count() > 0; }
|
||||||
@ -96,6 +95,7 @@ private:
|
|||||||
int64_t step_;
|
int64_t step_;
|
||||||
bool * memstore_retired_;
|
bool * memstore_retired_;
|
||||||
ObStorageMetaHandle *transfer_src_table_store_handle_;
|
ObStorageMetaHandle *transfer_src_table_store_handle_;
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObTableStoreIterator);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user