set ObDirectLoadType

This commit is contained in:
obdev
2024-05-11 10:34:35 +00:00
committed by ob-robot
parent 2ff8588a15
commit f77226df19
7 changed files with 33 additions and 17 deletions

View File

@ -1484,7 +1484,8 @@ int ObComplementWriteTask::append_row(ObScan *scan)
hidden_table_key, hidden_table_key,
param_->task_id_, param_->task_id_,
context_->start_scn_, context_->start_scn_,
param_->data_format_version_))) { param_->data_format_version_,
direct_load_hdl.get_full_obj()->get_direct_load_type()))) {
LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key)); LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key));
} else if (OB_FAIL(writer.open(data_desc.get_desc(), macro_start_seq, &callback))) { } else if (OB_FAIL(writer.open(data_desc.get_desc(), macro_start_seq, &callback))) {
LOG_WARN("fail to open macro block writer", K(ret), K(data_desc)); LOG_WARN("fail to open macro block writer", K(ret), K(data_desc));

View File

@ -643,6 +643,7 @@ ObDDLIncRedoLogWriterCallback::ObDDLIncRedoLogWriterCallback()
table_key_(), table_key_(),
task_id_(0), task_id_(0),
data_format_version_(0), data_format_version_(0),
direct_load_type_(DIRECT_LOAD_INVALID),
tx_desc_(nullptr), tx_desc_(nullptr),
trans_id_() trans_id_()
{ {
@ -661,6 +662,7 @@ int ObDDLIncRedoLogWriterCallback::init(
const int64_t task_id, const int64_t task_id,
const share::SCN &start_scn, const share::SCN &start_scn,
const uint64_t data_format_version, const uint64_t data_format_version,
const ObDirectLoadType direct_load_type,
ObTxDesc *tx_desc, ObTxDesc *tx_desc,
const ObTransID &trans_id) const ObTransID &trans_id)
{ {
@ -669,9 +671,11 @@ int ObDDLIncRedoLogWriterCallback::init(
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
LOG_WARN("inited twice", K(ret)); LOG_WARN("inited twice", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || block_type == DDL_MB_INVALID_TYPE || } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || block_type == DDL_MB_INVALID_TYPE ||
!table_key.is_valid() || task_id == 0 || data_format_version < 0 || OB_ISNULL(tx_desc) || !trans_id.is_valid())) { !table_key.is_valid() || task_id == 0 || data_format_version < 0 ||
!is_valid_direct_load(direct_load_type) || OB_ISNULL(tx_desc) || !trans_id.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(block_type), K(table_key), K(task_id), K(data_format_version), KP(tx_desc), K(trans_id)); LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(block_type), K(table_key), K(task_id), K(data_format_version),
K(direct_load_type), KP(tx_desc), K(trans_id));
} else if (OB_FAIL(ddl_inc_writer_.init(ls_id, tablet_id))) { } else if (OB_FAIL(ddl_inc_writer_.init(ls_id, tablet_id))) {
LOG_WARN("fail to init ddl_inc_writer_", K(ret), K(ls_id), K(tablet_id)); LOG_WARN("fail to init ddl_inc_writer_", K(ret), K(ls_id), K(tablet_id));
} else { } else {
@ -699,6 +703,7 @@ void ObDDLIncRedoLogWriterCallback::reset()
task_id_ = 0; task_id_ = 0;
start_scn_.reset(); start_scn_.reset();
data_format_version_ = 0; data_format_version_ = 0;
direct_load_type_ = DIRECT_LOAD_INVALID;
tx_desc_ = nullptr; tx_desc_ = nullptr;
trans_id_.reset(); trans_id_.reset();
} }
@ -725,7 +730,7 @@ int ObDDLIncRedoLogWriterCallback::write(
redo_info_.logic_id_ = logic_id; redo_info_.logic_id_ = logic_id;
redo_info_.start_scn_ = start_scn_; redo_info_.start_scn_ = start_scn_;
redo_info_.data_format_version_ = data_format_version_; redo_info_.data_format_version_ = data_format_version_;
redo_info_.type_ = ObDirectLoadType::DIRECT_LOAD_INCREMENTAL; redo_info_.type_ = direct_load_type_;
redo_info_.trans_id_ = trans_id_; redo_info_.trans_id_ = trans_id_;
if (OB_FAIL(ddl_inc_writer_.write_inc_redo_log_with_retry(redo_info_, macro_block_id_, task_id_, tx_desc_))) { if (OB_FAIL(ddl_inc_writer_.write_inc_redo_log_with_retry(redo_info_, macro_block_id_, task_id_, tx_desc_))) {
LOG_WARN("write ddl inc redo log fail", K(ret)); LOG_WARN("write ddl inc redo log fail", K(ret));

View File

@ -124,6 +124,7 @@ public:
const int64_t task_id, const int64_t task_id,
const share::SCN &start_scn, const share::SCN &start_scn,
const uint64_t data_format_version, const uint64_t data_format_version,
const storage::ObDirectLoadType direct_load_type,
transaction::ObTxDesc *tx_desc, transaction::ObTxDesc *tx_desc,
const transaction::ObTransID &trans_id); const transaction::ObTransID &trans_id);
void reset(); void reset();
@ -144,6 +145,7 @@ private:
int64_t task_id_; int64_t task_id_;
share::SCN start_scn_; share::SCN start_scn_;
uint64_t data_format_version_; uint64_t data_format_version_;
storage::ObDirectLoadType direct_load_type_;
transaction::ObTxDesc *tx_desc_; transaction::ObTxDesc *tx_desc_;
transaction::ObTransID trans_id_; transaction::ObTransID trans_id_;
}; };

