branch-2.1: [enhance](mtmv)Only restrict MTMV to not allow concurrent insert overwrite execution #48673 (#49965)
Cherry-picked from #48673 Co-authored-by: zhangdong <zhangdong@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
1b108604d5
commit
523681d58e
@ -19,6 +19,7 @@ package org.apache.doris.insertoverwrite;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -294,7 +295,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
|
||||
// If executed in parallel, it may cause problems such as not being able to find temporary partitions.
|
||||
// But in terms of external table, we don't care the internal logic of execution,
|
||||
// so there's no need to keep records
|
||||
if (!(table instanceof OlapTable)) {
|
||||
if (!(table instanceof MTMV)) {
|
||||
return;
|
||||
}
|
||||
long dbId = db.getId();
|
||||
|
||||
@ -98,9 +98,16 @@ public class InsertOverwriteUtil {
|
||||
* @return
|
||||
*/
|
||||
public static List<String> generateTempPartitionNames(List<String> partitionNames) {
|
||||
long threadId = Thread.currentThread().getId();
|
||||
// Adding thread ID as a prefix is to avoid mutual interference
|
||||
// when different threads perform insert overwrite on the same partition simultaneously.
|
||||
// Even if the insert overwrite execution fails/cancels,
|
||||
// the generated temporary partition will be deleted,
|
||||
// so there will be no problem generating temporary partitions with the same name in a single thread
|
||||
String prefix = "iot_temp_" + threadId + "_";
|
||||
List<String> tempPartitionNames = new ArrayList<String>(partitionNames.size());
|
||||
for (String partitionName : partitionNames) {
|
||||
String tempPartitionName = "iot_temp_" + partitionName;
|
||||
String tempPartitionName = prefix + partitionName;
|
||||
if (tempPartitionName.length() > 50) {
|
||||
tempPartitionName = tempPartitionName.substring(0, 30) + Math.abs(Objects.hash(tempPartitionName))
|
||||
+ "_" + System.currentTimeMillis();
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.insertoverwrite;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -40,6 +41,9 @@ public class InsertOverwriteManagerTest {
|
||||
@Mocked
|
||||
private HMSExternalTable hmsExternalTable;
|
||||
|
||||
@Mocked
|
||||
private MTMV mtmv;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException {
|
||||
@ -69,18 +73,26 @@ public class InsertOverwriteManagerTest {
|
||||
hmsExternalTable.getName();
|
||||
minTimes = 0;
|
||||
result = "hmsTable";
|
||||
|
||||
mtmv.getId();
|
||||
minTimes = 0;
|
||||
result = 4L;
|
||||
|
||||
mtmv.getName();
|
||||
minTimes = 0;
|
||||
result = "mtmv1";
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParallel() {
|
||||
public void testMTMVParallel() {
|
||||
InsertOverwriteManager manager = new InsertOverwriteManager();
|
||||
manager.recordRunningTableOrException(db, table);
|
||||
manager.recordRunningTableOrException(db, mtmv);
|
||||
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
|
||||
() -> manager.recordRunningTableOrException(db, table));
|
||||
manager.dropRunningRecord(db.getId(), table.getId());
|
||||
Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table));
|
||||
() -> manager.recordRunningTableOrException(db, mtmv));
|
||||
manager.dropRunningRecord(db.getId(), mtmv.getId());
|
||||
Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, mtmv));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -90,4 +102,12 @@ public class InsertOverwriteManagerTest {
|
||||
Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, hmsExternalTable));
|
||||
manager.dropRunningRecord(db.getId(), hmsExternalTable.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOlapTableParallel() {
|
||||
InsertOverwriteManager manager = new InsertOverwriteManager();
|
||||
manager.recordRunningTableOrException(db, table);
|
||||
Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table));
|
||||
manager.dropRunningRecord(db.getId(), table.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,35 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.insertoverwrite;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class InsertOverwriteUtilTest {
|
||||
|
||||
@Test
|
||||
public void testGenerateTempPartitionNames() {
|
||||
String regex = "^iot_temp_[0-9]+_p1$";
|
||||
List<String> res = InsertOverwriteUtil.generateTempPartitionNames(Lists.newArrayList("p1"));
|
||||
String tempP1Name = res.get(0);
|
||||
Assertions.assertTrue(tempP1Name.matches(regex));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user