diff --git a/deps/oblib/src/lib/container/ob_mask_set2.h b/deps/oblib/src/lib/container/ob_mask_set2.h index c1bb8a1a0..262168639 100644 --- a/deps/oblib/src/lib/container/ob_mask_set2.h +++ b/deps/oblib/src/lib/container/ob_mask_set2.h @@ -40,9 +40,9 @@ public: return ret; } void reset() - { - is_inited_ = false; - array_ = NULL; + { + is_inited_ = false; + array_ = NULL; bitset_.reuse(); } void clear() @@ -126,9 +126,9 @@ public: } return ret; } - bool is_all_mask() const - { - bool bool_ret = false; + bool is_all_mask() const + { + bool bool_ret = false; if (is_inited_) { bool_ret = (array_->count() == bitset_.num_members()); } diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index 503c3d262..a9b808223 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -1484,7 +1484,8 @@ int ObComplementWriteTask::append_row(ObScan *scan) hidden_table_key, param_->task_id_, context_->start_scn_, - param_->data_format_version_))) { + param_->data_format_version_, + direct_load_hdl.get_full_obj()->get_direct_load_type()))) { LOG_WARN("fail to init data callback", K(ret), K(hidden_table_key)); } else if (OB_FAIL(writer.open(data_desc.get_desc(), macro_start_seq, &callback))) { LOG_WARN("fail to open macro block writer", K(ret), K(data_desc)); diff --git a/src/storage/ddl/ob_ddl_inc_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_inc_redo_log_writer.cpp index 3ec83735e..2c98389d0 100644 --- a/src/storage/ddl/ob_ddl_inc_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_inc_redo_log_writer.cpp @@ -643,6 +643,7 @@ ObDDLIncRedoLogWriterCallback::ObDDLIncRedoLogWriterCallback() table_key_(), task_id_(0), data_format_version_(0), + direct_load_type_(DIRECT_LOAD_INVALID), tx_desc_(nullptr), trans_id_() { @@ -661,6 +662,7 @@ int ObDDLIncRedoLogWriterCallback::init( const int64_t task_id, const share::SCN &start_scn, const uint64_t data_format_version, + const ObDirectLoadType direct_load_type, ObTxDesc *tx_desc, const ObTransID &trans_id) { @@ -669,9 +671,11 @@ int ObDDLIncRedoLogWriterCallback::init( ret = OB_INIT_TWICE; LOG_WARN("inited twice", K(ret)); } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || block_type == DDL_MB_INVALID_TYPE || - !table_key.is_valid() || task_id == 0 || data_format_version < 0 || OB_ISNULL(tx_desc) || !trans_id.is_valid())) { + !table_key.is_valid() || task_id == 0 || data_format_version < 0 || + !is_valid_direct_load(direct_load_type) || OB_ISNULL(tx_desc) || !trans_id.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(block_type), K(table_key), K(task_id), K(data_format_version), KP(tx_desc), K(trans_id)); + LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(block_type), K(table_key), K(task_id), K(data_format_version), + K(direct_load_type), KP(tx_desc), K(trans_id)); } else if (OB_FAIL(ddl_inc_writer_.init(ls_id, tablet_id))) { LOG_WARN("fail to init ddl_inc_writer_", K(ret), K(ls_id), K(tablet_id)); } else { @@ -699,6 +703,7 @@ void ObDDLIncRedoLogWriterCallback::reset() task_id_ = 0; start_scn_.reset(); data_format_version_ = 0; + direct_load_type_ = DIRECT_LOAD_INVALID; tx_desc_ = nullptr; trans_id_.reset(); } @@ -725,7 +730,7 @@ int ObDDLIncRedoLogWriterCallback::write( redo_info_.logic_id_ = logic_id; redo_info_.start_scn_ = start_scn_; redo_info_.data_format_version_ = data_format_version_; - redo_info_.type_ = ObDirectLoadType::DIRECT_LOAD_INCREMENTAL; + redo_info_.type_ = direct_load_type_; redo_info_.trans_id_ = trans_id_; if (OB_FAIL(ddl_inc_writer_.write_inc_redo_log_with_retry(redo_info_, macro_block_id_, task_id_, tx_desc_))) { LOG_WARN("write ddl inc redo log fail", K(ret)); diff --git a/src/storage/ddl/ob_ddl_inc_redo_log_writer.h b/src/storage/ddl/ob_ddl_inc_redo_log_writer.h index 058fecacd..0c3351ec8 100644 --- a/src/storage/ddl/ob_ddl_inc_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_inc_redo_log_writer.h @@ -124,6 +124,7 @@ public: const int64_t task_id, const share::SCN &start_scn, const uint64_t data_format_version, + const storage::ObDirectLoadType direct_load_type, transaction::ObTxDesc *tx_desc, const transaction::ObTransID &trans_id); void reset(); @@ -144,6 +145,7 @@ private: int64_t task_id_; share::SCN start_scn_; uint64_t data_format_version_; + storage::ObDirectLoadType direct_load_type_; transaction::ObTxDesc *tx_desc_; transaction::ObTransID trans_id_; }; diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 92d29490a..c8db71896 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -1500,7 +1500,8 @@ ObDDLRedoLogWriter::~ObDDLRedoLogWriter() ObDDLRedoLogWriterCallback::ObDDLRedoLogWriterCallback() : is_inited_(false), redo_info_(), block_type_(ObDDLMacroBlockType::DDL_MB_INVALID_TYPE), - table_key_(), macro_block_id_(), task_id_(0), data_format_version_(0), row_id_offset_(-1) + table_key_(), macro_block_id_(), task_id_(0), data_format_version_(0), + direct_load_type_(DIRECT_LOAD_INVALID), row_id_offset_(-1) { } @@ -1516,6 +1517,7 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id, const int64_t task_id, const share::SCN &start_scn, const uint64_t data_format_version, + const ObDirectLoadType direct_load_type, const int64_t row_id_offset/*=-1*/) { int ret = OB_SUCCESS; @@ -1527,9 +1529,11 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id, ret = OB_INIT_TWICE; LOG_WARN("ddl redo log writer has been inited twice", K(ret)); } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !table_key.is_valid() || - DDL_MB_INVALID_TYPE == block_type || 0 == task_id || data_format_version < 0)) { + DDL_MB_INVALID_TYPE == block_type || 0 == task_id || data_format_version < 0 || + !is_valid_direct_load(direct_load_type))) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(table_key), K(block_type), K(data_format_version), K(task_id)); + LOG_WARN("invalid arguments", K(ret), K(ls_id), K(tablet_id), K(table_key), K(block_type), K(data_format_version), + K(direct_load_type), K(task_id)); } else if (OB_UNLIKELY(table_key.is_column_store_sstable() && row_id_offset < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument of column group data", K(ret), K(table_key), K(row_id_offset)); @@ -1541,6 +1545,7 @@ int ObDDLRedoLogWriterCallback::init(const share::ObLSID &ls_id, task_id_ = task_id; start_scn_ = start_scn; data_format_version_ = data_format_version; + direct_load_type_ = direct_load_type; row_id_offset_ = row_id_offset; is_inited_ = true; } @@ -1558,6 +1563,7 @@ void ObDDLRedoLogWriterCallback::reset() task_id_ = 0; start_scn_.reset(); data_format_version_ = 0; + direct_load_type_ = DIRECT_LOAD_INVALID; row_id_offset_ = -1; } @@ -1587,7 +1593,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle, redo_info_.logic_id_ = logic_id; redo_info_.start_scn_ = start_scn_; redo_info_.data_format_version_ = data_format_version_; - redo_info_.type_ = ObDirectLoadType::DIRECT_LOAD_LOAD_DATA; + redo_info_.type_ = direct_load_type_; if (is_column_group_info_valid()) { redo_info_.end_row_id_ = row_id_offset_ + row_count - 1; row_id_offset_ += row_count; diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index d1fb3c451..c0ee6a687 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -374,6 +374,7 @@ public: const int64_t task_id, const share::SCN &start_scn, const uint64_t data_format_version, + const storage::ObDirectLoadType direct_load_type, const int64_t row_id_offset = -1); void reset(); int write( @@ -397,6 +398,7 @@ private: int64_t task_id_; share::SCN start_scn_; uint64_t data_format_version_; + storage::ObDirectLoadType direct_load_type_; // if has one macro block with 100 rows before, this macro block's ddl_start_row_offset will be 100. // if current macro block finish with 50 rows, current macro block's end_row_offset will be 149. // end_row_offset = ddl_start_row_offset + curr_row_count - 1. diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index e11fc81a1..3c7b518fb 100644 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -642,7 +642,7 @@ int ObMacroBlockSliceStore::init( ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory", K(ret)); } else if (OB_FAIL(static_cast(ddl_redo_callback_)->init( - ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, tx_desc, trans_id))) { + ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, direct_load_type, tx_desc, trans_id))) { LOG_WARN("fail to init inc ddl_redo_callback_", K(ret)); } } else { @@ -650,7 +650,7 @@ int ObMacroBlockSliceStore::init( ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory", K(ret)); } else if (OB_FAIL(static_cast(ddl_redo_callback_)->init( - ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, -1/*row_id_offset*/))) { + ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, start_scn, data_format_version, direct_load_type, -1/*row_id_offset*/))) { LOG_WARN("fail to init full ddl_redo_callback_", K(ret)); } } @@ -1359,7 +1359,7 @@ int ObCOSliceWriter::init(const ObStorageSchema *storage_schema, const int64_t c LOG_WARN("init sstable index builder failed", K(ret), K(ls_id), K(table_key), K(data_desc_)); } else if (FALSE_IT(data_desc_.get_desc().sstable_index_builder_ = &index_builder_)) { // for build the tail index block in macro block } else if (OB_FAIL(flush_callback_.init(ls_id, table_key.tablet_id_, DDL_MB_DATA_TYPE, table_key, ddl_task_id, - start_scn, data_format_version, row_id_offset))) { + start_scn, data_format_version, tablet_direct_load_mgr->get_direct_load_type(), row_id_offset))) { LOG_WARN("fail to init redo log writer callback", KR(ret)); } else if (OB_FAIL(macro_block_writer_.open(data_desc_.get_desc(), start_seq, &flush_callback_))) { LOG_WARN("fail to open macro block writer", K(ret), K(ls_id), K(table_key), K(data_desc_), K(start_seq));