[OPT] Add Thread Local DecoderCtxArray threshold, use SEArray and set tenant.

This commit is contained in:
obdev
2023-05-19 08:17:22 +00:00
committed by ob-robot
parent 1a117748a6
commit 6c9b45aec2

View File

@ -70,15 +70,17 @@ class ObDecoderCtxArray
{
public:
typedef ObColumnDecoderCtx ObDecoderCtx;
ObDecoderCtxArray()
ObDecoderCtxArray() : ctxs_()
{
ctxs_.set_attr(SET_USE_500("DecoderCtxArray"));
}
ObMemAttr attr(MTL_ID(), "TLDecoderCtxArr");
ctxs_.set_attr(attr);
};
virtual ~ObDecoderCtxArray()
{
ObMemAttr attr(MTL_ID(), "TLDecoderCtx");
FOREACH(it, ctxs_) {
ObDecoderCtx *c = *it;
OB_DELETE(ObDecoderCtx, ObModIds::OB_SSTABLE_READER, c);
OB_DELETE(ObDecoderCtx, attr, c);
}
ctxs_.reset();
}
@ -93,15 +95,16 @@ public:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
if (ctxs_.size() < size) {
for (int64_t i = ctxs_.size(); OB_SUCC(ret) && i < size; ++i) {
ObDecoderCtx *ctx = OB_NEW(ObDecoderCtx, ObModIds::OB_SSTABLE_READER);
ObMemAttr attr(MTL_ID(), "TLDecoderCtx");
if (ctxs_.count() < size) {
for (int64_t i = ctxs_.count(); OB_SUCC(ret) && i < size; ++i) {
ObDecoderCtx *ctx = OB_NEW(ObDecoderCtx, attr);
if (NULL == ctx) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_FAIL(ctxs_.push_back(ctx))) {
LOG_WARN("array push back failed", K(ret));
OB_DELETE(ObDecoderCtx, ObModIds::OB_SSTABLE_READER, ctx);
OB_DELETE(ObDecoderCtx, attr, ctx);
}
}
if (OB_SUCC(ret)) {
@ -115,7 +118,8 @@ public:
}
private:
ObArray<ObDecoderCtx *> ctxs_;
static const int64_t LOCAL_CTX_CNT = 128;
ObSEArray<ObDecoderCtx *, LOCAL_CTX_CNT> ctxs_;
DISALLOW_COPY_AND_ASSIGN(ObDecoderCtxArray);
};
@ -126,13 +130,18 @@ ObColumnDecoderCtx ObMicroBlockDecoder::none_exist_column_decoder_ctx_;
class ObTLDecoderCtxArray
{
public:
ObTLDecoderCtxArray() {}
ObTLDecoderCtxArray() : ctxs_array_()
{
ObMemAttr attr(MTL_ID(), "TLDecoderCtxArr");
ctxs_array_.set_attr(attr);
}
virtual ~ObTLDecoderCtxArray()
{
ObMemAttr attr(MTL_ID(), "TLDecoderCtx");
FOREACH(it, ctxs_array_) {
ObDecoderCtxArray *ctxs = *it;
OB_DELETE(ObDecoderCtxArray, ObModIds::OB_SSTABLE_READER, ctxs);
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
}
}
@ -145,7 +154,8 @@ public:
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("NULL instance", K(ret));
} else if (tl_array->ctxs_array_.empty()) {
ctxs = OB_NEW(ObDecoderCtxArray, ObModIds::OB_SSTABLE_READER);
ObMemAttr attr(MTL_ID(), "TLDecoderCtx");
ctxs = OB_NEW(ObDecoderCtxArray, attr);
if (NULL == ctxs) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
@ -161,14 +171,18 @@ public:
{
int ret = OB_SUCCESS;
ObTLDecoderCtxArray *tl_array = instance();
ObMemAttr attr(MTL_ID(), "TLDecoderCtx");
if (NULL == tl_array) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("NULL instance", K(ret));
} else if (NULL == ctxs) {
// do nothing
} else if (tl_array->ctxs_array_.count() >= MAX_ARRAY_CNT) {
// reach the threshold, release memory
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
} else if (OB_FAIL(tl_array->ctxs_array_.push_back(ctxs))) {
LOG_WARN("array push back failed", K(ret));
OB_DELETE(ObDecoderCtxArray, ObModIds::OB_SSTABLE_READER, ctxs);
OB_DELETE(ObDecoderCtxArray, attr, ctxs);
}
}
@ -176,7 +190,8 @@ private:
static ObTLDecoderCtxArray *instance() { return GET_TSI_MULT(ObTLDecoderCtxArray, 1); }
private:
ObArray<ObDecoderCtxArray *> ctxs_array_;
static const int64_t MAX_ARRAY_CNT = 32;
ObSEArray<ObDecoderCtxArray *, MAX_ARRAY_CNT> ctxs_array_;
DISALLOW_COPY_AND_ASSIGN(ObTLDecoderCtxArray);
};