diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 123bfa727f..1a7d9dc93b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -63,6 +63,7 @@ import java.util.stream.Collectors; public class ExportStmt extends StatementBase { public static final String PARALLELISM = "parallelism"; public static final String LABEL = "label"; + public static final String DATA_CONSISTENCY = "data_consistency"; private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; @@ -72,6 +73,7 @@ public class ExportStmt extends StatementBase { private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) .add(PARALLELISM) + .add(DATA_CONSISTENCY) .add(LoadStmt.KEY_IN_PARAM_COLUMNS) .add(OutFileClause.PROP_MAX_FILE_SIZE) .add(OutFileClause.PROP_DELETE_EXISTING_FILES) @@ -104,6 +106,7 @@ public class ExportStmt extends StatementBase { private String maxFileSize; private String deleteExistingFiles; private String withBom; + private String dataConsistency; private SessionVariable sessionVariables; private String qualifiedUser; @@ -230,6 +233,7 @@ public class ExportStmt extends StatementBase { exportJob.setMaxFileSize(this.maxFileSize); exportJob.setDeleteExistingFiles(this.deleteExistingFiles); exportJob.setWithBom(this.withBom); + exportJob.setDataConsistency(this.dataConsistency); if (columns != null) { Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); @@ -359,6 +363,17 @@ public class ExportStmt extends StatementBase { // with bom this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false"); + + // data consistency + String dataConsistencyStr = properties.get(DATA_CONSISTENCY); + if (dataConsistencyStr != null) { + if (!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) { + throw new UserException("The value of data_consistency is invalid, only `partition` is allowed"); + } + this.dataConsistency = ExportJob.CONSISTENT_PARTITION; + } else { + this.dataConsistency = ExportJob.CONSISTENT_ALL; + } } private void checkColumns() throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 9fb827f6d6..fe28151a54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -99,6 +99,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Data public class ExportJob implements Writable { @@ -108,6 +109,9 @@ public class ExportJob implements Writable { private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = Config.maximum_tablets_of_outfile_in_export; + public static final String CONSISTENT_ALL = "all"; + public static final String CONSISTENT_PARTITION = "partition"; + @SerializedName("id") private long id; @SerializedName("label") @@ -168,6 +172,8 @@ public class ExportJob implements Writable { private Integer tabletsNum; @SerializedName("withBom") private String withBom; + @SerializedName("dataConsistency") + private String dataConsistency; private TableRef tableRef; @@ -222,6 +228,7 @@ public class ExportJob implements Writable { this.lineDelimiter = "\n"; this.columns = ""; this.withBom = "false"; + this.dataConsistency = "all"; } public ExportJob(long jobId) { @@ -229,6 +236,10 @@ public class ExportJob implements Writable { this.id = jobId; } + public boolean isPartitionConsistency() { + return dataConsistency != null && dataConsistency.equals(CONSISTENT_PARTITION); + } + public void generateOutfileStatement() throws UserException { exportTable.readLock(); try { @@ -302,16 +313,12 @@ public class ExportJob implements Writable { } // get all tablets - List> tabletsListPerParallel = splitTablets(); + List>> tabletsListPerParallel = splitTablets(); // Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets - for (List tabletsList : tabletsListPerParallel) { + for (List> tabletsList : tabletsListPerParallel) { List logicalPlanAdapters = Lists.newArrayList(); - for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) { - int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size() - ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size(); - List tabletIds = new ArrayList<>(tabletsList.subList(i, end)); - + for (List tabletIds : tabletsList) { // generate LogicalPlan LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, tabletIds, this.partitionNames, selectLists); @@ -471,15 +478,12 @@ public class ExportJob implements Writable { } private List> getTableRefListPerParallel() throws UserException { - List> tabletsListPerParallel = splitTablets(); + List>> tabletsListPerParallel = splitTablets(); List> tableRefListPerParallel = Lists.newArrayList(); - for (List tabletsList : tabletsListPerParallel) { + for (List> tabletsList : tabletsListPerParallel) { List tableRefList = Lists.newArrayList(); - for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) { - int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size() - ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size(); - List tablets = new ArrayList<>(tabletsList.subList(i, end)); + for (List tablets : tabletsList) { // Since export does not support the alias, here we pass the null value. // we can not use this.tableRef.getAlias(), // because the constructor of `Tableref` will convert this.tableRef.getAlias() @@ -494,11 +498,13 @@ public class ExportJob implements Writable { return tableRefListPerParallel; } - private List> splitTablets() throws UserException { + private List>> splitTablets() throws UserException { // get tablets Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb()); OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl()); - List tabletIdList = Lists.newArrayList(); + + Integer tabletsAllNum = 0; + List> tabletIdList = Lists.newArrayList(); table.readLock(); try { final Collection partitions = new ArrayList(); @@ -516,26 +522,56 @@ public class ExportJob implements Writable { // get tablets for (Partition partition : partitions) { - partitionToVersion.put(partition.getName(), partition.getVisibleVersion()); + // Partition data consistency is not need to verify partition version. + if (!isPartitionConsistency()) { + partitionToVersion.put(partition.getName(), partition.getVisibleVersion()); + } for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { - tabletIdList.addAll(index.getTabletIdsInOrder()); + List tablets = index.getTabletIdsInOrder(); + tabletsAllNum += tablets.size(); + tabletIdList.add(tablets); } } } finally { table.readUnlock(); } + if (isPartitionConsistency()) { + // Assign tablets of a partition to per parallel. + int totalPartitions = tabletIdList.size(); + int numPerParallel = totalPartitions / this.parallelism; + int numPerQueryRemainder = totalPartitions - numPerParallel * this.parallelism; + int realParallelism = this.parallelism; + if (totalPartitions < this.parallelism) { + realParallelism = totalPartitions; + LOG.warn("Export Job [{}]: The number of partitions ({}) is smaller than parallelism ({}), " + + "set parallelism to partition num.", id, totalPartitions, this.parallelism); + } + int start = 0; + List>> tabletsListPerParallel = new ArrayList<>(); + for (int i = 0; i < realParallelism; ++i) { + int partitionNum = numPerParallel; + if (numPerQueryRemainder > 0) { + partitionNum += 1; + --numPerQueryRemainder; + } + List> tablets = new ArrayList<>(tabletIdList.subList(start, start + partitionNum)); + start += partitionNum; + tabletsListPerParallel.add(tablets); + } + return tabletsListPerParallel; + } + /** * Assign tablets to per parallel, for example: * If the number of all tablets if 10, and the real parallelism is 4, * then, the number of tablets of per parallel should be: 3 3 2 2. */ - Integer tabletsAllNum = tabletIdList.size(); tabletsNum = tabletsAllNum; Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism; Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism; - List> tabletsListPerParallel = Lists.newArrayList(); + List>> tabletsListPerParallel = Lists.newArrayList(); Integer realParallelism = this.parallelism; if (tabletsAllNum < this.parallelism) { realParallelism = tabletsAllNum; @@ -543,15 +579,22 @@ public class ExportJob implements Writable { + "set parallelism to tablets num.", id, tabletsAllNum, this.parallelism); } Integer start = 0; - for (int i = 0; i < realParallelism; ++i) { + List flatTabletIdList = tabletIdList.stream().flatMap(List::stream).collect(Collectors.toList()); + for (int j = 0; j < realParallelism; ++j) { Integer tabletsNum = tabletsNumPerParallel; if (tabletsNumPerQueryRemainder > 0) { tabletsNum = tabletsNum + 1; --tabletsNumPerQueryRemainder; } - ArrayList tablets = new ArrayList<>(tabletIdList.subList(start, start + tabletsNum)); - start += tabletsNum; + List tabletsList = new ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum)); + List> tablets = new ArrayList<>(); + for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) { + int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size() + ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size(); + tablets.add(new ArrayList<>(tabletsList.subList(i, end))); + } + start += tabletsNum; tabletsListPerParallel.add(tablets); } return tabletsListPerParallel; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index efbfd33966..f72c0b44a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -355,6 +355,7 @@ public class ExportMgr { infoMap.put("tablet_num", job.getTabletsNum()); infoMap.put("max_file_size", job.getMaxFileSize()); infoMap.put("delete_existing_files", job.getDeleteExistingFiles()); + infoMap.put("data_consistency", job.getDataConsistency()); jobInfo.add(new Gson().toJson(infoMap)); // path jobInfo.add(job.getExportPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index 48f9b2ca1a..f4ee84298b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -89,8 +89,8 @@ public class ExportTaskExecutor implements TransientTaskExecutor { if (isCanceled.get()) { throw new JobException("Export executor has been canceled, task id: {}", taskId); } - // check the version of tablets - if (exportJob.getExportTable().getType() == TableType.OLAP) { + // check the version of tablets, skip if the consistency is in partition level. + if (exportJob.getExportTable().getType() == TableType.OLAP && !exportJob.isPartitionConsistency()) { try { Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException( exportJob.getTableName().getDb()); @@ -136,7 +136,6 @@ public class ExportTaskExecutor implements TransientTaskExecutor { } try (AutoCloseConnectContext r = buildConnectContext()) { - StatementBase statementBase = selectStmtLists.get(idx); OriginStatement originStatement = new OriginStatement( StringUtils.isEmpty(statementBase.getOrigStmt().originStmt) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index 713ff6b276..e548433649 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -73,6 +73,7 @@ import java.util.stream.Collectors; public class ExportCommand extends Command implements ForwardWithSync { public static final String PARALLELISM = "parallelism"; public static final String LABEL = "label"; + public static final String DATA_CONSISTENCY = "data_consistency"; private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; @@ -81,6 +82,7 @@ public class ExportCommand extends Command implements ForwardWithSync { private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) .add(PARALLELISM) + .add(DATA_CONSISTENCY) .add(LoadStmt.KEY_IN_PARAM_COLUMNS) .add(OutFileClause.PROP_MAX_FILE_SIZE) .add(OutFileClause.PROP_DELETE_EXISTING_FILES) @@ -310,6 +312,17 @@ public class ExportCommand extends Command implements ForwardWithSync { exportJob.setQualifiedUser(ctx.getQualifiedUser()); exportJob.setUserIdentity(ctx.getCurrentUserIdentity()); + // set data consistency + String dataConsistencyStr = fileProperties.get(DATA_CONSISTENCY); + if (dataConsistencyStr != null) { + if (!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) { + throw new AnalysisException("The value of data_consistency is invalid, only partition is allowed!"); + } + exportJob.setDataConsistency(ExportJob.CONSISTENT_PARTITION); + } else { + exportJob.setDataConsistency(ExportJob.CONSISTENT_ALL); + } + // Must copy session variable, because session variable may be changed during export job running. SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable( ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable())); diff --git a/regression-test/data/export_p0/test_export_data_consistency.out b/regression-test/data/export_p0/test_export_data_consistency.out new file mode 100644 index 0000000000..97135d2bb9 --- /dev/null +++ b/regression-test/data/export_p0/test_export_data_consistency.out @@ -0,0 +1,305 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_export -- +1 ftw-1 19 +2 ftw-2 20 +3 ftw-3 21 +4 ftw-4 22 +5 ftw-5 23 +6 ftw-6 24 +7 ftw-7 25 +8 ftw-8 26 +9 ftw-9 27 +10 ftw-10 28 +11 ftw-11 29 +12 ftw-12 30 +13 ftw-13 31 +14 ftw-14 32 +15 ftw-15 33 +16 ftw-16 34 +17 ftw-17 35 +18 ftw-18 36 +19 ftw-19 37 +20 ftw-20 38 +21 ftw-21 39 +22 ftw-22 40 +23 ftw-23 41 +24 ftw-24 42 +25 ftw-25 43 +26 ftw-26 44 +27 ftw-27 45 +28 ftw-28 46 +29 ftw-29 47 +30 ftw-30 48 +31 ftw-31 49 +32 ftw-32 50 +33 ftw-33 51 +34 ftw-34 52 +35 ftw-35 53 +36 ftw-36 54 +37 ftw-37 55 +38 ftw-38 56 +39 ftw-39 57 +40 ftw-40 58 +41 ftw-41 59 +42 ftw-42 60 +43 ftw-43 61 +44 ftw-44 62 +45 ftw-45 63 +46 ftw-46 64 +47 ftw-47 65 +48 ftw-48 66 +49 ftw-49 67 +50 ftw-50 68 +51 ftw-51 69 +52 ftw-52 70 +53 ftw-53 71 +54 ftw-54 72 +55 ftw-55 73 +56 ftw-56 74 +57 ftw-57 75 +58 ftw-58 76 +59 ftw-59 77 +60 ftw-60 78 +61 ftw-61 79 +62 ftw-62 80 +63 ftw-63 81 +64 ftw-64 82 +65 ftw-65 83 +66 ftw-66 84 +67 ftw-67 85 +68 ftw-68 86 +69 ftw-69 87 +70 ftw-70 88 +71 ftw-71 89 +72 ftw-72 90 +73 ftw-73 91 +74 ftw-74 92 +75 ftw-75 93 +76 ftw-76 94 +77 ftw-77 95 +78 ftw-78 96 +79 ftw-79 97 +80 ftw-80 98 +81 ftw-81 99 +82 ftw-82 100 +83 ftw-83 101 +84 ftw-84 102 +85 ftw-85 103 +86 ftw-86 104 +87 ftw-87 105 +88 ftw-88 106 +89 ftw-89 107 +90 ftw-90 108 +91 ftw-91 109 +92 ftw-92 110 +93 ftw-93 111 +94 ftw-94 112 +95 ftw-95 113 +96 ftw-96 114 +97 ftw-97 115 +98 ftw-98 116 +99 ftw-99 117 +100 ftw-100 118 +101 ftw-101 119 +102 ftw-102 120 +103 ftw-103 121 +104 ftw-104 122 +105 ftw-105 123 +106 ftw-106 124 +107 ftw-107 125 +108 ftw-108 126 +109 ftw-109 127 +110 ftw-110 128 +111 ftw-111 129 +112 ftw-112 130 +113 ftw-113 131 +114 ftw-114 132 +115 ftw-115 133 +116 ftw-116 134 +117 ftw-117 135 +118 ftw-118 136 +119 ftw-119 137 +120 ftw-120 138 +121 ftw-121 139 +122 ftw-122 140 +123 ftw-123 141 +124 ftw-124 142 +125 ftw-125 143 +126 ftw-126 144 +127 ftw-127 145 +128 ftw-128 146 +129 ftw-129 147 +130 ftw-130 148 +131 ftw-131 149 +132 ftw-132 150 +133 ftw-133 151 +134 ftw-134 152 +135 ftw-135 153 +136 ftw-136 154 +137 ftw-137 155 +138 ftw-138 156 +139 ftw-139 157 +140 ftw-140 158 +141 ftw-141 159 +142 ftw-142 160 +143 ftw-143 161 +144 ftw-144 162 +145 ftw-145 163 +146 ftw-146 164 +147 ftw-147 165 +148 ftw-148 166 +149 ftw-149 167 +150 \N \N + +-- !select_load1 -- +1 ftw-1 19 +2 ftw-2 20 +3 ftw-3 21 +4 ftw-4 22 +5 ftw-5 23 +6 ftw-6 24 +7 ftw-7 25 +8 ftw-8 26 +9 ftw-9 27 +10 ftw-10 28 +11 ftw-11 29 +12 ftw-12 30 +13 ftw-13 31 +14 ftw-14 32 +15 ftw-15 33 +16 ftw-16 34 +17 ftw-17 35 +18 ftw-18 36 +19 ftw-19 37 +20 ftw-20 38 +21 ftw-21 39 +22 ftw-22 40 +23 ftw-23 41 +24 ftw-24 42 +25 ftw-25 43 +26 ftw-26 44 +27 ftw-27 45 +28 ftw-28 46 +29 ftw-29 47 +30 ftw-30 48 +31 ftw-31 49 +32 ftw-32 50 +33 ftw-33 51 +34 ftw-34 52 +35 ftw-35 53 +36 ftw-36 54 +37 ftw-37 55 +38 ftw-38 56 +39 ftw-39 57 +40 ftw-40 58 +41 ftw-41 59 +42 ftw-42 60 +43 ftw-43 61 +44 ftw-44 62 +45 ftw-45 63 +46 ftw-46 64 +47 ftw-47 65 +48 ftw-48 66 +49 ftw-49 67 +50 ftw-50 68 +51 ftw-51 69 +52 ftw-52 70 +53 ftw-53 71 +54 ftw-54 72 +55 ftw-55 73 +56 ftw-56 74 +57 ftw-57 75 +58 ftw-58 76 +59 ftw-59 77 +60 ftw-60 78 +61 ftw-61 79 +62 ftw-62 80 +63 ftw-63 81 +64 ftw-64 82 +65 ftw-65 83 +66 ftw-66 84 +67 ftw-67 85 +68 ftw-68 86 +69 ftw-69 87 +70 ftw-70 88 +71 ftw-71 89 +72 ftw-72 90 +73 ftw-73 91 +74 ftw-74 92 +75 ftw-75 93 +76 ftw-76 94 +77 ftw-77 95 +78 ftw-78 96 +79 ftw-79 97 +80 ftw-80 98 +81 ftw-81 99 +82 ftw-82 100 +83 ftw-83 101 +84 ftw-84 102 +85 ftw-85 103 +86 ftw-86 104 +87 ftw-87 105 +88 ftw-88 106 +89 ftw-89 107 +90 ftw-90 108 +91 ftw-91 109 +92 ftw-92 110 +93 ftw-93 111 +94 ftw-94 112 +95 ftw-95 113 +96 ftw-96 114 +97 ftw-97 115 +98 ftw-98 116 +99 ftw-99 117 +100 ftw-100 118 +101 ftw-101 119 +102 ftw-102 120 +103 ftw-103 121 +104 ftw-104 122 +105 ftw-105 123 +106 ftw-106 124 +107 ftw-107 125 +108 ftw-108 126 +109 ftw-109 127 +110 ftw-110 128 +111 ftw-111 129 +112 ftw-112 130 +113 ftw-113 131 +114 ftw-114 132 +115 ftw-115 133 +116 ftw-116 134 +117 ftw-117 135 +118 ftw-118 136 +119 ftw-119 137 +120 ftw-120 138 +121 ftw-121 139 +122 ftw-122 140 +123 ftw-123 141 +124 ftw-124 142 +125 ftw-125 143 +126 ftw-126 144 +127 ftw-127 145 +128 ftw-128 146 +129 ftw-129 147 +130 ftw-130 148 +131 ftw-131 149 +132 ftw-132 150 +133 ftw-133 151 +134 ftw-134 152 +135 ftw-135 153 +136 ftw-136 154 +137 ftw-137 155 +138 ftw-138 156 +139 ftw-139 157 +140 ftw-140 158 +141 ftw-141 159 +142 ftw-142 160 +143 ftw-143 161 +144 ftw-144 162 +145 ftw-145 163 +146 ftw-146 164 +147 ftw-147 165 +148 ftw-148 166 +149 ftw-149 167 +150 \N \N + diff --git a/regression-test/suites/export_p0/test_export_data_consistency.groovy b/regression-test/suites/export_p0/test_export_data_consistency.groovy new file mode 100644 index 0000000000..87d2eb4a7b --- /dev/null +++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_export_data_consistency", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + def db = "regression_test_export_p0" + + // check whether the FE config 'enable_outfile_to_local' is true + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe") + + String command = strBuilder.toString() + def process = command.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + def configJson = response.data.rows + boolean enableOutfileToLocal = false + for (Object conf: configJson) { + assert conf instanceof Map + if (((Map) conf).get("Name").toLowerCase() == "enable_outfile_to_local") { + enableOutfileToLocal = ((Map) conf).get("Value").toLowerCase() == "true" + } + } + if (!enableOutfileToLocal) { + logger.warn("Please set enable_outfile_to_local to true to run test_outfile") + return + } + + def table_export_name = "test_export_data_consistency" + def table_load_name = "test_load_data_consistency" + def outfile_path_prefix = """/tmp/test_export_data_consistency""" + + // create table and insert + sql """ DROP TABLE IF EXISTS ${table_export_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_export_name} ( + `id` int(11) NULL, + `name` string NULL, + `age` int(11) NULL + ) + PARTITION BY RANGE(id) + ( + PARTITION less_than_20 VALUES LESS THAN ("20"), + PARTITION between_20_70 VALUES [("20"),("70")), + PARTITION more_than_70 VALUES LESS THAN ("151") + ) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 150; i ++) { + sb.append(""" + (${i}, 'ftw-${i}', ${i + 18}), + """) + } + sb.append(""" + (${i}, NULL, NULL) + """) + sql """ INSERT INTO ${table_export_name} VALUES + ${sb.toString()} + """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) + qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """ + + + def check_path_exists = { dir_path -> + File path = new File(dir_path) + if (!path.exists()) { + assert path.mkdirs() + } else { + throw new IllegalStateException("""${dir_path} already exists! """) + } + } + + def check_file_amounts = { dir_path, amount -> + File path = new File(dir_path) + File[] files = path.listFiles() + assert files.length == amount + } + + def delete_files = { dir_path -> + File path = new File(dir_path) + if (path.exists()) { + for (File f: path.listFiles()) { + f.delete(); + } + path.delete(); + } + } + + def waiting_export = { the_db, export_label -> + while (true) { + def res = sql """ show export from ${the_db} where label = "${export_label}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + break; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } + } + + // 1. basic test + def uuid = UUID.randomUUID().toString() + def outFilePath = """${outfile_path_prefix}_${uuid}""" + def label = "label_${uuid}" + try { + // check export path + check_path_exists.call("${outFilePath}") + + // exec export + sql """ + EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" + PROPERTIES( + "label" = "${label}", + "format" = "csv", + "column_separator" = ",", + "data_consistency" = "partition" + ); + """ + // do insert in parallel + sql """INSERT INTO ${table_export_name} VALUES + (10, 'test', 11), + (20, 'test', 21), + (40, 'test', 51), + (80, 'test', 51) + """ + + // wait export + waiting_export.call(db, label) + + // check file amounts + check_file_amounts.call("${outFilePath}", 3) + + // check data correctness + sql """ DROP TABLE IF EXISTS ${table_load_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_load_name} ( + `id` int(11) NULL, + `name` string NULL, + `age` int(11) NULL + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + File[] files = new File("${outFilePath}").listFiles() + for (exportLoadFile in files) { + String file_path = exportLoadFile.getAbsolutePath() + streamLoad { + table "${table_load_name}" + + set 'column_separator', ',' + set 'columns', 'id, name, age' + set 'strict_mode', 'true' + + file "${file_path}" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) + } + } + } + + qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${table_load_name}") + delete_files.call("${outFilePath}") + } +}