[FEAT MERGE]

Co-authored-by: leftgeek <1094669802@qq.com>
Co-authored-by: coolfishchen <coolfishchen@gmail.com>
Co-authored-by: hy-guo <fqboyg@gmail.com>
This commit is contained in:
chimyue
2024-04-19 06:46:36 +00:00
committed by ob-robot
parent 686b0aba5d
commit bf604885f8
143 changed files with 6932 additions and 1286 deletions

View File

@ -759,6 +759,7 @@ ob_set_subtarget(ob_storage direct_load
direct_load/ob_direct_load_sstable_scan_merge.cpp
direct_load/ob_direct_load_sstable_scan_merge_loser_tree.cpp
direct_load/ob_direct_load_sstable_scanner.cpp
direct_load/ob_direct_load_struct.cpp
direct_load/ob_direct_load_table_data_desc.cpp
direct_load/ob_direct_load_table_store.cpp
direct_load/ob_direct_load_tmp_file.cpp

View File

@ -281,7 +281,7 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg,
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
int64_t MAX_ABORT_WAIT_TIMEOUT = 60 * 1000 * 1000; //60s
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + MAX_ABORT_WAIT_TIMEOUT);
if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, session, common_rpc_proxy))) {
if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, DDL_DIRECT_LOAD, session, common_rpc_proxy))) {
if (OB_CANCELED == ret) {
ret = OB_SUCCESS;
LOG_INFO("ddl abort success", K_(arg.task_id));
@ -356,7 +356,7 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi
if (OB_FAIL(ret)) {
} else if (OB_FAIL(build_ddl_single_replica_response(build_single_arg))) {
LOG_WARN("build ddl single replica response", K(ret), K(build_single_arg));
} else if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, &session, common_rpc_proxy))) {
} else if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, DDL_DIRECT_LOAD, &session, common_rpc_proxy))) {
LOG_WARN("failed to wait ddl finish", K(ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_));
}
if (OB_TMP_FAIL(heart_beat_clear(finish_redef_arg.task_id_, tenant_id))) {

View File

@ -870,12 +870,18 @@ int ObTenantDirectLoadMgr::check_and_process_finished_tablet(
}
}
if (OB_SUCC(ret) && nullptr != row_iter) {
ObDDLInsertRowIterator *ddl_row_iter = reinterpret_cast<ObDDLInsertRowIterator*>(row_iter);
const ObDatumRow *row = nullptr;
const bool skip_lob = true;
while (OB_SUCC(ret)) {
if (OB_FAIL(THIS_WORKER.check_status())) {
LOG_WARN("check status failed", K(ret));
} else if (OB_FAIL(static_cast<ObDDLInsertRowIterator*>(row_iter)->get_next_row(skip_lob, row))) {
} else {
if (ddl_row_iter != nullptr) {
ret = ddl_row_iter->get_next_row(skip_lob, row);
} else {
ret = row_iter->get_next_row(row);
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;

View File

@ -59,6 +59,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam()
is_column_store_(false),
online_opt_stat_gather_(false),
px_mode_(false),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
insert_table_ctx_(nullptr),
dml_row_handler_(nullptr)
{
@ -73,6 +74,7 @@ bool ObDirectLoadMergeParam::is_valid() const
return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ &&
snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != col_descs_ && nullptr != cmp_funcs_ &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_) &&
nullptr != insert_table_ctx_ && nullptr != dml_row_handler_;
}
@ -210,6 +212,7 @@ int ObDirectLoadTabletMergeCtx::init(ObTableLoadTableCtx *ctx,
origin_table_param.table_id_ = param.table_id_;
origin_table_param.tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_;
origin_table_param.ls_id_ = ls_partition_id.ls_id_;
origin_table_param.insert_mode_ = param.insert_mode_;
if (OB_FAIL(origin_table_.init(origin_table_param))) {
LOG_WARN("fail to init origin sstable", KR(ret));
} else {

View File

@ -22,6 +22,7 @@
#include "share/table/ob_table_load_dml_stat.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "storage/direct_load/ob_direct_load_origin_table.h"
#include "storage/direct_load/ob_direct_load_struct.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h"
#include "storage/direct_load/ob_direct_load_fast_heap_table.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
@ -54,7 +55,9 @@ public:
TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count),
K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs),
KP_(lob_column_cnt), KP_(cmp_funcs), K_(is_heap_table), K_(is_fast_heap_table),
K_(is_column_store), K_(online_opt_stat_gather), KP_(insert_table_ctx),
K_(is_column_store), K_(online_opt_stat_gather),
"insert_mode", ObDirectLoadInsertMode::get_type_string(insert_mode_),
KP_(insert_table_ctx),
KP_(dml_row_handler));
public:
uint64_t table_id_;
@ -73,6 +76,7 @@ public:
bool is_column_store_;
bool online_opt_stat_gather_;
bool px_mode_;
ObDirectLoadInsertMode::Type insert_mode_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};

View File

@ -31,7 +31,7 @@ using namespace share::schema;
*/
ObDirectLoadOriginTableCreateParam::ObDirectLoadOriginTableCreateParam()
: table_id_(OB_INVALID_ID)
: table_id_(OB_INVALID_ID), insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
{
}
@ -41,7 +41,10 @@ ObDirectLoadOriginTableCreateParam::~ObDirectLoadOriginTableCreateParam()
bool ObDirectLoadOriginTableCreateParam::is_valid() const
{
return OB_INVALID_ID != table_id_ && tablet_id_.is_valid() && ls_id_.is_valid();
return OB_INVALID_ID != table_id_ &&
tablet_id_.is_valid() &&
ls_id_.is_valid() &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_);
}
/**
@ -49,7 +52,7 @@ bool ObDirectLoadOriginTableCreateParam::is_valid() const
*/
ObDirectLoadOriginTableMeta::ObDirectLoadOriginTableMeta()
: table_id_(OB_INVALID_ID)
: table_id_(OB_INVALID_ID), insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
{
}
@ -96,12 +99,13 @@ int ObDirectLoadOriginTable::init(const ObDirectLoadOriginTableCreateParam &para
LOG_WARN("unexpected ls is nullptr", KR(ret));
} else if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle_))) {
LOG_WARN("fail to get tablet", KR(ret), K(tablet_id));
} else if (OB_FAIL(prepare_tables())) {
} else if (ObDirectLoadInsertMode::need_origin_data(param.insert_mode_) && OB_FAIL(prepare_tables())) {
LOG_WARN("fail to prepare tables", KR(ret));
} else {
meta_.ls_id_ = param.ls_id_;
meta_.table_id_ = param.table_id_;
meta_.tablet_id_ = param.tablet_id_;
meta_.insert_mode_ = param.insert_mode_;
is_inited_ = true;
}
}
@ -217,7 +221,9 @@ int ObDirectLoadOriginTableScanner::init(ObDirectLoadOriginTable *origin_table,
LOG_WARN("Invalid argument", KR(ret), KPC(origin_table), K(query_range));
} else {
origin_table_ = origin_table;
if (OB_FAIL((init_table_access_param()))) {
if (!ObDirectLoadInsertMode::need_origin_data(origin_table_->get_meta().insert_mode_)) {
// do nothing
} else if (OB_FAIL((init_table_access_param()))) {
LOG_WARN("fail to init query range", KR(ret));
} else if (OB_FAIL(init_table_access_ctx())) {
LOG_WARN("fail to init table access param", KR(ret));
@ -228,7 +234,8 @@ int ObDirectLoadOriginTableScanner::init(ObDirectLoadOriginTable *origin_table,
LOG_WARN("fail to init multi merge", KR(ret));
} else if (OB_FAIL(scan_merge_.open(query_range))) {
LOG_WARN("fail to open multi merge", KR(ret), K(query_range));
} else {
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
}
@ -327,6 +334,8 @@ int ObDirectLoadOriginTableScanner::get_next_row(const ObDatumRow *&datum_row)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadOriginTableScanner not init", KR(ret), KP(this));
} else if (!ObDirectLoadInsertMode::need_origin_data(origin_table_->get_meta().insert_mode_)) {
ret = OB_ITER_END;
} else {
ObDatumRow *result_row = nullptr;
if (OB_FAIL(scan_merge_.get_next_row(result_row))) {

View File

@ -14,6 +14,7 @@
#include "share/schema/ob_table_dml_param.h"
#include "storage/access/ob_multiple_scan_merge.h"
#include "storage/access/ob_store_row_iterator.h"
#include "storage/direct_load/ob_direct_load_struct.h"
namespace oceanbase
{
@ -26,11 +27,15 @@ public:
ObDirectLoadOriginTableCreateParam();
~ObDirectLoadOriginTableCreateParam();
bool is_valid() const;
TO_STRING_KV(K_(table_id), K_(tablet_id), K_(ls_id));
TO_STRING_KV(K_(table_id),
K_(tablet_id),
K_(ls_id),
"insert_mode", ObDirectLoadInsertMode::get_type_string(insert_mode_));
public:
uint64_t table_id_;
common::ObTabletID tablet_id_;
share::ObLSID ls_id_;
ObDirectLoadInsertMode::Type insert_mode_;
};
struct ObDirectLoadOriginTableMeta
@ -38,11 +43,15 @@ struct ObDirectLoadOriginTableMeta
public:
ObDirectLoadOriginTableMeta();
~ObDirectLoadOriginTableMeta();
TO_STRING_KV( K_(table_id), K_(tablet_id), K_(ls_id));
TO_STRING_KV(K_(table_id),
K_(tablet_id),
K_(ls_id),
"insert_mode", ObDirectLoadInsertMode::get_type_string(insert_mode_));
public:
uint64_t table_id_;
common::ObTabletID tablet_id_;
share::ObLSID ls_id_;
ObDirectLoadInsertMode::Type insert_mode_;
};
class ObDirectLoadOriginTable

View File

@ -208,7 +208,22 @@ int ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters(
} else {
const ObITableReadInfo &read_info =
origin_table->get_tablet_handle().get_obj()->get_rowkey_read_info();
if (nullptr != origin_table->get_major_sstable()) {
if (!ObDirectLoadInsertMode::need_origin_data(origin_table->get_meta().insert_mode_)) {
ObDirectLoadDatumRowkeyEmptyIterator *rowkey_iter = nullptr;
if (OB_ISNULL(rowkey_iter = OB_NEWx(ObDirectLoadDatumRowkeyEmptyIterator, &allocator))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadDatumRowkeyEmptyIterator", KR(ret));
} else if (OB_FAIL(rowkey_iters.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObDirectLoadDatumRowkeyEmptyIterator();
allocator.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
} else if (nullptr != origin_table->get_major_sstable()) {
ObSSTable *major_sstable = origin_table->get_major_sstable();
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(

View File

@ -34,9 +34,20 @@ public:
TO_STRING_EMPTY();
};
template <class Rowkey>
class ObDirectLoadRowkeyEmptyIterator : public ObIDirectLoadRowkeyIterator<Rowkey>
{
public:
ObDirectLoadRowkeyEmptyIterator() = default;
virtual ~ObDirectLoadRowkeyEmptyIterator() = default;
int get_next_rowkey(const Rowkey *&rowkey) override { return OB_ITER_END; }
};
typedef ObIDirectLoadRowkeyIterator<blocksstable::ObDatumRowkey> ObIDirectLoadDatumRowkeyIterator;
typedef ObIDirectLoadRowkeyIterator<ObDirectLoadMultipleDatumRowkey>
ObIDirectLoadMultipleDatumRowkeyIterator;
typedef ObDirectLoadRowkeyEmptyIterator<blocksstable::ObDatumRowkey>
ObDirectLoadDatumRowkeyEmptyIterator;
class ObDirectLoadDatumRowkeyArrayIterator : public ObIDirectLoadDatumRowkeyIterator
{

View File

@ -0,0 +1,55 @@
/**
* Copyright (c) 2024 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "storage/direct_load/ob_direct_load_struct.h"
namespace oceanbase
{
namespace storage
{
/**
* ObDirectLoadMode
*/
DEFINE_ENUM_FUNC(ObDirectLoadMode::Type, type, OB_DIRECT_LOAD_MODE_DEF, ObDirectLoadMode::);
bool ObDirectLoadMode::is_type_valid(const Type type)
{
return type > INVALID_MODE && type < MAX_MODE;
}
/**
* ObDirectLoadMethod
*/
DEFINE_ENUM_FUNC(ObDirectLoadMethod::Type, type, OB_DIRECT_LOAD_METHOD_DEF, ObDirectLoadMethod::);
bool ObDirectLoadMethod::is_type_valid(const Type type)
{
return type > INVALID_METHOD && type < MAX_METHOD;
}
/**
* ObDirectLoadInsertMode
*/
DEFINE_ENUM_FUNC(ObDirectLoadInsertMode::Type, type, OB_DIRECT_LOAD_INSERT_MODE_DEF, ObDirectLoadInsertMode::);
bool ObDirectLoadInsertMode::is_type_valid(const Type type)
{
return type > INVALID_INSERT_MODE && type < MAX_INSERT_MODE;
}
} // namespace storage
} // namespace oceanbase

View File

@ -0,0 +1,71 @@
/**
* Copyright (c) 2024 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#include "lib/string/ob_string.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/ob_unify_serialize.h"
namespace oceanbase
{
namespace storage
{
struct ObDirectLoadMode
{
#define OB_DIRECT_LOAD_MODE_DEF(DEF) \
DEF(INVALID_MODE, = 0) \
DEF(LOAD_DATA, = 1) \
DEF(PDML, = 2) \
DEF(TABLE_LOAD, = 3) \
DEF(MAX_MODE, )
DECLARE_ENUM(Type, type, OB_DIRECT_LOAD_MODE_DEF, static);
static bool is_type_valid(const Type type);
};
struct ObDirectLoadMethod
{
#define OB_DIRECT_LOAD_METHOD_DEF(DEF) \
DEF(INVALID_METHOD, = 0) \
DEF(FULL, = 1) \
DEF(INCREMENTAL, = 2) \
DEF(MAX_METHOD, )
DECLARE_ENUM(Type, type, OB_DIRECT_LOAD_METHOD_DEF, static);
static bool is_type_valid(const Type type);
static bool is_full(const Type type) { return FULL == type; }
static bool is_incremental(const Type type) { return INCREMENTAL == type; }
};
struct ObDirectLoadInsertMode
{
#define OB_DIRECT_LOAD_INSERT_MODE_DEF(DEF) \
DEF(INVALID_INSERT_MODE, = 0) \
DEF(NORMAL, = 1) \
DEF(INC_REPLACE, = 2) \
DEF(OVERWRITE, = 3) \
DEF(MAX_INSERT_MODE, )
DECLARE_ENUM(Type, type, OB_DIRECT_LOAD_INSERT_MODE_DEF, static);
static bool is_type_valid(const Type type);
static bool is_valid_for_full_method(const Type type) { return NORMAL == type || OVERWRITE == type; }
static bool is_valid_for_incremental_method(const Type type) { return NORMAL == type || INC_REPLACE == type; }
static bool need_origin_data(const Type type) { return NORMAL == type; }
};
} // namespace storage
} // namespace oceanbase

View File

@ -250,7 +250,7 @@ int ObMViewRefresher::prepare_for_refresh()
if (OB_SUCC(ret) && ObMVRefreshType::FAST == refresh_type) {
const ObIArray<ObString> *operators = nullptr;
ObString fast_refresh_sql;
if (OB_FAIL(mv_provider.get_operators(operators))) {
if (OB_FAIL(mv_provider.get_fast_refresh_operators(operators))) {
LOG_WARN("fail to get operators", KR(ret));
} else if (OB_ISNULL(operators)) {
ret = OB_ERR_UNEXPECTED;
@ -435,7 +435,7 @@ int ObMViewRefresher::complete_refresh()
.timeout(timeout_ctx.get_timeout())
.mview_complete_refresh(arg, res))) {
LOG_WARN("fail to mview complete refresh", KR(ret), K(arg));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, session_info,
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(tenant_id, res.task_id_, DDL_MVIEW_COMPLETE_REFRESH, session_info,
GCTX.rs_rpc_proxy_))) {
LOG_WARN("fail to wait mview complete refresh finish", KR(ret));
} else {