[CP] add retry for direct load if unit has migrated

Co-authored-by: suz-yang <suz.yang@foxmail.com>
This commit is contained in:
coolfishchen
2023-12-12 17:12:43 +00:00
committed by ob-robot
parent d3a6e8e743
commit 8668b506b8
8 changed files with 54 additions and 17 deletions

View File

@ -24,7 +24,7 @@ namespace oceanbase
namespace sql
{
int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret)
int ObLoadDataExecutor::check_is_direct_load(ObTableDirectInsertCtx &ctx, const ObLoadDataHint &load_hint)
{
int ret = OB_SUCCESS;
int64_t enable_direct = 0;
@ -34,9 +34,9 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo
} else if (OB_FAIL(load_hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if ((enable_direct != 0 || append != 0) && GCONF._ob_enable_direct_load) {
check_ret = true;
ctx.set_is_direct(true);
} else {
check_ret = false;
ctx.set_is_direct(false);
}
return ret;
}
@ -44,15 +44,15 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo
int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt)
{
int ret = OB_SUCCESS;
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
ObLoadDataBase *load_impl = NULL;
bool is_direct_load = false;
if (!stmt.get_load_arguments().is_csv_format_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("invalid resolver results", K(ret));
} else if (OB_FAIL(check_is_direct_load(stmt.get_hints(), is_direct_load))) {
} else if (OB_FAIL(check_is_direct_load(table_direct_insert_ctx, stmt.get_hints()))) {
LOG_WARN("fail to check is load mode", KR(ret));
} else {
if (!is_direct_load) {
if (!table_direct_insert_ctx.get_is_direct()) {
if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));