Added:
* Add streaming load feature. You can execute 'help stream load;' to see more information. Changed: * Loading phase of a certain table can be parallelized, to reduce the load job execution time when multi load jobs to a single table. * Using RocksDB to save the header info of tablets in Backends, to reduce the IO operations and increate speeding of restarting. Fixed: * A lot of bugs fixed.
This commit is contained in:
329
fe/src/test/java/com/baidu/palo/alter/RollupJobTest.java
Normal file
329
fe/src/test/java/com/baidu/palo/alter/RollupJobTest.java
Normal file
@ -0,0 +1,329 @@
|
||||
package com.baidu.palo.alter;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.baidu.palo.alter.AlterJob.JobState;
|
||||
import com.baidu.palo.analysis.AccessTestUtil;
|
||||
import com.baidu.palo.analysis.AddRollupClause;
|
||||
import com.baidu.palo.analysis.AlterClause;
|
||||
import com.baidu.palo.analysis.Analyzer;
|
||||
import com.baidu.palo.catalog.Catalog;
|
||||
import com.baidu.palo.catalog.CatalogTestUtil;
|
||||
import com.baidu.palo.catalog.Database;
|
||||
import com.baidu.palo.catalog.FakeCatalog;
|
||||
import com.baidu.palo.catalog.FakeEditLog;
|
||||
import com.baidu.palo.catalog.MaterializedIndex;
|
||||
import com.baidu.palo.catalog.MaterializedIndex.IndexState;
|
||||
import com.baidu.palo.catalog.OlapTable;
|
||||
import com.baidu.palo.catalog.OlapTable.OlapTableState;
|
||||
import com.baidu.palo.catalog.Partition;
|
||||
import com.baidu.palo.catalog.Partition.PartitionState;
|
||||
import com.baidu.palo.catalog.Replica;
|
||||
import com.baidu.palo.catalog.Tablet;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.common.FeMetaVersion;
|
||||
import com.baidu.palo.task.AgentTask;
|
||||
import com.baidu.palo.task.AgentTaskQueue;
|
||||
import com.baidu.palo.thrift.TTabletInfo;
|
||||
import com.baidu.palo.thrift.TTaskType;
|
||||
import com.baidu.palo.transaction.FakeTransactionIDGenerator;
|
||||
import com.baidu.palo.transaction.GlobalTransactionMgr;
|
||||
import com.baidu.palo.transaction.TabletCommitInfo;
|
||||
import com.baidu.palo.transaction.TransactionState;
|
||||
import com.baidu.palo.transaction.TransactionState.LoadJobSourceType;
|
||||
import com.baidu.palo.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import mockit.internal.startup.Startup;
|
||||
|
||||
public class RollupJobTest {
|
||||
|
||||
private static FakeEditLog fakeEditLog;
|
||||
private static FakeCatalog fakeCatalog;
|
||||
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
|
||||
private static GlobalTransactionMgr masterTransMgr;
|
||||
private static GlobalTransactionMgr slaveTransMgr;
|
||||
private static Catalog masterCatalog;
|
||||
private static Catalog slaveCatalog;
|
||||
|
||||
private String transactionSource = "localfe";
|
||||
private static Analyzer analyzer;
|
||||
private static AddRollupClause clause;
|
||||
|
||||
static {
|
||||
Startup.initializeIfPossible();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException {
|
||||
fakeEditLog = new FakeEditLog();
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
|
||||
masterCatalog = CatalogTestUtil.createTestCatalog();
|
||||
slaveCatalog = CatalogTestUtil.createTestCatalog();
|
||||
masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
masterTransMgr = masterCatalog.getGlobalTransactionMgr();
|
||||
masterTransMgr.setEditLog(masterCatalog.getEditLog());
|
||||
|
||||
slaveTransMgr = slaveCatalog.getGlobalTransactionMgr();
|
||||
slaveTransMgr.setEditLog(slaveCatalog.getEditLog());
|
||||
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
|
||||
clause = new AddRollupClause(CatalogTestUtil.testRollupIndex2, Lists.newArrayList("k1", "v"), null,
|
||||
CatalogTestUtil.testIndex1, null);
|
||||
clause.analyze(analyzer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddRollup() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
RollupHandler rollupHandler = Catalog.getInstance().getRollupHandler();
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(clause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
rollupHandler.process(alterClauses, db, olapTable, false);
|
||||
RollupJob rollupJob = (RollupJob) rollupHandler.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
Assert.assertEquals(CatalogTestUtil.testIndexId1, rollupJob.getBaseIndexId());
|
||||
Assert.assertEquals(CatalogTestUtil.testRollupIndex2, rollupJob.getRollupIndexName());
|
||||
}
|
||||
|
||||
// start a rollup, then finished
|
||||
@Test
|
||||
public void testRollup1() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
RollupHandler rollupHandler = Catalog.getInstance().getRollupHandler();
|
||||
|
||||
// add a rollup job
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(clause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
rollupHandler.process(alterClauses, db, olapTable, false);
|
||||
RollupJob rollupJob = (RollupJob) rollupHandler.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
Assert.assertEquals(CatalogTestUtil.testIndexId1, rollupJob.getBaseIndexId());
|
||||
Assert.assertEquals(CatalogTestUtil.testRollupIndex2, rollupJob.getRollupIndexName());
|
||||
MaterializedIndex rollupIndex = rollupJob.getRollupIndex(CatalogTestUtil.testPartitionId1);
|
||||
MaterializedIndex baseIndex = testPartition.getBaseIndex();
|
||||
assertEquals(IndexState.ROLLUP, rollupIndex.getState());
|
||||
assertEquals(IndexState.NORMAL, baseIndex.getState());
|
||||
assertEquals(OlapTableState.ROLLUP, olapTable.getState());
|
||||
assertEquals(PartitionState.ROLLUP, testPartition.getState());
|
||||
Tablet rollupTablet = rollupIndex.getTablets().get(0);
|
||||
List<Replica> replicas = rollupTablet.getReplicas();
|
||||
Replica rollupReplica1 = replicas.get(0);
|
||||
Replica rollupReplica2 = replicas.get(1);
|
||||
Replica rollupReplica3 = replicas.get(2);
|
||||
|
||||
assertEquals(-1, rollupReplica1.getVersion());
|
||||
assertEquals(-1, rollupReplica2.getVersion());
|
||||
assertEquals(-1, rollupReplica3.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica3.getLastFailedVersion());
|
||||
assertEquals(-1, rollupReplica1.getLastSuccessVersion());
|
||||
assertEquals(-1, rollupReplica2.getLastSuccessVersion());
|
||||
assertEquals(-1, rollupReplica3.getLastSuccessVersion());
|
||||
|
||||
// rollup handler run one cycle, agent task is generated and send tasks
|
||||
rollupHandler.runOneCycle();
|
||||
AgentTask task1 = AgentTaskQueue.getTask(rollupReplica1.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
AgentTask task2 = AgentTaskQueue.getTask(rollupReplica2.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
AgentTask task3 = AgentTaskQueue.getTask(rollupReplica3.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
|
||||
// be report finishe rollup success
|
||||
TTabletInfo tTabletInfo = new TTabletInfo(rollupTablet.getId(), CatalogTestUtil.testSchemaHash1,
|
||||
CatalogTestUtil.testStartVersion, CatalogTestUtil.testStartVersionHash, 0, 0);
|
||||
rollupHandler.handleFinishedReplica(task1, tTabletInfo, -1);
|
||||
rollupHandler.handleFinishedReplica(task2, tTabletInfo, -1);
|
||||
rollupHandler.handleFinishedReplica(task3, tTabletInfo, -1);
|
||||
|
||||
// rollup hander run one cycle again, the rollup job is finishing
|
||||
rollupHandler.runOneCycle();
|
||||
Assert.assertEquals(JobState.FINISHING, rollupJob.getState());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica3.getVersion());
|
||||
assertEquals(-1, rollupReplica1.getLastFailedVersion());
|
||||
assertEquals(-1, rollupReplica2.getLastFailedVersion());
|
||||
assertEquals(-1, rollupReplica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
|
||||
}
|
||||
|
||||
// load some data and one replica has errors
|
||||
// start a rollup and then load data
|
||||
// load finished and rollup finished
|
||||
@Test
|
||||
public void testRollup2() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
// load one transaction with backend 2 has errors
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1, transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction, backend 2 has errors
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
// TabletCommitInfo tabletCommitInfo2 = new
|
||||
// TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
// CatalogTestUtil.testBackendId2);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
// transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
|
||||
|
||||
// start a rollup
|
||||
RollupHandler rollupHandler = Catalog.getInstance().getRollupHandler();
|
||||
// add a rollup job
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(clause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
rollupHandler.process(alterClauses, db, olapTable, false);
|
||||
RollupJob rollupJob = (RollupJob) rollupHandler.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
Assert.assertEquals(CatalogTestUtil.testIndexId1, rollupJob.getBaseIndexId());
|
||||
Assert.assertEquals(CatalogTestUtil.testRollupIndex2, rollupJob.getRollupIndexName());
|
||||
MaterializedIndex rollupIndex = rollupJob.getRollupIndex(CatalogTestUtil.testPartitionId1);
|
||||
MaterializedIndex baseIndex = testPartition.getBaseIndex();
|
||||
assertEquals(IndexState.ROLLUP, rollupIndex.getState());
|
||||
assertEquals(IndexState.NORMAL, baseIndex.getState());
|
||||
assertEquals(OlapTableState.ROLLUP, olapTable.getState());
|
||||
assertEquals(PartitionState.ROLLUP, testPartition.getState());
|
||||
Tablet rollupTablet = rollupIndex.getTablets().get(0);
|
||||
List<Replica> replicas = rollupTablet.getReplicas();
|
||||
Replica rollupReplica1 = replicas.get(0);
|
||||
Replica rollupReplica3 = replicas.get(1);
|
||||
assertEquals(2, rollupTablet.getReplicas().size());
|
||||
|
||||
assertEquals(-1, rollupReplica1.getVersion());
|
||||
assertEquals(-1, rollupReplica3.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica1.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica3.getLastFailedVersion());
|
||||
assertEquals(-1, rollupReplica1.getLastSuccessVersion());
|
||||
assertEquals(-1, rollupReplica3.getLastSuccessVersion());
|
||||
|
||||
// rollup handler run one cycle, agent task is generated and send tasks
|
||||
rollupHandler.runOneCycle();
|
||||
AgentTask task1 = AgentTaskQueue.getTask(rollupReplica1.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
AgentTask task3 = AgentTaskQueue.getTask(rollupReplica3.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
|
||||
// be report finishe rollup success
|
||||
TTabletInfo tTabletInfo = new TTabletInfo(rollupTablet.getId(), CatalogTestUtil.testSchemaHash1,
|
||||
CatalogTestUtil.testStartVersion + 1, CatalogTestUtil.testPartitionNextVersionHash, 0, 0);
|
||||
rollupHandler.handleFinishedReplica(task1, tTabletInfo, -1);
|
||||
rollupHandler.handleFinishedReplica(task3, tTabletInfo, -1);
|
||||
|
||||
// rollup hander run one cycle again, the rollup job is finishing
|
||||
rollupHandler.runOneCycle();
|
||||
Assert.assertEquals(JobState.FINISHING, rollupJob.getState());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica3.getVersion());
|
||||
assertEquals(-1, rollupReplica1.getLastFailedVersion());
|
||||
assertEquals(-1, rollupReplica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica3.getLastSuccessVersion());
|
||||
}
|
||||
|
||||
// start a rollup and then load data
|
||||
// but load to rolluping index failed, then rollup is cancelled
|
||||
@Test
|
||||
public void testRollup3() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
RollupHandler rollupHandler = Catalog.getInstance().getRollupHandler();
|
||||
|
||||
// add a rollup job
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(clause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
rollupHandler.process(alterClauses, db, olapTable, false);
|
||||
RollupJob rollupJob = (RollupJob) rollupHandler.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
Assert.assertEquals(CatalogTestUtil.testIndexId1, rollupJob.getBaseIndexId());
|
||||
Assert.assertEquals(CatalogTestUtil.testRollupIndex2, rollupJob.getRollupIndexName());
|
||||
MaterializedIndex rollupIndex = rollupJob.getRollupIndex(CatalogTestUtil.testPartitionId1);
|
||||
MaterializedIndex baseIndex = testPartition.getBaseIndex();
|
||||
assertEquals(IndexState.ROLLUP, rollupIndex.getState());
|
||||
assertEquals(IndexState.NORMAL, baseIndex.getState());
|
||||
assertEquals(OlapTableState.ROLLUP, olapTable.getState());
|
||||
assertEquals(PartitionState.ROLLUP, testPartition.getState());
|
||||
Tablet rollupTablet = rollupIndex.getTablets().get(0);
|
||||
List<Replica> replicas = rollupTablet.getReplicas();
|
||||
Replica rollupReplica1 = replicas.get(0);
|
||||
Replica rollupReplica2 = replicas.get(1);
|
||||
Replica rollupReplica3 = replicas.get(2);
|
||||
|
||||
// rollup handler run one cycle, agent task is generated and send tasks
|
||||
rollupHandler.runOneCycle();
|
||||
AgentTask task1 = AgentTaskQueue.getTask(rollupReplica1.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
AgentTask task2 = AgentTaskQueue.getTask(rollupReplica2.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
AgentTask task3 = AgentTaskQueue.getTask(rollupReplica3.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
|
||||
|
||||
// load a transaction, but rollup tablet failed, then the rollup job should be
|
||||
// cancelled
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction, backend 2 has errors
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId2);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
|
||||
// rollup replca's last failed version should change to 13
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica1.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, rollupReplica3.getLastFailedVersion());
|
||||
|
||||
// be report finishe rollup success
|
||||
TTabletInfo tTabletInfo = new TTabletInfo(rollupTablet.getId(), CatalogTestUtil.testSchemaHash1,
|
||||
CatalogTestUtil.testStartVersion, CatalogTestUtil.testStartVersionHash, 0, 0);
|
||||
rollupHandler.handleFinishedReplica(task1, tTabletInfo, -1);
|
||||
rollupHandler.handleFinishedReplica(task2, tTabletInfo, -1);
|
||||
rollupHandler.handleFinishedReplica(task3, tTabletInfo, -1);
|
||||
|
||||
// rollup hander run one cycle again, the rollup job is finishing
|
||||
rollupHandler.runOneCycle();
|
||||
Assert.assertEquals(JobState.CANCELLED, rollupJob.getState());
|
||||
assertEquals(1, testPartition.getMaterializedIndices().size());
|
||||
}
|
||||
}
|
||||
256
fe/src/test/java/com/baidu/palo/alter/SchemaChangeJobTest.java
Normal file
256
fe/src/test/java/com/baidu/palo/alter/SchemaChangeJobTest.java
Normal file
@ -0,0 +1,256 @@
|
||||
package com.baidu.palo.alter;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.baidu.palo.alter.AlterJob.JobState;
|
||||
import com.baidu.palo.analysis.AccessTestUtil;
|
||||
import com.baidu.palo.analysis.AddColumnClause;
|
||||
import com.baidu.palo.analysis.AlterClause;
|
||||
import com.baidu.palo.analysis.Analyzer;
|
||||
import com.baidu.palo.analysis.ColumnPosition;
|
||||
import com.baidu.palo.catalog.AggregateType;
|
||||
import com.baidu.palo.catalog.Catalog;
|
||||
import com.baidu.palo.catalog.CatalogTestUtil;
|
||||
import com.baidu.palo.catalog.Column;
|
||||
import com.baidu.palo.catalog.ColumnType;
|
||||
import com.baidu.palo.catalog.Database;
|
||||
import com.baidu.palo.catalog.FakeCatalog;
|
||||
import com.baidu.palo.catalog.FakeEditLog;
|
||||
import com.baidu.palo.catalog.MaterializedIndex;
|
||||
import com.baidu.palo.catalog.OlapTable;
|
||||
import com.baidu.palo.catalog.Partition;
|
||||
import com.baidu.palo.catalog.PrimitiveType;
|
||||
import com.baidu.palo.catalog.Replica;
|
||||
import com.baidu.palo.catalog.Tablet;
|
||||
import com.baidu.palo.catalog.MaterializedIndex.IndexState;
|
||||
import com.baidu.palo.catalog.OlapTable.OlapTableState;
|
||||
import com.baidu.palo.catalog.Partition.PartitionState;
|
||||
import com.baidu.palo.catalog.Replica.ReplicaState;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.common.FeMetaVersion;
|
||||
import com.baidu.palo.task.AgentTask;
|
||||
import com.baidu.palo.task.AgentTaskQueue;
|
||||
import com.baidu.palo.thrift.TTabletInfo;
|
||||
import com.baidu.palo.thrift.TTaskType;
|
||||
import com.baidu.palo.transaction.FakeTransactionIDGenerator;
|
||||
import com.baidu.palo.transaction.GlobalTransactionMgr;
|
||||
import com.baidu.palo.transaction.TabletCommitInfo;
|
||||
import com.baidu.palo.transaction.TransactionState;
|
||||
import com.baidu.palo.transaction.TransactionStatus;
|
||||
import com.baidu.palo.transaction.TransactionState.LoadJobSourceType;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class SchemaChangeJobTest {
|
||||
|
||||
private static FakeEditLog fakeEditLog;
|
||||
private static FakeCatalog fakeCatalog;
|
||||
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
|
||||
private static GlobalTransactionMgr masterTransMgr;
|
||||
private static GlobalTransactionMgr slaveTransMgr;
|
||||
private static Catalog masterCatalog;
|
||||
private static Catalog slaveCatalog;
|
||||
|
||||
private String transactionSource = "localfe";
|
||||
private static Analyzer analyzer;
|
||||
private static Column newCol = new Column("add_v", new ColumnType(PrimitiveType.INT), false, AggregateType.MAX,
|
||||
false, "1", "");
|
||||
private static AddColumnClause addColumnClause = new AddColumnClause(newCol, new ColumnPosition("v"), null, null);
|
||||
|
||||
@Before
|
||||
public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException {
|
||||
fakeEditLog = new FakeEditLog();
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
|
||||
masterCatalog = CatalogTestUtil.createTestCatalog();
|
||||
slaveCatalog = CatalogTestUtil.createTestCatalog();
|
||||
masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
masterTransMgr = masterCatalog.getGlobalTransactionMgr();
|
||||
masterTransMgr.setEditLog(masterCatalog.getEditLog());
|
||||
slaveTransMgr = slaveCatalog.getGlobalTransactionMgr();
|
||||
slaveTransMgr.setEditLog(slaveCatalog.getEditLog());
|
||||
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
|
||||
addColumnClause.analyze(analyzer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSchemaChange() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler();
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(addColumnClause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
schemaChangeHandler.process(alterClauses, "default", db, olapTable);
|
||||
SchemaChangeJob schemaChangeJob = (SchemaChangeJob) schemaChangeHandler
|
||||
.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
Assert.assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
|
||||
}
|
||||
|
||||
// start a schema change, then finished
|
||||
@Test
|
||||
public void testSchemaChange1() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler();
|
||||
|
||||
// add a schema change job
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(addColumnClause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
schemaChangeHandler.process(alterClauses, "default", db, olapTable);
|
||||
SchemaChangeJob schemaChangeJob = (SchemaChangeJob) schemaChangeHandler
|
||||
.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
MaterializedIndex baseIndex = testPartition.getBaseIndex();
|
||||
assertEquals(IndexState.SCHEMA_CHANGE, baseIndex.getState());
|
||||
assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
|
||||
assertEquals(PartitionState.SCHEMA_CHANGE, testPartition.getState());
|
||||
Tablet baseTablet = baseIndex.getTablets().get(0);
|
||||
List<Replica> replicas = baseTablet.getReplicas();
|
||||
Replica replica1 = replicas.get(0);
|
||||
Replica replica2 = replicas.get(1);
|
||||
Replica replica3 = replicas.get(2);
|
||||
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
|
||||
assertEquals(-1, replica1.getLastFailedVersion());
|
||||
assertEquals(-1, replica2.getLastFailedVersion());
|
||||
assertEquals(-1, replica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion());
|
||||
|
||||
// schemachange handler run one cycle, agent task is generated and send tasks
|
||||
schemaChangeHandler.runOneCycle();
|
||||
AgentTask task1 = AgentTaskQueue.getTask(replica1.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
AgentTask task2 = AgentTaskQueue.getTask(replica2.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
AgentTask task3 = AgentTaskQueue.getTask(replica3.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
|
||||
// be report finishe schema change success, report the new schema hash
|
||||
TTabletInfo tTabletInfo = new TTabletInfo(baseTablet.getId(),
|
||||
schemaChangeJob.getSchemaHashByIndexId(CatalogTestUtil.testIndexId1), CatalogTestUtil.testStartVersion,
|
||||
CatalogTestUtil.testStartVersionHash, 0, 0);
|
||||
schemaChangeHandler.handleFinishedReplica(task1, tTabletInfo, -1);
|
||||
schemaChangeHandler.handleFinishedReplica(task2, tTabletInfo, -1);
|
||||
schemaChangeHandler.handleFinishedReplica(task3, tTabletInfo, -1);
|
||||
|
||||
// schema change hander run one cycle again, the rollup job is finishing
|
||||
schemaChangeHandler.runOneCycle();
|
||||
Assert.assertEquals(JobState.FINISHING, schemaChangeJob.getState());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
|
||||
assertEquals(-1, replica1.getLastFailedVersion());
|
||||
assertEquals(-1, replica2.getLastFailedVersion());
|
||||
assertEquals(-1, replica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion());
|
||||
}
|
||||
|
||||
// load some data and one replica has errors
|
||||
// start a schema change and then load data
|
||||
// load finished and schema change finished
|
||||
@Test
|
||||
public void testSchemaChange2() throws Exception {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler();
|
||||
// load one transaction with backend 2 has errors
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction, backend 2 has errors
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
// TabletCommitInfo tabletCommitInfo2 = new
|
||||
// TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
// CatalogTestUtil.testBackendId2);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
// transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
|
||||
|
||||
// start a schema change
|
||||
ArrayList<AlterClause> alterClauses = new ArrayList<>();
|
||||
alterClauses.add(addColumnClause);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
|
||||
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
|
||||
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
schemaChangeHandler.process(alterClauses, "default", db, olapTable);
|
||||
SchemaChangeJob schemaChangeJob = (SchemaChangeJob) schemaChangeHandler
|
||||
.getAlterJob(CatalogTestUtil.testTableId1);
|
||||
MaterializedIndex baseIndex = testPartition.getBaseIndex();
|
||||
assertEquals(IndexState.SCHEMA_CHANGE, baseIndex.getState());
|
||||
assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
|
||||
assertEquals(PartitionState.SCHEMA_CHANGE, testPartition.getState());
|
||||
Tablet baseTablet = baseIndex.getTablets().get(0);
|
||||
List<Replica> replicas = baseTablet.getReplicas();
|
||||
Replica replica1 = replicas.get(0);
|
||||
Replica replica2 = replicas.get(1);
|
||||
Replica replica3 = replicas.get(2);
|
||||
assertEquals(3, baseTablet.getReplicas().size());
|
||||
|
||||
assertEquals(ReplicaState.SCHEMA_CHANGE, replica1.getState());
|
||||
assertEquals(ReplicaState.NORMAL, replica2.getState());
|
||||
assertEquals(ReplicaState.SCHEMA_CHANGE, replica3.getState());
|
||||
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getVersion());
|
||||
assertEquals(-1, replica1.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getLastFailedVersion());
|
||||
assertEquals(-1, replica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getLastSuccessVersion());
|
||||
|
||||
// schemachange handler run one cycle, agent task is generated and send tasks
|
||||
schemaChangeHandler.runOneCycle();
|
||||
AgentTask task1 = AgentTaskQueue.getTask(replica1.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
AgentTask task2 = AgentTaskQueue.getTask(replica2.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
AgentTask task3 = AgentTaskQueue.getTask(replica3.getBackendId(), TTaskType.SCHEMA_CHANGE, baseTablet.getId());
|
||||
assertNull(task2);
|
||||
|
||||
// be report finish schema change success, report the new schema hash
|
||||
TTabletInfo tTabletInfo = new TTabletInfo(baseTablet.getId(),
|
||||
schemaChangeJob.getSchemaHashByIndexId(CatalogTestUtil.testIndexId1), CatalogTestUtil.testStartVersion,
|
||||
CatalogTestUtil.testStartVersionHash, 0, 0);
|
||||
schemaChangeHandler.handleFinishedReplica(task1, tTabletInfo, -1);
|
||||
schemaChangeHandler.handleFinishedReplica(task3, tTabletInfo, -1);
|
||||
|
||||
// rollup hander run one cycle again, the rollup job is finishing
|
||||
schemaChangeHandler.runOneCycle();
|
||||
Assert.assertEquals(JobState.FINISHING, schemaChangeJob.getState());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getVersion());
|
||||
assertEquals(-1, replica1.getLastFailedVersion());
|
||||
assertEquals(-1, replica3.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getLastSuccessVersion());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,77 @@
|
||||
package com.baidu.palo.analysis;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public class AdminShowReplicaTest {
|
||||
|
||||
@Test
|
||||
public void testShowReplicaStatus() {
|
||||
String stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'ok'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status != 'ok'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'dead'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status != 'VERSION_ERROR'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'MISSING'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'missing'");
|
||||
testAnalyzeWhere(stmt, true);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status != 'what'");
|
||||
testAnalyzeWhere(stmt, false);
|
||||
|
||||
stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'how'");
|
||||
testAnalyzeWhere(stmt, false);
|
||||
}
|
||||
|
||||
private void testAnalyzeWhere(String stmt, boolean correct) {
|
||||
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(stmt)));
|
||||
AdminShowReplicaStatusStmt showStmt = null;
|
||||
try {
|
||||
showStmt = (AdminShowReplicaStatusStmt) parser.parse().value;
|
||||
} catch (Error e) {
|
||||
Assert.fail(e.getMessage());
|
||||
} catch (Exception e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
Method method = AdminShowReplicaStatusStmt.class.getDeclaredMethod("analyzeWhere");
|
||||
method.setAccessible(true);
|
||||
if (!(Boolean) method.invoke(showStmt)) {
|
||||
if (correct) {
|
||||
Assert.fail();
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (tryAssert(correct, e)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!correct) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean tryAssert(boolean correct, Exception e) {
|
||||
if (correct) {
|
||||
e.printStackTrace();
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
294
fe/src/test/java/com/baidu/palo/catalog/CatalogTestUtil.java
Normal file
294
fe/src/test/java/com/baidu/palo/catalog/CatalogTestUtil.java
Normal file
@ -0,0 +1,294 @@
|
||||
package com.baidu.palo.catalog;
|
||||
|
||||
import com.baidu.palo.analysis.PartitionKeyDesc;
|
||||
import com.baidu.palo.analysis.SingleRangePartitionDesc;
|
||||
import com.baidu.palo.catalog.MaterializedIndex.IndexState;
|
||||
import com.baidu.palo.catalog.Replica.ReplicaState;
|
||||
import com.baidu.palo.common.DdlException;
|
||||
import com.baidu.palo.persist.EditLog;
|
||||
import com.baidu.palo.system.Backend;
|
||||
import com.baidu.palo.system.SystemInfoService;
|
||||
import com.baidu.palo.thrift.TDisk;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class CatalogTestUtil {
|
||||
|
||||
public static String testDb1 = "testDb1";
|
||||
public static long testDbId1 = 1;
|
||||
public static String testTable1 = "testTable1";
|
||||
public static long testTableId1 = 2;
|
||||
public static String testPartition1 = "testPartition1";
|
||||
public static long testPartitionId1 = 3;
|
||||
public static String testIndex1 = "testIndex1";
|
||||
public static long testIndexId1 = 2; // the base indexid == tableid
|
||||
public static int testSchemaHash1 = 93423942;
|
||||
public static long testBackendId1 = 5;
|
||||
public static long testBackendId2 = 6;
|
||||
public static long testBackendId3 = 7;
|
||||
public static long testReplicaId1 = 8;
|
||||
public static long testReplicaId2 = 9;
|
||||
public static long testReplicaId3 = 10;
|
||||
public static long testTabletId1 = 11;
|
||||
public static long testStartVersion = 12;
|
||||
public static long testStartVersionHash = 12312;
|
||||
public static long testPartitionCurrentVersionHash = 12312;
|
||||
public static long testPartitionNextVersionHash = 123123123;
|
||||
public static long testRollupIndexId2 = 13;
|
||||
public static String testRollupIndex2 = "newRollupIndex";
|
||||
public static String testTxnLable1 = "testTxnLable1";
|
||||
public static String testTxnLable2 = "testTxnLable2";
|
||||
public static String testTxnLable3 = "testTxnLable3";
|
||||
public static String testTxnLable4 = "testTxnLable4";
|
||||
public static String testTxnLable5 = "testTxnLable5";
|
||||
public static String testTxnLable6 = "testTxnLable6";
|
||||
public static String testTxnLable7 = "testTxnLable7";
|
||||
public static String testTxnLable8 = "testTxnLable8";
|
||||
public static String testTxnLable9 = "testTxnLable9";
|
||||
public static String testTxnLable10 = "testTxnLable10";
|
||||
public static String testTxnLable11 = "testTxnLable11";
|
||||
public static String testTxnLable12 = "testTxnLable12";
|
||||
public static String testPartitionedEsTable1 = "partitionedEsTable1";
|
||||
public static long testPartitionedEsTableId1 = 14;
|
||||
public static String testUnPartitionedEsTable1 = "unpartitionedEsTable1";
|
||||
public static long testUnPartitionedEsTableId1 = 15;
|
||||
|
||||
public static Catalog createTestCatalog() throws InstantiationException, IllegalAccessException,
|
||||
IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
|
||||
Constructor<Catalog> constructor = Catalog.class.getDeclaredConstructor();
|
||||
constructor.setAccessible(true);
|
||||
Catalog catalog = constructor.newInstance();
|
||||
catalog.setEditLog(new EditLog("name"));
|
||||
FakeCatalog.setCatalog(catalog);
|
||||
Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 125);
|
||||
Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 125);
|
||||
Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 125);
|
||||
catalog.getCurrentSystemInfo().addBackend(backend1);
|
||||
catalog.getCurrentSystemInfo().addBackend(backend2);
|
||||
catalog.getCurrentSystemInfo().addBackend(backend3);
|
||||
catalog.initDefaultCluster();
|
||||
Database db = createSimpleDb(testDbId1, testTableId1, testPartitionId1, testIndexId1, testTabletId1,
|
||||
testStartVersion, testStartVersionHash);
|
||||
catalog.unprotectCreateDb(db);
|
||||
return catalog;
|
||||
}
|
||||
|
||||
public static boolean compareCatalog(Catalog masterCatalog, Catalog slaveCatalog) {
|
||||
Database masterDb = masterCatalog.getDb(testDb1);
|
||||
Database slaveDb = slaveCatalog.getDb(testDb1);
|
||||
List<Table> tables = masterDb.getTables();
|
||||
for (Table table : tables) {
|
||||
Table slaveTable = slaveDb.getTable(table.getId());
|
||||
if (slaveTable == null) {
|
||||
return false;
|
||||
}
|
||||
Partition masterPartition = table.getPartition(testPartition1);
|
||||
Partition slavePartition = slaveTable.getPartition(testPartition1);
|
||||
if (masterPartition == null && slavePartition == null) {
|
||||
return true;
|
||||
}
|
||||
if (masterPartition.getId() != slavePartition.getId()) {
|
||||
return false;
|
||||
}
|
||||
if (masterPartition.getCommittedVersion() != slavePartition.getCommittedVersion()
|
||||
|| masterPartition.getCommittedVersionHash() != slavePartition.getCommittedVersionHash()
|
||||
|| masterPartition.getNextVersion() != slavePartition.getNextVersion()
|
||||
|| masterPartition.getCurrentVersionHash() != slavePartition.getCurrentVersionHash()) {
|
||||
return false;
|
||||
}
|
||||
List<MaterializedIndex> allMaterializedIndices = masterPartition.getMaterializedIndices();
|
||||
for (MaterializedIndex masterIndex : allMaterializedIndices) {
|
||||
MaterializedIndex slaveIndex = slavePartition.getIndex(masterIndex.getId());
|
||||
if (slaveIndex == null) {
|
||||
return false;
|
||||
}
|
||||
List<Tablet> allTablets = masterIndex.getTablets();
|
||||
for (Tablet masterTablet : allTablets) {
|
||||
Tablet slaveTablet = slaveIndex.getTablet(masterTablet.getId());
|
||||
if (slaveTablet == null) {
|
||||
return false;
|
||||
}
|
||||
List<Replica> allReplicas = masterTablet.getReplicas();
|
||||
for (Replica masterReplica : allReplicas) {
|
||||
Replica slaveReplica = slaveTablet.getReplicaById(masterReplica.getId());
|
||||
if (slaveReplica.getBackendId() != masterReplica.getBackendId()
|
||||
|| slaveReplica.getVersion() != masterReplica.getVersion()
|
||||
|| slaveReplica.getVersionHash() != masterReplica.getVersionHash()
|
||||
|| slaveReplica.getLastFailedVersion() != masterReplica.getLastFailedVersion()
|
||||
|| slaveReplica.getLastFailedVersionHash() != masterReplica.getLastFailedVersionHash()
|
||||
|| slaveReplica.getLastSuccessVersion() != slaveReplica.getLastSuccessVersion()
|
||||
|| slaveReplica.getLastSuccessVersionHash() != slaveReplica
|
||||
.getLastSuccessVersionHash()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static Database createSimpleDb(long dbId, long tableId, long partitionId, long indexId, long tabletId,
|
||||
long version, long versionHash) {
|
||||
Catalog.getCurrentInvertedIndex().clear();
|
||||
|
||||
// replica
|
||||
long replicaId = 0;
|
||||
Replica replica1 = new Replica(testReplicaId1, testBackendId1, version, versionHash, 0L, 0L,
|
||||
ReplicaState.NORMAL, -1, 0, 0, 0);
|
||||
Replica replica2 = new Replica(testReplicaId2, testBackendId2, version, versionHash, 0L, 0L,
|
||||
ReplicaState.NORMAL, -1, 0, 0, 0);
|
||||
Replica replica3 = new Replica(testReplicaId3, testBackendId3, version, versionHash, 0L, 0L,
|
||||
ReplicaState.NORMAL, -1, 0, 0, 0);
|
||||
|
||||
// tablet
|
||||
Tablet tablet = new Tablet(tabletId);
|
||||
|
||||
// index
|
||||
MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL);
|
||||
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0);
|
||||
index.addTablet(tablet, tabletMeta);
|
||||
|
||||
tablet.addReplica(replica1);
|
||||
tablet.addReplica(replica2);
|
||||
tablet.addReplica(replica3);
|
||||
|
||||
// partition
|
||||
RandomDistributionInfo distributionInfo = new RandomDistributionInfo(10);
|
||||
Partition partition = new Partition(partitionId, testPartition1, index, distributionInfo);
|
||||
partition.updateCommitVersionAndVersionHash(testStartVersion, testStartVersionHash);
|
||||
partition.setNextVersion(testStartVersion + 1);
|
||||
partition.setNextVersionHash(testPartitionNextVersionHash, testPartitionCurrentVersionHash);
|
||||
|
||||
// columns
|
||||
List<Column> columns = new ArrayList<Column>();
|
||||
Column temp = new Column("k1", PrimitiveType.INT);
|
||||
temp.setIsKey(true);
|
||||
columns.add(temp);
|
||||
temp = new Column("k2", PrimitiveType.INT);
|
||||
temp.setIsKey(true);
|
||||
columns.add(temp);
|
||||
columns.add(new Column("v", new ColumnType(PrimitiveType.DOUBLE), false, AggregateType.SUM, "0", ""));
|
||||
|
||||
List<Column> keysColumn = new ArrayList<Column>();
|
||||
temp = new Column("k1", PrimitiveType.INT);
|
||||
temp.setIsKey(true);
|
||||
keysColumn.add(temp);
|
||||
temp = new Column("k2", PrimitiveType.INT);
|
||||
temp.setIsKey(true);
|
||||
keysColumn.add(temp);
|
||||
|
||||
// table
|
||||
PartitionInfo partitionInfo = new SinglePartitionInfo();
|
||||
partitionInfo.setDataProperty(partitionId, DataProperty.DEFAULT_HDD_DATA_PROPERTY);
|
||||
partitionInfo.setReplicationNum(partitionId, (short) 3);
|
||||
OlapTable table = new OlapTable(tableId, testTable1, columns, KeysType.AGG_KEYS, partitionInfo,
|
||||
distributionInfo);
|
||||
table.addPartition(partition);
|
||||
table.setIndexSchemaInfo(indexId, testIndex1, columns, 0, testSchemaHash1, (short) 1);
|
||||
// db
|
||||
Database db = new Database(dbId, testDb1);
|
||||
db.createTable(table);
|
||||
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
|
||||
// add a es table to catalog
|
||||
try {
|
||||
createPartitionedEsTable(db);
|
||||
createUnPartitionedEsTable(db);
|
||||
} catch (DdlException e) {
|
||||
// TODO Auto-generated catch block
|
||||
// e.printStackTrace();
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
public static void createPartitionedEsTable(Database db) throws DdlException {
|
||||
// columns
|
||||
List<Column> columns = new ArrayList<Column>();
|
||||
Column k1 = new Column("k1", PrimitiveType.DATE);
|
||||
k1.setIsKey(true);
|
||||
columns.add(k1);
|
||||
Column k2 = new Column("k2", PrimitiveType.INT);
|
||||
k2.setIsKey(true);
|
||||
columns.add(k2);
|
||||
columns.add(new Column("v", new ColumnType(PrimitiveType.DOUBLE), false, AggregateType.SUM, "0", ""));
|
||||
|
||||
// table
|
||||
List<Column> partitionColumns = Lists.newArrayList();
|
||||
List<SingleRangePartitionDesc> singleRangePartitionDescs = Lists.newArrayList();
|
||||
partitionColumns.add(k1);
|
||||
|
||||
singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1",
|
||||
new PartitionKeyDesc(Lists
|
||||
.newArrayList("100")),
|
||||
null));
|
||||
|
||||
RangePartitionInfo partitionInfo = new RangePartitionInfo(partitionColumns);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(EsTable.HOSTS, "xxx");
|
||||
properties.put(EsTable.INDEX, "indexa");
|
||||
properties.put(EsTable.PASSWORD, "");
|
||||
properties.put(EsTable.USER, "root");
|
||||
EsTable esTable = new EsTable(testPartitionedEsTableId1, testPartitionedEsTable1,
|
||||
columns, properties, partitionInfo);
|
||||
db.createTable(esTable);
|
||||
}
|
||||
|
||||
public static void createUnPartitionedEsTable(Database db) throws DdlException {
|
||||
// columns
|
||||
List<Column> columns = new ArrayList<Column>();
|
||||
Column k1 = new Column("k1", PrimitiveType.DATE);
|
||||
k1.setIsKey(true);
|
||||
columns.add(k1);
|
||||
Column k2 = new Column("k2", PrimitiveType.INT);
|
||||
k2.setIsKey(true);
|
||||
columns.add(k2);
|
||||
columns.add(new Column("v", new ColumnType(PrimitiveType.DOUBLE), false, AggregateType.SUM, "0", ""));
|
||||
|
||||
// table
|
||||
List<Column> partitionColumns = Lists.newArrayList();
|
||||
List<SingleRangePartitionDesc> singleRangePartitionDescs = Lists.newArrayList();
|
||||
partitionColumns.add(k1);
|
||||
|
||||
singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1",
|
||||
new PartitionKeyDesc(Lists
|
||||
.newArrayList("100")),
|
||||
null));
|
||||
|
||||
RangePartitionInfo partitionInfo = new RangePartitionInfo(partitionColumns);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(EsTable.HOSTS, "xxx");
|
||||
properties.put(EsTable.INDEX, "indexa");
|
||||
properties.put(EsTable.PASSWORD, "");
|
||||
properties.put(EsTable.USER, "root");
|
||||
EsTable esTable = new EsTable(testUnPartitionedEsTableId1, testUnPartitionedEsTable1,
|
||||
columns, properties, partitionInfo);
|
||||
db.createTable(esTable);
|
||||
}
|
||||
|
||||
public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort) {
|
||||
Backend backend = new Backend(id, host, heartPort);
|
||||
// backend.updateOnce(bePort, httpPort, 10000);
|
||||
backend.setAlive(true);
|
||||
return backend;
|
||||
}
|
||||
|
||||
public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort,
|
||||
long totalCapacityB, long availableCapacityB) {
|
||||
Backend backend = createBackend(id, host, heartPort, bePort, httpPort);
|
||||
Map<String, TDisk> backendDisks = new HashMap<String, TDisk>();
|
||||
String rootPath = "root_path";
|
||||
TDisk disk = new TDisk(rootPath, totalCapacityB, availableCapacityB, true);
|
||||
backendDisks.put(rootPath, disk);
|
||||
backend.updateDisks(backendDisks);
|
||||
backend.setAlive(true);
|
||||
return backend;
|
||||
}
|
||||
}
|
||||
31
fe/src/test/java/com/baidu/palo/catalog/FakeCatalog.java
Normal file
31
fe/src/test/java/com/baidu/palo/catalog/FakeCatalog.java
Normal file
@ -0,0 +1,31 @@
|
||||
package com.baidu.palo.catalog;
|
||||
|
||||
import com.baidu.palo.common.FeMetaVersion;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
|
||||
public class FakeCatalog extends MockUp<Catalog> {
|
||||
|
||||
private static Catalog catalog;
|
||||
|
||||
public static void setCatalog(Catalog catalog) {
|
||||
FakeCatalog.catalog = catalog;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public int getJournalVersion() {
|
||||
return FeMetaVersion.VERSION_45;
|
||||
}
|
||||
|
||||
@Mock
|
||||
private static Catalog getCurrentCatalog() {
|
||||
System.out.println("fake get current catalog is called");
|
||||
return catalog;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public static Catalog getInstance() {
|
||||
return catalog;
|
||||
}
|
||||
}
|
||||
74
fe/src/test/java/com/baidu/palo/catalog/FakeEditLog.java
Normal file
74
fe/src/test/java/com/baidu/palo/catalog/FakeEditLog.java
Normal file
@ -0,0 +1,74 @@
|
||||
package com.baidu.palo.catalog;
|
||||
|
||||
import com.baidu.palo.alter.RollupJob;
|
||||
import com.baidu.palo.alter.SchemaChangeJob;
|
||||
import com.baidu.palo.cluster.Cluster;
|
||||
import com.baidu.palo.persist.EditLog;
|
||||
import com.baidu.palo.transaction.TransactionState;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
|
||||
public class FakeEditLog extends MockUp<EditLog> {
|
||||
|
||||
private Map<Long, TransactionState> allTransactionState = new HashMap<>();
|
||||
|
||||
@Mock
|
||||
public void $init(String nodeName) {
|
||||
// do nothing
|
||||
System.out.println("abc");
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logInsertTransactionState(TransactionState transactionState) {
|
||||
// do nothing
|
||||
System.out.println("insert transaction manager is called");
|
||||
allTransactionState.put(transactionState.getTransactionId(), transactionState);
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logDeleteTransactionState(TransactionState transactionState) {
|
||||
// do nothing
|
||||
System.out.println("delete transaction state is deleted");
|
||||
allTransactionState.remove(transactionState.getTransactionId());
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logSaveNextId(long nextId) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logCreateCluster(Cluster cluster) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logStartRollup(RollupJob rollupJob) {
|
||||
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logFinishingRollup(RollupJob rollupJob) {
|
||||
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logCancelRollup(RollupJob rollupJob) {
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logStartSchemaChange(SchemaChangeJob schemaChangeJob) {
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logFinishingSchemaChange(SchemaChangeJob schemaChangeJob) {
|
||||
}
|
||||
|
||||
public TransactionState getTransaction(long transactionId) {
|
||||
return allTransactionState.get(transactionId);
|
||||
}
|
||||
}
|
||||
107
fe/src/test/java/com/baidu/palo/catalog/MetadataViewerTest.java
Normal file
107
fe/src/test/java/com/baidu/palo/catalog/MetadataViewerTest.java
Normal file
@ -0,0 +1,107 @@
|
||||
package com.baidu.palo.catalog;
|
||||
|
||||
import com.baidu.palo.analysis.BinaryPredicate.Operator;
|
||||
import com.baidu.palo.backup.CatalogMocker;
|
||||
import com.baidu.palo.catalog.Replica.ReplicaStatus;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.system.SystemInfoService;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Mocked;
|
||||
import mockit.NonStrictExpectations;
|
||||
|
||||
public class MetadataViewerTest {
|
||||
|
||||
private static Method getTabletStatusMethod;
|
||||
private static Method getTabletDistributionMethod;
|
||||
|
||||
@Mocked
|
||||
private Catalog catalog;
|
||||
|
||||
@Mocked
|
||||
private SystemInfoService infoService;
|
||||
|
||||
private static Database db;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws NoSuchMethodException, SecurityException, InstantiationException,
|
||||
IllegalAccessException, IllegalArgumentException, InvocationTargetException, AnalysisException {
|
||||
Class[] argTypes = new Class[] { String.class, String.class, List.class, ReplicaStatus.class, Operator.class };
|
||||
getTabletStatusMethod = MetadataViewer.class.getDeclaredMethod("getTabletStatus", argTypes);
|
||||
getTabletStatusMethod.setAccessible(true);
|
||||
|
||||
argTypes = new Class[] { String.class, String.class, List.class };
|
||||
getTabletDistributionMethod = MetadataViewer.class.getDeclaredMethod("getTabletDistribution", argTypes);
|
||||
getTabletDistributionMethod.setAccessible(true);
|
||||
|
||||
db = CatalogMocker.mockDb();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
|
||||
new NonStrictExpectations() {
|
||||
{
|
||||
Catalog.getCurrentCatalog();
|
||||
minTimes = 0;
|
||||
result = catalog;
|
||||
|
||||
catalog.getDb(anyString);
|
||||
result = db;
|
||||
}
|
||||
};
|
||||
|
||||
new NonStrictExpectations() {
|
||||
{
|
||||
Catalog.getCurrentSystemInfo();
|
||||
minTimes = 0;
|
||||
result = infoService;
|
||||
|
||||
infoService.getBackendIds(anyBoolean);
|
||||
result = Lists.newArrayList(10000L, 10001L, 10002L);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTabletStatus()
|
||||
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||
List<String> partitions = Lists.newArrayList();
|
||||
Object[] args = new Object[] { CatalogMocker.TEST_DB_NAME, CatalogMocker.TEST_TBL_NAME, partitions, null,
|
||||
null };
|
||||
List<List<String>> result = (List<List<String>>) getTabletStatusMethod.invoke(null, args);
|
||||
Assert.assertEquals(3, result.size());
|
||||
System.out.println(result);
|
||||
|
||||
args = new Object[] { CatalogMocker.TEST_DB_NAME, CatalogMocker.TEST_TBL_NAME, partitions, ReplicaStatus.DEAD,
|
||||
Operator.EQ };
|
||||
result = (List<List<String>>) getTabletStatusMethod.invoke(null, args);
|
||||
Assert.assertEquals(3, result.size());
|
||||
|
||||
args = new Object[] { CatalogMocker.TEST_DB_NAME, CatalogMocker.TEST_TBL_NAME, partitions, ReplicaStatus.DEAD,
|
||||
Operator.NE };
|
||||
result = (List<List<String>>) getTabletStatusMethod.invoke(null, args);
|
||||
Assert.assertEquals(0, result.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTabletDistribution()
|
||||
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||
List<String> partitions = Lists.newArrayList();
|
||||
Object[] args = new Object[] { CatalogMocker.TEST_DB_NAME, CatalogMocker.TEST_TBL_NAME, partitions };
|
||||
List<List<String>> result = (List<List<String>>) getTabletDistributionMethod.invoke(null, args);
|
||||
Assert.assertEquals(3, result.size());
|
||||
System.out.println(result);
|
||||
}
|
||||
|
||||
}
|
||||
206
fe/src/test/java/com/baidu/palo/es/EsStateStoreTest.java
Normal file
206
fe/src/test/java/com/baidu/palo/es/EsStateStoreTest.java
Normal file
@ -0,0 +1,206 @@
|
||||
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.baidu.palo.es;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.baidu.palo.catalog.Catalog;
|
||||
import com.baidu.palo.catalog.CatalogTestUtil;
|
||||
import com.baidu.palo.catalog.EsTable;
|
||||
import com.baidu.palo.catalog.FakeCatalog;
|
||||
import com.baidu.palo.catalog.FakeEditLog;
|
||||
import com.baidu.palo.catalog.PartitionKey;
|
||||
import com.baidu.palo.catalog.RangePartitionInfo;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.common.FeMetaVersion;
|
||||
import com.baidu.palo.external.EsIndexState;
|
||||
import com.baidu.palo.external.EsStateStore;
|
||||
import com.baidu.palo.external.EsTableState;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
public class EsStateStoreTest {
|
||||
|
||||
private static FakeEditLog fakeEditLog;
|
||||
private static FakeCatalog fakeCatalog;
|
||||
private static Catalog masterCatalog;
|
||||
private static String clusterStateStr1 = "";
|
||||
private static String clusterStateStr2 = "";
|
||||
private static String clusterStateStr3 = "";
|
||||
private EsStateStore esStateStore;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws IOException, InstantiationException, IllegalAccessException,
|
||||
IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException,
|
||||
URISyntaxException {
|
||||
fakeEditLog = new FakeEditLog();
|
||||
fakeCatalog = new FakeCatalog();
|
||||
masterCatalog = CatalogTestUtil.createTestCatalog();
|
||||
masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
clusterStateStr1 = loadJsonFromFile("data/es/clusterstate1.json");
|
||||
clusterStateStr2 = loadJsonFromFile("data/es/clusterstate2.json");
|
||||
clusterStateStr3 = loadJsonFromFile("data/es/clusterstate3.json");
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
esStateStore = new EsStateStore();
|
||||
}
|
||||
|
||||
/**
|
||||
* partitioned es table schema: k1(date), k2(int), v(double)
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
@Test
|
||||
public void testParsePartitionedClusterState() throws AnalysisException {
|
||||
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
|
||||
.getDb(CatalogTestUtil.testDb1)
|
||||
.getTable(CatalogTestUtil.testPartitionedEsTable1);
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr1, esTable);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
hasException = true;
|
||||
}
|
||||
assertFalse(hasException);
|
||||
assertNotNull(esTableState);
|
||||
assertEquals(2, esTableState.getPartitionedIndexStates().size());
|
||||
RangePartitionInfo definedPartInfo = (RangePartitionInfo) esTable.getPartitionInfo();
|
||||
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) esTableState.getPartitionInfo();
|
||||
Map<Long, Range<PartitionKey>> rangeMap = rangePartitionInfo.getIdToRange();
|
||||
assertEquals(2, rangeMap.size());
|
||||
Range<PartitionKey> part0 = rangeMap.get(new Long(0));
|
||||
EsIndexState esIndexState1 = esTableState.getIndexState(0);
|
||||
assertEquals(5, esIndexState1.getShardRoutings().size());
|
||||
assertEquals("index1", esIndexState1.getIndexName());
|
||||
PartitionKey lowKey = PartitionKey.createInfinityPartitionKey(definedPartInfo.getPartitionColumns(), false);
|
||||
PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList("2018-10-01"),
|
||||
definedPartInfo.getPartitionColumns());
|
||||
Range<PartitionKey> newRange = Range.closedOpen(lowKey, upperKey);
|
||||
assertEquals(newRange, part0);
|
||||
Range<PartitionKey> part1 = rangeMap.get(new Long(1));
|
||||
EsIndexState esIndexState2 = esTableState.getIndexState(1);
|
||||
assertEquals("index2", esIndexState2.getIndexName());
|
||||
lowKey = PartitionKey.createPartitionKey(Lists.newArrayList("2018-10-01"),
|
||||
definedPartInfo.getPartitionColumns());
|
||||
upperKey = PartitionKey.createPartitionKey(Lists.newArrayList("2018-10-02"),
|
||||
definedPartInfo.getPartitionColumns());
|
||||
newRange = Range.closedOpen(lowKey, upperKey);
|
||||
assertEquals(newRange, part1);
|
||||
assertEquals(6, esIndexState2.getShardRoutings().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* partitioned es table schema: k1(date), k2(int), v(double)
|
||||
* scenario desc:
|
||||
* 2 indices, one with partition desc, the other does not contains partition desc
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
@Test
|
||||
public void testParsePartitionedClusterStateTwoIndices() throws AnalysisException {
|
||||
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
|
||||
.getDb(CatalogTestUtil.testDb1)
|
||||
.getTable(CatalogTestUtil.testPartitionedEsTable1);
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr3, esTable);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
hasException = true;
|
||||
}
|
||||
assertFalse(hasException);
|
||||
assertNotNull(esTableState);
|
||||
|
||||
// check
|
||||
assertEquals(1, esTableState.getPartitionedIndexStates().size());
|
||||
assertEquals(1, esTableState.getUnPartitionedIndexStates().size());
|
||||
|
||||
// check partition info
|
||||
RangePartitionInfo definedPartInfo = (RangePartitionInfo) esTable.getPartitionInfo();
|
||||
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) esTableState.getPartitionInfo();
|
||||
Map<Long, Range<PartitionKey>> rangeMap = rangePartitionInfo.getIdToRange();
|
||||
assertEquals(1, rangeMap.size());
|
||||
Range<PartitionKey> part0 = rangeMap.get(new Long(0));
|
||||
EsIndexState esIndexState1 = esTableState.getIndexState(0);
|
||||
assertEquals(5, esIndexState1.getShardRoutings().size());
|
||||
assertEquals("index1", esIndexState1.getIndexName());
|
||||
PartitionKey lowKey = PartitionKey.createInfinityPartitionKey(definedPartInfo.getPartitionColumns(), false);
|
||||
PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList("2018-10-01"),
|
||||
definedPartInfo.getPartitionColumns());
|
||||
Range<PartitionKey> newRange = Range.closedOpen(lowKey, upperKey);
|
||||
assertEquals(newRange, part0);
|
||||
|
||||
// check index with no partition desc
|
||||
EsIndexState esIndexState2 = esTableState.getUnPartitionedIndexStates().get("index2");
|
||||
assertEquals("index2", esIndexState2.getIndexName());
|
||||
assertEquals(6, esIndexState2.getShardRoutings().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* partitioned es table schema: k1(date), k2(int), v(double)
|
||||
* "upperbound": "2018" is not a valid date value, so parsing procedure will fail
|
||||
*/
|
||||
@Test
|
||||
public void testParseInvalidUpperbound() {
|
||||
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
|
||||
.getDb(CatalogTestUtil.testDb1)
|
||||
.getTable(CatalogTestUtil.testPartitionedEsTable1);
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr2, esTable);
|
||||
} catch (Exception e) {
|
||||
hasException = true;
|
||||
}
|
||||
assertTrue(hasException);
|
||||
assertTrue(esTableState == null);
|
||||
}
|
||||
|
||||
private static String loadJsonFromFile(String fileName) throws IOException, URISyntaxException {
|
||||
File file = new File(EsStateStoreTest.class.getClassLoader().getResource(fileName).toURI());
|
||||
InputStream is = new FileInputStream(file);
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(is));
|
||||
StringBuilder jsonStr = new StringBuilder();
|
||||
String line = "";
|
||||
while ((line = br.readLine()) != null) {
|
||||
jsonStr.append(line);
|
||||
}
|
||||
br.close();
|
||||
is.close();
|
||||
return jsonStr.toString();
|
||||
}
|
||||
}
|
||||
69
fe/src/test/java/com/baidu/palo/es/EsUtilTest.java
Normal file
69
fe/src/test/java/com/baidu/palo/es/EsUtilTest.java
Normal file
@ -0,0 +1,69 @@
|
||||
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.baidu.palo.es;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.baidu.palo.external.EsUtil;
|
||||
|
||||
public class EsUtilTest {
|
||||
|
||||
private String jsonStr = "{\"settings\": {\n"
|
||||
+ " \"index\": {\n"
|
||||
+ " \"bpack\": {\n"
|
||||
+ " \"partition\": {\n"
|
||||
+ " \"upperbound\": \"12\"\n"
|
||||
+ " }\n"
|
||||
+ " },\n"
|
||||
+ " \"number_of_shards\": \"5\",\n"
|
||||
+ " \"provided_name\": \"indexa\",\n"
|
||||
+ " \"creation_date\": \"1539328532060\",\n"
|
||||
+ " \"number_of_replicas\": \"1\",\n"
|
||||
+ " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
|
||||
+ " \"version\": {\n"
|
||||
+ " \"created\": \"5050099\"\n"
|
||||
+ " }\n"
|
||||
+ " }\n"
|
||||
+ " }}";
|
||||
|
||||
@Test
|
||||
public void testGetJsonObject() {
|
||||
JSONObject json = new JSONObject(jsonStr);
|
||||
JSONObject upperBoundSetting = EsUtil.getJsonObject(json, "settings.index.bpack.partition", 0);
|
||||
assertTrue(upperBoundSetting.has("upperbound"));
|
||||
assertEquals("12", upperBoundSetting.getString("upperbound"));
|
||||
|
||||
JSONObject unExistKey = EsUtil.getJsonObject(json, "set", 0);
|
||||
assertNull(unExistKey);
|
||||
|
||||
JSONObject singleKey = EsUtil.getJsonObject(json, "settings", 0);
|
||||
assertTrue(singleKey.has("index"));
|
||||
}
|
||||
|
||||
@Test(expected = JSONException.class)
|
||||
public void testGetJsonObjectWithException() {
|
||||
JSONObject json = new JSONObject(jsonStr);
|
||||
// only support json object could not get string value directly from this api, exception will be threw
|
||||
EsUtil.getJsonObject(json, "settings.index.bpack.partition.upperbound", 0);
|
||||
}
|
||||
|
||||
}
|
||||
174
fe/src/test/java/com/baidu/palo/planner/OlapTableSinkTest.java
Normal file
174
fe/src/test/java/com/baidu/palo/planner/OlapTableSinkTest.java
Normal file
@ -0,0 +1,174 @@
|
||||
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.baidu.palo.planner;
|
||||
|
||||
import com.baidu.palo.analysis.DescriptorTable;
|
||||
import com.baidu.palo.analysis.SlotDescriptor;
|
||||
import com.baidu.palo.analysis.TupleDescriptor;
|
||||
import com.baidu.palo.catalog.Column;
|
||||
import com.baidu.palo.catalog.ColumnType;
|
||||
import com.baidu.palo.catalog.HashDistributionInfo;
|
||||
import com.baidu.palo.catalog.MaterializedIndex;
|
||||
import com.baidu.palo.catalog.OlapTable;
|
||||
import com.baidu.palo.catalog.Partition;
|
||||
import com.baidu.palo.catalog.PartitionKey;
|
||||
import com.baidu.palo.catalog.PartitionType;
|
||||
import com.baidu.palo.catalog.PrimitiveType;
|
||||
import com.baidu.palo.catalog.RangePartitionInfo;
|
||||
import com.baidu.palo.catalog.SinglePartitionInfo;
|
||||
import com.baidu.palo.catalog.Tablet;
|
||||
import com.baidu.palo.common.UserException;
|
||||
import com.baidu.palo.system.Backend;
|
||||
import com.baidu.palo.system.SystemInfoService;
|
||||
import com.baidu.palo.thrift.TExplainLevel;
|
||||
import com.baidu.palo.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Range;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.Test;
|
||||
|
||||
public class OlapTableSinkTest {
|
||||
private static final Logger LOG = LogManager.getLogger(OlapTableSinkTest.class);
|
||||
|
||||
@Injectable
|
||||
OlapTable dstTable;
|
||||
|
||||
@Mocked
|
||||
SystemInfoService systemInfoService;
|
||||
|
||||
private TupleDescriptor getTuple() {
|
||||
DescriptorTable descTable = new DescriptorTable();
|
||||
TupleDescriptor tuple = descTable.createTupleDescriptor("DstTable");
|
||||
// k1
|
||||
SlotDescriptor k1 = descTable.addSlotDescriptor(tuple);
|
||||
k1.setColumn(new Column("k1", PrimitiveType.BIGINT));
|
||||
k1.setIsMaterialized(true);
|
||||
|
||||
// k2
|
||||
SlotDescriptor k2 = descTable.addSlotDescriptor(tuple);
|
||||
k2.setColumn(new Column("k2", new ColumnType(PrimitiveType.VARCHAR, 25, 12, 1)));
|
||||
k2.setIsMaterialized(true);
|
||||
// v1
|
||||
SlotDescriptor v1 = descTable.addSlotDescriptor(tuple);
|
||||
v1.setColumn(new Column("v1", new ColumnType(PrimitiveType.VARCHAR, 25, 12, 1)));
|
||||
v1.setIsMaterialized(true);
|
||||
// v2
|
||||
SlotDescriptor v2 = descTable.addSlotDescriptor(tuple);
|
||||
v2.setColumn(new Column("v2", PrimitiveType.BIGINT));
|
||||
v2.setIsMaterialized(true);
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSinglePartition() throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
SinglePartitionInfo partInfo = new SinglePartitionInfo();
|
||||
partInfo.setReplicationNum(2, (short) 3);
|
||||
MaterializedIndex index = new MaterializedIndex(2, MaterializedIndex.IndexState.NORMAL);
|
||||
HashDistributionInfo distInfo = new HashDistributionInfo(
|
||||
2, Lists.newArrayList(new Column("k1", PrimitiveType.BIGINT)));
|
||||
Partition partition = new Partition(2, "p1", index, distInfo);
|
||||
|
||||
new Expectations() {{
|
||||
dstTable.getId(); result = 1;
|
||||
dstTable.getPartitionInfo(); result = partInfo;
|
||||
dstTable.getPartitions(); result = Lists.newArrayList(partition);
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple);
|
||||
sink.init(new TUniqueId(1, 2), 3, 4);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRangePartition(
|
||||
@Injectable RangePartitionInfo partInfo,
|
||||
@Injectable MaterializedIndex index) throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
|
||||
HashDistributionInfo distInfo = new HashDistributionInfo(
|
||||
2, Lists.newArrayList(new Column("k1", PrimitiveType.BIGINT)));
|
||||
|
||||
Column partKey = new Column("k2", PrimitiveType.VARCHAR);
|
||||
PartitionKey key = PartitionKey.createPartitionKey(Lists.newArrayList("123"), Lists.newArrayList(partKey));
|
||||
Partition p1 = new Partition(1, "p1", index, distInfo);
|
||||
Partition p2 = new Partition(2, "p2", index, distInfo);
|
||||
|
||||
new Expectations() {{
|
||||
dstTable.getId(); result = 1;
|
||||
dstTable.getPartitionInfo(); result = partInfo;
|
||||
partInfo.getType(); result = PartitionType.RANGE;
|
||||
partInfo.getPartitionColumns(); result = Lists.newArrayList(partKey);
|
||||
partInfo.getRange(1); result = Range.lessThan(key);
|
||||
// partInfo.getRange(2); result = Range.atLeast(key);
|
||||
dstTable.getPartitions(); result = Lists.newArrayList(p1, p2);
|
||||
dstTable.getPartition("p1"); result = p1;
|
||||
|
||||
index.getTablets(); result = Lists.newArrayList(new Tablet(1));
|
||||
systemInfoService.getBackendIds(anyBoolean); result = Lists.newArrayList(new Long(1));
|
||||
systemInfoService.getBackend(new Long(1)); result = new Backend(1, "abc", 1234);
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1");
|
||||
sink.init(new TUniqueId(1, 2), 3, 4);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testRangeUnknownPartition(
|
||||
@Injectable RangePartitionInfo partInfo,
|
||||
@Injectable MaterializedIndex index) throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
|
||||
new Expectations() {{
|
||||
partInfo.getType(); result = PartitionType.RANGE;
|
||||
dstTable.getPartition("p3"); result = null;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p3");
|
||||
sink.init(new TUniqueId(1, 2), 3, 4);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testUnpartFail(
|
||||
@Injectable RangePartitionInfo partInfo,
|
||||
@Injectable MaterializedIndex index) throws UserException {
|
||||
TupleDescriptor tuple = getTuple();
|
||||
|
||||
new Expectations() {{
|
||||
partInfo.getType(); result = PartitionType.UNPARTITIONED;
|
||||
}};
|
||||
|
||||
OlapTableSink sink = new OlapTableSink(dstTable, tuple, "p1");
|
||||
sink.init(new TUniqueId(1, 2), 3, 4);
|
||||
sink.finalize();
|
||||
LOG.info("sink is {}", sink.toThrift());
|
||||
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,74 @@
|
||||
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.baidu.palo.planner;
|
||||
|
||||
import com.baidu.palo.analysis.Analyzer;
|
||||
import com.baidu.palo.catalog.Column;
|
||||
import com.baidu.palo.catalog.Database;
|
||||
import com.baidu.palo.catalog.OlapTable;
|
||||
import com.baidu.palo.catalog.PrimitiveType;
|
||||
import com.baidu.palo.common.UserException;
|
||||
import com.baidu.palo.thrift.TStreamLoadPutRequest;
|
||||
import com.baidu.palo.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
import mockit.NonStrictExpectations;
|
||||
|
||||
public class StreamLoadPlannerTest {
|
||||
@Injectable
|
||||
Database db;
|
||||
|
||||
@Injectable
|
||||
OlapTable destTable;
|
||||
|
||||
@Mocked
|
||||
StreamLoadScanNode scanNode;
|
||||
|
||||
@Mocked
|
||||
OlapTableSink sink;
|
||||
|
||||
@Test
|
||||
public void testNormalPlan() throws UserException {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
Column c1 = new Column("c1", PrimitiveType.BIGINT, false);
|
||||
columns.add(c1);
|
||||
Column c2 = new Column("c2", PrimitiveType.BIGINT, true);
|
||||
columns.add(c2);
|
||||
new NonStrictExpectations() {
|
||||
{
|
||||
destTable.getBaseSchema();
|
||||
result = columns;
|
||||
scanNode.init((Analyzer) any);
|
||||
scanNode.getChildren();
|
||||
result = Lists.newArrayList();
|
||||
scanNode.getId();
|
||||
result = new PlanNodeId(5);
|
||||
}
|
||||
};
|
||||
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
|
||||
request.setTxnId(1);
|
||||
request.setLoadId(new TUniqueId(2, 3));
|
||||
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, request);
|
||||
planner.plan();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,522 @@
|
||||
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.baidu.palo.planner;
|
||||
|
||||
import com.baidu.palo.analysis.Analyzer;
|
||||
import com.baidu.palo.analysis.CastExpr;
|
||||
import com.baidu.palo.analysis.DescriptorTable;
|
||||
import com.baidu.palo.analysis.FunctionName;
|
||||
import com.baidu.palo.analysis.SlotDescriptor;
|
||||
import com.baidu.palo.analysis.TupleDescriptor;
|
||||
import com.baidu.palo.catalog.AggregateType;
|
||||
import com.baidu.palo.catalog.Catalog;
|
||||
import com.baidu.palo.catalog.Column;
|
||||
import com.baidu.palo.catalog.ColumnType;
|
||||
import com.baidu.palo.catalog.Function;
|
||||
import com.baidu.palo.catalog.OlapTable;
|
||||
import com.baidu.palo.catalog.PrimitiveType;
|
||||
import com.baidu.palo.catalog.ScalarFunction;
|
||||
import com.baidu.palo.catalog.Type;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.common.UserException;
|
||||
import com.baidu.palo.qe.ConnectContext;
|
||||
import com.baidu.palo.thrift.TExplainLevel;
|
||||
import com.baidu.palo.thrift.TFileType;
|
||||
import com.baidu.palo.thrift.TPlanNode;
|
||||
import com.baidu.palo.thrift.TStreamLoadPutRequest;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class StreamLoadScanNodeTest {
|
||||
private static final Logger LOG = LogManager.getLogger(StreamLoadScanNodeTest.class);
|
||||
|
||||
@Mocked
|
||||
Catalog catalog;
|
||||
|
||||
@Injectable
|
||||
ConnectContext connectContext;
|
||||
|
||||
@Injectable
|
||||
OlapTable dstTable;
|
||||
|
||||
@Mocked
|
||||
CastExpr castExpr;
|
||||
|
||||
TStreamLoadPutRequest getBaseRequest() {
|
||||
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
|
||||
request.setFileType(TFileType.FILE_STREAM);
|
||||
return request;
|
||||
}
|
||||
|
||||
List<Column> getBaseSchema() {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
|
||||
Column k1 = new Column("k1", PrimitiveType.BIGINT);
|
||||
k1.setIsKey(true);
|
||||
k1.setIsAllowNull(false);
|
||||
columns.add(k1);
|
||||
|
||||
Column k2 = new Column("k2", new ColumnType(PrimitiveType.VARCHAR, 25, 10, 5));
|
||||
k2.setIsKey(true);
|
||||
k2.setIsAllowNull(true);
|
||||
columns.add(k2);
|
||||
|
||||
Column v1 = new Column("v1", PrimitiveType.BIGINT);
|
||||
v1.setIsKey(false);
|
||||
v1.setIsAllowNull(true);
|
||||
v1.setAggregationType(AggregateType.SUM, false);
|
||||
|
||||
columns.add(v1);
|
||||
|
||||
Column v2 = new Column("v2", new ColumnType(PrimitiveType.VARCHAR, 25, 10, 5));
|
||||
v2.setIsKey(false);
|
||||
v2.setAggregationType(AggregateType.REPLACE, false);
|
||||
v2.setIsAllowNull(false);
|
||||
columns.add(v2);
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
List<Column> getHllSchema() {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
|
||||
Column k1 = new Column("k1", PrimitiveType.BIGINT);
|
||||
k1.setIsKey(true);
|
||||
k1.setIsAllowNull(false);
|
||||
columns.add(k1);
|
||||
|
||||
Column v1 = new Column("v1", PrimitiveType.HLL);
|
||||
v1.setIsKey(false);
|
||||
v1.setIsAllowNull(true);
|
||||
v1.setAggregationType(AggregateType.HLL_UNION, false);
|
||||
|
||||
columns.add(v1);
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNormal() throws UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
new Expectations() {{
|
||||
dstTable.getBaseSchema(); result = columns;
|
||||
castExpr.analyze((Analyzer) any);
|
||||
}};
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
|
||||
Assert.assertEquals(1, scanNode.getNumInstances());
|
||||
Assert.assertEquals(1, scanNode.getScanRangeLocations(0).size());
|
||||
}
|
||||
|
||||
@Test(expected = AnalysisException.class)
|
||||
public void testLostV2() throws UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1, k2, v1");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = AnalysisException.class)
|
||||
public void testBadColumns() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1 k2 v1");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumnsNormal() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setFileType(TFileType.FILE_LOCAL);
|
||||
request.setColumns("k1,k2,v1, v2=k2");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHllColumnsNormal() throws UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getHllSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
new Expectations() {{
|
||||
catalog.getFunction((Function) any, (Function.CompareMode) any);
|
||||
result = new ScalarFunction(new FunctionName("hll_hash"), Lists.newArrayList(), Type.BIGINT, false);
|
||||
}};
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setFileType(TFileType.FILE_LOCAL);
|
||||
request.setColumns("k1,k2, v1=hll_hash(k2)");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testHllColumnsNoHllHash() throws UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getHllSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
new Expectations() {{
|
||||
catalog.getFunction((Function) any, (Function.CompareMode) any);
|
||||
result = new ScalarFunction(new FunctionName("hll_hash1"), Lists.newArrayList(), Type.BIGINT, false);
|
||||
}};
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setFileType(TFileType.FILE_LOCAL);
|
||||
request.setColumns("k1,k2, v1=hll_hash1(k2)");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testHllColumnsFail() throws UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getHllSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setFileType(TFileType.FILE_LOCAL);
|
||||
request.setColumns("k1,k2, v1=k2");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testUnsupportedFType() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setFileType(TFileType.FILE_BROKER);
|
||||
request.setColumns("k1,k2,v1, v2=k2");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testColumnsUnknownRef() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1,k2,v1, v2=k3");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhereNormal() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1,k2,v1, v2=k1");
|
||||
request.setWhere("k1 = 1");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = AnalysisException.class)
|
||||
public void testWhereBad() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1,k2,v1, v2=k2");
|
||||
request.setWhere("k1 1");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testWhereUnknownRef() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1,k2,v1, v2=k1");
|
||||
request.setWhere("k5 = 1");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
|
||||
@Test(expected = UserException.class)
|
||||
public void testWhereNotBool() throws UserException, UserException {
|
||||
Analyzer analyzer = new Analyzer(catalog, connectContext);
|
||||
DescriptorTable descTbl = analyzer.getDescTbl();
|
||||
|
||||
List<Column> columns = getBaseSchema();
|
||||
TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc");
|
||||
for (Column column : columns) {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
request.setColumns("k1,k2,v1, v2=k1");
|
||||
request.setWhere("k1 + v2");
|
||||
StreamLoadScanNode scanNode = new StreamLoadScanNode(new PlanNodeId(1), dstDesc, dstTable, request);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNode.getNodeExplainString("", TExplainLevel.NORMAL);
|
||||
TPlanNode planNode = new TPlanNode();
|
||||
scanNode.toThrift(planNode);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package com.baidu.palo.transaction;
|
||||
|
||||
import com.baidu.palo.persist.EditLog;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
|
||||
public final class FakeTransactionIDGenerator extends MockUp<TransactionIDGenerator> {
|
||||
|
||||
private long currentId = 1000L;
|
||||
|
||||
@Mock
|
||||
public void $init() {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void setEditLog(EditLog editLog) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Mock
|
||||
public synchronized long getNextTransactionId() {
|
||||
System.out.println("getNextTransactionId is called");
|
||||
return currentId++;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void write(DataOutput out) throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
public void setCurrentId(long newId) {
|
||||
this.currentId = newId;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,448 @@
|
||||
package com.baidu.palo.transaction;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.baidu.palo.catalog.Catalog;
|
||||
import com.baidu.palo.catalog.CatalogTestUtil;
|
||||
import com.baidu.palo.catalog.FakeCatalog;
|
||||
import com.baidu.palo.catalog.FakeEditLog;
|
||||
import com.baidu.palo.catalog.Partition;
|
||||
import com.baidu.palo.catalog.Replica;
|
||||
import com.baidu.palo.catalog.Tablet;
|
||||
import com.baidu.palo.common.AnalysisException;
|
||||
import com.baidu.palo.common.FeMetaVersion;
|
||||
import com.baidu.palo.common.MetaNotFoundException;
|
||||
import com.baidu.palo.transaction.TransactionState.LoadJobSourceType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class GlobalTransactionMgrTest {
|
||||
|
||||
private static FakeEditLog fakeEditLog;
|
||||
private static FakeCatalog fakeCatalog;
|
||||
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
|
||||
private static GlobalTransactionMgr masterTransMgr;
|
||||
private static GlobalTransactionMgr slaveTransMgr;
|
||||
private static Catalog masterCatalog;
|
||||
private static Catalog slaveCatalog;
|
||||
|
||||
private String transactionSource = "localfe";
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException, SecurityException {
|
||||
fakeEditLog = new FakeEditLog();
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
|
||||
masterCatalog = CatalogTestUtil.createTestCatalog();
|
||||
slaveCatalog = CatalogTestUtil.createTestCatalog();
|
||||
masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
|
||||
masterTransMgr = masterCatalog.getGlobalTransactionMgr();
|
||||
masterTransMgr.setEditLog(masterCatalog.getEditLog());
|
||||
|
||||
slaveTransMgr = slaveCatalog.getGlobalTransactionMgr();
|
||||
slaveTransMgr.setEditLog(slaveCatalog.getEditLog());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginTransaction() throws LabelAlreadyExistsException, AnalysisException,
|
||||
BeginTransactionException {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertNotNull(transactionState);
|
||||
assertEquals(transactionId, transactionState.getTransactionId());
|
||||
assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());
|
||||
assertEquals(CatalogTestUtil.testDbId1, transactionState.getDbId());
|
||||
assertEquals(transactionSource, transactionState.getCoordinator());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginTransactionWithSameLabel() throws LabelAlreadyExistsException, AnalysisException,
|
||||
BeginTransactionException {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
long transactionId = 0;
|
||||
Throwable throwable = null;
|
||||
try {
|
||||
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
} catch (AnalysisException e) {
|
||||
e.printStackTrace();
|
||||
} catch (LabelAlreadyExistsException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertNotNull(transactionState);
|
||||
assertEquals(transactionId, transactionState.getTransactionId());
|
||||
assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());
|
||||
assertEquals(CatalogTestUtil.testDbId1, transactionState.getDbId());
|
||||
assertEquals(transactionSource, transactionState.getCoordinator());
|
||||
|
||||
try {
|
||||
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
} catch (Exception e) {
|
||||
// TODO: handle exception
|
||||
}
|
||||
}
|
||||
|
||||
// all replica committed success
|
||||
@Test
|
||||
public void testCommitTransaction1() throws MetaNotFoundException,
|
||||
TransactionCommitFailedException,
|
||||
IllegalTransactionParameterException, LabelAlreadyExistsException,
|
||||
AnalysisException, BeginTransactionException {
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId2);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
// check status is committed
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
// check replica version
|
||||
Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion());
|
||||
// check partition next version
|
||||
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica.getVersion());
|
||||
}
|
||||
// slave replay new state and compare catalog
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
}
|
||||
|
||||
// commit with only two replicas
|
||||
@Test
|
||||
public void testCommitTransactionWithOneFailed() throws MetaNotFoundException,
|
||||
TransactionCommitFailedException,
|
||||
IllegalTransactionParameterException, LabelAlreadyExistsException,
|
||||
AnalysisException, BeginTransactionException {
|
||||
TransactionState transactionState = null;
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction with 1,2 success
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId2);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
|
||||
// follower catalog replay the transaction
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
// commit another transaction with 1,3 success
|
||||
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable2,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
// check status is prepare, because the commit failed
|
||||
assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());
|
||||
// check replica version
|
||||
Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion());
|
||||
// check partition next version
|
||||
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica.getVersion());
|
||||
}
|
||||
// the transaction not committed, so that catalog should be equal
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
|
||||
// commit the second transaction with 1,2,3 success
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId2);
|
||||
tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3);
|
||||
transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
// check status is commit
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
// check replica version
|
||||
testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion());
|
||||
// check partition next version
|
||||
tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replica.getVersion());
|
||||
}
|
||||
Replica replcia1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
|
||||
Replica replcia2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
|
||||
Replica replcia3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia3.getVersion());
|
||||
assertEquals(-1, replcia1.getLastFailedVersion());
|
||||
assertEquals(-1, replcia2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia3.getLastFailedVersion());
|
||||
|
||||
// last success version not change, because not published
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia2.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia3.getLastSuccessVersion());
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion());
|
||||
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
}
|
||||
|
||||
public void testFinishTransaction() throws MetaNotFoundException, TransactionCommitFailedException,
|
||||
IllegalTransactionParameterException, LabelAlreadyExistsException,
|
||||
AnalysisException, BeginTransactionException {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId2);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
errorReplicaIds.add(CatalogTestUtil.testReplicaId1);
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
|
||||
// check replica version
|
||||
Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getNextVersion());
|
||||
// check partition next version
|
||||
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replica.getVersion());
|
||||
}
|
||||
// slave replay new state and compare catalog
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishTransactionWithOneFailed() throws MetaNotFoundException,
|
||||
TransactionCommitFailedException,
|
||||
IllegalTransactionParameterException, LabelAlreadyExistsException,
|
||||
AnalysisException, BeginTransactionException {
|
||||
TransactionState transactionState = null;
|
||||
Partition testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
// commit a transaction with 1,2 success
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId2);
|
||||
List<TabletCommitInfo> transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId, transTablets);
|
||||
|
||||
// follower catalog replay the transaction
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
|
||||
// master finish the transaction failed
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
Replica replcia1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
|
||||
Replica replcia2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
|
||||
Replica replcia3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia3.getVersion());
|
||||
assertEquals(-1, replcia1.getLastFailedVersion());
|
||||
assertEquals(-1, replcia2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia3.getLastFailedVersion());
|
||||
|
||||
errorReplicaIds = Sets.newHashSet();
|
||||
masterTransMgr.finishTransaction(transactionId, errorReplicaIds);
|
||||
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia3.getVersion());
|
||||
assertEquals(-1, replcia1.getLastFailedVersion());
|
||||
assertEquals(-1, replcia2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia3.getLastFailedVersion());
|
||||
|
||||
// follower catalog replay the transaction
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
// commit another transaction with 1,3 success
|
||||
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable2,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
// check status is prepare, because the commit failed
|
||||
assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());
|
||||
|
||||
// commit the second transaction with 1,2,3 success
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
tabletCommitInfo2 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId2);
|
||||
tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3);
|
||||
transTablets = Lists.newArrayList();
|
||||
transTablets.add(tabletCommitInfo1);
|
||||
transTablets.add(tabletCommitInfo2);
|
||||
transTablets.add(tabletCommitInfo3);
|
||||
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, transactionId2, transTablets);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
// check status is commit
|
||||
assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
|
||||
// check replica version
|
||||
testPartition = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1)
|
||||
.getPartition(CatalogTestUtil.testPartition1);
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion());
|
||||
|
||||
// follower catalog replay the transaction
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
|
||||
// master finish the transaction2
|
||||
errorReplicaIds = Sets.newHashSet();
|
||||
masterTransMgr.finishTransaction(transactionId2, errorReplicaIds);
|
||||
assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, replcia1.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, replcia2.getVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion, replcia3.getVersion());
|
||||
assertEquals(-1, replcia1.getLastFailedVersion());
|
||||
assertEquals(-1, replcia2.getLastFailedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 1, replcia3.getLastFailedVersion());
|
||||
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, replcia1.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, replcia2.getLastSuccessVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, replcia3.getLastSuccessVersion());
|
||||
// check partition version
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 2, testPartition.getCommittedVersion());
|
||||
assertEquals(CatalogTestUtil.testStartVersion + 3, testPartition.getNextVersion());
|
||||
|
||||
transactionState = fakeEditLog.getTransaction(transactionId2);
|
||||
FakeCatalog.setCatalog(slaveCatalog);
|
||||
slaveTransMgr.replayUpsertTransactionState(transactionState);
|
||||
assertTrue(CatalogTestUtil.compareCatalog(masterCatalog, slaveCatalog));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteTransaction() throws LabelAlreadyExistsException,
|
||||
AnalysisException, BeginTransactionException {
|
||||
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND);
|
||||
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertNotNull(transactionState);
|
||||
assertEquals(transactionId, transactionState.getTransactionId());
|
||||
assertEquals(TransactionStatus.PREPARE, transactionState.getTransactionStatus());
|
||||
assertEquals(CatalogTestUtil.testDbId1, transactionState.getDbId());
|
||||
assertEquals(transactionSource, transactionState.getCoordinator());
|
||||
|
||||
masterTransMgr.deleteTransaction(transactionId);
|
||||
transactionState = fakeEditLog.getTransaction(transactionId);
|
||||
assertNull(transactionState);
|
||||
transactionState = masterTransMgr.getTransactionState(transactionId);
|
||||
assertNull(transactionState);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user