reduce EncodePivot memory usage with lots of columns and few rows
This commit is contained in:
parent
cef829bb89
commit
960df4d65c
@ -313,7 +313,7 @@ struct ObColumnCSEncodingCtx
|
||||
int64_t dict_var_data_size_;
|
||||
int64_t fix_data_size_;
|
||||
int64_t max_string_size_;
|
||||
const ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_MIDDLE_BLOCK_SIZE> *col_datums_;
|
||||
const ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_NORMAL_BLOCK_SIZE> *col_datums_;
|
||||
ObEncodingHashTable *ht_;
|
||||
const ObMicroBlockEncodingCtx *encoding_ctx_;
|
||||
ObMicroBufferWriter *all_string_buf_writer_;
|
||||
|
@ -22,7 +22,7 @@ namespace blocksstable
|
||||
using namespace common;
|
||||
|
||||
const int64_t ObCSEncodingUtil::ENCODING_ROW_COUNT_THRESHOLD = 4;
|
||||
// limit by typedef ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_MIDDLE_BLOCK_SIZE> ObColDatums;
|
||||
// limit by typedef ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_NORMAL_BLOCK_SIZE> ObColDatums;
|
||||
const int64_t ObCSEncodingUtil::MAX_MICRO_BLOCK_ROW_CNT = 1L << 20; // 1M
|
||||
const int64_t ObCSEncodingUtil::DEFAULT_DATA_BUFFER_SIZE = common::OB_DEFAULT_MACRO_BLOCK_SIZE;
|
||||
const int64_t ObCSEncodingUtil::MAX_BLOCK_ENCODING_STORE_SIZE = 2 * DEFAULT_DATA_BUFFER_SIZE;
|
||||
|
@ -85,6 +85,7 @@ ObMicroBlockCSEncoder::ObMicroBlockCSEncoder()
|
||||
: allocator_("CSEncAlloc", OB_MALLOC_MIDDLE_BLOCK_SIZE),
|
||||
ctx_(), row_buf_holder_(), data_buffer_(), all_string_data_buffer_(),
|
||||
all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator("CSBlkEnc", MTL_ID())),
|
||||
pivot_allocator_(lib::ObMemAttr(MTL_ID(), blocksstable::OB_ENCODING_LABEL_PIVOT), OB_MALLOC_MIDDLE_BLOCK_SIZE),
|
||||
datum_row_offset_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator("CSBlkEnc", MTL_ID())),
|
||||
estimate_size_(0), estimate_size_limit_(0), all_headers_size_(0),
|
||||
expand_pct_(DEFAULT_ESTIMATE_REAL_SIZE_PCT),
|
||||
@ -186,9 +187,13 @@ void ObMicroBlockCSEncoder::reset()
|
||||
FOREACH(cv, all_col_datums_)
|
||||
{
|
||||
ObColDatums *p = *cv;
|
||||
OB_DELETE(ObColDatums, blocksstable::OB_ENCODING_LABEL_PIVOT, p);
|
||||
if (nullptr != p) {
|
||||
p->~ObColDatums();
|
||||
pivot_allocator_.free(p);
|
||||
}
|
||||
}
|
||||
all_col_datums_.reset();
|
||||
pivot_allocator_.reset();
|
||||
datum_row_offset_arr_.reset();
|
||||
estimate_size_ = 0;
|
||||
estimate_size_limit_ = 0;
|
||||
@ -217,6 +222,7 @@ void ObMicroBlockCSEncoder::reuse()
|
||||
{
|
||||
(*c)->reuse();
|
||||
}
|
||||
// pivot_allocator_ pivot array memory is cached until encoder reset()
|
||||
row_buf_holder_.reuse();
|
||||
all_string_data_buffer_.reuse();
|
||||
datum_row_offset_arr_.reuse();
|
||||
@ -289,13 +295,16 @@ int ObMicroBlockCSEncoder::init_all_col_values_(const ObMicroBlockEncodingCtx &c
|
||||
LOG_WARN("reserve array failed", K(ret), "size", ctx.column_cnt_);
|
||||
}
|
||||
for (int64_t i = all_col_datums_.count(); i < ctx.column_cnt_ && OB_SUCC(ret); ++i) {
|
||||
ObColDatums *c = OB_NEW(ObColDatums, blocksstable::OB_ENCODING_LABEL_PIVOT);
|
||||
ObColDatums *c = OB_NEWx(ObColDatums, &pivot_allocator_, pivot_allocator_);
|
||||
if (OB_ISNULL(c)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc memory failed", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(all_col_datums_.push_back(c))) {
|
||||
LOG_WARN("push back column values failed", K(ret));
|
||||
OB_DELETE(ObColDatums, blocksstable::OB_ENCODING_LABEL_PIVOT, c);
|
||||
if (nullptr != c) {
|
||||
c->~ObColDatums();
|
||||
pivot_allocator_.free(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -125,6 +125,7 @@ private:
|
||||
ObMicroBufferWriter all_string_data_buffer_;
|
||||
|
||||
common::ObArray<ObColDatums *> all_col_datums_;
|
||||
ObArenaAllocator pivot_allocator_;
|
||||
common::ObSEArray<uint32_t, 512> datum_row_offset_arr_;
|
||||
int64_t estimate_size_;
|
||||
int64_t estimate_size_limit_;
|
||||
|
@ -660,7 +660,11 @@ private:
|
||||
const static int64_t BLOCK_ITEM_CNT = BLOCK_SIZE / sizeof(T);
|
||||
const static int64_t MAX_BLOCK_CNT = (MAX_COUNT + BLOCK_ITEM_CNT - 1) / BLOCK_ITEM_CNT;
|
||||
public:
|
||||
ObPodFix2dArray() : size_(0) { MEMSET(block_list_, 0, sizeof(T *) * MAX_BLOCK_CNT); }
|
||||
explicit ObPodFix2dArray(ObIAllocator &allocator)
|
||||
: allocator_(&allocator), size_(0)
|
||||
{
|
||||
MEMSET(block_list_, 0, sizeof(T *) * MAX_BLOCK_CNT);
|
||||
}
|
||||
~ObPodFix2dArray() { destroy(); }
|
||||
|
||||
OB_INLINE int64_t count() const { return size_; }
|
||||
@ -702,7 +706,7 @@ public:
|
||||
if (NULL == block_list_[i]) {
|
||||
break;
|
||||
} else {
|
||||
common::ob_free(block_list_[i]);
|
||||
allocator_->free(block_list_[i]);
|
||||
block_list_[i] = NULL;
|
||||
}
|
||||
}
|
||||
@ -739,11 +743,10 @@ private:
|
||||
ret = common::OB_SIZE_OVERFLOW;
|
||||
STORAGE_LOG(WARN, "size will overflow", K(ret), K_(size), K(block_cnt), K(cur_cnt));
|
||||
} else {
|
||||
common::ObMemAttr ma(MTL_ID(), blocksstable::OB_ENCODING_LABEL_PIVOT);
|
||||
for (int64_t i = 0; i < block_cnt && OB_SUCC(ret); ++i) {
|
||||
if (NULL == block_list_[cur_cnt + i]) {
|
||||
T *block = static_cast<T *>(common::ob_malloc(BLOCK_SIZE, ma));
|
||||
if (NULL == block) {
|
||||
T *block = static_cast<T *>(allocator_->alloc(BLOCK_SIZE));
|
||||
if (OB_ISNULL(block)) {
|
||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "alloc block failed",
|
||||
K(ret), "block_size", static_cast<int64_t>(BLOCK_SIZE));
|
||||
@ -756,12 +759,12 @@ private:
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
ObIAllocator *allocator_;
|
||||
T *block_list_[MAX_BLOCK_CNT];
|
||||
int64_t size_;
|
||||
};
|
||||
|
||||
typedef ObPodFix2dArray<common::ObObj, 64 << 10, common::OB_MALLOC_MIDDLE_BLOCK_SIZE> ObColValues;
|
||||
typedef ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_MIDDLE_BLOCK_SIZE> ObColDatums;
|
||||
typedef ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_NORMAL_BLOCK_SIZE> ObColDatums;
|
||||
|
||||
class ObMapAttrOperator
|
||||
{
|
||||
|
@ -96,6 +96,7 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
|
||||
data_buffer_(),
|
||||
datum_rows_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
|
||||
all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
|
||||
pivot_allocator_(lib::ObMemAttr(MTL_ID(), blocksstable::OB_ENCODING_LABEL_PIVOT), OB_MALLOC_MIDDLE_BLOCK_SIZE),
|
||||
estimate_size_(0), estimate_size_limit_(0),
|
||||
header_size_(0), expand_pct_(DEFAULT_ESTIMATE_REAL_SIZE_PCT),
|
||||
row_buf_holder_(),
|
||||
@ -210,9 +211,13 @@ void ObMicroBlockEncoder::reset()
|
||||
datum_rows_.reset();
|
||||
FOREACH(cv, all_col_datums_) {
|
||||
ObColDatums *p = *cv;
|
||||
OB_DELETE(ObColDatums, blocksstable::OB_ENCODING_LABEL_PIVOT, p);
|
||||
if (nullptr != p) {
|
||||
p->~ObColDatums();
|
||||
pivot_allocator_.free(p);
|
||||
}
|
||||
}
|
||||
all_col_datums_.reset();
|
||||
pivot_allocator_.reset();
|
||||
estimate_size_ = 0;
|
||||
estimate_size_limit_ = 0;
|
||||
header_size_ = 0;
|
||||
@ -242,6 +247,7 @@ void ObMicroBlockEncoder::reuse()
|
||||
FOREACH(c, all_col_datums_) {
|
||||
(*c)->reuse();
|
||||
}
|
||||
// pivot_allocator_ pivot array memory is cached until encoder reset()
|
||||
row_buf_holder_.reuse();
|
||||
estimate_size_ = 0;
|
||||
// estimate_size_limit_
|
||||
@ -312,15 +318,17 @@ int ObMicroBlockEncoder::init_all_col_values(const ObMicroBlockEncodingCtx &ctx)
|
||||
if (OB_FAIL(all_col_datums_.reserve(ctx.column_cnt_))) {
|
||||
LOG_WARN("reserve array failed", K(ret), "size", ctx.column_cnt_);
|
||||
}
|
||||
lib::ObMemAttr attr(MTL_ID(), blocksstable::OB_ENCODING_LABEL_PIVOT);
|
||||
for (int64_t i = all_col_datums_.count(); i < ctx.column_cnt_ && OB_SUCC(ret); ++i) {
|
||||
ObColDatums *c = OB_NEW(ObColDatums, attr);
|
||||
ObColDatums *c = OB_NEWx(ObColDatums, &pivot_allocator_, pivot_allocator_);
|
||||
if (OB_ISNULL(c)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc memory failed", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(all_col_datums_.push_back(c))) {
|
||||
LOG_WARN("push back column values failed", K(ret));
|
||||
OB_DELETE(ObColDatums, attr, c);
|
||||
if (nullptr != c) {
|
||||
c->~ObColDatums();
|
||||
pivot_allocator_.free(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -154,6 +154,7 @@ private:
|
||||
ObMicroBufferWriter data_buffer_;
|
||||
ObConstDatumRowArray datum_rows_;
|
||||
common::ObArray<ObColDatums *> all_col_datums_;
|
||||
ObArenaAllocator pivot_allocator_;
|
||||
int64_t estimate_size_;
|
||||
int64_t estimate_size_limit_;
|
||||
int64_t header_size_;
|
||||
|
@ -495,7 +495,7 @@ struct ObColumnEncodingCtx
|
||||
int64_t fix_data_size_;
|
||||
int64_t max_string_size_;
|
||||
int64_t extend_value_bit_;
|
||||
const ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_MIDDLE_BLOCK_SIZE> *col_datums_;
|
||||
const ObPodFix2dArray<ObDatum, 1 << 20, common::OB_MALLOC_NORMAL_BLOCK_SIZE> *col_datums_;
|
||||
ObEncodingHashTable *ht_;
|
||||
ObMultiPrefixTree *prefix_tree_;
|
||||
const ObMicroBlockEncodingCtx *encoding_ctx_;
|
||||
|
@ -283,7 +283,7 @@ public:
|
||||
K(loop), K(sizeof(T)), K(min), K(max), K(mon), K(use_null_replace_ref), K(actual_uint_width));
|
||||
ObIntegerStreamEncoderCtx ctx;
|
||||
ObCSEncodingOpt encoding_opt;
|
||||
ObArenaAllocator alloctor;
|
||||
ObArenaAllocator allocator;
|
||||
const ObCompressorType compress_type = ObCompressorType::ZSTD_1_3_8_COMPRESSOR;
|
||||
ctx.meta_.width_ = actual_uint_width;
|
||||
ctx.meta_.type_ = type;
|
||||
@ -302,10 +302,10 @@ public:
|
||||
if (ctx.meta_.is_use_null_replace_value()) {
|
||||
ctx.meta_.set_null_replaced_value(null_replace_value);
|
||||
}
|
||||
ctx.build_stream_encoder_info(has_null, false, &encoding_opt, nullptr, -1, compress_type, &alloctor);
|
||||
ctx.build_stream_encoder_info(has_null, false, &encoding_opt, nullptr, -1, compress_type, &allocator);
|
||||
|
||||
ObIntegerStreamEncoder encoder;
|
||||
ObColDatums *datums = new ObColDatums();
|
||||
ObColDatums *datums = new ObColDatums(allocator);
|
||||
datums->reserve(1 << 20);
|
||||
generate_datums<T>(datums, size, has_null, min, max, mon);
|
||||
int64_t bitmap_size = pad8(size);
|
||||
|
@ -122,6 +122,7 @@ public:
|
||||
{
|
||||
LOG_INFO("test_and_check_string_encoding", K(size), K(type), K(use_zero_len_as_null), K(has_null), K(is_fix_len),
|
||||
K(use_nullbitmap), K(all_null), K(half_null_half_empty), K(use_null_replaced_ref));
|
||||
ObArenaAllocator local_arena;
|
||||
ObStringStreamEncoderCtx ctx;
|
||||
ObCSEncodingOpt encoding_opt;
|
||||
bool is_use_zero_len_as_null = use_zero_len_as_null;
|
||||
@ -140,7 +141,7 @@ public:
|
||||
|
||||
ObStringStreamEncoder encoder;
|
||||
uint32_t *data = nullptr;
|
||||
ObColDatums *datums = new ObColDatums();
|
||||
ObColDatums *datums = new ObColDatums(local_arena);
|
||||
ASSERT_EQ(OB_SUCCESS, datums->resize(max_count));
|
||||
datums->reuse();
|
||||
if (half_null_half_empty) {
|
||||
@ -203,7 +204,7 @@ public:
|
||||
str_data.set(all_string_writer.data(), all_string_writer.length());
|
||||
|
||||
// 3. decode str
|
||||
ObColDatums *datums2 = new ObColDatums();
|
||||
ObColDatums *datums2 = new ObColDatums(local_arena);
|
||||
ASSERT_EQ(OB_SUCCESS, datums2->resize(max_count));
|
||||
datums2->reuse();
|
||||
for (int64_t i = 0; i < size; i++) {
|
||||
|
@ -60,8 +60,9 @@ TEST(ObMultiDimArray_T, timestamp_with_time_zone)
|
||||
{
|
||||
// for timestamp with time zone, we need use binary_equal to build encoding hash table.
|
||||
ObEncodingHashTableBuilder hash_builder;
|
||||
ObArenaAllocator local_arena;
|
||||
ASSERT_EQ(OB_SUCCESS, hash_builder.create(8, 8));
|
||||
ObColDatums time_arr;
|
||||
ObColDatums time_arr(local_arena);
|
||||
ObStorageDatum t1, t2;
|
||||
|
||||
ObTimeZoneInfo time_zone_info1, time_zone_info2;
|
||||
|
Loading…
x
Reference in New Issue
Block a user