优化建表路径中partition相关代码以减小临时内存

This commit is contained in:
obdev 2023-08-04 08:18:10 +00:00 committed by ob-robot
parent 0e7a1768db
commit 600aa34ecc
4 changed files with 62 additions and 32 deletions

View File

@ -182,11 +182,15 @@ int ObBatchCreateTabletHelper::try_add_table_schema(
} else if(OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
index = arg_.table_schemas_.count();
if (OB_FAIL(arg_.table_schemas_.push_back(*table_schema))) {
LOG_WARN("failed to push back table schema", KR(ret), KPC(table_schema));
} else if (FALSE_IT(arg_.table_schemas_.at(index).reset_partition_schema())) {
} else if (OB_FAIL(table_schemas_map_.set_refactored(table_schema->get_table_id(), index))) {
LOG_WARN("failed to set table schema map", KR(ret), K(index), KPC(table_schema));
HEAP_VAR(ObTableSchema, temp_table_schema) {
if (OB_FAIL(temp_table_schema.assign(*table_schema))) {
LOG_WARN("failed to assign temp_table_schema", KR(ret), KPC(table_schema));
} else if (FALSE_IT(temp_table_schema.reset_partition_schema())) {
} else if (OB_FAIL(arg_.table_schemas_.push_back(temp_table_schema))) {
LOG_WARN("failed to push back table schema", KR(ret), K(temp_table_schema));
} else if (OB_FAIL(table_schemas_map_.set_refactored(temp_table_schema.get_table_id(), index))) {
LOG_WARN("failed to set table schema map", KR(ret), K(index), K(temp_table_schema));
}
}
} else {
LOG_WARN("failed to find table schema in map", KR(ret), KP(table_schema));

View File

@ -1453,6 +1453,24 @@ void ObSchema::reset()
arena->reuse();
}
}
template <class T>
int ObSchema::preserve_array(T** &array, int64_t &array_capacity, const int64_t &preserved_capacity) {
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 >= preserved_capacity)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("preserved capacity should greater than 0", KR(ret), K(preserved_capacity));
} else if (OB_NOT_NULL(array) || OB_UNLIKELY(0 != array_capacity)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support to preserve when array is not null or capacity is not zero", KR(ret), KP(array), K(array_capacity));
} else if (OB_ISNULL(array = static_cast<T**>(alloc(sizeof(T*) * preserved_capacity)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for partition arrary", KR(ret));
} else {
array_capacity = preserved_capacity;
}
return ret;
}
common::ObCollationType ObSchema::get_cs_type_with_cmp_mode(const ObNameCaseMode mode)
{
common::ObCollationType cs_type = common::CS_TYPE_INVALID;
@ -3038,16 +3056,11 @@ int ObPartitionSchema::assign_partition_schema(const ObPartitionSchema &src_sche
if (OB_SUCC(ret)) { \
int64_t partition_num = src_schema.PART_NAME##_num_; \
if (partition_num > 0) { \
PART_NAME##_array_ = static_cast<ObPartition **>( \
alloc(sizeof(ObPartition *) * partition_num)); \
if (OB_ISNULL(PART_NAME##_array_)) { \
ret = OB_ALLOCATE_MEMORY_FAILED; \
LOG_ERROR("Fail to allocate memory for "#PART_NAME"_array_", K(ret)); \
if (OB_FAIL(preserve_array(PART_NAME##_array_, PART_NAME##_array_capacity_, partition_num))) { \
LOG_WARN("Fail to preserve "#PART_NAME" array", KR(ret), KP(PART_NAME##_array_), K(PART_NAME##_array_capacity_), K(partition_num)); \
} else if (OB_ISNULL(src_schema.PART_NAME##_array_)) { \
ret = OB_ERR_UNEXPECTED; \
LOG_WARN("src_schema."#PART_NAME"_array_ is null", K(ret)); \
} else { \
PART_NAME##_array_capacity_ = partition_num; \
} \
} \
ObPartition *partition = NULL; \
@ -3068,16 +3081,11 @@ int ObPartitionSchema::assign_partition_schema(const ObPartitionSchema &src_sche
if (OB_SUCC(ret)) {
int64_t def_subpartition_num = src_schema.def_subpartition_num_;
if (def_subpartition_num > 0) {
def_subpartition_array_ = static_cast<ObSubPartition **>(
alloc(sizeof(ObSubPartition *) * def_subpartition_num));
if (OB_ISNULL(def_subpartition_array_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("Fail to allocate memory for def_subpartition_array_", K(ret), K(def_subpartition_num));
if(OB_FAIL(preserve_array(def_subpartition_array_, def_subpartition_array_capacity_, def_subpartition_num))) {
LOG_WARN("fail to preserve def_subpartition_array", KR(ret), KP(def_subpartition_array_), K(def_subpartition_array_capacity_), K(def_subpartition_num));
} else if (OB_ISNULL(src_schema.def_subpartition_array_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("src_schema.def_subpartition_array_ is null", K(ret));
} else {
def_subpartition_array_capacity_ = def_subpartition_num;
}
}
ObSubPartition *subpartition = NULL;
@ -3223,14 +3231,17 @@ int ObPartitionSchema::try_generate_hash_part()
bool is_oracle_mode = false;
const int64_t BUF_SIZE = OB_MAX_PARTITION_NAME_LENGTH;
char buf[BUF_SIZE];
if (get_first_part_num() <= 0) {
const int64_t &first_part_num = get_first_part_num();
if (OB_UNLIKELY(first_part_num <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("part_option is invalid", KR(ret), KPC(this));
} else if (OB_FAIL(check_if_oracle_compat_mode(is_oracle_mode))) {
LOG_WARN("fail to check if oracle mode", KR(ret), KPC(this));
} else if (OB_FAIL(preserve_array(partition_array_, partition_array_capacity_, first_part_num))) {
LOG_WARN("fail to preserve partition array", KR(ret), KP(partition_array_), K(partition_array_capacity_), K(first_part_num));
} else {
ObPartition part;
for (int64_t i = 0; OB_SUCC(ret) && i < get_first_part_num(); i++) {
for (int64_t i = 0; OB_SUCC(ret) && i < first_part_num; i++) {
ObString part_name;
part.reset();
MEMSET(buf, 0, BUF_SIZE);
@ -3285,6 +3296,9 @@ int ObPartitionSchema::try_generate_hash_subpart(bool &generated)
ObSubPartition subpart;
// 1. try generate def_sub_part_array()
if (OB_ISNULL(get_def_subpart_array())) {
if (OB_FAIL(preserve_array(def_subpartition_array_, def_subpartition_array_capacity_, def_subpart_num))) {
LOG_WARN("fail to preserve def subpartition array", KR(ret), KP(def_subpartition_array_), K(def_subpartition_array_capacity_), K(def_subpart_num));
}
for (int64_t j = 0; j < def_subpart_num && OB_SUCC(ret); j++) {
MEMSET(buf, 0, BUF_SIZE);
ObString sub_part_name;
@ -3348,6 +3362,8 @@ int ObPartitionSchema::try_generate_subpart_by_template(bool &generated)
} else if (part->get_subpartition_num() > 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("subpartition num should be 0", KR(ret), KPC(part));
} else if (OB_FAIL(part->preserve_subpartition(def_subpart_num))) {
LOG_WARN("fail to preserve subpartition", KR(ret), K(def_subpart_num));
} else {
part->set_sub_part_num(def_subpart_num);
for (int64_t j = 0; j < def_subpart_num && OB_SUCC(ret); j++) {
@ -5198,7 +5214,7 @@ ObBasePartition::ObBasePartition(common::ObIAllocator *allocator)
schema_version_(OB_INVALID_VERSION), name_(),
high_bound_val_(),
schema_allocator_(*allocator),
list_row_values_(common::OB_MALLOC_NORMAL_BLOCK_SIZE,
list_row_values_(SCHEMA_MALLOC_BLOCK_SIZE,
common::ModulePageAllocator(schema_allocator_)),
status_(PARTITION_STATUS_ACTIVE),
projector_(NULL),
@ -5767,17 +5783,11 @@ int ObPartition::assign(const ObPartition & src_part)
#define ASSIGN_SUBPARTITION_ARRAY(SUBPART_NAME) \
if (OB_SUCC(ret) && src_part.SUBPART_NAME##_num_ > 0) { \
int64_t subpartition_num = src_part.SUBPART_NAME##_num_; \
SUBPART_NAME##_array_ = static_cast<ObSubPartition **>( \
alloc(sizeof(ObSubPartition *) * subpartition_num)); \
if (OB_ISNULL(SUBPART_NAME##_array_)) { \
ret = OB_ALLOCATE_MEMORY_FAILED; \
LOG_ERROR("Fail to allocate memory for "#SUBPART_NAME"_array_", \
KR(ret), K(subpartition_num)); \
if (OB_FAIL(preserve_array(SUBPART_NAME##_array_, SUBPART_NAME##_array_capacity_, subpartition_num))) { \
LOG_WARN("Fail to preserve "#SUBPART_NAME" array", KR(ret), KP(SUBPART_NAME##_array_), K(SUBPART_NAME##_array_capacity_), K(subpartition_num)); \
} else if (OB_ISNULL(src_part.SUBPART_NAME##_array_)) { \
ret = OB_ERR_UNEXPECTED; \
LOG_WARN(#SUBPART_NAME"_array_ is null", KR(ret)); \
} else { \
SUBPART_NAME##_array_capacity_ = subpartition_num; \
} \
ObSubPartition *subpartition = NULL; \
for (int64_t i = 0; OB_SUCC(ret) && i < subpartition_num; i++) { \
@ -5992,6 +6002,14 @@ int ObPartition::get_max_sub_part_idx(int64_t &sub_part_idx) const
return ret;
}
int ObPartition::preserve_subpartition(const int64_t &capacity) {
int ret = OB_SUCCESS;
if (OB_FAIL(preserve_array(subpartition_array_, subpartition_array_capacity_, capacity))) {
LOG_WARN("fail to preserve subpartition array", KR(ret), KP(subpartition_array_), K(subpartition_array_capacity_), K(capacity));
}
return ret;
}
ObSubPartition::ObSubPartition()
: ObBasePartition()
{

View File

@ -1196,6 +1196,7 @@ public:
inline int get_err_ret() const { return error_ret_; }
protected:
static const int64_t STRING_ARRAY_EXTEND_CNT = 7;
static const int64_t SCHEMA_MALLOC_BLOCK_SIZE = 128;
void *alloc(const int64_t size);
void free(void *ptr);
int deep_copy_str(const char *src, common::ObString &dest);
@ -1214,6 +1215,8 @@ protected:
void reset_string(common::ObString &str);
void reset_string_array(common::ObArrayHelper<common::ObString> &str_array);
const char *extract_str(const common::ObString &str) const;
template <class T>
int preserve_array(T** &array, int64_t &array_capacity, const int64_t &preserved_capacity);
// buffer is the memory used to store schema item, if not same with this pointer,
// it means that this schema item has already been rewrote to buffer when rewriting
// other schema manager, and when we want to rewrite this schema item in current schema
@ -2077,6 +2080,7 @@ public:
int add_partition(const ObSubPartition &subpartition);
ObSubPartition **get_hidden_subpart_array() const { return hidden_subpartition_array_; }
int64_t get_hidden_subpartition_num() const { return hidden_subpartition_num_; }
int preserve_subpartition(const int64_t &capacity);
INHERIT_TO_STRING_KV(
"BasePartition", ObBasePartition,

View File

@ -6392,7 +6392,9 @@ OB_DEF_DESERIALIZE(ObTableSchema)
ObRowkey rowkey;
rowkey.assign(obj_array, ROW_KEY_CNT);
LST_DO_CODE(OB_UNIS_DECODE, rowkey);
if (FAILEDx(rowkey.deserialize(buf, data_len, pos, true))) {
LOG_WARN("fail to deserialize transintion point rowkey", KR(ret));
}
if (OB_FAIL(ret)) {
LOG_WARN("Fail to deserialize data, ", K(ret));
@ -6402,7 +6404,9 @@ OB_DEF_DESERIALIZE(ObTableSchema)
obj_array[0].reset();
rowkey.assign(obj_array, ROW_KEY_CNT);
LST_DO_CODE(OB_UNIS_DECODE, rowkey);
if (FAILEDx(rowkey.deserialize(buf, data_len, pos, true))) {
LOG_WARN("fail to deserialize interval range rowkey", KR(ret));
}
if (OB_FAIL(ret)) {
LOG_WARN("Fail to deserialize data, ", K(ret));
} else if (OB_FAIL(set_interval_range(rowkey))) {