fix incremental direct load no update storage schema
This commit is contained in:
parent
905a5e35fa
commit
a8be021dda
@ -37,6 +37,7 @@
|
||||
#include "storage/ddl/ob_direct_insert_sstable_ctx_new.h"
|
||||
#include "storage/column_store/ob_column_oriented_sstable.h"
|
||||
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
|
||||
#include "storage/ob_storage_schema_util.h"
|
||||
|
||||
using namespace oceanbase::observer;
|
||||
using namespace oceanbase::share::schema;
|
||||
@ -1392,6 +1393,12 @@ int ObTabletDDLUtil::compact_ddl_kv(
|
||||
}
|
||||
#endif
|
||||
|
||||
if (OB_SUCC(ret) && is_incremental_direct_load(ddl_param.direct_load_type_)) {
|
||||
if (OB_FAIL(update_storage_schema(tablet, ddl_param, arena, storage_schema, frozen_ddl_kvs))) {
|
||||
LOG_WARN("fail to update storage schema", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ddl_param.table_key_.is_co_sstable()) {
|
||||
if (OB_FAIL(compact_co_ddl_sstable(tablet, ddl_sstable_iter, frozen_ddl_kvs, ddl_param, storage_schema, allocator, compacted_cg_sstable_handles, compacted_sstable_handle))) {
|
||||
@ -1414,6 +1421,96 @@ int ObTabletDDLUtil::compact_ddl_kv(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int get_schema_info_from_ddl_kvs(
|
||||
ObTablet &tablet,
|
||||
const ObIArray<ObDDLKVHandle> &frozen_ddl_kvs,
|
||||
const int64_t column_cnt_in_schema,
|
||||
int64_t &max_column_cnt_in_memtable,
|
||||
int64_t &max_schema_version_in_memtable)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_column_cnt_on_recorder = 0;
|
||||
ObDDLKV *ddl_kv = nullptr;
|
||||
for (int i = frozen_ddl_kvs.count() - 1; OB_SUCC(ret) && i >= 0; --i) {
|
||||
const ObDDLKVHandle &ddl_kv_handle = frozen_ddl_kvs.at(i);
|
||||
if (OB_ISNULL(ddl_kv = ddl_kv_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected ddl kv is null", KR(ret), K(ddl_kv_handle));
|
||||
} else if (OB_FAIL(ddl_kv->get_schema_info(column_cnt_in_schema,
|
||||
max_schema_version_in_memtable,
|
||||
max_column_cnt_in_memtable))) {
|
||||
LOG_WARN("fail to get schema info from ddl kv", KR(ret), KPC(ddl_kv));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(tablet.get_max_column_cnt_on_schema_recorder(max_column_cnt_on_recorder))) {
|
||||
LOG_WARN("fail to get max column cnt on schema recorder", KR(ret));
|
||||
} else {
|
||||
max_column_cnt_in_memtable = MAX(max_column_cnt_in_memtable, max_column_cnt_on_recorder);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::update_storage_schema(
|
||||
ObTablet &tablet,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
ObArenaAllocator &allocator,
|
||||
ObStorageSchema *&storage_schema,
|
||||
const ObIArray<ObDDLKVHandle> &frozen_ddl_kvs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(nullptr == storage_schema || frozen_ddl_kvs.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(storage_schema), K(frozen_ddl_kvs));
|
||||
} else if (OB_UNLIKELY(!is_incremental_direct_load(ddl_param.direct_load_type_))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected not incremental direct load", KR(ret), K(ddl_param));
|
||||
} else {
|
||||
ObStorageSchema *schema_on_tablet = storage_schema;
|
||||
int64_t column_cnt_in_schema = 0;
|
||||
int64_t max_column_cnt_in_memtable = 0;
|
||||
int64_t max_schema_version_in_memtable = 0;
|
||||
bool column_info_simplified = false;
|
||||
if (OB_FAIL(schema_on_tablet->get_store_column_count(column_cnt_in_schema, true/*full_col*/))) {
|
||||
LOG_WARN("fail to get storage column count", KR(ret));
|
||||
} else if (OB_FAIL(get_schema_info_from_ddl_kvs(tablet,
|
||||
frozen_ddl_kvs,
|
||||
column_cnt_in_schema,
|
||||
max_column_cnt_in_memtable,
|
||||
max_schema_version_in_memtable))) {
|
||||
LOG_WARN("fail to get schema info from ddl kvs", KR(ret));
|
||||
} else if (FALSE_IT(column_info_simplified =
|
||||
max_column_cnt_in_memtable > column_cnt_in_schema)) {
|
||||
// can't get new added column info from memtable, need simplify column info
|
||||
} else if (column_info_simplified ||
|
||||
max_schema_version_in_memtable > schema_on_tablet->get_schema_version()) {
|
||||
// need alloc new storage schema & set column cnt
|
||||
ObStorageSchema *new_storage_schema = nullptr;
|
||||
if (OB_FAIL(ObStorageSchemaUtil::alloc_storage_schema(allocator, new_storage_schema))) {
|
||||
LOG_WARN("failed to alloc storage schema", K(ret));
|
||||
} else if (OB_FAIL(new_storage_schema->init(allocator, *schema_on_tablet, column_info_simplified))) {
|
||||
LOG_WARN("fail to init storage schema", K(ret), K(schema_on_tablet));
|
||||
ObStorageSchemaUtil::free_storage_schema(allocator, new_storage_schema);
|
||||
new_storage_schema = nullptr;
|
||||
} else {
|
||||
// only update column cnt by memtable, use schema version on tablet_schema
|
||||
new_storage_schema->column_cnt_ = MAX(new_storage_schema->column_cnt_, max_column_cnt_in_memtable);
|
||||
new_storage_schema->store_column_cnt_ = MAX(column_cnt_in_schema, max_column_cnt_in_memtable);
|
||||
new_storage_schema->schema_version_ = MAX(max_schema_version_in_memtable, schema_on_tablet->get_schema_version());
|
||||
storage_schema = new_storage_schema;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
FLOG_INFO("get storage schema to merge", KPC(storage_schema), KPC(schema_on_tablet),
|
||||
K(max_column_cnt_in_memtable), K(max_schema_version_in_memtable));
|
||||
if (schema_on_tablet != storage_schema) {
|
||||
ObStorageSchemaUtil::free_storage_schema(allocator, schema_on_tablet);
|
||||
schema_on_tablet = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int check_ddl_sstable_expired(const SCN &ddl_start_scn, ObTableStoreIterator &ddl_sstable_iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -155,6 +155,13 @@ public:
|
||||
common::ObArenaAllocator &allocator,
|
||||
ObTableHandleV2 &compacted_sstable_handle);
|
||||
|
||||
static int update_storage_schema(
|
||||
ObTablet &tablet,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
common::ObArenaAllocator &allocator,
|
||||
ObStorageSchema *&storage_schema,
|
||||
const ObIArray<ObDDLKVHandle> &frozen_ddl_kvs);
|
||||
|
||||
static int report_ddl_checksum(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
const uint64_t table_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user