when creating a tablet, replace table schema with storage schema in clog.

This commit is contained in:
obdev
2023-11-16 13:42:24 +00:00
committed by ob-robot
parent 729902b847
commit 129e7dd5ff
19 changed files with 573 additions and 261 deletions

View File

@ -7939,7 +7939,7 @@ bool ObBatchCreateTabletArg::is_inited() const
bool ObBatchCreateTabletArg::is_valid() const
{
bool valid = true;
if (is_inited() && tablets_.count() > 0 && table_schemas_.count() > 0) {
if (is_inited() && tablets_.count() > 0 && (create_tablet_schemas_.count() > 0 || table_schemas_.count() > 0)) {
for (int64_t i = 0; valid && i < tablets_.count(); ++i) {
const ObCreateTabletInfo &info = tablets_[i];
const common::ObSArray<int64_t> &table_schema_index = info.table_schema_index_;
@ -7949,7 +7949,7 @@ bool ObBatchCreateTabletArg::is_valid() const
for (int64_t j = 0; valid && j < table_schema_index.count(); ++j) {
const int64_t index = table_schema_index[j];
if (index < 0 || index >= table_schemas_.count()) {
if (index < 0 || (index >= create_tablet_schemas_.count() && index >= table_schemas_.count())) {
valid = false;
}
}
@ -7968,11 +7968,17 @@ void ObBatchCreateTabletArg::reset()
table_schemas_.reset();
need_check_tablet_cnt_ = false;
is_old_mds_ = false;
for (int64_t i = 0; i < create_tablet_schemas_.count(); ++i) {
create_tablet_schemas_[i]->~ObCreateTabletSchema();
}
create_tablet_schemas_.reset();
allocator_.reset();
}
int ObBatchCreateTabletArg::assign(const ObBatchCreateTabletArg &arg)
{
int ret = OB_SUCCESS;
const common::ObSArray<storage::ObCreateTabletSchema*> &create_tablet_schemas = arg.create_tablet_schemas_;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("arg is invalid", KR(ret), K(arg));
@ -7980,6 +7986,32 @@ int ObBatchCreateTabletArg::assign(const ObBatchCreateTabletArg &arg)
LOG_WARN("failed to assign tablets", KR(ret), K(arg));
} else if (OB_FAIL(table_schemas_.assign(arg.table_schemas_))) {
LOG_WARN("failed to assign table schema", KR(ret), K(arg));
} else if (OB_FAIL(create_tablet_schemas_.reserve(create_tablet_schemas.count()))) {
STORAGE_LOG(WARN, "Fail to reserve schema array", K(ret), K(create_tablet_schemas.count()));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < create_tablet_schemas.count(); ++i) {
if (OB_ISNULL(create_tablet_schemas[i])) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(i), KPC(this));
} else {
ObCreateTabletSchema *create_tablet_schema = NULL;
void *create_tablet_schema_ptr = allocator_.alloc(sizeof(ObCreateTabletSchema));
if (OB_ISNULL(create_tablet_schema_ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate storage schema", KR(ret));
} else if (FALSE_IT(create_tablet_schema = new (create_tablet_schema_ptr)ObCreateTabletSchema())) {
} else if (OB_FAIL(create_tablet_schema->init(allocator_, *create_tablet_schemas[i]))) {
create_tablet_schema->~ObCreateTabletSchema();
STORAGE_LOG(WARN,"Fail to init create_tablet_schema", K(ret));
} else if (OB_FAIL(create_tablet_schemas_.push_back(create_tablet_schema))) {
create_tablet_schema->~ObCreateTabletSchema();
STORAGE_LOG(WARN, "Fail to add schema", K(ret));
}
}
}
}
if (OB_FAIL(ret)) {
reset();
} else {
id_ = arg.id_;
major_frozen_scn_ = arg.major_frozen_scn_;
@ -8035,6 +8067,84 @@ int64_t ObBatchCreateTabletArg::get_tablet_count() const
return cnt;
}
int ObBatchCreateTabletArg::serialize_for_create_tablet_schemas(char *buf,
const int64_t data_len,
int64_t &pos) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::encode_vi64(buf, data_len, pos, create_tablet_schemas_.count()))) {
STORAGE_LOG(WARN, "failed to encode schema count", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < create_tablet_schemas_.count(); ++i) {
if (OB_ISNULL(create_tablet_schemas_.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null tx service ptr", KR(ret), K(i), KPC(this));
} else if (OB_FAIL(create_tablet_schemas_.at(i)->serialize(buf, data_len, pos))) {
STORAGE_LOG(WARN, "failed to serialize schema", K(ret));
}
}
return ret;
}
int64_t ObBatchCreateTabletArg::get_serialize_size_for_create_tablet_schemas() const
{
int ret = OB_SUCCESS;
int64_t len = 0;
len += serialization::encoded_length_vi64(create_tablet_schemas_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < create_tablet_schemas_.count(); ++i) {
if (OB_ISNULL(create_tablet_schemas_.at(i))) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("create_tablet_schema is NULL", KR(ret), K(i), KPC(this));
} else {
len += create_tablet_schemas_.at(i)->get_serialize_size();
}
}
return len;
}
int ObBatchCreateTabletArg::deserialize_create_tablet_schemas(const char *buf,
const int64_t data_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t count = 0;
if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0) || OB_UNLIKELY(pos > data_len)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(buf), K(data_len), K(pos), K(ret));
} else if (pos == data_len) {
//do nothing
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) {
STORAGE_LOG(WARN, "failed to decode schema count", K(ret));
} else if (count < 0) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "count invalid", KR(ret), K(buf), K(data_len), K(pos), K(count));
} else if (count == 0) {
STORAGE_LOG(INFO, "upgrade, count is 0", KR(ret), K(buf), K(data_len), K(pos), K(count));
} else if (OB_FAIL(create_tablet_schemas_.reserve(count))) {
STORAGE_LOG(WARN, "failed to reserve schema array", K(ret), K(count), K(buf), K(data_len), K(pos));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < count; ++i) {
ObCreateTabletSchema *create_tablet_schema = NULL;
void *create_tablet_schema_ptr = allocator_.alloc(sizeof(ObCreateTabletSchema));
if (OB_ISNULL(create_tablet_schema_ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate storage schema", KR(ret));
} else if (FALSE_IT(create_tablet_schema = new (create_tablet_schema_ptr)ObCreateTabletSchema())) {
} else if (OB_FAIL(create_tablet_schema->deserialize(allocator_, buf, data_len, pos))) {
create_tablet_schema->~ObCreateTabletSchema();
STORAGE_LOG(WARN,"failed to deserialize schema", K(ret), K(buf), K(data_len), K(pos));
} else if (OB_FAIL(create_tablet_schemas_.push_back(create_tablet_schema))) {
create_tablet_schema->~ObCreateTabletSchema();
STORAGE_LOG(WARN, "failed to add schema", K(ret));
}
}
if (OB_FAIL(ret)) {
reset();
}
}
return ret;
}
DEF_TO_STRING(ObBatchCreateTabletArg)
{
int64_t pos = 0;
@ -8046,6 +8156,10 @@ OB_DEF_SERIALIZE(ObBatchCreateTabletArg)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, id_, major_frozen_scn_, tablets_, table_schemas_, need_check_tablet_cnt_, is_old_mds_);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(serialize_for_create_tablet_schemas(buf, buf_len, pos))) {
LOG_WARN("failed to serialize_for_create_tablet_schemas", KR(ret), KPC(this));
}
return ret;
}
@ -8053,6 +8167,7 @@ OB_DEF_SERIALIZE_SIZE(ObBatchCreateTabletArg)
{
int len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, id_, major_frozen_scn_, tablets_, table_schemas_, need_check_tablet_cnt_, is_old_mds_);
len += get_serialize_size_for_create_tablet_schemas();
return len;
}
@ -8065,6 +8180,11 @@ OB_DEF_DESERIALIZE(ObBatchCreateTabletArg)
is_old_mds_ = true;
} else {
LST_DO_CODE(OB_UNIS_DECODE, is_old_mds_);
if (OB_FAIL(ret)) {
} else if (pos == data_len) {
} else if (OB_FAIL(deserialize_create_tablet_schemas(buf, data_len, pos))) {
LOG_WARN("failed to deserialize_for_create_tablet_schemas", KR(ret));
}
}
}
return ret;