From 88750e7e5d7ef13b23d3367f51326a8388bd1a0c Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Thu, 5 Sep 2024 21:39:15 +0800 Subject: [PATCH] [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix table not found fault when disable group commit (#39731)" (#40323) ## Proposed changes Pick #39731 --- .../apache/doris/httpv2/rest/LoadAction.java | 20 ++++++++- ...eam_load_with_nonexist_db_and_table.groovy | 41 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_with_nonexist_db_and_table.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index c3a77a13b3..d10a3020a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -53,6 +53,7 @@ import org.springframework.web.servlet.view.RedirectView; import java.net.URI; import java.util.List; +import java.util.Optional; import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -298,8 +299,20 @@ public class LoadAction extends RestBaseController { return new RestBaseResult(e.getMessage()); } } else { - long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName) - .get()).getTable(tableName).get()).getId(); + long tableId = -1; + if (groupCommit) { + Optional database = Env.getCurrentEnv().getCurrentCatalog().getDb(dbName); + if (!database.isPresent()) { + return new RestBaseResult("Database not found."); + } + + Optional olapTable = ((Database) database.get()).getTable(tableName); + if (!olapTable.isPresent()) { + return new RestBaseResult("OlapTable not found."); + } + + tableId = ((OlapTable) olapTable.get()).getId(); + } redirectAddr = selectRedirectBackend(request, groupCommit, tableId); } @@ -356,6 +369,9 @@ public class LoadAction extends RestBaseController { Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } + if (groupCommit && tableId == -1) { + throw new LoadException("Group commit table id wrong."); + } return selectLocalRedirectBackend(groupCommit, request, tableId); } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_with_nonexist_db_and_table.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_with_nonexist_db_and_table.groovy new file mode 100644 index 0000000000..ba806967bf --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_with_nonexist_db_and_table.groovy @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_stream_load_with_nonexist_db_and_table") { + def tableName = "test_group_commit_stream_load_with_nonexist_db_and_table" + sql "create database if not exists ${tableName}" + + try { + def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H group_commit:sync_mode" + + " -H column_separator:," + + " -T ${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv" + + " http://${context.config.feHttpAddress}/api/${tableName}/${tableName}/_stream_load" + log.info("stream load command: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + log.info("stream lad result: ${out}".toString()) + assertTrue(out.toString().contains("table not found")) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + + } +}