fix crtl+c aborting task but __all_virtual_load_data_stat abort bug

This commit is contained in:
obdev 2023-02-13 02:41:16 +00:00 committed by ob-robot
parent 7ca44639df
commit c485d8f458
9 changed files with 104 additions and 46 deletions

View File

@ -2001,7 +2001,7 @@ int ObRpcRemoteWriteDDLRedoLogP::process()
LOG_WARN("fail to wait macro block io finish", K(ret));
} else if (OB_FAIL(sstable_redo_writer.init(arg_.ls_id_, arg_.redo_info_.table_key_.tablet_id_))) {
LOG_WARN("init sstable redo writer", K(ret), K_(arg));
} else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false/*allow remote write*/, tablet_handle, ddl_kv_mgr_handle))) {
} else if (OB_FAIL(sstable_redo_writer.write_redo_log(arg_.redo_info_, macro_handle.get_macro_id(), false, arg_.task_id_, tablet_handle, ddl_kv_mgr_handle))) {
LOG_WARN("fail to write macro redo", K(ret), K_(arg));
} else if (OB_FAIL(sstable_redo_writer.wait_redo_log_finish(arg_.redo_info_,
macro_handle.get_macro_id()))) {

View File

@ -1529,6 +1529,7 @@ int ObDDLScheduler::remove_inactive_ddl_task()
if (OB_FAIL(manager_reg_heart_beat_task_.get_inactive_ddl_task_ids(remove_task_ids))){
LOG_WARN("failed to check register time", K(ret));
} else {
LOG_INFO("need remove task", K(remove_task_ids));
for (int64_t i = 0; OB_SUCC(ret) && i < remove_task_ids.size(); i++) {
int64_t remove_task_id = 0;
if (OB_FAIL(remove_task_ids.at(i, remove_task_id))) {

View File

@ -7345,26 +7345,28 @@ int ObBatchBroadcastSchemaResult::get_ret() const
OB_SERIALIZE_MEMBER(ObBatchBroadcastSchemaResult, ret_);
ObRpcRemoteWriteDDLRedoLogArg::ObRpcRemoteWriteDDLRedoLogArg()
: tenant_id_(OB_INVALID_ID), ls_id_(), redo_info_()
: tenant_id_(OB_INVALID_ID), ls_id_(), redo_info_(), task_id_(0)
{}
int ObRpcRemoteWriteDDLRedoLogArg::init(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info)
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
const int64_t task_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !redo_info.is_valid())) {
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || task_id == 0 || !ls_id.is_valid() || !redo_info.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("args are not valid", K(ret), K(tenant_id), K(ls_id), K(redo_info));
LOG_WARN("args are not valid", K(ret), K(tenant_id), K(task_id), K(ls_id), K(redo_info));
} else {
tenant_id_ = tenant_id;
ls_id_ = ls_id;
redo_info_ = redo_info;
task_id_ = task_id;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLRedoLogArg, tenant_id_, ls_id_, redo_info_);
OB_SERIALIZE_MEMBER(ObRpcRemoteWriteDDLRedoLogArg, tenant_id_, ls_id_, redo_info_, task_id_);
ObRpcRemoteWriteDDLCommitLogArg::ObRpcRemoteWriteDDLCommitLogArg()
: tenant_id_(OB_INVALID_ID), ls_id_(), table_key_(), start_scn_(SCN::min_scn()),

View File

@ -8074,13 +8074,15 @@ public:
~ObRpcRemoteWriteDDLRedoLogArg() = default;
int init(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info);
bool is_valid() const { return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && redo_info_.is_valid(); }
TO_STRING_KV(K_(tenant_id), K(ls_id_), K_(redo_info));
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
const int64_t task_id);
bool is_valid() const { return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && redo_info_.is_valid() && task_id_ != 0; }
TO_STRING_KV(K_(tenant_id), K(ls_id_), K_(redo_info), K(task_id_));
public:
uint64_t tenant_id_;
share::ObLSID ls_id_;
blocksstable::ObDDLMacroBlockRedoInfo redo_info_;
int64_t task_id_;
private:
DISALLOW_COPY_AND_ASSIGN(ObRpcRemoteWriteDDLRedoLogArg);

View File

@ -1021,7 +1021,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
LOG_WARN("the dag of this task is null", K(ret));
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(
static_cast<ObComplementDataDag *>(get_dag())->get_context().data_sstable_redo_writer_.get_start_scn()))) {
} else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) {
} else if (OB_FAIL(callback.init(DDL_MB_DATA_TYPE, hidden_table_key, param_->task_id_, &sstable_redo_writer, context_->ddl_kv_mgr_handle_))) {
LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key));
} else if (OB_FAIL(writer.open(data_desc, macro_start_seq, &callback))) {
LOG_WARN("fail to open macro block writer", K(ret), K(data_desc));

View File

@ -27,6 +27,7 @@
#include "storage/blocksstable/ob_logic_macro_id.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "storage/tablet/ob_tablet.h"
#include "rootserver/ddl_task/ob_ddl_task.h"
using namespace oceanbase::common;
using namespace oceanbase::storage;
@ -144,28 +145,45 @@ int ObDDLCtrlSpeedItem::cal_limit(const int64_t bytes, int64_t &next_available_t
int ObDDLCtrlSpeedItem::do_sleep(
const int64_t next_available_ts,
const int64_t task_id,
int64_t &real_sleep_us)
{
int ret = OB_SUCCESS;
real_sleep_us = 0;
bool is_exist = true;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (next_available_ts <= 0) {
} else if (next_available_ts <= 0 || task_id == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument.", K(ret), K(next_available_ts));
LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(task_id));
} else if (OB_UNLIKELY(need_stop_write_)) /*clog disk used exceeds threshold*/ {
int64_t loop_cnt = 0;
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
while (OB_SUCC(ret) && need_stop_write_) {
// TODO YIREN (FIXME-20221017), exit when task is canceled, etc.
int64_t tmp_ret = OB_SUCCESS;
ob_usleep(SLEEP_INTERVAL);
if (0 == loop_cnt % 100) {
if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, task_id, is_exist))) {
is_exist = true;
LOG_WARN("check task id exist failed", K(tmp_ret), K(task_id));
} else {
if (!is_exist) {
LOG_INFO("task is not exist", K(task_id));
break;
}
}
}
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
ObTaskController::get().allow_next_syslog();
FLOG_INFO("stop write ddl clog", K(ret), K(ls_id_),
K(write_speed_), K(need_stop_write_), K(ref_cnt_), K(disk_used_stop_write_threshold_));
}
loop_cnt++;
}
}
if (OB_SUCC(ret)) {
if (OB_SUCC(ret) && is_exist) {
real_sleep_us = std::max(0L, next_available_ts - ObTimeUtility::current_time());
ob_usleep(real_sleep_us);
}
@ -175,6 +193,7 @@ int ObDDLCtrlSpeedItem::do_sleep(
// calculate the sleep time for the input bytes, sleep.
int ObDDLCtrlSpeedItem::limit_and_sleep(
const int64_t bytes,
const int64_t task_id,
int64_t &real_sleep_us)
{
int ret = OB_SUCCESS;
@ -185,9 +204,9 @@ int ObDDLCtrlSpeedItem::limit_and_sleep(
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if ((disk_used_stop_write_threshold_ <= 0
|| disk_used_stop_write_threshold_ > 100) || bytes < 0) {
|| disk_used_stop_write_threshold_ > 100) || bytes < 0 || 0 == task_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes));
LOG_WARN("invalid argument.", K(ret), K(disk_used_stop_write_threshold_), K(bytes), K(task_id));
} else if (OB_FAIL(cal_limit(bytes, next_available_ts))) {
LOG_WARN("fail to calculate sleep time", K(ret), K(bytes), K(next_available_ts));
} else if (OB_ISNULL(GCTX.bandwidth_throttle_)) {
@ -198,7 +217,7 @@ int ObDDLCtrlSpeedItem::limit_and_sleep(
INT64_MAX,
&transmit_sleep_us))) {
LOG_WARN("fail to limit out and sleep", K(ret), K(bytes), K(transmit_sleep_us));
} else if (OB_FAIL(do_sleep(next_available_ts, real_sleep_us))) {
} else if (OB_FAIL(do_sleep(next_available_ts, task_id, real_sleep_us))) {
LOG_WARN("fail to sleep", K(ret), K(next_available_ts), K(real_sleep_us));
} else {/* do nothing. */}
return ret;
@ -288,7 +307,9 @@ int ObDDLCtrlSpeedHandle::init()
int ObDDLCtrlSpeedHandle::limit_and_sleep(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const int64_t bytes, int64_t &real_sleep_us)
const int64_t bytes,
const int64_t task_id,
int64_t &real_sleep_us)
{
int ret = OB_SUCCESS;
SpeedHandleKey speed_handle_key;
@ -298,9 +319,9 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep(
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if(OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid() || bytes < 0)) {
} else if(OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid() || bytes < 0 || 0 == task_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(ls_id), K(bytes));
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(ls_id), K(bytes));
} else if (FALSE_IT(speed_handle_key.tenant_id_ = tenant_id)) {
} else if (FALSE_IT(speed_handle_key.ls_id_ = ls_id)) {
} else if (OB_UNLIKELY(!speed_handle_map_.created())) {
@ -314,8 +335,9 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err, ctrl speed item is nullptr", K(ret), K(speed_handle_key));
} else if (OB_FAIL(speed_handle_item->limit_and_sleep(bytes,
task_id,
real_sleep_us))) {
LOG_WARN("fail to limit and sleep", K(ret), K(bytes), K(real_sleep_us));
LOG_WARN("fail to limit and sleep", K(ret), K(bytes), K(task_id), K(real_sleep_us));
}
return ret;
}
@ -559,6 +581,7 @@ int ObDDLRedoLogWriter::write(
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const ObDDLRedoLog &log,
const uint64_t tenant_id,
const int64_t task_id,
const share::ObLSID &ls_id,
ObLogHandler *log_handler,
const blocksstable::MacroBlockId &macro_block_id,
@ -581,7 +604,7 @@ int ObDDLRedoLogWriter::write(
SCN base_scn = SCN::min_scn();
SCN scn;
uint32_t lock_tid = 0;
if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer) {
if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer || 0 == task_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(log), K(ls_id), K(tenant_id), KP(buffer));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->rdlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) {
@ -620,8 +643,8 @@ int ObDDLRedoLogWriter::write(
}
if (OB_SUCC(ret)) {
int64_t real_sleep_us = 0;
if (OB_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, real_sleep_us))) {
LOG_WARN("fail to limit and sleep", K(ret), K(tenant_id), K(ls_id), K(buffer_size), K(real_sleep_us));
if (OB_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, task_id, real_sleep_us))) {
LOG_WARN("fail to limit and sleep", K(ret), K(tenant_id), K(task_id), K(ls_id), K(buffer_size), K(real_sleep_us));
}
}
if (OB_FAIL(ret)) {
@ -948,34 +971,36 @@ int ObDDLMacroBlockRedoWriter::write_macro_redo(ObTabletHandle &tablet_handle,
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const ObDDLMacroBlockRedoInfo &redo_info,
const share::ObLSID &ls_id,
const int64_t task_id,
ObLogHandler *log_handler,
const blocksstable::MacroBlockId &macro_block_id,
char *buffer,
ObDDLRedoLogHandle &handle)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!redo_info.is_valid() || nullptr == log_handler || nullptr == buffer)) {
if (OB_UNLIKELY(!redo_info.is_valid() || nullptr == log_handler || nullptr == buffer || 0 == task_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer));
LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer), K(task_id));
} else {
ObDDLRedoLog log;
const uint64_t tenant_id = MTL_ID();
if (OB_FAIL(log.init(redo_info))) {
LOG_WARN("fail to init DDLRedoLog", K(ret), K(redo_info));
} else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, ls_id, log_handler, macro_block_id, buffer, handle))) {
} else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, task_id, ls_id, log_handler, macro_block_id, buffer, handle))) {
LOG_WARN("fail to write ddl redo log item", K(ret));
}
}
return ret;
}
int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const ObAddr &leader_addr,
int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const int64_t task_id,
const ObAddr &leader_addr,
const ObLSID &leader_ls_id,
const ObDDLMacroBlockRedoInfo &redo_info)
{
int ret = OB_SUCCESS;
obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr;
if (OB_UNLIKELY(!redo_info.is_valid())) {
if (OB_UNLIKELY(!redo_info.is_valid() || 0 == task_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(redo_info));
} else if (OB_ISNULL(srv_rpc_proxy = GCTX.srv_rpc_proxy_)) {
@ -983,7 +1008,7 @@ int ObDDLMacroBlockRedoWriter::remote_write_macro_redo(const ObAddr &leader_addr
LOG_WARN("srv rpc proxy is null", K(ret), KP(srv_rpc_proxy));
} else {
obrpc::ObRpcRemoteWriteDDLRedoLogArg arg;
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id, redo_info))) {
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id, redo_info, task_id))) {
LOG_WARN("fail to init ObRpcRemoteWriteDDLRedoLogArg", K(ret));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).by(MTL_ID()).remote_write_ddl_redo_log(arg))) {
LOG_WARN("fail to remote write ddl redo log", K(ret), K(leader_addr), K(arg));
@ -1119,7 +1144,12 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
return ret;
}
int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info, const blocksstable::MacroBlockId &macro_block_id, const bool allow_remote_write, ObTabletHandle &tablet_handle, ObDDLKvMgrHandle &ddl_kv_mgr_handle)
int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_info,
const blocksstable::MacroBlockId &macro_block_id,
const bool allow_remote_write,
const int64_t task_id,
ObTabletHandle &tablet_handle,
ObDDLKvMgrHandle &ddl_kv_mgr_handle)
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
@ -1128,9 +1158,9 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLSSTableRedoWriter has not been inited", K(ret));
} else if (OB_UNLIKELY(!redo_info.is_valid())) {
} else if (OB_UNLIKELY(!redo_info.is_valid() || 0 == task_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret));
LOG_WARN("invalid arguments", K(ret), K(task_id));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
LOG_WARN("get ls failed", K(ret), K(ls_id_));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
@ -1140,7 +1170,7 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(BUF_SIZE));
} else if (!remote_write_) {
if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(tablet_handle, ddl_kv_mgr_handle, redo_info, ls->get_ls_id(), ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) {
if (OB_FAIL(ObDDLMacroBlockRedoWriter::write_macro_redo(tablet_handle, ddl_kv_mgr_handle, redo_info, ls->get_ls_id(), task_id, ls->get_log_handler(), macro_block_id, buffer_, ddl_redo_handle_))) {
if (ObDDLUtil::need_remote_write(ret) && allow_remote_write) {
if (OB_FAIL(switch_to_remote_write())) {
LOG_WARN("fail to switch to remote write", K(ret));
@ -1152,7 +1182,8 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i
}
if (OB_SUCC(ret) && remote_write_) {
if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(leader_addr_,
if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(task_id,
leader_addr_,
leader_ls_id_,
redo_info))) {
LOG_WARN("fail to remote write ddl redo log", K(ret), K_(leader_ls_id), K_(leader_addr));
@ -1287,7 +1318,7 @@ ObDDLSSTableRedoWriter::~ObDDLSSTableRedoWriter()
}
ObDDLRedoLogWriterCallback::ObDDLRedoLogWriterCallback()
: is_inited_(false), redo_info_(), table_key_(), macro_block_id_(), ddl_writer_(nullptr), block_buffer_(nullptr)
: is_inited_(false), redo_info_(), table_key_(), macro_block_id_(), ddl_writer_(nullptr), block_buffer_(nullptr), task_id_(0)
{
}
@ -1301,6 +1332,7 @@ ObDDLRedoLogWriterCallback::~ObDDLRedoLogWriterCallback()
int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type,
const ObITable::TableKey &table_key,
const int64_t task_id,
ObDDLSSTableRedoWriter *ddl_writer,
ObDDLKvMgrHandle &ddl_kv_mgr_handle)
{
@ -1313,7 +1345,7 @@ int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type,
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObDDLSSTableRedoWriter has been inited twice", K(ret));
} else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type || !ddl_kv_mgr_handle.is_valid())) {
} else if (OB_UNLIKELY(!table_key.is_valid() || nullptr == ddl_writer || DDL_MB_INVALID_TYPE == block_type || 0 == task_id || !ddl_kv_mgr_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(table_key), K(block_type));
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ddl_kv_mgr_handle.get_obj()->get_ls_id(), ls_handle, ObLSGetMod::DDL_MOD))) {
@ -1327,6 +1359,7 @@ int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type,
block_type_ = block_type;
table_key_ = table_key;
ddl_writer_ = ddl_writer;
task_id_ = task_id;
tablet_handle_ = tablet_handle;
ddl_kv_mgr_handle_ = ddl_kv_mgr_handle;
is_inited_ = true;
@ -1352,7 +1385,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle &macro_handle,
redo_info_.block_type_ = block_type_;
redo_info_.logic_id_ = logic_id;
redo_info_.start_scn_ = ddl_writer_->get_start_scn();
if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/, tablet_handle_, ddl_kv_mgr_handle_))) {
if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_, true/*allow remote write*/, task_id_, tablet_handle_, ddl_kv_mgr_handle_))) {
LOG_WARN("fail to write ddl redo log", K(ret));
}
}

