swap tablet in compaction and destroy memtabls array & fix wrong allocator in multi_source_data
This commit is contained in:
@ -446,7 +446,8 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableStoreIterator table_iter;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id;
|
||||
medium_info.cluster_id_ = GCONF.cluster_id; // set cluster id
|
||||
|
||||
if (medium_info.is_major_compaction()) {
|
||||
// get table schema
|
||||
if (OB_UNLIKELY(result.schema_version_ <= 0)) {
|
||||
@ -457,14 +458,35 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info(
|
||||
LOG_WARN("failed to get table schema", K(ret), KPC(this), K(medium_info));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(medium_info.save_storage_schema(allocator_, tablet_.get_storage_schema()))) {
|
||||
LOG_WARN("failed to save storage schema", K(ret), K(tablet_.get_storage_schema()));
|
||||
}
|
||||
} else {
|
||||
ObStorageSchema tmp_storage_schema;
|
||||
bool use_storage_schema_on_tablet = true;
|
||||
if (medium_info.medium_snapshot_ > tablet_.get_snapshot_version()) {
|
||||
ObSEArray<ObITable*, MAX_MEMSTORE_CNT> memtables;
|
||||
if (OB_FAIL(tablet_.get_table_store().get_memtables(memtables, true/*need_active*/))) {
|
||||
LOG_WARN("failed to get memtables", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable(
|
||||
allocator_, memtables, tmp_storage_schema))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // clear errno
|
||||
} else {
|
||||
LOG_WARN("failed to get storage schema from memtable", K(ret));
|
||||
}
|
||||
} else {
|
||||
use_storage_schema_on_tablet = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (FAILEDx(medium_info.save_storage_schema(
|
||||
allocator_,
|
||||
use_storage_schema_on_tablet ? tablet_.get_storage_schema() : tmp_storage_schema))) {
|
||||
LOG_WARN("failed to save storage schema", K(ret), K(use_storage_schema_on_tablet), K(tmp_storage_schema));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(init_parallel_range(result, medium_info))) {
|
||||
LOG_WARN("failed to init parallel range", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_INFO("success to init parallel range", K(ret), K(medium_info));
|
||||
LOG_INFO("success to prepare medium info", K(ret), K(medium_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -855,5 +877,38 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::get_latest_storage_schema_from_memtable(
|
||||
ObIAllocator &allocator,
|
||||
const ObIArray<ObITable *> &memtables,
|
||||
ObStorageSchema &storage_schema)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObITable *table = nullptr;
|
||||
memtable::ObMemtable * memtable = nullptr;
|
||||
bool found = false;
|
||||
for (int64_t i = memtables.count() - 1; OB_SUCC(ret) && i >= 0; --i) {
|
||||
if (OB_ISNULL(table = memtables.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table in tables_handle is invalid", K(ret), KPC(table));
|
||||
} else if (OB_ISNULL(memtable = dynamic_cast<memtable::ObMemtable *>(table))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table pointer does not point to a ObMemtable object", KPC(table));
|
||||
} else if (OB_FAIL(memtable->get_multi_source_data_unit(&storage_schema, &allocator))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // clear OB_ENTRY_NOT_EXIST
|
||||
} else {
|
||||
LOG_WARN("failed to get storage schema from memtable", K(ret), KPC(table));
|
||||
}
|
||||
} else {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
} // end for
|
||||
if (OB_SUCC(ret) && !found) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} //namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user