diff --git a/src/logservice/cdcservice/ob_cdc_service.cpp b/src/logservice/cdcservice/ob_cdc_service.cpp index 27fcd1766d..c1f837a02f 100644 --- a/src/logservice/cdcservice/ob_cdc_service.cpp +++ b/src/logservice/cdcservice/ob_cdc_service.cpp @@ -88,6 +88,7 @@ void ObCdcService::run1() { int ret = OB_SUCCESS; int64_t tenant_id = MTL_ID(); + lib::set_thread_name("CdcSrv"); if (IS_NOT_INIT) { ret = OB_ERR_UNEXPECTED; EXTLOG_LOG(ERROR, "ObCdcService is not initialized", KR(ret)); diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index d0c4adb078..6ef1210a08 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -12,7 +12,6 @@ * OBCDC Instance */ -#include "lib/ob_errno.h" #define USING_LOG_PREFIX OBLOG #include "ob_log_instance.h" diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 8e4c85503d..c4fd55bef1 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -38,6 +38,7 @@ #include "ob_log_callback.h" // ObILogCallback #include "ob_cdc_lob_ctx.h" // ObLobDataOutRowCtxList #include "ob_cdc_lob_aux_table_schema_info.h" // ObCDCLobAuxTableSchemaInfo +#include "ob_log_safe_arena.h" namespace oceanbase { @@ -715,9 +716,9 @@ private: int64_t formatted_stmt_num_; // Number of statements that formatted int64_t row_ref_cnt_; // reference count - // Non-thread safe allocator - // used for Parser/Formatter - common::ObArenaAllocator arena_allocator_; // allocator + // thread safe allocator + // used for Parser/Formatter/LobDataMerger + ObCdcSafeArena arena_allocator_; // allocator private: DISALLOW_COPY_AND_ASSIGN(ObLogEntryTask); diff --git a/src/logservice/libobcdc/src/ob_log_safe_arena.h b/src/logservice/libobcdc/src/ob_log_safe_arena.h new file mode 100644 index 0000000000..878b182363 --- /dev/null +++ b/src/logservice/libobcdc/src/ob_log_safe_arena.h @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +// A Smaller Thread-Safe Arena Allocator for libobcdc + +#ifndef OCEANBASE_LIBOBCDC_OB_LOG_SAFE_ARENA_ +#define OCEANBASE_LIBOBCDC_OB_LOG_SAFE_ARENA_ + +#include "lib/allocator/ob_allocator.h" +#include "lib/allocator/page_arena.h" +#include "lib/lock/ob_small_spin_lock.h" + +namespace oceanbase +{ +namespace libobcdc +{ + +class ObCdcSafeArena: public common::ObIAllocator +{ +public: + ObCdcSafeArena(const lib::ObLabel &label = ObModIds::OB_MODULE_PAGE_ALLOCATOR, + const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE, + int64_t tenant_id = OB_SERVER_TENANT_ID, + int64_t ctx_id = 0) : + arena_(label, page_size, tenant_id, ctx_id), + lock_() {} + virtual ~ObCdcSafeArena() {} + virtual void *alloc(const int64_t size) override + { + ObByteLockGuard guard(lock_); + return arena_.alloc(size); + } + virtual void* alloc(const int64_t size, const ObMemAttr &attr) + { + ObByteLockGuard guard(lock_); + return arena_.alloc(size, attr); + } + virtual void free(void *ptr) override + { + ObByteLockGuard guard(lock_); + arena_.free(ptr); + } + + virtual void clear() + { + ObByteLockGuard guard(lock_); + arena_.clear(); + } + + virtual int64_t total() const override + { + return arena_.total(); + } + virtual int64_t used() const override + { + return arena_.used(); + } + virtual void reset() override + { + ObByteLockGuard guard(lock_); + arena_.reset(); + } + virtual void reuse() override + { + ObByteLockGuard guard(lock_); + arena_.reuse(); + } + virtual void set_attr(const ObMemAttr &attr) override + { + ObByteLockGuard guard(lock_); + arena_.set_attr(attr); + } + +private: + ObArenaAllocator arena_; + ObByteLock lock_; +}; + +} +} + + + + #endif \ No newline at end of file diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 1348ddb32f..112d3a8d7f 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -322,6 +322,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem, uint64_t tenant_id = OB_INVALID_TENANT_ID; ObLogTenantGuard guard; ObLogTenant *tenant = NULL; + const ObTransID trans_id = trans_ctx->get_trans_id(); if (OB_FAIL(trans_ctx->get_tenant_id(tenant_id))) { LOG_ERROR("trans_ctx get_tenant_id fail", KR(ret), K(tenant_id)); @@ -360,6 +361,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem, monitor.mark_and_get_cost("dml-done", true); } // while } else if (is_ddl_trans){ + trans_ctx->set_trans_redo_dispatched(); // need sort_participants = on, which make sure the first PartTransTask of // participant_list is DDL_TRANS. // TODO: consider more idea to handle this. @@ -378,7 +380,7 @@ int ObLogSequencer::handle_to_be_sequenced_trans_(TrxSortElem &trx_sort_elem, } } - LOG_DEBUG("handle_to_be_sequenced_trans_ end", KR(ret), KPC(trans_ctx)); + LOG_DEBUG("handle_to_be_sequenced_trans_ end", KR(ret), K(trans_id)); } return ret; diff --git a/src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp b/src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp index b80c3d4bc0..d2870702e2 100644 --- a/src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp +++ b/src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp @@ -563,11 +563,10 @@ int ObBinlogRecordPrinter::output_data_file_column_data(IBinlogRecord *br, } ROW_PRINTF(ptr, size, pos, ri, "[C%ld] column_extend_info:%s", column_index, enum_set_values_str.c_str()); } - // print precision & scale only in print detail mode, becacuse INT in oracle mode is also a kind of NUMBER(DECIMAL) - // whose precision is 38 and scale is 0, more importantly, the default precision(-1, PRECISION_UNKNOWN_YET) - // and scale(-85, ORA_NUMBER_SCALE_UNKNOWN_YET) of NUMBER in oracle mode is confusing, so we decide not to - // modify test results for oracle mode temporarily for convenience and efficiency. - // TODO + // print precision & scale only in print detail mode, becacuse INT in oracle mode is also a kind of NUMBER(DECIMAL) + // whose precision is 38 and scale is 0 + // the default value of precision is -1(PRECISION_UNKNOWN_YET) and the default value of scale is -85 + // (ORA_NUMBER_SCALE_UNKNOWN_YET), when using default precision & scale, the number type would behave adaptively else if ((oceanbase::obmysql::MYSQL_TYPE_DECIMAL == ctype) || (oceanbase::obmysql::MYSQL_TYPE_NEWDECIMAL == ctype)) { // Not sure if MYSQL_TYPE_DECIMAL is deprecated, DECIMAL in mysql & oracle mode should be MYSQL_TYPE_NEWDECIMAL ROW_PRINTF(ptr, size, pos , ri, "[C%ld] column_precision:%ld", column_index, precision); diff --git a/unittest/libobcdc/CMakeLists.txt b/unittest/libobcdc/CMakeLists.txt index 6e44e2e89f..bacb921c79 100644 --- a/unittest/libobcdc/CMakeLists.txt +++ b/unittest/libobcdc/CMakeLists.txt @@ -29,3 +29,4 @@ libobcdc_unittest(test_ob_seq_thread) libobcdc_unittest(test_ob_cdc_part_trans_resolver) libobcdc_unittest(test_log_svr_blacklist) libobcdc_unittest(test_ob_cdc_sorted_list) +libobcdc_unittest(test_ob_log_safe_arena) diff --git a/unittest/libobcdc/test_ob_log_safe_arena.cpp b/unittest/libobcdc/test_ob_log_safe_arena.cpp new file mode 100644 index 0000000000..bd84cf9e1b --- /dev/null +++ b/unittest/libobcdc/test_ob_log_safe_arena.cpp @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "lib/oblog/ob_log.h" +#include +#include "ob_log_safe_arena.h" +#include + +namespace oceanbase +{ +namespace libobcdc +{ + +bool check_memset(const void *ptr, const char c, const int64_t size) { + bool bret = true; + const char *cptr = static_cast(ptr); + for (int i = 0; bret && i < size; i++) { + if (cptr[i] != c) { + bret = false; + } + } + return bret; +} + +TEST(ObLogSafeArena, test_ob_log_safe_arena) +{ + ObCdcSafeArena safe_arena; + const int64_t THREAD_NUM = 32; + std::vector threads; + auto alloc_test_func = [&](int idx) { + constexpr int64_t ALLOC_CNT = 1024; + constexpr int64_t ALLOC_SIZE = 1024; + ObRandom rand; + char c = static_cast(idx & 0xFF); + for (int i = 0; i < ALLOC_CNT; i++) { + int64_t alloc_size = rand.get(1, ALLOC_SIZE); + void *ptr = safe_arena.alloc(alloc_size); + EXPECT_TRUE(NULL != ptr); + MEMSET(ptr, c, alloc_size); + EXPECT_TRUE(check_memset(ptr, c, alloc_size)); + } + }; + for (int i = 0; i < THREAD_NUM; i++) { + threads.emplace_back(std::thread(alloc_test_func, i)); + } + for (int i = 0; i < THREAD_NUM; i++) { + threads[i].join(); + } + safe_arena.clear(); +} + +} // namespace libobcdc +} // ns oceanbase + +int main(int argc, char **argv) +{ + oceanbase::common::ObLogger::get_logger().set_log_level("DEBUG"); + OB_LOGGER.set_log_level("DEBUG"); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}