From d5034e56fa8f803e820ffec4ffbe68dc5f77b0aa Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 22 Mar 2023 10:45:14 +0000 Subject: [PATCH] fix core of GET_TSI --- .../lib/compress/zstd/ob_zstd_compressor.cpp | 23 ++-- .../lib/compress/zstd/ob_zstd_compressor.h | 5 + .../zstd/ob_zstd_stream_compressor.cpp | 8 +- .../compress/zstd/ob_zstd_stream_compressor.h | 5 + .../zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp | 16 ++- .../zstd_1_3_8/ob_zstd_compressor_1_3_8.h | 5 + .../ob_zstd_stream_compressor_1_3_8.cpp | 8 +- .../ob_zstd_stream_compressor_1_3_8.h | 5 + deps/oblib/src/lib/stat/ob_di_cache.cpp | 4 +- deps/oblib/src/lib/utility/ob_print_utils.h | 100 ++++++++++-------- deps/oblib/src/lib/utility/utility.cpp | 6 +- .../unittest/lib/utility/test_print_utils.cpp | 70 +++++++++++- src/observer/ob_req_time_service.cpp | 7 +- src/observer/ob_req_time_service.h | 18 ++-- src/sql/engine/expr/ob_expr.h | 25 ++--- src/sql/ob_result_set.h | 12 +-- 16 files changed, 200 insertions(+), 117 deletions(-) diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp index 8cebce5fd6..7a6ac46772 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp @@ -82,8 +82,8 @@ int ObZstdCompressor::compress(const char *src_buffer, int ret = OB_SUCCESS; int64_t max_overflow_size = 0; size_t compress_ret_size = 0; - ObZstdCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, zstd_allocator}; + ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -111,9 +111,7 @@ int ObZstdCompressor::compress(const char *src_buffer, dst_data_size = compress_ret_size; } - if (NULL != zstd_allocator) { - zstd_allocator->reuse(); - } + zstd_allocator.reuse(); return ret; } @@ -125,8 +123,8 @@ int ObZstdCompressor::decompress(const char *src_buffer, { int ret = OB_SUCCESS; size_t decompress_ret_size = 0; - ObZstdCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, zstd_allocator}; + ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -147,19 +145,14 @@ int ObZstdCompressor::decompress(const char *src_buffer, } else { dst_data_size = decompress_ret_size; } - - if (NULL != zstd_allocator) { - zstd_allocator->reuse(); - } + zstd_allocator.reuse(); return ret; } void ObZstdCompressor::reset_mem() { - ObZstdCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdCtxAllocator, 1); - if (NULL != zstd_allocator) { - zstd_allocator->reset(); - } + ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); + zstd_allocator.reset(); } const char *ObZstdCompressor::get_compressor_name() const diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h index e907f05498..4a8b62bfeb 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h @@ -28,6 +28,11 @@ class ObZstdCtxAllocator public: ObZstdCtxAllocator(); virtual ~ObZstdCtxAllocator(); + static ObZstdCtxAllocator &get_thread_local_instance() + { + thread_local ObZstdCtxAllocator allocator; + return allocator; + } void *alloc(size_t size); void free(void *addr); void reuse(); diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.cpp b/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.cpp index 82b1042a83..dc0375b6de 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.cpp +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.cpp @@ -79,8 +79,8 @@ int ObZstdStreamCompressor::create_compress_ctx(void *&ctx) int ret = OB_SUCCESS; ctx = NULL; - ObZstdStreamCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdStreamCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, zstd_allocator}; + ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator}; if (OB_FAIL(ObZstdWrapper::create_cctx(zstd_mem, ctx))) { LIB_LOG(WARN, "failed to create cctx", K(ret)); } @@ -146,8 +146,8 @@ int ObZstdStreamCompressor::stream_compress(void *ctx, const char *src, const in int ObZstdStreamCompressor::create_decompress_ctx(void *&ctx) { int ret = OB_SUCCESS; - ObZstdStreamCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdStreamCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, zstd_allocator}; + ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator}; ctx = NULL; if (OB_FAIL(ObZstdWrapper::create_dctx(zstd_mem, ctx))) { diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.h b/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.h index 7081fc0d16..92b44d4864 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.h +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_stream_compressor.h @@ -28,6 +28,11 @@ class ObZstdStreamCtxAllocator public: ObZstdStreamCtxAllocator(); virtual ~ObZstdStreamCtxAllocator(); + static ObZstdStreamCtxAllocator &get_thread_local_instance() + { + thread_local ObZstdStreamCtxAllocator allocator; + return allocator; + } void *alloc(size_t size); void free(void *addr); private: diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp index fba265ebb4..c5491ebe36 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp @@ -83,8 +83,8 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer, int ret = OB_SUCCESS; int64_t max_overflow_size = 0; size_t compress_ret_size = 0; - ObZstdCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, zstd_allocator}; + ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -112,9 +112,7 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer, dst_data_size = compress_ret_size; } - if (NULL != zstd_allocator) { - zstd_allocator->reuse(); - } + zstd_allocator.reuse(); return ret; } @@ -126,8 +124,8 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer, { int ret = OB_SUCCESS; size_t decompress_ret_size = 0; - ObZstdCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, zstd_allocator}; + ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -149,9 +147,7 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer, dst_data_size = decompress_ret_size; } - if (NULL != zstd_allocator) { - zstd_allocator->reuse(); - } + zstd_allocator.reuse(); return ret; } diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h index e71909dc77..eec541a7f0 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h @@ -28,6 +28,11 @@ class ObZstdCtxAllocator public: ObZstdCtxAllocator(); virtual ~ObZstdCtxAllocator(); + static ObZstdCtxAllocator &get_thread_local_instance() + { + thread_local ObZstdCtxAllocator allocator; + return allocator; + } void *alloc(size_t size); void free(void *addr); void reuse(); diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.cpp b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.cpp index 6c844275b9..44eb1bcccf 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.cpp +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.cpp @@ -81,8 +81,8 @@ int ObZstdStreamCompressor_1_3_8::create_compress_ctx(void *&ctx) int ret = OB_SUCCESS; ctx = NULL; - ObZstdStreamCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdStreamCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, zstd_allocator}; + ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator}; if (OB_FAIL(ObZstdWrapper::create_cctx(zstd_mem, ctx))) { LIB_LOG(WARN, "failed to create cctx", K(ret)); } @@ -148,8 +148,8 @@ int ObZstdStreamCompressor_1_3_8::stream_compress(void *ctx, const char *src, co int ObZstdStreamCompressor_1_3_8::create_decompress_ctx(void *&ctx) { int ret = OB_SUCCESS; - ObZstdStreamCtxAllocator *zstd_allocator = GET_TSI_MULT(ObZstdStreamCtxAllocator, 1); - OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, zstd_allocator}; + ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance(); + OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator}; ctx = NULL; if (OB_FAIL(ObZstdWrapper::create_dctx(zstd_mem, ctx))) { diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.h b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.h index ed61845a47..f09f991bca 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.h +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_stream_compressor_1_3_8.h @@ -28,6 +28,11 @@ class ObZstdStreamCtxAllocator public: ObZstdStreamCtxAllocator(); virtual ~ObZstdStreamCtxAllocator(); + static ObZstdStreamCtxAllocator &get_thread_local_instance() + { + thread_local ObZstdStreamCtxAllocator allocator; + return allocator; + } void *alloc(size_t size); void free(void *addr); private: diff --git a/deps/oblib/src/lib/stat/ob_di_cache.cpp b/deps/oblib/src/lib/stat/ob_di_cache.cpp index f9ed3a376e..bdb4968ac0 100644 --- a/deps/oblib/src/lib/stat/ob_di_cache.cpp +++ b/deps/oblib/src/lib/stat/ob_di_cache.cpp @@ -76,7 +76,7 @@ ObDISessionCache &ObDISessionCache::get_instance() int ObDISessionCache::get_node(uint64_t session_id, ObDISessionCollect *&session_collect) { int ret = OB_SUCCESS; - ObRandom *random = GET_TSI(ObRandom); + thread_local ObRandom random; ObSessionBucket &bucket = di_map_[session_id % OB_MAX_SERVER_SESSION_CNT]; while (1) { bucket.lock_.rdlock(); @@ -90,7 +90,7 @@ int ObDISessionCache::get_node(uint64_t session_id, ObDISessionCollect *&session bucket.lock_.unlock(); int64_t pos = 0; while (1) { - pos = random->get(0, OB_MAX_SERVER_SESSION_CNT-1); + pos = random.get(0, OB_MAX_SERVER_SESSION_CNT-1); if (OB_SUCCESS == (ret = collects_[pos].lock_.try_wrlock())) { break; } diff --git a/deps/oblib/src/lib/utility/ob_print_utils.h b/deps/oblib/src/lib/utility/ob_print_utils.h index eb7c02b7c9..239b0609c9 100644 --- a/deps/oblib/src/lib/utility/ob_print_utils.h +++ b/deps/oblib/src/lib/utility/ob_print_utils.h @@ -147,36 +147,46 @@ public: class CStringBufMgr { public: - static const int BUF_SIZE = 12 * 1024; - static const int BUF_NUM = 5; + static const int BUF_SIZE = 8 * 1024; + static const int MIN_REST_SIZE = 1024; struct BufNode { char buf_[BUF_SIZE]; int64_t level_; struct BufNode *next_; }; - struct BufArray - { - BufNode node_[BUF_NUM]; - }; struct BufList { BufList() : head_(nullptr) {} BufNode *head_; }; - CStringBufMgr() : list_(), level_(-1), idx_(0) {} + CStringBufMgr() : list_(), level_(-1), pos_(0) {} ~CStringBufMgr() {} + static CStringBufMgr &get_thread_local_instance() + { + thread_local CStringBufMgr mgr; + return mgr; + } void inc_level() { level_++; } void dec_level() { level_--; } - BufNode *acquire() + int64_t get_pos() { return pos_; } + void set_pos(int64_t pos) { - BufNode *node = nullptr; if (0 == level_) { - BufArray *array = GET_TSI(__typeof__(*array)); - if (NULL != array) { - node = &array->node_[(idx_++ % BUF_NUM)]; + if (MIN_REST_SIZE > BUF_SIZE - pos) { + pos_ = 0; + } else { + pos_ = pos; } + } + } + char *acquire() + { + char *buffer = NULL; + if (0 == level_) { + buffer = local_buf_ + pos_; } else { + BufNode *node = NULL; node = list_.head_; while (NULL != node) { if (node->level_ > level_) { @@ -191,8 +201,11 @@ public: node->next_ = list_.head_; list_.head_ = node; } + if (NULL != node) { + buffer = node->buf_; + } } - return node; + return buffer; } void try_clear_list() { @@ -205,36 +218,35 @@ public: } } private: + char local_buf_[BUF_SIZE]; BufList list_; int64_t level_; - uint64_t idx_; + int64_t pos_; }; template const char *to_cstring(const T &obj, const bool verbose) { char *buffer = NULL; - int64_t pos = 0; - CStringBufMgr *mgr = GET_TSI(CStringBufMgr); - mgr->inc_level(); - CStringBufMgr::BufNode *node = mgr->acquire(); - if (OB_ISNULL(node)) { - LIB_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "buffer is NULL"); + int64_t str_len = 0; + CStringBufMgr &mgr = CStringBufMgr::get_thread_local_instance(); + mgr.inc_level(); + buffer = mgr.acquire(); + if (OB_ISNULL(buffer)) { + LIB_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "buffer is NULL"); } else { - buffer = node->buf_; - if (NULL == &obj) { - snprintf(buffer, CStringBufMgr::BUF_SIZE -1, "NULL"); + int64_t pos = mgr.get_pos(); + const int64_t buf_len = CStringBufMgr::BUF_SIZE - pos; + str_len = obj.to_string(buffer, buf_len -1, verbose); + if (str_len >= 0 && str_len < buf_len) { + buffer[str_len] = '\0'; } else { - pos = obj.to_string(buffer, CStringBufMgr::BUF_SIZE -1, verbose); - if (pos >= 0 && pos < CStringBufMgr::BUF_SIZE) { - buffer[pos] = '\0'; - } else { - buffer[0] = '\0'; - } + buffer[0] = '\0'; } + mgr.set_pos(pos + str_len + 1); } - mgr->try_clear_list(); - mgr->dec_level(); + mgr.try_clear_list(); + mgr.dec_level(); return buffer; } @@ -242,23 +254,25 @@ template const char *to_cstring(const T &obj, FalseType) { char *buffer = NULL; - int64_t pos = 0; - CStringBufMgr *mgr = GET_TSI(CStringBufMgr); - mgr->inc_level(); - CStringBufMgr::BufNode *node = mgr->acquire(); - if (OB_ISNULL(node)) { - LIB_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "buffer is NULL"); + int64_t str_len = 0; + CStringBufMgr &mgr = CStringBufMgr::get_thread_local_instance(); + mgr.inc_level(); + buffer = mgr.acquire(); + if (OB_ISNULL(buffer)) { + LIB_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "buffer is NULL"); } else { - buffer = node->buf_; - pos = to_string(obj, buffer, CStringBufMgr::BUF_SIZE -1); - if (pos >= 0 && pos < CStringBufMgr::BUF_SIZE) { - buffer[pos] = '\0'; + int64_t pos = mgr.get_pos(); + const int64_t buf_len = CStringBufMgr::BUF_SIZE - pos; + str_len = to_string(obj, buffer, buf_len -1); + if (str_len >= 0 && str_len < buf_len) { + buffer[str_len] = '\0'; } else { buffer[0] = '\0'; } + mgr.set_pos(pos + str_len + 1); } - mgr->try_clear_list(); - mgr->dec_level(); + mgr.try_clear_list(); + mgr.dec_level(); return buffer; } diff --git a/deps/oblib/src/lib/utility/utility.cpp b/deps/oblib/src/lib/utility/utility.cpp index bf2e530bc8..34185e8a53 100644 --- a/deps/oblib/src/lib/utility/utility.cpp +++ b/deps/oblib/src/lib/utility/utility.cpp @@ -413,10 +413,10 @@ const char *inet_ntoa_s(char *buffer, size_t n, const uint32_t ip) const char *time2str(const int64_t time_us, const char *format) { // FIXME: To Be Removed - static const int32_t BUFFER_SIZE = 1024; + static const int32_t BUFFER_SIZE = 256; + thread_local char buffer[4 * BUFFER_SIZE]; RLOCAL(uint64_t, i); - char *buffer = (char*)GET_TSI_MULT(ByteBuf<10 * BUFFER_SIZE>, 5); - uint64_t cur = i++ % 10; + uint64_t cur = i++ % 4; buffer[cur * BUFFER_SIZE] = '\0'; struct tm time_struct; int64_t time_s = time_us / 1000000; diff --git a/deps/oblib/unittest/lib/utility/test_print_utils.cpp b/deps/oblib/unittest/lib/utility/test_print_utils.cpp index e2bb95721c..ebbce9f273 100644 --- a/deps/oblib/unittest/lib/utility/test_print_utils.cpp +++ b/deps/oblib/unittest/lib/utility/test_print_utils.cpp @@ -11,11 +11,14 @@ */ #include +#include #include "lib/allocator/ob_malloc.h" #include "lib/utility/ob_print_utils.h" +#include "lib/string/ob_string.h" +#include "lib/utility/ob_tracepoint.h" using namespace oceanbase::common; - +/* TEST(print_utility, hex_print) { const int64_t data_size = 10; @@ -54,6 +57,69 @@ TEST(print_utility, hex_print) pos = 0; ret = hex_print(data, -1, buff_h, 21, pos); ASSERT_EQ(OB_SUCCESS, ret); +}*/ + +template +class ObTuple +{ +public: + template + ObTuple(Args &&...args) : tuple_(std::forward(args)...) {} + int64_t to_string(char *buf, const int64_t buf_len) const + { + int64_t pos = 0; + print_<0>(buf, buf_len, pos); + return pos; + } + template + int64_t print_(char *buf, const int64_t buf_len, int64_t &pos) const + { + if (N == 0) { + databuff_printf(buf, buf_len, pos, "{"); + } + databuff_printf(buf, buf_len, pos, "%s,", to_cstring(std::get(tuple_))); + print_(buf, buf_len, pos); + return pos; + } + template <> + int64_t print_(char *buf, const int64_t buf_len, int64_t &pos) const + { + databuff_printf(buf, buf_len, pos, "%s}", to_cstring(std::get(tuple_))); + return pos; + } +private: + std::tuple tuple_; +}; +TEST(print_utility, to_cstring) +{ + typedef ObTuple MyTuple; + const int size = 1300; + const int number = 10; + char data[size * number]; + char *buffer = (char*)ob_malloc(sizeof(MyTuple) * number); + MyTuple *tuples[number]; + for (int n = 0; n < number; ++n) { + memset(&data[n * size], 'a' + n, size - 1); + data[size * (n+1) - 1] = '\0'; + tuples[n] = new (buffer + sizeof(MyTuple) * n) MyTuple(data + size * n, n); + } + // mutiply call to_cstring at the same time + _OB_LOG(INFO, "print tuple string, {%s}, {%s}, {%s}", + to_cstring(*tuples[0]), to_cstring(*tuples[1]), to_cstring(*tuples[2])); + _OB_LOG(INFO, "print tuple string, {%s}, {%s}, {%s}, {%s}, {%s}, {%s}, {%s}", to_cstring(*tuples[3]), to_cstring(*tuples[4]), + to_cstring(*tuples[5]), to_cstring(*tuples[6]), to_cstring(*tuples[7]), to_cstring(*tuples[8]), to_cstring(*tuples[9])); + for (int i = 0; i < 10; ++i) { + int64_t pos = CStringBufMgr::get_thread_local_instance().get_pos(); + _OB_LOG(INFO, "print tuple string, pos = %ld\n", pos); + to_cstring(*tuples[0]); + } + // the performance of to_cstring when observer reach memory limit + EventItem item; + item.trigger_freq_ = 1; + item.error_code_ = OB_ALLOCATE_MEMORY_FAILED; + ::oceanbase::common::EventTable::instance().set_event(EventTable::EN_4, item); + int64_t pos = CStringBufMgr::get_thread_local_instance().get_pos(); + _OB_LOG(INFO, "print tuple string, {%s}, pos = %ld\n", to_cstring(*tuples[0]), pos); } int main(int argc, char **argv) @@ -64,4 +130,4 @@ int main(int argc, char **argv) OB_LOGGER.set_file_name("test_print_utility.log", true); testing::InitGoogleTest(&argc,argv); return RUN_ALL_TESTS(); -} +} \ No newline at end of file diff --git a/src/observer/ob_req_time_service.cpp b/src/observer/ob_req_time_service.cpp index eb9224ee10..4ea628604c 100644 --- a/src/observer/ob_req_time_service.cpp +++ b/src/observer/ob_req_time_service.cpp @@ -42,12 +42,9 @@ ObReqTimeInfo::~ObReqTimeInfo() void ObGlobalReqTimeService::check_req_timeinfo() { #if !defined(NDEBUG) - observer::ObReqTimeInfo *req_timeinfo = GET_TSI_MULT( - observer::ObReqTimeInfo, - observer::ObReqTimeInfo::REQ_TIMEINFO_IDENTIFIER); - OB_ASSERT(req_timeinfo != NULL); + observer::ObReqTimeInfo &req_timeinfo = observer::ObReqTimeInfo::get_thread_local_instance(); - OB_ASSERT(req_timeinfo->reentrant_cnt_ > 0); + OB_ASSERT(req_timeinfo.reentrant_cnt_ > 0); #endif } } // end namespace server diff --git a/src/observer/ob_req_time_service.h b/src/observer/ob_req_time_service.h index 7357484bf2..abe1e29601 100644 --- a/src/observer/ob_req_time_service.h +++ b/src/observer/ob_req_time_service.h @@ -31,7 +31,11 @@ struct ObReqTimeInfo: public common::ObDLinkBase ObReqTimeInfo(); ~ObReqTimeInfo(); - + static ObReqTimeInfo &get_thread_local_instance() + { + thread_local ObReqTimeInfo req_timeinfo; + return req_timeinfo; + } void update_start_time() { if (0 == reentrant_cnt_) { @@ -135,18 +139,14 @@ struct ObReqTimeGuard { ObReqTimeGuard() { - ObReqTimeInfo *req_timeinfo = GET_TSI_MULT(ObReqTimeInfo, - ObReqTimeInfo::REQ_TIMEINFO_IDENTIFIER); - OB_ASSERT(NULL != req_timeinfo); - req_timeinfo->update_start_time(); + ObReqTimeInfo &req_timeinfo = observer::ObReqTimeInfo::get_thread_local_instance(); + req_timeinfo.update_start_time(); } ~ObReqTimeGuard() { - ObReqTimeInfo *req_timeinfo = GET_TSI_MULT(ObReqTimeInfo, - ObReqTimeInfo::REQ_TIMEINFO_IDENTIFIER); - OB_ASSERT(NULL != req_timeinfo); - req_timeinfo->update_end_time(); + ObReqTimeInfo &req_timeinfo = observer::ObReqTimeInfo::get_thread_local_instance(); + req_timeinfo.update_end_time(); } }; } // end namespace observer diff --git a/src/sql/engine/expr/ob_expr.h b/src/sql/engine/expr/ob_expr.h index f802b84c39..46bc841169 100644 --- a/src/sql/engine/expr/ob_expr.h +++ b/src/sql/engine/expr/ob_expr.h @@ -1113,24 +1113,25 @@ inline const char *get_vectorized_row_str(ObEvalCtx &eval_ctx, int64_t index) { char *buffer = NULL; - int64_t pos = 0; - CStringBufMgr *mgr = GET_TSI(CStringBufMgr); - mgr->inc_level(); - CStringBufMgr::BufNode *node = mgr->acquire(); - if (OB_ISNULL(node)) { + int64_t str_len = 0; + CStringBufMgr &mgr = CStringBufMgr::get_thread_local_instance(); + mgr.inc_level(); + buffer = mgr.acquire(); + if (OB_ISNULL(buffer)) { LIB_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "buffer is NULL"); } else { - buffer = node->buf_; - databuff_printf(buffer, CStringBufMgr::BUF_SIZE, pos, "vectorized_rows(%ld)=", index); - pos += to_string(ROWEXPR2STR(eval_ctx, exprs), buffer + pos, CStringBufMgr::BUF_SIZE - pos - 1); - if (pos >= 0 && pos < CStringBufMgr::BUF_SIZE) { - buffer[pos] = '\0'; + int64_t pos = mgr.get_pos(); + const int64_t buf_len = CStringBufMgr::BUF_SIZE - pos; + databuff_printf(buffer, buf_len, str_len, "vectorized_rows(%ld)=", index); + str_len += to_string(ROWEXPR2STR(eval_ctx, exprs), buffer + str_len, buf_len - str_len - 1); + if (str_len >= 0 && str_len < buf_len) { + buffer[str_len] = '\0'; } else { buffer[0] = '\0'; } } - mgr->try_clear_list(); - mgr->dec_level(); + mgr.try_clear_list(); + mgr.dec_level(); return buffer; } diff --git a/src/sql/ob_result_set.h b/src/sql/ob_result_set.h index 36236d7081..6f57551a60 100644 --- a/src/sql/ob_result_set.h +++ b/src/sql/ob_result_set.h @@ -352,18 +352,14 @@ private: // Always called in the ObResultSet constructor void update_start_time() const { - oceanbase::observer::ObReqTimeInfo *req_timeinfo = GET_TSI_MULT(observer::ObReqTimeInfo, - observer::ObReqTimeInfo::REQ_TIMEINFO_IDENTIFIER); - OB_ASSERT(NULL != req_timeinfo); - req_timeinfo->update_start_time(); + oceanbase::observer::ObReqTimeInfo &req_timeinfo = observer::ObReqTimeInfo::get_thread_local_instance(); + req_timeinfo.update_start_time(); } // Always called at the end of the ObResultSet destructor void update_end_time() const { - oceanbase::observer::ObReqTimeInfo *req_timeinfo = GET_TSI_MULT(observer::ObReqTimeInfo, - observer::ObReqTimeInfo::REQ_TIMEINFO_IDENTIFIER); - OB_ASSERT(NULL != req_timeinfo); - req_timeinfo->update_end_time(); + oceanbase::observer::ObReqTimeInfo &req_timeinfo = observer::ObReqTimeInfo::get_thread_local_instance(); + req_timeinfo.update_end_time(); } protected: