From d68ffd17d39ac42093c205aac811872ccfc3daa8 Mon Sep 17 00:00:00 2001 From: AnimationFan <3067477338@qq.com> Date: Mon, 6 Jan 2025 04:44:59 +0000 Subject: [PATCH] fix bug, write finish log hung when remote write --- .../ob_imacro_block_flush_callback.h | 8 ++ .../ob_shared_object_reader_writer.cpp | 119 ++++++++++++------ src/storage/ddl/ob_ddl_clog.cpp | 5 + src/storage/ddl/ob_ddl_clog.h | 1 + src/storage/ddl/ob_ddl_redo_log_writer.cpp | 24 +++- src/storage/ddl/ob_ddl_redo_log_writer.h | 3 + 6 files changed, 116 insertions(+), 44 deletions(-) diff --git a/src/storage/blocksstable/ob_imacro_block_flush_callback.h b/src/storage/blocksstable/ob_imacro_block_flush_callback.h index 891cf8f65..1870bebab 100644 --- a/src/storage/blocksstable/ob_imacro_block_flush_callback.h +++ b/src/storage/blocksstable/ob_imacro_block_flush_callback.h @@ -25,11 +25,19 @@ class ObIMacroBlockFlushCallback public: ObIMacroBlockFlushCallback() {} virtual ~ObIMacroBlockFlushCallback() {} + /* + * for some reason, some sub class set write into three parts + * 1. write() : write data into local buffer + * 2. do_write_io() : optional, some sub class start write data in do_write_io(), while other directly write in write() + * 3. wait() : wait async write finish + */ virtual int write(const ObStorageObjectHandle ¯o_handle, const ObLogicMacroBlockId &logic_id, char *buf, const int64_t buf_len, const int64_t row_count) = 0; + virtual int do_write_io() { return OB_SUCCESS;} + virtual int wait() = 0; virtual int64_t get_ddl_start_row_offset() const { return -1; }; }; diff --git a/src/storage/blockstore/ob_shared_object_reader_writer.cpp b/src/storage/blockstore/ob_shared_object_reader_writer.cpp index f96281206..b6c49daa1 100644 --- a/src/storage/blockstore/ob_shared_object_reader_writer.cpp +++ b/src/storage/blockstore/ob_shared_object_reader_writer.cpp @@ -775,13 +775,35 @@ void ObSharedObjectReaderWriter::reset() is_inited_ = false; } +int callback_do_write_io(const ObSharedObjectWriteInfo &write_info) +{ + int ret = OB_SUCCESS; + if (!write_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid write info", K(ret), K(write_info)); + } else if (nullptr != write_info.write_callback_ && OB_FAIL(write_info.write_callback_->do_write_io())) { + LOG_WARN("fail to start write callback", K(ret)); + } + return ret; +} + +int callback_do_write_io(const ObIArray &write_infos) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < write_infos.count(); ++i) { + if (OB_FAIL(callback_do_write_io(write_infos.at(i)))) { + LOG_WARN("failed to start write callback", K(ret)); + } + } + return ret; +} + int ObSharedObjectReaderWriter::async_write( const ObSharedObjectWriteInfo &write_info, const blocksstable::ObStorageObjectOpt &curr_opt, ObSharedObjectWriteHandle &shared_obj_handle) { int ret = OB_SUCCESS; - lib::ObMutexGuard guard(mutex_); ObSharedObjectWriteArgs write_args; ObMetaDiskAddr prev_addr; prev_addr.set_none_addr(); @@ -789,23 +811,29 @@ int ObSharedObjectReaderWriter::async_write( write_args.object_opt_ = curr_opt; ObSharedObjectHeader header; ObSharedObjectsWriteCtx tmp_write_ctx; - - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("Not init", K(ret)); - } else if (OB_UNLIKELY(!write_info.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", K(ret), K(write_info)); - } else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) { - LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args)); + { + lib::ObMutexGuard guard(mutex_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("Not init", K(ret)); + } else if (OB_UNLIKELY(!write_info.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(write_info)); + } else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) { + LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args)); + } + if (FAILEDx(inner_write_block( + header, + write_info, + write_args, + shared_obj_handle, + tmp_write_ctx))) { + LOG_WARN("fail to write block", K(ret), K(write_info), K(write_args)); + } } - if (FAILEDx(inner_write_block( - header, - write_info, - write_args, - shared_obj_handle, - tmp_write_ctx))) { - LOG_WARN("fail to write block", K(ret), K(write_info), K(write_args)); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(callback_do_write_io(write_info))) { + LOG_WARN("failed to start write callback", K(ret)); } return ret; } @@ -843,6 +871,11 @@ int ObSharedObjectReaderWriter::async_batch_write( } } } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(callback_do_write_io(write_infos))) { + LOG_WARN("failed to start write callback", K(ret)); + } return ret; } @@ -852,33 +885,39 @@ int ObSharedObjectReaderWriter::async_link_write( ObSharedObjectLinkHandle &shared_obj_handle) { int ret = OB_SUCCESS; - lib::ObMutexGuard guard(mutex_); - ObSharedObjectWriteArgs write_args; - ObSharedObjectsWriteCtx write_ctx; - write_args.object_opt_ = curr_opt; - write_args.need_flush_ = true; - write_args.need_align_ = need_align_; - write_args.is_linked_ = true; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("Not init", K(ret)); - } else if (OB_FAIL(shared_obj_handle.wait())) { - LOG_WARN("Fail to wait other blocks finish", K(ret), K(shared_obj_handle)); - } else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) { - LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args)); - } + { + lib::ObMutexGuard guard(mutex_); + ObSharedObjectWriteArgs write_args; + ObSharedObjectsWriteCtx write_ctx; + write_args.object_opt_ = curr_opt; + write_args.need_flush_ = true; + write_args.need_align_ = need_align_; + write_args.is_linked_ = true; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("Not init", K(ret)); + } else if (OB_FAIL(shared_obj_handle.wait())) { + LOG_WARN("Fail to wait other blocks finish", K(ret), K(shared_obj_handle)); + } else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) { + LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args)); + } - if (FAILEDx(inner_async_write(write_info, write_args, shared_obj_handle, write_ctx))) { - LOG_WARN("Fail to inner async write block", K(ret), K(write_info), K(write_args)); - } else if (OB_FAIL(shared_obj_handle.write_ctx_.set_addr(write_ctx.addr_))) { - LOG_WARN("Fail to set addr to write ctx", K(ret), K(write_ctx)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < write_ctx.block_ids_.count(); ++i) { - if (OB_FAIL(shared_obj_handle.write_ctx_.add_object_id(write_ctx.block_ids_.at(i)))) { - LOG_WARN("Fail to add block id", K(ret), K(write_ctx)); + if (FAILEDx(inner_async_write(write_info, write_args, shared_obj_handle, write_ctx))) { + LOG_WARN("Fail to inner async write block", K(ret), K(write_info), K(write_args)); + } else if (OB_FAIL(shared_obj_handle.write_ctx_.set_addr(write_ctx.addr_))) { + LOG_WARN("Fail to set addr to write ctx", K(ret), K(write_ctx)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < write_ctx.block_ids_.count(); ++i) { + if (OB_FAIL(shared_obj_handle.write_ctx_.add_object_id(write_ctx.block_ids_.at(i)))) { + LOG_WARN("Fail to add block id", K(ret), K(write_ctx)); + } } } } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(callback_do_write_io(write_info))) { + LOG_WARN("failed to satrt write callback", K(ret)); + } return ret; } diff --git a/src/storage/ddl/ob_ddl_clog.cpp b/src/storage/ddl/ob_ddl_clog.cpp index dc24026dd..4ccdfcf26 100644 --- a/src/storage/ddl/ob_ddl_clog.cpp +++ b/src/storage/ddl/ob_ddl_clog.cpp @@ -501,6 +501,11 @@ ObDDLFinishLog::ObDDLFinishLog() { } +void ObDDLFinishLog::reset() +{ + finish_info_.reset(); +} + int ObDDLFinishLog::assign(const storage::ObDDLFinishLogInfo &other) { int ret = OB_SUCCESS; diff --git a/src/storage/ddl/ob_ddl_clog.h b/src/storage/ddl/ob_ddl_clog.h index a2e33a41b..3a16c2a37 100644 --- a/src/storage/ddl/ob_ddl_clog.h +++ b/src/storage/ddl/ob_ddl_clog.h @@ -261,6 +261,7 @@ class ObDDLFinishLog final public: ObDDLFinishLog(); ~ObDDLFinishLog() = default; + void reset(); int init(const int64_t tenant_id, const share::ObLSID ls_id, const ObITable::TableKey &table_key, diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 3c45d8309..a5f32077e 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -2478,18 +2478,34 @@ int ObDDLFinishLogWriterCallback::write(const blocksstable::ObStorageObjectHandl } } if (OB_SUCC(ret)) { + finish_log_.reset(); bool is_remote_write = false; - ObDDLFinishLog finish_log; blocksstable::MacroBlockId macro_block_id = macro_handle.get_macro_id(); - if (OB_FAIL(finish_log.init(MTL_ID(), ls_id_, table_key_, buf, buf_len, macro_block_id, data_format_version_))) { + + char *buffer = nullptr; //buffer allocate by arena allocator, not set as class member + if (OB_ISNULL(buffer = static_cast(arena_allocator_.alloc(buf_len)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate buffer", K(ret), K(buf_len)); + } else if (FALSE_IT(MEMCPY(buffer, buf, buf_len))) { + } else if (OB_FAIL(finish_log_.init(MTL_ID(), ls_id_, table_key_, buffer, buf_len, macro_block_id, data_format_version_))) { LOG_WARN("failed to init table key", K(ret)); - } else if (OB_FAIL(ddl_writer_->write_finish_log_with_retry(true, /*allow remote write */ finish_log, is_remote_write))) { - LOG_WARN("failed to write finish log", K(ret)); } } return ret; } +int ObDDLFinishLogWriterCallback::do_write_io() +{ + int ret = OB_SUCCESS; + bool is_remote_write = false; + if (!is_inited_) { + LOG_WARN("callback has not been init", K(ret)); + } else if (OB_FAIL(ddl_writer_->write_finish_log_with_retry(true, /*allow remote write */ finish_log_, is_remote_write))) { + LOG_WARN("failed to write finish log", K(ret)); + } + return ret; +} + int ObDDLFinishLogWriterCallback::wait() { int ret = OB_SUCCESS; diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index ffc9b24fb..095a6147f 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -509,6 +509,7 @@ public: char *buf, const int64_t buf_len, const int64_t row_count) override; + int do_write_io() override; int wait(); private: bool is_inited_; @@ -517,6 +518,8 @@ private: ObDDLRedoLogWriter *ddl_writer_; int64_t task_id_; uint64_t data_format_version_; + ObArenaAllocator arena_allocator_; + ObDDLFinishLog finish_log_; }; #endif } // end namespace storage