fix bug, write finish log hung when remote write

This commit is contained in:
AnimationFan 2025-01-06 04:44:59 +00:00 committed by ob-robot
parent 1643dab91d
commit d68ffd17d3
6 changed files with 116 additions and 44 deletions

View File

@ -25,11 +25,19 @@ class ObIMacroBlockFlushCallback
public:
ObIMacroBlockFlushCallback() {}
virtual ~ObIMacroBlockFlushCallback() {}
/*
* for some reason some sub class set write into three parts
* 1. write() : write data into local buffer
* 2. do_write_io() : optional, some sub class start write data in do_write_io(), while other directly write in write()
* 3. wait() : wait async write finish
*/
virtual int write(const ObStorageObjectHandle &macro_handle,
const ObLogicMacroBlockId &logic_id,
char *buf,
const int64_t buf_len,
const int64_t row_count) = 0;
virtual int do_write_io() { return OB_SUCCESS;}
virtual int wait() = 0;
virtual int64_t get_ddl_start_row_offset() const { return -1; };
};

View File

@ -775,13 +775,35 @@ void ObSharedObjectReaderWriter::reset()
is_inited_ = false;
}
int callback_do_write_io(const ObSharedObjectWriteInfo &write_info)
{
int ret = OB_SUCCESS;
if (!write_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid write info", K(ret), K(write_info));
} else if (nullptr != write_info.write_callback_ && OB_FAIL(write_info.write_callback_->do_write_io())) {
LOG_WARN("fail to start write callback", K(ret));
}
return ret;
}
int callback_do_write_io(const ObIArray<ObSharedObjectWriteInfo> &write_infos)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < write_infos.count(); ++i) {
if (OB_FAIL(callback_do_write_io(write_infos.at(i)))) {
LOG_WARN("failed to start write callback", K(ret));
}
}
return ret;
}
int ObSharedObjectReaderWriter::async_write(
const ObSharedObjectWriteInfo &write_info,
const blocksstable::ObStorageObjectOpt &curr_opt,
ObSharedObjectWriteHandle &shared_obj_handle)
{
int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_);
ObSharedObjectWriteArgs write_args;
ObMetaDiskAddr prev_addr;
prev_addr.set_none_addr();
@ -789,23 +811,29 @@ int ObSharedObjectReaderWriter::async_write(
write_args.object_opt_ = curr_opt;
ObSharedObjectHeader header;
ObSharedObjectsWriteCtx tmp_write_ctx;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
} else if (OB_UNLIKELY(!write_info.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(write_info));
} else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) {
LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args));
{
lib::ObMutexGuard guard(mutex_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
} else if (OB_UNLIKELY(!write_info.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(write_info));
} else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) {
LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args));
}
if (FAILEDx(inner_write_block(
header,
write_info,
write_args,
shared_obj_handle,
tmp_write_ctx))) {
LOG_WARN("fail to write block", K(ret), K(write_info), K(write_args));
}
}
if (FAILEDx(inner_write_block(
header,
write_info,
write_args,
shared_obj_handle,
tmp_write_ctx))) {
LOG_WARN("fail to write block", K(ret), K(write_info), K(write_args));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(callback_do_write_io(write_info))) {
LOG_WARN("failed to start write callback", K(ret));
}
return ret;
}
@ -843,6 +871,11 @@ int ObSharedObjectReaderWriter::async_batch_write(
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(callback_do_write_io(write_infos))) {
LOG_WARN("failed to start write callback", K(ret));
}
return ret;
}
@ -852,33 +885,39 @@ int ObSharedObjectReaderWriter::async_link_write(
ObSharedObjectLinkHandle &shared_obj_handle)
{
int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_);
ObSharedObjectWriteArgs write_args;
ObSharedObjectsWriteCtx write_ctx;
write_args.object_opt_ = curr_opt;
write_args.need_flush_ = true;
write_args.need_align_ = need_align_;
write_args.is_linked_ = true;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
} else if (OB_FAIL(shared_obj_handle.wait())) {
LOG_WARN("Fail to wait other blocks finish", K(ret), K(shared_obj_handle));
} else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) {
LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args));
}
{
lib::ObMutexGuard guard(mutex_);
ObSharedObjectWriteArgs write_args;
ObSharedObjectsWriteCtx write_ctx;
write_args.object_opt_ = curr_opt;
write_args.need_flush_ = true;
write_args.need_align_ = need_align_;
write_args.is_linked_ = true;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
} else if (OB_FAIL(shared_obj_handle.wait())) {
LOG_WARN("Fail to wait other blocks finish", K(ret), K(shared_obj_handle));
} else if (GCTX.is_shared_storage_mode() && OB_FAIL(do_switch(write_args.object_opt_))) {
LOG_WARN("fail to switch object for shared storage", K(ret), K(write_args));
}
if (FAILEDx(inner_async_write(write_info, write_args, shared_obj_handle, write_ctx))) {
LOG_WARN("Fail to inner async write block", K(ret), K(write_info), K(write_args));
} else if (OB_FAIL(shared_obj_handle.write_ctx_.set_addr(write_ctx.addr_))) {
LOG_WARN("Fail to set addr to write ctx", K(ret), K(write_ctx));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < write_ctx.block_ids_.count(); ++i) {
if (OB_FAIL(shared_obj_handle.write_ctx_.add_object_id(write_ctx.block_ids_.at(i)))) {
LOG_WARN("Fail to add block id", K(ret), K(write_ctx));
if (FAILEDx(inner_async_write(write_info, write_args, shared_obj_handle, write_ctx))) {
LOG_WARN("Fail to inner async write block", K(ret), K(write_info), K(write_args));
} else if (OB_FAIL(shared_obj_handle.write_ctx_.set_addr(write_ctx.addr_))) {
LOG_WARN("Fail to set addr to write ctx", K(ret), K(write_ctx));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < write_ctx.block_ids_.count(); ++i) {
if (OB_FAIL(shared_obj_handle.write_ctx_.add_object_id(write_ctx.block_ids_.at(i)))) {
LOG_WARN("Fail to add block id", K(ret), K(write_ctx));
}
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(callback_do_write_io(write_info))) {
LOG_WARN("failed to satrt write callback", K(ret));
}
return ret;
}

View File

@ -501,6 +501,11 @@ ObDDLFinishLog::ObDDLFinishLog()
{
}
void ObDDLFinishLog::reset()
{
finish_info_.reset();
}
int ObDDLFinishLog::assign(const storage::ObDDLFinishLogInfo &other)
{
int ret = OB_SUCCESS;

View File

@ -261,6 +261,7 @@ class ObDDLFinishLog final
public:
ObDDLFinishLog();
~ObDDLFinishLog() = default;
void reset();
int init(const int64_t tenant_id,
const share::ObLSID ls_id,
const ObITable::TableKey &table_key,

View File

@ -2478,18 +2478,34 @@ int ObDDLFinishLogWriterCallback::write(const blocksstable::ObStorageObjectHandl
}
}
if (OB_SUCC(ret)) {
finish_log_.reset();
bool is_remote_write = false;
ObDDLFinishLog finish_log;
blocksstable::MacroBlockId macro_block_id = macro_handle.get_macro_id();
if (OB_FAIL(finish_log.init(MTL_ID(), ls_id_, table_key_, buf, buf_len, macro_block_id, data_format_version_))) {
char *buffer = nullptr; //buffer allocate by arena allocator, not set as class member
if (OB_ISNULL(buffer = static_cast<char*>(arena_allocator_.alloc(buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate buffer", K(ret), K(buf_len));
} else if (FALSE_IT(MEMCPY(buffer, buf, buf_len))) {
} else if (OB_FAIL(finish_log_.init(MTL_ID(), ls_id_, table_key_, buffer, buf_len, macro_block_id, data_format_version_))) {
LOG_WARN("failed to init table key", K(ret));
} else if (OB_FAIL(ddl_writer_->write_finish_log_with_retry(true, /*allow remote write */ finish_log, is_remote_write))) {
LOG_WARN("failed to write finish log", K(ret));
}
}
return ret;
}
int ObDDLFinishLogWriterCallback::do_write_io()
{
int ret = OB_SUCCESS;
bool is_remote_write = false;
if (!is_inited_) {
LOG_WARN("callback has not been init", K(ret));
} else if (OB_FAIL(ddl_writer_->write_finish_log_with_retry(true, /*allow remote write */ finish_log_, is_remote_write))) {
LOG_WARN("failed to write finish log", K(ret));
}
return ret;
}
int ObDDLFinishLogWriterCallback::wait()
{
int ret = OB_SUCCESS;

View File

@ -509,6 +509,7 @@ public:
char *buf,
const int64_t buf_len,
const int64_t row_count) override;
int do_write_io() override;
int wait();
private:
bool is_inited_;
@ -517,6 +518,8 @@ private:
ObDDLRedoLogWriter *ddl_writer_;
int64_t task_id_;
uint64_t data_format_version_;
ObArenaAllocator arena_allocator_;
ObDDLFinishLog finish_log_;
};
#endif
} // end namespace storage