View File

@ -1500,7 +1500,8 @@ ObDDLRedoLogWriter::~ObDDLRedoLogWriter()
ObDDLRedoLogWriterCallback::ObDDLRedoLogWriterCallback() ObDDLRedoLogWriterCallback::ObDDLRedoLogWriterCallback()
: is_inited_(false), redo_info_(), block_type_(ObDDLMacroBlockType::DDL_MB_INVALID_TYPE), : is_inited_(false), redo_info_(), block_type_(ObDDLMacroBlockType::DDL_MB_INVALID_TYPE),
table_key_(), macro_block_id_(), task_id_(0), data_format_version_(0), row_id_offset_(-1) table_key_(), macro_block_id_(), task_id_(0), data_format_version_(0),
direct_load_type_(DIRECT_LOAD_INVALID), row_id_offset_(-1)
{ {
} }
@ -1516,6 +1517,7 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id,
const int64_t task_id, const int64_t task_id,
const share::SCN &start_scn, const share::SCN &start_scn,
const uint64_t data_format_version, const uint64_t data_format_version,
const ObDirectLoadType direct_load_type,
const int64_t row_id_offset/*=-1*/) const int64_t row_id_offset/*=-1*/)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -1527,9 +1529,11 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id,
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
LOG_WARN("ddl redo log writer has been inited twice", K(ret)); LOG_WARN("ddl redo log writer has been inited twice", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !table_key.is_valid() || } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !table_key.is_valid() ||
DDL_MB_INVALID_TYPE == block_type || 0 == task_id || data_format_version < 0)) { DDL_MB_INVALID_TYPE == block_type || 0 == task_id || data_format_version < 0 ||
!is_valid_direct_load(direct_load_type))) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(table_key), K(block_type), K(data_format_version), K(task_id)); LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(table_key), K(block_type), K(data_format_version),
K(direct_load_type), K(task_id));
} else if (OB_UNLIKELY(table_key.is_column_store_sstable() && row_id_offset < 0)) { } else if (OB_UNLIKELY(table_key.is_column_store_sstable() && row_id_offset < 0)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument of column group data", K(ret), K(table_key), K(row_id_offset)); LOG_WARN("invalid argument of column group data", K(ret), K(table_key), K(row_id_offset));
@ -1541,6 +1545,7 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id,
task_id_ = task_id; task_id_ = task_id;
start_scn_ = start_scn; start_scn_ = start_scn;
data_format_version_ = data_format_version; data_format_version_ = data_format_version;
direct_load_type_ = direct_load_type;
row_id_offset_ = row_id_offset; row_id_offset_ = row_id_offset;
is_inited_ = true; is_inited_ = true;
} }
@ -1558,6 +1563,7 @@ void ObDDLRedoLogWriterCallback::reset()
task_id_ = 0; task_id_ = 0;
start_scn_.reset(); start_scn_.reset();
data_format_version_ = 0; data_format_version_ = 0;
direct_load_type_ = DIRECT_LOAD_INVALID;
row_id_offset_ = -1; row_id_offset_ = -1;
} }
@ -1587,7 +1593,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle &macro_handle,
redo_info_.logic_id_ = logic_id; redo_info_.logic_id_ = logic_id;
redo_info_.start_scn_ = start_scn_; redo_info_.start_scn_ = start_scn_;
redo_info_.data_format_version_ = data_format_version_; redo_info_.data_format_version_ = data_format_version_;
redo_info_.type_ = ObDirectLoadType::DIRECT_LOAD_LOAD_DATA; redo_info_.type_ = direct_load_type_;
if (is_column_group_info_valid()) { if (is_column_group_info_valid()) {
redo_info_.end_row_id_ = row_id_offset_ + row_count - 1; redo_info_.end_row_id_ = row_id_offset_ + row_count - 1;
row_id_offset_ += row_count; row_id_offset_ += row_count;

View File

@ -374,6 +374,7 @@ public:
const int64_t task_id, const int64_t task_id,
const share::SCN &start_scn, const share::SCN &start_scn,
const uint64_t data_format_version, const uint64_t data_format_version,
const storage::ObDirectLoadType direct_load_type,
const int64_t row_id_offset = -1); const int64_t row_id_offset = -1);
void reset(); void reset();
int write( int write(
@ -397,6 +398,7 @@ private:
int64_t task_id_; int64_t task_id_;
share::SCN start_scn_; share::SCN start_scn_;
uint64_t data_format_version_; uint64_t data_format_version_;
storage::ObDirectLoadType direct_load_type_;
// if has one macro block with 100 rows before, this macro block's ddl_start_row_offset will be 100. // if has one macro block with 100 rows before, this macro block's ddl_start_row_offset will be 100.
// if current macro block finish with 50 rows, current macro block's end_row_offset will be 149. // if current macro block finish with 50 rows, current macro block's end_row_offset will be 149.
// end_row_offset = ddl_start_row_offset + curr_row_count - 1. // end_row_offset = ddl_start_row_offset + curr_row_count - 1.

View File

@ -642,7 +642,7 @@ int ObMacroBlockSliceStore::init(
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret)); LOG_WARN("failed to alloc memory", K(ret));
} else if (OB_FAIL(static_cast<ObDDLIncRedoLogWriterCallback *>(ddl_redo_callback_)->init( } else if (OB_FAIL(static_cast<ObDDLIncRedoLogWriterCallback *>(ddl_redo_callback_)->init(
ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, tx_desc, trans_id))) { ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, direct_load_type, tx_desc, trans_id))) {
LOG_WARN("fail to init inc ddl_redo_callback_", K(ret)); LOG_WARN("fail to init inc ddl_redo_callback_", K(ret));
} }
} else { } else {
@ -650,7 +650,7 @@ int ObMacroBlockSliceStore::init(
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret)); LOG_WARN("failed to alloc memory", K(ret));
} else if (OB_FAIL(static_cast<ObDDLRedoLogWriterCallback *>(ddl_redo_callback_)->init( } else if (OB_FAIL(static_cast<ObDDLRedoLogWriterCallback *>(ddl_redo_callback_)->init(
ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, -1/*row_id_offset*/))) { ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, direct_load_type, -1/*row_id_offset*/))) {
LOG_WARN("fail to init full ddl_redo_callback_", K(ret)); LOG_WARN("fail to init full ddl_redo_callback_", K(ret));
} }
} }
@ -1359,7 +1359,7 @@ int ObCOSliceWriter::init(const ObStorageSchema *storage_schema, const int64_t c
LOG_WARN("init sstable index builder failed", K(ret), K(ls_id), K(table_key), K(data_desc_)); LOG_WARN("init sstable index builder failed", K(ret), K(ls_id), K(table_key), K(data_desc_));
} else if (FALSE_IT(data_desc_.get_desc().sstable_index_builder_ = &index_builder_)) { // for build the tail index block in macro block } else if (FALSE_IT(data_desc_.get_desc().sstable_index_builder_ = &index_builder_)) { // for build the tail index block in macro block
} else if (OB_FAIL(flush_callback_.init(ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, } else if (OB_FAIL(flush_callback_.init(ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id,
start_scn, data_format_version, row_id_offset))) { start_scn, data_format_version, tablet_direct_load_mgr->get_direct_load_type(), row_id_offset))) {
LOG_WARN("fail to init redo log writer callback", KR(ret)); LOG_WARN("fail to init redo log writer callback", KR(ret));
} else if (OB_FAIL(macro_block_writer_.open(data_desc_.get_desc(), start_seq, &flush_callback_))) { } else if (OB_FAIL(macro_block_writer_.open(data_desc_.get_desc(), start_seq, &flush_callback_))) {
LOG_WARN("fail to open macro block writer", K(ret), K(ls_id), K(table_key), K(data_desc_), K(start_seq)); LOG_WARN("fail to open macro block writer", K(ret), K(ls_id), K(table_key), K(data_desc_), K(start_seq));