View File

@ -55,7 +55,9 @@ public:
void reset_need_stop_write() { need_stop_write_ = false; }
int init(const share::ObLSID &ls_id);
int refresh();
int limit_and_sleep(const int64_t bytes, int64_t &real_sleep_us);
int limit_and_sleep(const int64_t bytes,
const int64_t task_id,
int64_t &real_sleep_us);
// for ref_cnt_
void inc_ref() { ATOMIC_INC(&ref_cnt_); }
@ -66,7 +68,9 @@ public:
K_(write_speed), K_(disk_used_stop_write_threshold), K_(need_stop_write), K_(ref_cnt));
private:
int cal_limit(const int64_t bytes, int64_t &next_available_ts);
int do_sleep(const int64_t next_available_ts, int64_t &real_sleep_us);
int do_sleep(const int64_t next_available_ts,
const int64_t task_id,
int64_t &real_sleep_us);
private:
static const int64_t MIN_WRITE_SPEED = 50L;
static const int64_t SLEEP_INTERVAL = 1 * 1000; // 1ms
@ -85,7 +89,11 @@ class ObDDLCtrlSpeedHandle final
public:
int init();
static ObDDLCtrlSpeedHandle &get_instance();
int limit_and_sleep(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t bytes, int64_t &real_sleep_us);
int limit_and_sleep(const uint64_t tenant_id,
const share::ObLSID &ls_id,
const int64_t bytes,
const int64_t task_id,
int64_t &real_sleep_us);
private:
struct SpeedHandleKey {
@ -192,6 +200,7 @@ public:
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const ObDDLRedoLog &log,
const uint64_t tenant_id,
const int64_t task_id,
const share::ObLSID &ls_id,
logservice::ObLogHandler *log_handler,
const blocksstable::MacroBlockId &macro_block_id,
@ -234,11 +243,13 @@ public:
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
const share::ObLSID &ls_id,
const int64_t task_id,
logservice::ObLogHandler *log_handler,
const blocksstable::MacroBlockId &macro_block_id,
char *buffer,
ObDDLRedoLogHandle &handle);
static int remote_write_macro_redo(const ObAddr &leader_addr,
static int remote_write_macro_redo(const int64_t task_id,
const ObAddr &leader_addr,
const share::ObLSID &leader_ls_id,
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info);
private:
@ -265,6 +276,7 @@ public:
int write_redo_log(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
const blocksstable::MacroBlockId &macro_block_id,
const bool allow_remote_write,
const int64_t task_id,
ObTabletHandle &tablet_handle,
ObDDLKvMgrHandle &ddl_kv_mgr_handle);
int wait_redo_log_finish(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
@ -298,7 +310,11 @@ class ObDDLRedoLogWriterCallback : public blocksstable::ObIMacroBlockFlushCallba
public:
ObDDLRedoLogWriterCallback();
virtual ~ObDDLRedoLogWriterCallback();
int init(const blocksstable::ObDDLMacroBlockType block_type, const ObITable::TableKey &table_key, ObDDLSSTableRedoWriter *ddl_writer, ObDDLKvMgrHandle &ddl_kv_mgr_handle);
int init(const blocksstable::ObDDLMacroBlockType block_type,
const ObITable::TableKey &table_key,
const int64_t task_id,
ObDDLSSTableRedoWriter *ddl_writer,
ObDDLKvMgrHandle &ddl_kv_mgr_handle);
int write(
const ObMacroBlockHandle &macro_handle,
const blocksstable::ObLogicMacroBlockId &logic_id,
@ -314,6 +330,7 @@ private:
blocksstable::MacroBlockId macro_block_id_;
ObDDLSSTableRedoWriter *ddl_writer_;
char *block_buffer_;
int64_t task_id_;
ObTabletHandle tablet_handle_;
ObDDLKvMgrHandle ddl_kv_mgr_handle_;
};

View File

@ -195,7 +195,8 @@ int ObSSTableInsertRowIterator::get_sql_mode(ObSQLMode &sql_mode) const
ObSSTableInsertSliceParam::ObSSTableInsertSliceParam()
: snapshot_version_(0),
write_major_(false),
sstable_index_builder_(nullptr)
sstable_index_builder_(nullptr),
task_id_(0)
{
}
@ -207,7 +208,7 @@ bool ObSSTableInsertSliceParam::is_valid() const
{
return tablet_id_.is_valid() && ls_id_.is_valid() && table_key_.is_valid() &&
start_seq_.is_valid() && start_scn_.is_valid() && frozen_scn_.is_valid() &&
nullptr != sstable_index_builder_;
nullptr != sstable_index_builder_ && 0 != task_id_;
}
ObSSTableInsertSliceWriter::ObSSTableInsertSliceWriter()
@ -250,7 +251,7 @@ int ObSSTableInsertSliceWriter::init(const ObSSTableInsertSliceParam &slice_para
K(slice_param.tablet_id_));
} else if (FALSE_IT(sstable_redo_writer_.set_start_scn(slice_param.start_scn_))) {
} else if (OB_FAIL(redo_log_writer_callback_.init(DDL_MB_DATA_TYPE, slice_param.table_key_,
&sstable_redo_writer_, ddl_kv_mgr_handle))) {
slice_param.task_id_, &sstable_redo_writer_, ddl_kv_mgr_handle))) {
LOG_WARN("fail to init redo log writer callback", KR(ret));
} else if (OB_FAIL(data_desc_.init(*table_schema,
slice_param.ls_id_,
@ -664,6 +665,7 @@ int ObSSTableInsertTabletContext::construct_sstable_slice_writer(
slice_param.frozen_scn_ = frozen_status.frozen_scn_;
slice_param.write_major_ = build_param.write_major_;
slice_param.sstable_index_builder_ = index_builder_;
slice_param.task_id_ = build_param_.ddl_task_id_;
if (OB_ISNULL(sstable_slice_writer = OB_NEWx(ObSSTableInsertSliceWriter, (&allocator)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObSSTableInsertSliceWriter", KR(ret));

View File

@ -116,7 +116,7 @@ public:
~ObSSTableInsertSliceParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(ls_id), K_(table_key), K_(start_seq), K_(start_scn),
K_(snapshot_version), K_(frozen_scn), K_(write_major), KP_(sstable_index_builder));
K_(snapshot_version), K_(task_id), K_(frozen_scn), K_(write_major), KP_(sstable_index_builder), K_(task_id));
public:
common::ObTabletID tablet_id_;
share::ObLSID ls_id_;
@ -127,6 +127,7 @@ public:
share::SCN frozen_scn_;
bool write_major_;
blocksstable::ObSSTableIndexBuilder *sstable_index_builder_;
int64_t task_id_;
};
class ObSSTableInsertSliceWriter final