Separate thread local decoder vars from sys to own tenant.
This commit is contained in:
@ -47,7 +47,7 @@ template <typename EncodingItem>
|
|||||||
struct ObEncodingPool
|
struct ObEncodingPool
|
||||||
{
|
{
|
||||||
static const int64_t MAX_FREE_ITEM_CNT = 64;
|
static const int64_t MAX_FREE_ITEM_CNT = 64;
|
||||||
ObEncodingPool(const int64_t item_size, const char *label);
|
ObEncodingPool(const int64_t item_size, const ObMemAttr &attr);
|
||||||
~ObEncodingPool();
|
~ObEncodingPool();
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -67,7 +67,7 @@ class ObEncodingAllocator
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
typedef ObEncodingPool<EncodingItem> Pool;
|
typedef ObEncodingPool<EncodingItem> Pool;
|
||||||
ObEncodingAllocator(const int64_t *size_array, const char *label);
|
ObEncodingAllocator(const int64_t *size_array, const ObMemAttr &attr);
|
||||||
virtual ~ObEncodingAllocator() {}
|
virtual ~ObEncodingAllocator() {}
|
||||||
int init();
|
int init();
|
||||||
bool is_inited() const { return inited_; }
|
bool is_inited() const { return inited_; }
|
||||||
@ -104,11 +104,9 @@ public:
|
|||||||
allocator_(nullptr)
|
allocator_(nullptr)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
lib::ObMemAttr attr;
|
lib::ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderAlloc");
|
||||||
attr.label_ = "encoding_alloc";
|
|
||||||
SET_USE_500(attr);
|
|
||||||
if (nullptr == (allocator_ = OB_NEW(ObDecoderAllocator, attr,
|
if (nullptr == (allocator_ = OB_NEW(ObDecoderAllocator, attr,
|
||||||
decoder_sizes, "encoding_alloc"))) {
|
decoder_sizes, attr))) {
|
||||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||||
STORAGE_LOG(WARN, "allocate ObDecoderAllocator failed", K(ret));
|
STORAGE_LOG(WARN, "allocate ObDecoderAllocator failed", K(ret));
|
||||||
} else if (OB_FAIL(allocator_->init())) {
|
} else if (OB_FAIL(allocator_->init())) {
|
||||||
|
|||||||
@ -11,10 +11,10 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
template <typename EncodingItem>
|
template <typename EncodingItem>
|
||||||
ObEncodingPool<EncodingItem>::ObEncodingPool(const int64_t item_size, const char *label)
|
ObEncodingPool<EncodingItem>::ObEncodingPool(const int64_t item_size, const ObMemAttr &attr)
|
||||||
: free_items_(), free_cnt_(0),
|
: free_items_(), free_cnt_(0),
|
||||||
pool_(item_size, common::OB_MALLOC_NORMAL_BLOCK_SIZE,
|
pool_(item_size, common::OB_MALLOC_NORMAL_BLOCK_SIZE,
|
||||||
common::ObMalloc(SET_USE_500(label)))
|
common::ObMalloc(attr))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,19 +66,19 @@ inline void ObEncodingPool<EncodingItem>::free(EncodingItem *item)
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename EncodingItem>
|
template <typename EncodingItem>
|
||||||
ObEncodingAllocator<EncodingItem>::ObEncodingAllocator(const int64_t *size_array, const char *label)
|
ObEncodingAllocator<EncodingItem>::ObEncodingAllocator(const int64_t *size_array, const ObMemAttr &attr)
|
||||||
: inited_(false),
|
: inited_(false),
|
||||||
size_index_(0),
|
size_index_(0),
|
||||||
raw_pool_(size_array[size_index_++], label),
|
raw_pool_(size_array[size_index_++], attr),
|
||||||
dict_pool_(size_array[size_index_++], label),
|
dict_pool_(size_array[size_index_++], attr),
|
||||||
rle_pool_(size_array[size_index_++], label),
|
rle_pool_(size_array[size_index_++], attr),
|
||||||
const_pool_(size_array[size_index_++], label),
|
const_pool_(size_array[size_index_++], attr),
|
||||||
int_diff_pool_(size_array[size_index_++], label),
|
int_diff_pool_(size_array[size_index_++], attr),
|
||||||
str_diff_pool_(size_array[size_index_++], label),
|
str_diff_pool_(size_array[size_index_++], attr),
|
||||||
hex_str_pool_(size_array[size_index_++], label),
|
hex_str_pool_(size_array[size_index_++], attr),
|
||||||
str_prefix_pool_(size_array[size_index_++], label),
|
str_prefix_pool_(size_array[size_index_++], attr),
|
||||||
column_equal_pool_(size_array[size_index_++], label),
|
column_equal_pool_(size_array[size_index_++], attr),
|
||||||
column_substr_pool_(size_array[size_index_++], label),
|
column_substr_pool_(size_array[size_index_++], attr),
|
||||||
pool_cnt_(0)
|
pool_cnt_(0)
|
||||||
{
|
{
|
||||||
for (int64_t i = 0; i < ObColumnHeader::MAX_TYPE; i++) {
|
for (int64_t i = 0; i < ObColumnHeader::MAX_TYPE; i++) {
|
||||||
|
|||||||
@ -72,13 +72,15 @@ public:
|
|||||||
typedef ObColumnDecoderCtx ObDecoderCtx;
|
typedef ObColumnDecoderCtx ObDecoderCtx;
|
||||||
ObDecoderCtxArray() : ctxs_()
|
ObDecoderCtxArray() : ctxs_()
|
||||||
{
|
{
|
||||||
ctxs_.set_attr(SET_USE_500("DecoderCtxArray"));
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtxArr");
|
||||||
|
ctxs_.set_attr(attr);
|
||||||
};
|
};
|
||||||
virtual ~ObDecoderCtxArray()
|
virtual ~ObDecoderCtxArray()
|
||||||
{
|
{
|
||||||
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtx");
|
||||||
FOREACH(it, ctxs_) {
|
FOREACH(it, ctxs_) {
|
||||||
ObDecoderCtx *c = *it;
|
ObDecoderCtx *c = *it;
|
||||||
OB_DELETE(ObDecoderCtx, "TLDecoderCtx", c);
|
OB_DELETE(ObDecoderCtx, attr, c);
|
||||||
}
|
}
|
||||||
ctxs_.reset();
|
ctxs_.reset();
|
||||||
}
|
}
|
||||||
@ -93,15 +95,16 @@ public:
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret));
|
LOG_WARN("invalid argument", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtx");
|
||||||
if (ctxs_.count() < size) {
|
if (ctxs_.count() < size) {
|
||||||
for (int64_t i = ctxs_.count(); OB_SUCC(ret) && i < size; ++i) {
|
for (int64_t i = ctxs_.count(); OB_SUCC(ret) && i < size; ++i) {
|
||||||
ObDecoderCtx *ctx = OB_NEW(ObDecoderCtx, "TLDecoderCtx");
|
ObDecoderCtx *ctx = OB_NEW(ObDecoderCtx, attr);
|
||||||
if (NULL == ctx) {
|
if (NULL == ctx) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("alloc memory failed", K(ret));
|
LOG_WARN("alloc memory failed", K(ret));
|
||||||
} else if (OB_FAIL(ctxs_.push_back(ctx))) {
|
} else if (OB_FAIL(ctxs_.push_back(ctx))) {
|
||||||
LOG_WARN("array push back failed", K(ret));
|
LOG_WARN("array push back failed", K(ret));
|
||||||
OB_DELETE(ObDecoderCtx, "TLDecoderCtx", ctx);
|
OB_DELETE(ObDecoderCtx, attr, ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
@ -129,14 +132,16 @@ class ObTLDecoderCtxArray
|
|||||||
public:
|
public:
|
||||||
ObTLDecoderCtxArray() : ctxs_array_()
|
ObTLDecoderCtxArray() : ctxs_array_()
|
||||||
{
|
{
|
||||||
ctxs_array_.set_attr(SET_USE_500("TLDecoderCtxArr"));
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtxArr");
|
||||||
|
ctxs_array_.set_attr(attr);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~ObTLDecoderCtxArray()
|
virtual ~ObTLDecoderCtxArray()
|
||||||
{
|
{
|
||||||
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtx");
|
||||||
FOREACH(it, ctxs_array_) {
|
FOREACH(it, ctxs_array_) {
|
||||||
ObDecoderCtxArray *ctxs = *it;
|
ObDecoderCtxArray *ctxs = *it;
|
||||||
OB_DELETE(ObDecoderCtxArray, "TLDecoderCtx", ctxs);
|
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,7 +154,8 @@ public:
|
|||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("NULL instance", K(ret));
|
LOG_WARN("NULL instance", K(ret));
|
||||||
} else if (tl_array->ctxs_array_.empty()) {
|
} else if (tl_array->ctxs_array_.empty()) {
|
||||||
ctxs = OB_NEW(ObDecoderCtxArray, "TLDecoderCtx");
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtx");
|
||||||
|
ctxs = OB_NEW(ObDecoderCtxArray, attr);
|
||||||
if (NULL == ctxs) {
|
if (NULL == ctxs) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("alloc memory failed", K(ret));
|
LOG_WARN("alloc memory failed", K(ret));
|
||||||
@ -164,6 +170,7 @@ public:
|
|||||||
static void free(ObDecoderCtxArray *ctxs)
|
static void free(ObDecoderCtxArray *ctxs)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObMemAttr attr(ob_thread_tenant_id(), "TLDecoderCtx");
|
||||||
ObTLDecoderCtxArray *tl_array = instance();
|
ObTLDecoderCtxArray *tl_array = instance();
|
||||||
if (NULL == tl_array) {
|
if (NULL == tl_array) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
@ -172,10 +179,10 @@ public:
|
|||||||
// do nothing
|
// do nothing
|
||||||
} else if (tl_array->ctxs_array_.count() >= MAX_ARRAY_CNT) {
|
} else if (tl_array->ctxs_array_.count() >= MAX_ARRAY_CNT) {
|
||||||
// reach the threshold, release memory
|
// reach the threshold, release memory
|
||||||
OB_DELETE(ObDecoderCtxArray, "TLDecoderCtx", ctxs);
|
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
|
||||||
} else if (OB_FAIL(tl_array->ctxs_array_.push_back(ctxs))) {
|
} else if (OB_FAIL(tl_array->ctxs_array_.push_back(ctxs))) {
|
||||||
LOG_WARN("array push back failed", K(ret));
|
LOG_WARN("array push back failed", K(ret));
|
||||||
OB_DELETE(ObDecoderCtxArray, "TLDecoderCtx", ctxs);
|
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -97,7 +97,7 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
|
|||||||
buffered_rows_checksum_(0), estimate_size_(0), estimate_size_limit_(0),
|
buffered_rows_checksum_(0), estimate_size_(0), estimate_size_limit_(0),
|
||||||
header_size_(0), expand_pct_(DEFAULT_ESTIMATE_REAL_SIZE_PCT),
|
header_size_(0), expand_pct_(DEFAULT_ESTIMATE_REAL_SIZE_PCT),
|
||||||
row_buf_holder_(blocksstable::OB_ENCODING_LABEL_ROW_BUFFER, OB_MALLOC_MIDDLE_BLOCK_SIZE),
|
row_buf_holder_(blocksstable::OB_ENCODING_LABEL_ROW_BUFFER, OB_MALLOC_MIDDLE_BLOCK_SIZE),
|
||||||
encoder_allocator_(encoder_sizes, common::ObModIds::OB_ENCODER_ALLOCATOR),
|
encoder_allocator_(encoder_sizes, ObMemAttr(MTL_ID(), common::ObModIds::OB_ENCODER_ALLOCATOR)),
|
||||||
string_col_cnt_(0), estimate_base_store_size_(0), length_(0),
|
string_col_cnt_(0), estimate_base_store_size_(0), length_(0),
|
||||||
is_inited_(false)
|
is_inited_(false)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user