[fix](export) the label of export should be unique with database scope (#27401)
### How to reproduce 1. create a database db1 and a table tbl1; 2. insert some data and export with label L1; 3. drop the db1 and tbl1, and recreate them with same name. 4. insert some data and export with same label L1; Expect: export success Actual: error: Label L1 have already been used. This PR fix it.
This commit is contained in:
@ -65,7 +65,8 @@ public class ExportMgr {
|
||||
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob
|
||||
private Map<String, Long> labelToExportJobId = Maps.newHashMap();
|
||||
// dbid -> <label -> job>
|
||||
private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap();
|
||||
|
||||
public ExportMgr() {
|
||||
}
|
||||
@ -95,7 +96,8 @@ public class ExportMgr {
|
||||
job.setId(jobId);
|
||||
writeLock();
|
||||
try {
|
||||
if (labelToExportJobId.containsKey(job.getLabel())) {
|
||||
if (dbTolabelToExportJobId.containsKey(job.getDbId())
|
||||
&& dbTolabelToExportJobId.get(job.getDbId()).containsKey(job.getLabel())) {
|
||||
throw new LabelAlreadyUsedException(job.getLabel());
|
||||
}
|
||||
unprotectAddJob(job);
|
||||
@ -135,7 +137,8 @@ public class ExportMgr {
|
||||
|
||||
public void unprotectAddJob(ExportJob job) {
|
||||
exportIdToJob.put(job.getId(), job);
|
||||
labelToExportJobId.putIfAbsent(job.getLabel(), job.getId());
|
||||
dbTolabelToExportJobId.computeIfAbsent(job.getDbId(),
|
||||
k -> Maps.newHashMap()).put(job.getLabel(), job.getId());
|
||||
}
|
||||
|
||||
private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException {
|
||||
@ -393,7 +396,13 @@ public class ExportMgr {
|
||||
&& (job.getState() == ExportJobState.CANCELLED
|
||||
|| job.getState() == ExportJobState.FINISHED)) {
|
||||
iter.remove();
|
||||
labelToExportJobId.remove(job.getLabel(), job.getId());
|
||||
Map<String, Long> labelJobs = dbTolabelToExportJobId.get(job.getDbId());
|
||||
if (labelJobs != null) {
|
||||
labelJobs.remove(job.getLabel());
|
||||
if (labelJobs.isEmpty()) {
|
||||
dbTolabelToExportJobId.remove(job.getDbId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.mysql;
|
||||
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ConnectScheduler;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import mockit.Delegate;
|
||||
import mockit.Expectations;
|
||||
@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
public class MysqlServerTest {
|
||||
@ -79,10 +79,7 @@ public class MysqlServerTest {
|
||||
|
||||
@Test
|
||||
public void testNormal() throws IOException, InterruptedException {
|
||||
ServerSocket socket = new ServerSocket(0);
|
||||
int port = socket.getLocalPort();
|
||||
socket.close();
|
||||
|
||||
int port = TestWithFeService.findValidPort();
|
||||
MysqlServer server = new MysqlServer(port, scheduler);
|
||||
Assert.assertTrue(server.start());
|
||||
|
||||
@ -108,9 +105,7 @@ public class MysqlServerTest {
|
||||
|
||||
@Test
|
||||
public void testBindFail() throws IOException {
|
||||
ServerSocket socket = new ServerSocket(0);
|
||||
int port = socket.getLocalPort();
|
||||
socket.close();
|
||||
int port = TestWithFeService.findValidPort();
|
||||
MysqlServer server = new MysqlServer(port, scheduler);
|
||||
Assert.assertTrue(server.start());
|
||||
MysqlServer server1 = new MysqlServer(port, scheduler);
|
||||
@ -121,9 +116,7 @@ public class MysqlServerTest {
|
||||
|
||||
@Test
|
||||
public void testSubFail() throws IOException, InterruptedException {
|
||||
ServerSocket socket = new ServerSocket(0);
|
||||
int port = socket.getLocalPort();
|
||||
socket.close();
|
||||
int port = TestWithFeService.findValidPort();
|
||||
MysqlServer server = new MysqlServer(port, badScheduler);
|
||||
Assert.assertTrue(server.start());
|
||||
|
||||
|
||||
@ -498,7 +498,7 @@ public abstract class TestWithFeService {
|
||||
}
|
||||
}
|
||||
|
||||
protected int findValidPort() {
|
||||
public static int findValidPort() {
|
||||
int port = 0;
|
||||
while (true) {
|
||||
try (ServerSocket socket = new ServerSocket(0)) {
|
||||
|
||||
@ -26,6 +26,7 @@ suite("test_export_basic", "p0") {
|
||||
sql """ set enable_nereids_planner=true """
|
||||
sql """ set enable_fallback_to_original_planner=false """
|
||||
|
||||
def db = "regression_test_export_p0"
|
||||
|
||||
// check whether the FE config 'enable_outfile_to_local' is true
|
||||
StringBuilder strBuilder = new StringBuilder()
|
||||
@ -120,9 +121,9 @@ suite("test_export_basic", "p0") {
|
||||
}
|
||||
}
|
||||
|
||||
def waiting_export = { export_label ->
|
||||
def waiting_export = { the_db, export_label ->
|
||||
while (true) {
|
||||
def res = sql """ show export where label = "${export_label}" """
|
||||
def res = sql """ show export from ${the_db} where label = "${export_label}" """
|
||||
logger.info("export state: " + res[0][2])
|
||||
if (res[0][2] == "FINISHED") {
|
||||
break;
|
||||
@ -151,7 +152,7 @@ suite("test_export_basic", "p0") {
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -216,7 +217,7 @@ suite("test_export_basic", "p0") {
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -281,7 +282,7 @@ suite("test_export_basic", "p0") {
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -346,7 +347,7 @@ suite("test_export_basic", "p0") {
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -422,8 +423,8 @@ suite("test_export_basic", "p0") {
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label1)
|
||||
waiting_export.call(label2)
|
||||
waiting_export.call(db, label1)
|
||||
waiting_export.call(db, label2)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 2)
|
||||
@ -456,7 +457,7 @@ suite("test_export_basic", "p0") {
|
||||
"columns" = "id, name"
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -521,7 +522,7 @@ suite("test_export_basic", "p0") {
|
||||
"columns" = "id"
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label)
|
||||
waiting_export.call(db, label)
|
||||
|
||||
// check file amounts
|
||||
check_file_amounts.call("${outFilePath}", 1)
|
||||
@ -560,7 +561,75 @@ suite("test_export_basic", "p0") {
|
||||
}
|
||||
|
||||
qt_select_load7 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
|
||||
|
||||
|
||||
// test label
|
||||
def label_db = "export_p0_test_label"
|
||||
sql """ DROP DATABASE IF EXISTS ${label_db}"""
|
||||
sql """ CREATE DATABASE ${label_db}"""
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${label_db}.${table_load_name} (
|
||||
`id` int(11) NULL
|
||||
)
|
||||
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
|
||||
"""
|
||||
sql """insert into ${label_db}.${table_load_name} values(1)""";
|
||||
|
||||
// 1. first export
|
||||
uuid = UUID.randomUUID().toString()
|
||||
outFilePath = """${outfile_path_prefix}_${uuid}"""
|
||||
label = "label_${uuid}"
|
||||
// check export path
|
||||
check_path_exists.call("${outFilePath}")
|
||||
|
||||
// exec export
|
||||
sql """
|
||||
EXPORT TABLE ${label_db}.${table_load_name}
|
||||
TO "file://${outFilePath}/"
|
||||
PROPERTIES(
|
||||
"label" = "${label}",
|
||||
"format" = "csv",
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label_db, label)
|
||||
|
||||
// 2. use same label again
|
||||
test {
|
||||
sql """
|
||||
EXPORT TABLE ${label_db}.${table_load_name}
|
||||
TO "file://${outFilePath}/"
|
||||
PROPERTIES(
|
||||
"label" = "${label}",
|
||||
"format" = "csv",
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
exception "has already been used"
|
||||
}
|
||||
|
||||
// 3. drop database and create again
|
||||
sql """ DROP DATABASE IF EXISTS ${label_db}"""
|
||||
sql """ CREATE DATABASE ${label_db}"""
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${label_db}.${table_load_name} (
|
||||
`id` int(11) NULL
|
||||
)
|
||||
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
|
||||
"""
|
||||
sql """insert into ${label_db}.${table_load_name} values(1)""";
|
||||
|
||||
// 4. exec export using same label
|
||||
sql """
|
||||
EXPORT TABLE ${label_db}.${table_load_name}
|
||||
TO "file://${outFilePath}/"
|
||||
PROPERTIES(
|
||||
"label" = "${label}",
|
||||
"format" = "csv",
|
||||
"column_separator"=","
|
||||
);
|
||||
"""
|
||||
waiting_export.call(label_db, label)
|
||||
|
||||
} finally {
|
||||
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
|
||||
delete_files.call("${outFilePath}")
|
||||
|
||||
Reference in New Issue
Block a user