From c485d8f458c33f4973b943edccf99ce25efebf77 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 13 Feb 2023 02:41:16 +0000 Subject: [PATCH] fix crtl+c aborting task but __all_virtual_load_data_stat abort bug --- src/observer/ob_rpc_processor_simple.cpp | 2 +- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 1 + src/share/ob_rpc_struct.cpp | 12 +-- src/share/ob_rpc_struct.h | 8 +- src/storage/ddl/ob_complement_data_task.cpp | 2 +- src/storage/ddl/ob_ddl_redo_log_writer.cpp | 87 +++++++++++++------ src/storage/ddl/ob_ddl_redo_log_writer.h | 27 ++++-- .../ddl/ob_direct_insert_sstable_ctx.cpp | 8 +- .../ddl/ob_direct_insert_sstable_ctx.h | 3 +- 9 files changed, 104 insertions(+), 46 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index ac096f562e..bcb7ad6cb0 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2001,7 +2001,7 @@ int ObRpcRemoteWriteDDLRedoLogP::process() LOG_WARN("fail to wait macro block io finish", K(ret)); } else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, arg_.redo_info_.table_key_.tablet_id_))) { LOG_WARN("init sstable redo writer", K(ret), K_(arg)); - } else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false/*allow remote write*/, tablet_handle, ddl_kv_mgr_handle))) { + } else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false, arg_.task_id_, tablet_handle, ddl_kv_mgr_handle))) { LOG_WARN("fail to write macro redo", K(ret), K_(arg)); } else if (OB_FAIL(sstable_redo_writer.wait_redo_log_finish(arg_.redo_info_, macro_handle.get_macro_id()))) { diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index e575235959..330bc6cb52 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -1529,6 +1529,7 @@ int ObDDLScheduler::remove_inactive_ddl_task() if (OB_FAIL(manager_reg_heart_beat_task_.get_inactive_ddl_task_ids(remove_task_ids))){ LOG_WARN("failed to check register time", K(ret)); } else { + LOG_INFO("need remove task", K(remove_task_ids)); for (int64_t i = 0; OB_SUCC(ret) && i < remove_task_ids.size(); i++) { int64_t remove_task_id = 0; if (OB_FAIL(remove_task_ids.at(i, remove_task_id))) { diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 1cb81a91e6..734a1ee67c 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -7345,26 +7345,28 @@ int ObBatchBroadcastSchemaResult::get_ret() const OB_SERIALIZE_MEMBER(ObBatchBroadcastSchemaResult, ret_); ObRpcRemoteWriteDDLRedoLogArg::ObRpcRemoteWriteDDLRedoLogArg() - : tenant_id_(OB_INVALID_ID), ls_id_(), redo_info_() + : tenant_id_(OB_INVALID_ID), ls_id_(), redo_info_(), task_id_(0) {} int ObRpcRemoteWriteDDLRedoLogArg::init(const uint64_t tenant_id, const share::ObLSID &ls_id, - const blocksstable::ObDDLMacroBlockRedoInfo &redo_info) + const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, + const int64_t task_id) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !redo_info.is_valid())) { + if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || task_id == 0 || !ls_id.is_valid() || !redo_info.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("args are not valid", K(ret), K(tenant_id), K(ls_id), K(redo_info)); + LOG_WARN("args are not valid", K(ret), K(tenant_id), K(task_id), K(ls_id), K(redo_info)); } else { tenant_id_ = tenant_id; ls_id_ = ls_id; redo_info_ = redo_info; + task_id_ = task_id; } return ret; } -OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLRedoLogArg, tenant_id_, ls_id_, redo_info_); +OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLRedoLogArg, tenant_id_, ls_id_, redo_info_, task_id_); ObRpcRemoteWriteDDLCommitLogArg::ObRpcRemoteWriteDDLCommitLogArg() : tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()), diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 8ff941c3ca..215a89be1a 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -8074,13 +8074,15 @@ public: ~ObRpcRemoteWriteDDLRedoLogArg() = default; int init(const uint64_t tenant_id, const share::ObLSID &ls_id, - const blocksstable::ObDDLMacroBlockRedoInfo &redo_info); - bool is_valid() const { return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && redo_info_.is_valid(); } - TO_STRING_KV(K_(tenant_id), K(ls_id_), K_(redo_info)); + const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, + const int64_t task_id); + bool is_valid() const { return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && redo_info_.is_valid() && task_id_ != 0; } + TO_STRING_KV(K_(tenant_id), K(ls_id_), K_(redo_info), K(task_id_)); public: uint64_t tenant_id_; share::ObLSID ls_id_; blocksstable::ObDDLMacroBlockRedoInfo redo_info_; + int64_t task_id_; private: DISALLOW_COPY_AND_ASSIGN(ObRpcRemoteWriteDDLRedoLogArg); diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 3a929e72b4..07eba22e0f 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -1021,7 +1021,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan) LOG_WARN("the dag of this task is null", K(ret)); } else if (FALSE_IT(sstable_redo_writer.set_start_scn( static_cast(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn()))) { - } else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) { + } else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, param_->task_id_, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) { LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key)); } else if (OB_FAIL(writer.open(data_desc, macro_start_seq, &callback))) { LOG_WARN("fail to open macro block writer", K(ret), K(data_desc)); diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 3ae8438d10..6da2726770 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -27,6 +27,7 @@ #include "storage/blocksstable/ob_logic_macro_id.h" #include "observer/ob_server_event_history_table_operator.h" #include "storage/tablet/ob_tablet.h" +#include "rootserver/ddl_task/ob_ddl_task.h" using namespace oceanbase::common; using namespace oceanbase::storage; @@ -144,28 +145,45 @@ int ObDDLCtrlSpeedItem::cal_limit(const int64_t bytes, int64_t &next_available_t int ObDDLCtrlSpeedItem::do_sleep( const int64_t next_available_ts, + const int64_t task_id, int64_t &real_sleep_us) { int ret = OB_SUCCESS; real_sleep_us = 0; + bool is_exist = true; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (next_available_ts <= 0) { + } else if (next_available_ts <= 0 || task_id == 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument.", K(ret), K(next_available_ts)); + LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(task_id)); } else if (OB_UNLIKELY(need_stop_write_)) /*clog disk used exceeds threshold*/ { + int64_t loop_cnt = 0; + ObMySQLProxy *sql_proxy = GCTX.sql_proxy_; while (OB_SUCC(ret) && need_stop_write_) { // TODO YIREN (FIXME-20221017), exit when task is canceled, etc. + int64_t tmp_ret = OB_SUCCESS; ob_usleep(SLEEP_INTERVAL); + if (0 == loop_cnt % 100) { + if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, task_id, is_exist))) { + is_exist = true; + LOG_WARN("check task id exist failed", K(tmp_ret), K(task_id)); + } else { + if (!is_exist) { + LOG_INFO("task is not exist", K(task_id)); + break; + } + } + } if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { ObTaskController::get().allow_next_syslog(); FLOG_INFO("stop write ddl clog", K(ret), K(ls_id_), K(write_speed_), K(need_stop_write_), K(ref_cnt_), K(disk_used_stop_write_threshold_)); } + loop_cnt++; } } - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && is_exist) { real_sleep_us = std::max(0L, next_available_ts - ObTimeUtility::current_time()); ob_usleep(real_sleep_us); } @@ -175,6 +193,7 @@ int ObDDLCtrlSpeedItem::do_sleep( // calculate the sleep time for the input bytes, sleep. int ObDDLCtrlSpeedItem::limit_and_sleep( const int64_t bytes, + const int64_t task_id, int64_t &real_sleep_us) { int ret = OB_SUCCESS; @@ -185,9 +204,9 @@ int ObDDLCtrlSpeedItem::limit_and_sleep( ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if ((disk_used_stop_write_threshold_ <= 0 - || disk_used_stop_write_threshold_ > 100) || bytes < 0) { + || disk_used_stop_write_threshold_ > 100) || bytes < 0 || 0 == task_id) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes)); + LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(task_id)); } else if (OB_FAIL(cal_limit(bytes, next_available_ts))) { LOG_WARN("fail to calculate sleep time", K(ret), K(bytes), K(next_available_ts)); } else if (OB_ISNULL(GCTX.bandwidth_throttle_)) { @@ -198,7 +217,7 @@ int ObDDLCtrlSpeedItem::limit_and_sleep( INT64_MAX, &transmit_sleep_us))) { LOG_WARN("fail to limit out and sleep", K(ret), K(bytes), K(transmit_sleep_us)); - } else if (OB_FAIL(do_sleep(next_available_ts, real_sleep_us))) { + } else if (OB_FAIL(do_sleep(next_available_ts, task_id, real_sleep_us))) { LOG_WARN("fail to sleep", K(ret), K(next_available_ts), K(real_sleep_us)); } else {/* do nothing. */} return ret; @@ -288,7 +307,9 @@ int ObDDLCtrlSpeedHandle::init() int ObDDLCtrlSpeedHandle::limit_and_sleep( const uint64_t tenant_id, const share::ObLSID &ls_id, - const int64_t bytes, int64_t &real_sleep_us) + const int64_t bytes, + const int64_t task_id, + int64_t &real_sleep_us) { int ret = OB_SUCCESS; SpeedHandleKey speed_handle_key; @@ -298,9 +319,9 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep( if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if(OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid() || bytes < 0)) { + } else if(OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid() || bytes < 0 || 0 == task_id)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(ls_id), K(bytes)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(ls_id), K(bytes)); } else if (FALSE_IT(speed_handle_key.tenant_id_ = tenant_id)) { } else if (FALSE_IT(speed_handle_key.ls_id_ = ls_id)) { } else if (OB_UNLIKELY(!speed_handle_map_.created())) { @@ -314,8 +335,9 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err, ctrl speed item is nullptr", K(ret), K(speed_handle_key)); } else if (OB_FAIL(speed_handle_item->limit_and_sleep(bytes, + task_id, real_sleep_us))) { - LOG_WARN("fail to limit and sleep", K(ret), K(bytes), K(real_sleep_us)); + LOG_WARN("fail to limit and sleep", K(ret), K(bytes), K(task_id), K(real_sleep_us)); } return ret; } @@ -559,6 +581,7 @@ int ObDDLRedoLogWriter::write( ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLRedoLog &log, const uint64_t tenant_id, + const int64_t task_id, const share::ObLSID &ls_id, ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, @@ -581,7 +604,7 @@ int ObDDLRedoLogWriter::write( SCN base_scn = SCN::min_scn(); SCN scn; uint32_t lock_tid = 0; - if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer) { + if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer || 0 == task_id) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(log), K(ls_id), K(tenant_id), KP(buffer)); } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->rdlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) { @@ -620,8 +643,8 @@ int ObDDLRedoLogWriter::write( } if (OB_SUCC(ret)) { int64_t real_sleep_us = 0; - if (OB_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, real_sleep_us))) { - LOG_WARN("fail to limit and sleep", K(ret), K(tenant_id), K(ls_id), K(buffer_size), K(real_sleep_us)); + if (OB_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, task_id, real_sleep_us))) { + LOG_WARN("fail to limit and sleep", K(ret), K(tenant_id), K(task_id), K(ls_id), K(buffer_size), K(real_sleep_us)); } } if (OB_FAIL(ret)) { @@ -948,34 +971,36 @@ int ObDDLMacroBlockRedoWriter::write_macro_redo(ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLMacroBlockRedoInfo &redo_info, const share::ObLSID &ls_id, + const int64_t task_id, ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, char *buffer, ObDDLRedoLogHandle &handle) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!redo_info.is_valid() || nullptr == log_handler || nullptr == buffer)) { + if (OB_UNLIKELY(!redo_info.is_valid() || nullptr == log_handler || nullptr == buffer || 0 == task_id)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer)); + LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer), K(task_id)); } else { ObDDLRedoLog log; const uint64_t tenant_id = MTL_ID(); if (OB_FAIL(log.init(redo_info))) { LOG_WARN("fail to init DDLRedoLog", K(ret), K(redo_info)); - } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, ls_id, log_handler, macro_block_id, buffer, handle))) { + } else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, task_id, ls_id, log_handler, macro_block_id, buffer, handle))) { LOG_WARN("fail to write ddl redo log item", K(ret)); } } return ret; } -int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const ObAddr &leader_addr, +int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const int64_t task_id, + const ObAddr &leader_addr, const ObLSID &leader_ls_id, const ObDDLMacroBlockRedoInfo &redo_info) { int ret = OB_SUCCESS; obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr; - if (OB_UNLIKELY(!redo_info.is_valid())) { + if (OB_UNLIKELY(!redo_info.is_valid() || 0 == task_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(redo_info)); } else if (OB_ISNULL(srv_rpc_proxy = GCTX.srv_rpc_proxy_)) { @@ -983,7 +1008,7 @@ int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const ObAddr &leader_addr LOG_WARN("srv rpc proxy is null", K(ret), KP(srv_rpc_proxy)); } else { obrpc::ObRpcRemoteWriteDDLRedoLogArg arg; - if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id, redo_info))) { + if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id, redo_info, task_id))) { LOG_WARN("fail to init ObRpcRemoteWriteDDLRedoLogArg", K(ret)); } else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).by(MTL_ID()).remote_write_ddl_redo_log(arg))) { LOG_WARN("fail to remote write ddl redo log", K(ret), K(leader_addr), K(arg)); @@ -1119,7 +1144,12 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable( return ret; } -int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, const bool allow_remote_write, ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle) +int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info, + const blocksstable::MacroBlockId ¯o_block_id, + const bool allow_remote_write, + const int64_t task_id, + ObTabletHandle &tablet_handle, + ObDDLKvMgrHandle &ddl_kv_mgr_handle) { int ret = OB_SUCCESS; ObLSHandle ls_handle; @@ -1128,9 +1158,9 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret)); - } else if (OB_UNLIKELY(!redo_info.is_valid())) { + } else if (OB_UNLIKELY(!redo_info.is_valid() || 0 == task_id)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret)); + LOG_WARN("invalid arguments", K(ret), K(task_id)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { LOG_WARN("get ls failed", K(ret), K(ls_id_)); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { @@ -1140,7 +1170,7 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret), K(BUF_SIZE)); } else if (!remote_write_) { - if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(tablet_handle, ddl_kv_mgr_handle, redo_info, ls->get_ls_id(), ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) { + if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(tablet_handle, ddl_kv_mgr_handle, redo_info, ls->get_ls_id(), task_id, ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) { if (ObDDLUtil::need_remote_write(ret) && allow_remote_write) { if (OB_FAIL(switch_to_remote_write())) { LOG_WARN("fail to switch to remote write", K(ret)); @@ -1152,7 +1182,8 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i } if (OB_SUCC(ret) && remote_write_) { - if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(leader_addr_, + if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(task_id, + leader_addr_, leader_ls_id_, redo_info))) { LOG_WARN("fail to remote write ddl redo log", K(ret), K_(leader_ls_id), K_(leader_addr)); @@ -1287,7 +1318,7 @@ ObDDLSSTableRedoWriter::~ObDDLSSTableRedoWriter() } ObDDLRedoLogWriterCallback::ObDDLRedoLogWriterCallback() - : is_inited_(false), redo_info_(), table_key_(), macro_block_id_(), ddl_writer_(nullptr), block_buffer_(nullptr) + : is_inited_(false), redo_info_(), table_key_(), macro_block_id_(), ddl_writer_(nullptr), block_buffer_(nullptr), task_id_(0) { } @@ -1301,6 +1332,7 @@ ObDDLRedoLogWriterCallback::~ObDDLRedoLogWriterCallback() int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, + const int64_t task_id, ObDDLSSTableRedoWriter *ddl_writer, ObDDLKvMgrHandle &ddl_kv_mgr_handle) { @@ -1313,7 +1345,7 @@ int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type, if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("ObDDLSSTableRedoWriter has been inited twice", K(ret)); - } else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type || !ddl_kv_mgr_handle.is_valid())) { + } else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type || 0 == task_id || !ddl_kv_mgr_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(table_key), K(block_type)); } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ddl_kv_mgr_handle.get_obj()->get_ls_id(), ls_handle, ObLSGetMod::DDL_MOD))) { @@ -1327,6 +1359,7 @@ int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type, block_type_ = block_type; table_key_ = table_key; ddl_writer_ = ddl_writer; + task_id_ = task_id; tablet_handle_ = tablet_handle; ddl_kv_mgr_handle_ = ddl_kv_mgr_handle; is_inited_ = true; @@ -1352,7 +1385,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle, redo_info_.block_type_ = block_type_; redo_info_.logic_id_ = logic_id; redo_info_.start_scn_ = ddl_writer_->get_start_scn(); - if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/, tablet_handle_, ddl_kv_mgr_handle_))) { + if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/, task_id_, tablet_handle_, ddl_kv_mgr_handle_))) { LOG_WARN("fail to write ddl redo log", K(ret)); } } diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index 6365bcf32e..09d365ec68 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -55,7 +55,9 @@ public: void reset_need_stop_write() { need_stop_write_ = false; } int init(const share::ObLSID &ls_id); int refresh(); - int limit_and_sleep(const int64_t bytes, int64_t &real_sleep_us); + int limit_and_sleep(const int64_t bytes, + const int64_t task_id, + int64_t &real_sleep_us); // for ref_cnt_ void inc_ref() { ATOMIC_INC(&ref_cnt_); } @@ -66,7 +68,9 @@ public: K_(write_speed), K_(disk_used_stop_write_threshold), K_(need_stop_write), K_(ref_cnt)); private: int cal_limit(const int64_t bytes, int64_t &next_available_ts); - int do_sleep(const int64_t next_available_ts, int64_t &real_sleep_us); + int do_sleep(const int64_t next_available_ts, + const int64_t task_id, + int64_t &real_sleep_us); private: static const int64_t MIN_WRITE_SPEED = 50L; static const int64_t SLEEP_INTERVAL = 1 * 1000; // 1ms @@ -85,7 +89,11 @@ class ObDDLCtrlSpeedHandle final public: int init(); static ObDDLCtrlSpeedHandle &get_instance(); - int limit_and_sleep(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t bytes, int64_t &real_sleep_us); + int limit_and_sleep(const uint64_t tenant_id, + const share::ObLSID &ls_id, + const int64_t bytes, + const int64_t task_id, + int64_t &real_sleep_us); private: struct SpeedHandleKey { @@ -192,6 +200,7 @@ public: ObDDLKvMgrHandle &ddl_kv_mgr_handle, const ObDDLRedoLog &log, const uint64_t tenant_id, + const int64_t task_id, const share::ObLSID &ls_id, logservice::ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, @@ -234,11 +243,13 @@ public: ObDDLKvMgrHandle &ddl_kv_mgr_handle, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const share::ObLSID &ls_id, + const int64_t task_id, logservice::ObLogHandler *log_handler, const blocksstable::MacroBlockId ¯o_block_id, char *buffer, ObDDLRedoLogHandle &handle); - static int remote_write_macro_redo(const ObAddr &leader_addr, + static int remote_write_macro_redo(const int64_t task_id, + const ObAddr &leader_addr, const share::ObLSID &leader_ls_id, const blocksstable::ObDDLMacroBlockRedoInfo &redo_info); private: @@ -265,6 +276,7 @@ public: int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId ¯o_block_id, const bool allow_remote_write, + const int64_t task_id, ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle); int wait_redo_log_finish(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info, @@ -298,7 +310,11 @@ class ObDDLRedoLogWriterCallback : public blocksstable::ObIMacroBlockFlushCallba public: ObDDLRedoLogWriterCallback(); virtual ~ObDDLRedoLogWriterCallback(); - int init(const blocksstable::ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, ObDDLSSTableRedoWriter *ddl_writer, ObDDLKvMgrHandle &ddl_kv_mgr_handle); + int init(const blocksstable::ObDDLMacroBlockType block_type, + const ObITable::TableKey &table_key, + const int64_t task_id, + ObDDLSSTableRedoWriter *ddl_writer, + ObDDLKvMgrHandle &ddl_kv_mgr_handle); int write( const ObMacroBlockHandle ¯o_handle, const blocksstable::ObLogicMacroBlockId &logic_id, @@ -314,6 +330,7 @@ private: blocksstable::MacroBlockId macro_block_id_; ObDDLSSTableRedoWriter *ddl_writer_; char *block_buffer_; + int64_t task_id_; ObTabletHandle tablet_handle_; ObDDLKvMgrHandle ddl_kv_mgr_handle_; }; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp index 11b6575e2a..70b400c28c 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.cpp @@ -195,7 +195,8 @@ int ObSSTableInsertRowIterator::get_sql_mode(ObSQLMode &sql_mode) const ObSSTableInsertSliceParam::ObSSTableInsertSliceParam() : snapshot_version_(0), write_major_(false), - sstable_index_builder_(nullptr) + sstable_index_builder_(nullptr), + task_id_(0) { } @@ -207,7 +208,7 @@ bool ObSSTableInsertSliceParam::is_valid() const { return tablet_id_.is_valid() && ls_id_.is_valid() && table_key_.is_valid() && start_seq_.is_valid() && start_scn_.is_valid() && frozen_scn_.is_valid() && - nullptr != sstable_index_builder_; + nullptr != sstable_index_builder_ && 0 != task_id_; } ObSSTableInsertSliceWriter::ObSSTableInsertSliceWriter() @@ -250,7 +251,7 @@ int ObSSTableInsertSliceWriter::init(const ObSSTableInsertSliceParam &slice_para K(slice_param.tablet_id_)); } else if (FALSE_IT(sstable_redo_writer_.set_start_scn(slice_param.start_scn_))) { } else if (OB_FAIL(redo_log_writer_callback_.init(DDL_MB_DATA_TYPE, slice_param.table_key_, - &sstable_redo_writer_, ddl_kv_mgr_handle))) { + slice_param.task_id_, &sstable_redo_writer_, ddl_kv_mgr_handle))) { LOG_WARN("fail to init redo log writer callback", KR(ret)); } else if (OB_FAIL(data_desc_.init(*table_schema, slice_param.ls_id_, @@ -664,6 +665,7 @@ int ObSSTableInsertTabletContext::construct_sstable_slice_writer( slice_param.frozen_scn_ = frozen_status.frozen_scn_; slice_param.write_major_ = build_param.write_major_; slice_param.sstable_index_builder_ = index_builder_; + slice_param.task_id_ = build_param_.ddl_task_id_; if (OB_ISNULL(sstable_slice_writer = OB_NEWx(ObSSTableInsertSliceWriter, (&allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObSSTableInsertSliceWriter", KR(ret)); diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx.h b/src/storage/ddl/ob_direct_insert_sstable_ctx.h index 074d1536d9..623d9f1483 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx.h @@ -116,7 +116,7 @@ public: ~ObSSTableInsertSliceParam(); bool is_valid() const; TO_STRING_KV(K_(tablet_id), K_(ls_id), K_(table_key), K_(start_seq), K_(start_scn), - K_(snapshot_version), K_(frozen_scn), K_(write_major), KP_(sstable_index_builder)); + K_(snapshot_version), K_(task_id), K_(frozen_scn), K_(write_major), KP_(sstable_index_builder), K_(task_id)); public: common::ObTabletID tablet_id_; share::ObLSID ls_id_; @@ -127,6 +127,7 @@ public: share::SCN frozen_scn_; bool write_major_; blocksstable::ObSSTableIndexBuilder *sstable_index_builder_; + int64_t task_id_; }; class ObSSTableInsertSliceWriter final