[Load] Fix bug of wrong file group aggregation when handling broker load job (#2824)
**Describe the bug**
**First**, In the broker load, we allow users to add multiple data descriptions. Each data description
represents a description of a file (or set of files). Including file path, delimiter, table and
partitions to be loaded, and other information.
When the user specifies multiple data descriptions, Doris currently aggregates the data
descriptions belonging to the same table and generates a unified load task.
The problem here is that although different data descriptions point to the same table,
they may specify different partitions. Therefore, the aggregation of data description
should not only consider the table level, but also the partition level.
Examples are as follows:
data description 1 is:
```
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
INTO TABLE `tbl1`
PARTITION (p1, p2)
```
data description 2 is:
```
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
INTO TABLE `tbl1`
PARTITION (p3, p4)
```
What user expects is to load file1 into partition p1 and p2 of tbl1, and load file2 into paritition
p3 and p4 of same table. But currently, it will be aggregated together, which result in loading
file1 and file2 into all partitions p1, p2, p3 and p4.
**Second**, the following 2 data descriptions are not allowed:
```
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
INTO TABLE `tbl1`
PARTITION (p1, p2)
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
INTO TABLE `tbl1`
PARTITION (p2, p3)
```
They have overlapping partition(p2), which is not support yet. And we should throw an Exception
to cancel this load job.
**Third**, there is a problem with the code implementation. In the constructor of
`OlapTableSink.java`, we pass in a string of partition names separated by commas.
But at the `OlapTableSink` level, we should be able to pass in a list of partition ids directly,
instead of names.
ISSUE: #2823
This commit is contained in:
@ -0,0 +1,119 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class BrokerFileGroupAggInfoTest {
|
||||
|
||||
@Test
|
||||
public void test1() throws DdlException {
|
||||
/*
|
||||
* data description:
|
||||
* table 1 -> partition[10] file1
|
||||
* table 1 -> partition[10] file2
|
||||
* table 2 -> partition[] file3
|
||||
* table 3 -> partition[11, 12] file4
|
||||
*
|
||||
* output:
|
||||
* table 1 -> partition[10] (file1, file2)
|
||||
* table 2 -> partition[] file3
|
||||
* table 3 -> partition[11, 12] file4
|
||||
*/
|
||||
BrokerFileGroupAggInfo brokerFileGroupAggInfo = new BrokerFileGroupAggInfo();
|
||||
|
||||
BrokerFileGroup group1 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group1, "tableId", 1L);
|
||||
Deencapsulation.setField(group1, "partitionIds", Lists.newArrayList(10L));
|
||||
|
||||
BrokerFileGroup group2 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group2, "tableId", 1L);
|
||||
Deencapsulation.setField(group2, "partitionIds", Lists.newArrayList(10L));
|
||||
|
||||
BrokerFileGroup group3 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group3, "tableId", 2L);
|
||||
Deencapsulation.setField(group3, "partitionIds", Lists.newArrayList());
|
||||
|
||||
BrokerFileGroup group4 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group4, "tableId", 3L);
|
||||
Deencapsulation.setField(group4, "partitionIds", Lists.newArrayList(11L, 12L));
|
||||
|
||||
BrokerFileGroup group5 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group5, "tableId", 4L);
|
||||
Deencapsulation.setField(group5, "partitionIds", null);
|
||||
|
||||
brokerFileGroupAggInfo.addFileGroup(group1);
|
||||
brokerFileGroupAggInfo.addFileGroup(group2);
|
||||
brokerFileGroupAggInfo.addFileGroup(group3);
|
||||
brokerFileGroupAggInfo.addFileGroup(group4);
|
||||
brokerFileGroupAggInfo.addFileGroup(group5);
|
||||
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> map = brokerFileGroupAggInfo.getAggKeyToFileGroups();
|
||||
Assert.assertEquals(4, map.keySet().size());
|
||||
FileGroupAggKey aggKey = new FileGroupAggKey(1L, Lists.newArrayList(10L));
|
||||
Assert.assertEquals(2, map.get(aggKey).size());
|
||||
aggKey = new FileGroupAggKey(2L, Lists.newArrayList());
|
||||
Assert.assertEquals(1, map.get(aggKey).size());
|
||||
aggKey = new FileGroupAggKey(3L, Lists.newArrayList(11L, 12L));
|
||||
Assert.assertEquals(1, map.get(aggKey).size());
|
||||
aggKey = new FileGroupAggKey(4L, Lists.newArrayList());
|
||||
Assert.assertEquals(1, map.get(aggKey).size());
|
||||
}
|
||||
|
||||
@Test(expected = DdlException.class)
|
||||
public void test2() throws DdlException {
|
||||
/*
|
||||
* data description:
|
||||
* table 1 -> partition[10, 11] file1
|
||||
* table 1 -> partition[11, 12] file2
|
||||
* table 2 -> partition[] file3
|
||||
*
|
||||
* output:
|
||||
* throw exception
|
||||
*/
|
||||
BrokerFileGroupAggInfo brokerFileGroupAggInfo = new BrokerFileGroupAggInfo();
|
||||
|
||||
BrokerFileGroup group1 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group1, "tableId", 1L);
|
||||
Deencapsulation.setField(group1, "partitionIds", Lists.newArrayList(10L, 11L));
|
||||
|
||||
BrokerFileGroup group2 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group2, "tableId", 1L);
|
||||
Deencapsulation.setField(group2, "partitionIds", Lists.newArrayList(11L, 12L));
|
||||
|
||||
BrokerFileGroup group3 = Deencapsulation.newInstance(BrokerFileGroup.class);
|
||||
Deencapsulation.setField(group3, "tableId", 2L);
|
||||
Deencapsulation.setField(group3, "partitionIds", Lists.newArrayList());
|
||||
|
||||
brokerFileGroupAggInfo.addFileGroup(group1);
|
||||
brokerFileGroupAggInfo.addFileGroup(group2);
|
||||
}
|
||||
|
||||
}
|
||||
@ -29,10 +29,11 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.EtlStatus;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.load.PullLoadSourceInfo;
|
||||
import org.apache.doris.load.Source;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
@ -172,23 +173,27 @@ public class BrokerLoadJobTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTableNames(@Injectable PullLoadSourceInfo dataSourceInfo,
|
||||
public void testGetTableNames(@Injectable BrokerFileGroupAggInfo fileGroupAggInfo,
|
||||
@Injectable BrokerFileGroup brokerFileGroup,
|
||||
@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable Table table) throws MetaNotFoundException {
|
||||
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
|
||||
brokerFileGroups.add(brokerFileGroup);
|
||||
Map<Long, List<BrokerFileGroup>> idToFileGroups = Maps.newHashMap();
|
||||
idToFileGroups.put(1L, brokerFileGroups);
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
|
||||
FileGroupAggKey aggKey = new FileGroupAggKey(1L, null);
|
||||
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
|
||||
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
|
||||
Deencapsulation.setField(brokerLoadJob, "dataSourceInfo", dataSourceInfo);
|
||||
Deencapsulation.setField(brokerLoadJob, "fileGroupAggInfo", fileGroupAggInfo);
|
||||
String tableName = "table";
|
||||
new Expectations() {
|
||||
{
|
||||
dataSourceInfo.getIdToFileGroups();
|
||||
fileGroupAggInfo.getAggKeyToFileGroups();
|
||||
minTimes = 0;
|
||||
result = idToFileGroups;
|
||||
result = aggKeyToFileGroups;
|
||||
fileGroupAggInfo.getAllTableIds();
|
||||
minTimes = 0;
|
||||
result = Sets.newHashSet(1L);
|
||||
catalog.getDb(anyLong);
|
||||
minTimes = 0;
|
||||
result = database;
|
||||
@ -249,7 +254,7 @@ public class BrokerLoadJobTest {
|
||||
public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment attachment,
|
||||
@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable PullLoadSourceInfo dataSourceInfo,
|
||||
@Injectable BrokerFileGroupAggInfo fileGroupAggInfo,
|
||||
@Injectable BrokerFileGroup brokerFileGroup1,
|
||||
@Injectable BrokerFileGroup brokerFileGroup2,
|
||||
@Injectable BrokerFileGroup brokerFileGroup3,
|
||||
@ -261,15 +266,22 @@ public class BrokerLoadJobTest {
|
||||
long taskId = 1L;
|
||||
long tableId1 = 1L;
|
||||
long tableId2 = 2L;
|
||||
Map<Long, List<BrokerFileGroup>> idToFileGroups = Maps.newHashMap();
|
||||
long partitionId1 = 3L;
|
||||
long partitionId2 = 4;
|
||||
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
|
||||
List<BrokerFileGroup> fileGroups1 = Lists.newArrayList();
|
||||
fileGroups1.add(brokerFileGroup1);
|
||||
idToFileGroups.put(tableId1, fileGroups1);
|
||||
aggKeyToFileGroups.put(new FileGroupAggKey(tableId1, null), fileGroups1);
|
||||
|
||||
List<BrokerFileGroup> fileGroups2 = Lists.newArrayList();
|
||||
fileGroups2.add(brokerFileGroup2);
|
||||
fileGroups2.add(brokerFileGroup3);
|
||||
idToFileGroups.put(tableId2, fileGroups2);
|
||||
Deencapsulation.setField(brokerLoadJob, "dataSourceInfo", dataSourceInfo);
|
||||
aggKeyToFileGroups.put(new FileGroupAggKey(tableId2, Lists.newArrayList(partitionId1)), fileGroups2);
|
||||
// add another file groups with different partition id
|
||||
aggKeyToFileGroups.put(new FileGroupAggKey(tableId2, Lists.newArrayList(partitionId2)), fileGroups2);
|
||||
|
||||
Deencapsulation.setField(brokerLoadJob, "fileGroupAggInfo", fileGroupAggInfo);
|
||||
new Expectations() {
|
||||
{
|
||||
attachment.getTaskId();
|
||||
@ -278,9 +290,9 @@ public class BrokerLoadJobTest {
|
||||
catalog.getDb(anyLong);
|
||||
minTimes = 0;
|
||||
result = database;
|
||||
dataSourceInfo.getIdToFileGroups();
|
||||
fileGroupAggInfo.getAggKeyToFileGroups();
|
||||
minTimes = 0;
|
||||
result = idToFileGroups;
|
||||
result = aggKeyToFileGroups;
|
||||
database.getTable(anyLong);
|
||||
minTimes = 0;
|
||||
result = olapTable;
|
||||
@ -288,6 +300,7 @@ public class BrokerLoadJobTest {
|
||||
minTimes = 0;
|
||||
result = 1L;
|
||||
result = 2L;
|
||||
result = 3L;
|
||||
}
|
||||
};
|
||||
|
||||
@ -296,7 +309,7 @@ public class BrokerLoadJobTest {
|
||||
Assert.assertEquals(1, finishedTaskIds.size());
|
||||
Assert.assertEquals(true, finishedTaskIds.contains(taskId));
|
||||
Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
|
||||
Assert.assertEquals(2, idToTasks.size());
|
||||
Assert.assertEquals(3, idToTasks.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -20,9 +20,10 @@ package org.apache.doris.load.loadv2;
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -48,10 +49,11 @@ public class BrokerLoadPendingTaskTest {
|
||||
@Injectable BrokerDesc brokerDesc,
|
||||
@Mocked Catalog catalog,
|
||||
@Injectable TBrokerFileStatus tBrokerFileStatus) throws UserException {
|
||||
Map<Long, List<BrokerFileGroup>> tableToFileGroups = Maps.newHashMap();
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
|
||||
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
|
||||
brokerFileGroups.add(brokerFileGroup);
|
||||
tableToFileGroups.put(1L, brokerFileGroups);
|
||||
FileGroupAggKey aggKey = new FileGroupAggKey(1L, null);
|
||||
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getNextId();
|
||||
@ -67,10 +69,10 @@ public class BrokerLoadPendingTaskTest {
|
||||
}
|
||||
};
|
||||
|
||||
BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, tableToFileGroups, brokerDesc);
|
||||
BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, aggKeyToFileGroups, brokerDesc);
|
||||
brokerLoadPendingTask.executeTask();
|
||||
BrokerPendingTaskAttachment brokerPendingTaskAttachment = Deencapsulation.getField(brokerLoadPendingTask, "attachment");
|
||||
Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(1L));
|
||||
Assert.assertEquals(tBrokerFileStatus, brokerPendingTaskAttachment.getFileStatusByTable(1L).get(0).get(0));
|
||||
Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(aggKey));
|
||||
Assert.assertEquals(tBrokerFileStatus, brokerPendingTaskAttachment.getFileStatusByTable(aggKey).get(0).get(0));
|
||||
}
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ public class OlapTableSinkTest {
|
||||
dstTable.getPartitions(); result = Lists.newArrayList(partition);
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "");
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList());
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
@ -126,16 +126,12 @@ public class OlapTableSinkTest {
|
||||
partInfo.getType(); result = PartitionType.RANGE;
|
||||
partInfo.getPartitionColumns(); result = Lists.newArrayList(partKey);
|
||||
partInfo.getRange(1); result = Range.lessThan(key);
|
||||
// partInfo.getRange(2); result = Range.atLeast(key);
|
||||
dstTable.getPartitions(); result = Lists.newArrayList(p1, p2);
|
||||
dstTable.getPartition("p1"); result = p1;
|
||||
|
||||
dstTable.getPartition(p1.getId()); result = p1;
|
||||
index.getTablets(); result = Lists.newArrayList(new Tablet(1));
|
||||
// systemInfoService.getBackendIds(anyBoolean); result = Lists.newArrayList(new Long(1));
|
||||
// systemInfoService.getBackend(new Long(1)); result = new Backend(1, "abc", 1234);
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1");
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()));
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
try {
|
||||
sink.finalize();
|
||||
@ -152,12 +148,13 @@ public class OlapTableSinkTest {
|
||||
@Injectable MaterializedIndex index) throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
|
||||
long unknownPartId = 12345L;
|
||||
new Expectations() {{
|
||||
partInfo.getType(); result = PartitionType.RANGE;
|
||||
dstTable.getPartition("p3"); result = null;
|
||||
dstTable.getPartition(unknownPartId); result = null;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p3");
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId));
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
@ -174,7 +171,7 @@ public class OlapTableSinkTest {
|
||||
partInfo.getType(); result = PartitionType.UNPARTITIONED;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1");
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(1L));
|
||||
sink.init(new TUniqueId(1, 2), 3, 4, 1000);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
|
||||
Reference in New Issue
Block a user