[Enhancement](multi-catalog) merge hms partition events. (#22869)

This pr mainly has two changes:

1. add some merge processes about partition events
2. add a ut for `MetastoreEventFactory`. First add some mock classes (`MockCatalog`/`MockDatabase` ...) to simulate the real hms catalog/databases/tables/partitions,  then create a event producer which can produce every kinds of `MetastoreEvent`s randomly. Use two catalogs for test, one is named `testCatalog` and the other is the `validateCatalog`, use event producer to produce many events and let `validateCatalog` to handle all of the events, but `testCatalog` just handles the events  which have been merged by `MetastoreEventFactory`, check if the `validateCatalog` is equals to `testCatalog`.
This commit is contained in:
Xiangyu Wang
2023-09-10 18:29:54 +08:00
committed by GitHub
parent 32a7eef96a
commit 8e171f5cbf
15 changed files with 646 additions and 191 deletions

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@ -32,6 +33,7 @@ import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -44,7 +46,7 @@ public class AddPartitionEvent extends MetastorePartitionEvent {
// for test
public AddPartitionEvent(long eventId, String catalogName, String dbName,
String tblName, List<String> partitionNames) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ADD_PARTITION);
this.partitionNames = partitionNames;
this.hmsTbl = null;
}
@ -71,6 +73,20 @@ public class AddPartitionEvent extends MetastorePartitionEvent {
}
}
@Override
protected boolean willChangePartitionName() {
return false;
}
@Override
public Set<String> getAllPartitionNames() {
return ImmutableSet.copyOf(partitionNames);
}
public void removePartition(String partitionName) {
partitionNames.remove(partitionName);
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AddPartitionEvent(event, catalogName));

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
import java.util.List;
import java.util.Random;
/**
* MetastoreEvent for ALTER_DATABASE event type
@ -41,13 +42,15 @@ public class AlterDatabaseEvent extends MetastoreEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final String dbNameAfter;
// for test
public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boolean isRename) {
super(eventId, catalogName, dbName, null);
super(eventId, catalogName, dbName, null, MetastoreEventType.ALTER_DATABASE);
this.isRename = isRename;
this.dbBefore = null;
this.dbAfter = null;
this.dbNameAfter = isRename ? (dbName + new Random().nextInt(10)) : dbName;
}
private AlterDatabaseEvent(NotificationEvent event,
@ -61,6 +64,7 @@ public class AlterDatabaseEvent extends MetastoreEvent {
.getAlterDatabaseMessage(event.getMessage());
dbBefore = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
dbAfter = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
dbNameAfter = dbAfter.getName();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter database message"), e);
@ -97,6 +101,10 @@ public class AlterDatabaseEvent extends MetastoreEvent {
return isRename;
}
public String getDbNameAfter() {
return dbNameAfter;
}
@Override
protected void process() throws MetastoreNotificationException {
try {

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@ -30,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -47,14 +49,14 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {
// for test
public AlterPartitionEvent(long eventId, String catalogName, String dbName, String tblName,
String partitionNameBefore, String partitionNameAfter) {
super(eventId, catalogName, dbName, tblName);
String partitionNameBefore, boolean isRename) {
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_PARTITION);
this.partitionNameBefore = partitionNameBefore;
this.partitionNameAfter = partitionNameAfter;
this.partitionNameAfter = isRename ? (partitionNameBefore + new Random().nextInt(100)) : partitionNameBefore;
this.hmsTbl = null;
this.partitionAfter = null;
this.partitionBefore = null;
isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
this.isRename = isRename;
}
private AlterPartitionEvent(NotificationEvent event,
@ -80,6 +82,24 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {
}
}
@Override
protected boolean willChangePartitionName() {
return isRename;
}
@Override
public Set<String> getAllPartitionNames() {
return ImmutableSet.of(partitionNameBefore);
}
public String getPartitionNameAfter() {
return partitionNameAfter;
}
public boolean isRename() {
return isRename;
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
@ -109,10 +129,28 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {
}
@Override
protected boolean canBeBatched(MetastoreEvent event) {
return isSameTable(event)
&& event instanceof AlterPartitionEvent
&& Objects.equals(partitionBefore, ((AlterPartitionEvent) event).partitionBefore)
&& Objects.equals(partitionAfter, ((AlterPartitionEvent) event).partitionAfter);
protected boolean canBeBatched(MetastoreEvent that) {
if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) {
return false;
}
// Check if `that` event is a rename event, a rename event can not be batched
// because the process of `that` event will change the reference relation of this partition
MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) that;
if (thatPartitionEvent.willChangePartitionName()) {
return false;
}
// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}
return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
import java.util.List;
import java.util.Random;
/**
* MetastoreEvent for ALTER_TABLE event type
@ -41,17 +42,17 @@ public class AlterTableEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final boolean isView;
private final boolean willCreateOrDropTable;
private final String tblNameAfter;
// for test
public AlterTableEvent(long eventId, String catalogName, String dbName,
String tblName, boolean isRename, boolean isView) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_TABLE);
this.isRename = isRename;
this.isView = isView;
this.tableBefore = null;
this.tableAfter = null;
this.willCreateOrDropTable = isRename || isView;
this.tblNameAfter = isRename ? (tblName + new Random().nextInt(10)) : tblName;
}
private AlterTableEvent(NotificationEvent event, String catalogName) {
@ -65,6 +66,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
.getAlterTableMessage(event.getMessage());
tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
tblNameAfter = tableAfter.getTableName();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter table message"), e);
@ -73,7 +75,6 @@ public class AlterTableEvent extends MetastoreTableEvent {
isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
|| !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText();
this.willCreateOrDropTable = isRename || isView;
}
public static List<MetastoreEvent> getEvents(NotificationEvent event,
@ -83,7 +84,12 @@ public class AlterTableEvent extends MetastoreTableEvent {
@Override
protected boolean willCreateOrDropTable() {
return willCreateOrDropTable;
return isRename || isView;
}
@Override
protected boolean willChangeTableName() {
return isRename;
}
private void processRecreateTable() throws DdlException {
@ -123,6 +129,10 @@ public class AlterTableEvent extends MetastoreTableEvent {
return isView;
}
public String getTblNameAfter() {
return tblNameAfter;
}
/**
* If the ALTER_TABLE event is due a table rename, this method removes the old table
* and creates a new table with the new name. Else, we just refresh table
@ -157,15 +167,22 @@ public class AlterTableEvent extends MetastoreTableEvent {
return false;
}
// `that` event must not be a rename table event
// so if the process of this event will drop this table,
// it can merge all the table's events before
if (willCreateOrDropTable) {
// First check if `that` event is a rename event, a rename event can not be batched
// because the process of `that` event will change the reference relation of this table
// `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false
MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
if (thatTblEvent.willChangeTableName()) {
return false;
}
// Then check if the process of this event will create or drop this table,
// if true then `that` event can be batched
if (willCreateOrDropTable()) {
return true;
}
// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
return !((MetastoreTableEvent) that).willCreateOrDropTable();
// Last, check if the process of `that` event will create or drop this table
// if false then `that` event can be batched
return !thatTblEvent.willCreateOrDropTable();
}
}

View File

@ -34,7 +34,7 @@ public class CreateDatabaseEvent extends MetastoreEvent {
// for test
public CreateDatabaseEvent(long eventId, String catalogName, String dbName) {
super(eventId, catalogName, dbName, null);
super(eventId, catalogName, dbName, null, MetastoreEventType.CREATE_DATABASE);
}
private CreateDatabaseEvent(NotificationEvent event,

View File

@ -37,7 +37,7 @@ public class CreateTableEvent extends MetastoreTableEvent {
// for test
public CreateTableEvent(long eventId, String catalogName, String dbName, String tblName) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.CREATE_TABLE);
this.hmsTbl = null;
}
@ -66,6 +66,11 @@ public class CreateTableEvent extends MetastoreTableEvent {
return true;
}
@Override
protected boolean willChangeTableName() {
return false;
}
@Override
protected void process() throws MetastoreNotificationException {
try {

View File

@ -32,6 +32,11 @@ import java.util.List;
*/
public class DropDatabaseEvent extends MetastoreEvent {
// for test
public DropDatabaseEvent(long eventId, String catalogName, String dbName) {
super(eventId, catalogName, dbName, null, MetastoreEventType.DROP_DATABASE);
}
private DropDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@ -31,15 +32,24 @@ import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* MetastoreEvent for ADD_PARTITION event type
* MetastoreEvent for DROP_PARTITION event type
*/
public class DropPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final List<String> partitionNames;
// for test
public DropPartitionEvent(long eventId, String catalogName, String dbName,
String tblName, List<String> partitionNames) {
super(eventId, catalogName, dbName, tblName, MetastoreEventType.DROP_PARTITION);
this.partitionNames = partitionNames;
this.hmsTbl = null;
}
private DropPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
@ -62,6 +72,20 @@ public class DropPartitionEvent extends MetastorePartitionEvent {
}
}
@Override
protected boolean willChangePartitionName() {
return false;
}
@Override
public Set<String> getAllPartitionNames() {
return ImmutableSet.copyOf(partitionNames);
}
public void removePartition(String partitionName) {
partitionNames.remove(partitionName);
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(
@ -85,4 +109,30 @@ public class DropPartitionEvent extends MetastorePartitionEvent {
debugString("Failed to process event"), e);
}
}
@Override
protected boolean canBeBatched(MetastoreEvent that) {
if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) {
return false;
}
MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) that;
// Check if `that` event is a rename event, a rename event can not be batched
// because the process of `that` event will change the reference relation of this partition
if (thatPartitionEvent.willChangePartitionName()) {
return false;
}
// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}
return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
}
}

View File

@ -37,7 +37,7 @@ public class DropTableEvent extends MetastoreTableEvent {
// for test
public DropTableEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.DROP_TABLE);
this.tableName = tblName;
}
@ -67,6 +67,11 @@ public class DropTableEvent extends MetastoreTableEvent {
return true;
}
@Override
protected boolean willChangeTableName() {
return false;
}
@Override
protected void process() throws MetastoreNotificationException {
try {
@ -80,8 +85,18 @@ public class DropTableEvent extends MetastoreTableEvent {
@Override
protected boolean canBeBatched(MetastoreEvent that) {
// `that` event must not be a rename table event
// so merge all events which belong to this table before is ok
return isSameTable(that);
if (!isSameTable(that)) {
return false;
}
/**
* Check if `that` event is a rename event, a rename event can not be batched
* because the process of `that` event will change the reference relation of this table,
* otherwise it can be batched because this event is a drop-table event
* and the process of this event will drop the whole table,
* and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false
* */
MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
return !thatTblEvent.willChangeTableName();
}
}

View File

@ -38,7 +38,7 @@ public class InsertEvent extends MetastoreTableEvent {
// for test
public InsertEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT);
this.hmsTbl = null;
}
@ -66,11 +66,16 @@ public class InsertEvent extends MetastoreTableEvent {
return false;
}
@Override
protected boolean willChangeTableName() {
return false;
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName);
/***
/**
* Only when we use hive client to execute a `INSERT INTO TBL SELECT * ...` or `INSERT INTO TBL ...` sql
* to a non-partitioned table then the hms will generate an insert event, and there is not
* any partition event occurs, but the file cache may has been changed, so we need handle this.
@ -91,8 +96,11 @@ public class InsertEvent extends MetastoreTableEvent {
return false;
}
// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
/**
* Because the cache of this table will be cleared when handling `InsertEvent`,
* so `that` event can be batched if `that` event will not create or drop this table,
* and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false
*/
return !((MetastoreTableEvent) that).willCreateOrDropTable();
}
}

View File

@ -58,12 +58,13 @@ public abstract class MetastoreEvent {
protected final String catalogName;
// for test
protected MetastoreEvent(long eventId, String catalogName, String dbName, String tblName) {
protected MetastoreEvent(long eventId, String catalogName, String dbName,
String tblName, MetastoreEventType eventType) {
this.eventId = eventId;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
this.eventType = null;
this.eventType = eventType;
this.metastoreNotificationEvent = null;
this.event = null;
}
@ -97,7 +98,6 @@ public abstract class MetastoreEvent {
/**
* Checks if the given event can be batched into this event. Default behavior is
* to return false which can be overridden by a sub-class.
* The current version is relatively simple to process batch events, so all that need to be processed are true.
*
* @param event The event under consideration to be batched into this event.
* @return false if event cannot be batched into this event; otherwise true.

View File

@ -99,7 +99,7 @@ public class MetastoreEventFactory implements EventFactory {
for (int i = 0; i < events.size(); i++) {
MetastoreEvent event = events.get(i);
// if the event is a rename event, just clear indexMap
// if the event is a rename db event, just clear indexMap
// to make sure the table references of these events in indexMap will not change
if (event instanceof AlterDatabaseEvent && ((AlterDatabaseEvent) event).isRename()) {
indexMap.clear();
@ -135,12 +135,6 @@ public class MetastoreEventFactory implements EventFactory {
.collect(Collectors.toList());
indexList.add(i);
indexMap.put(groupKey, indexList);
// if the event is a rename event, just clear indexMap
// to make sure the table references of these events in indexMap will not change
if (event instanceof AlterTableEvent && ((AlterTableEvent) event).isRename()) {
indexMap.clear();
}
}
List<MetastoreEvent> filteredEvents = eventsCopy.stream().filter(Objects::nonNull)

View File

@ -20,14 +20,17 @@ package org.apache.doris.datasource.hive.event;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.Set;
/**
* Base class for all the partition events
*/
public abstract class MetastorePartitionEvent extends MetastoreTableEvent {
// for test
protected MetastorePartitionEvent(long eventId, String catalogName, String dbName, String tblName) {
super(eventId, catalogName, dbName, tblName);
protected MetastorePartitionEvent(long eventId, String catalogName, String dbName,
String tblName, MetastoreEventType eventType) {
super(eventId, catalogName, dbName, tblName, eventType);
}
protected MetastorePartitionEvent(NotificationEvent event, String catalogName) {
@ -37,4 +40,15 @@ public abstract class MetastorePartitionEvent extends MetastoreTableEvent {
protected boolean willCreateOrDropTable() {
return false;
}
protected boolean willChangeTableName() {
return false;
}
/**
* Returns if the process of this event will rename this partition.
*/
protected abstract boolean willChangePartitionName();
public abstract Set<String> getAllPartitionNames();
}

View File

@ -31,8 +31,9 @@ import java.util.Objects;
public abstract class MetastoreTableEvent extends MetastoreEvent {
// for test
protected MetastoreTableEvent(long eventId, String catalogName, String dbName, String tblName) {
super(eventId, catalogName, dbName, tblName);
protected MetastoreTableEvent(long eventId, String catalogName, String dbName,
String tblName, MetastoreEventType eventType) {
super(eventId, catalogName, dbName, tblName, eventType);
}
protected MetastoreTableEvent(NotificationEvent event, String catalogName) {
@ -67,6 +68,11 @@ public abstract class MetastoreTableEvent extends MetastoreEvent {
*/
protected abstract boolean willCreateOrDropTable();
/**
* Returns if the process of this event will rename this table.
*/
protected abstract boolean willChangeTableName();
public TableKey getTableKey() {
return new TableKey(catalogName, dbName, tblName);
}

View File

@ -23,171 +23,450 @@ import org.apache.doris.datasource.hive.event.AlterPartitionEvent;
import org.apache.doris.datasource.hive.event.AlterTableEvent;
import org.apache.doris.datasource.hive.event.CreateDatabaseEvent;
import org.apache.doris.datasource.hive.event.CreateTableEvent;
import org.apache.doris.datasource.hive.event.DropDatabaseEvent;
import org.apache.doris.datasource.hive.event.DropPartitionEvent;
import org.apache.doris.datasource.hive.event.DropTableEvent;
import org.apache.doris.datasource.hive.event.InsertEvent;
import org.apache.doris.datasource.hive.event.MetastoreEvent;
import org.apache.doris.datasource.hive.event.MetastoreEventFactory;
import org.apache.hadoop.util.Lists;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
public class MetastoreEventFactoryTest {
private static final MetastoreEventFactory factory = new MetastoreEventFactory();
private static final Random random = new Random(System.currentTimeMillis());
private static final String testCtl = "test_ctl";
private static final Function<Long, CreateDatabaseEvent> createDatabaseEventProducer = eventId
-> new CreateDatabaseEvent(eventId, testCtl, randomDb());
private static final Function<Long, AlterDatabaseEvent> alterDatabaseEventProducer = eventId
-> new AlterDatabaseEvent(eventId, testCtl, randomDb(), randomBool(0.0001D));
private static final Function<Long, DropDatabaseEvent> dropDatabaseEventProducer = eventId
-> new DropDatabaseEvent(eventId, testCtl, randomDb());
private static final Function<Long, CreateTableEvent> createTableEventProducer = eventId
-> new CreateTableEvent(eventId, testCtl, randomDb(), randomTbl());
private static final Function<Long, AlterTableEvent> alterTableEventProducer = eventId
-> new AlterTableEvent(eventId, testCtl, randomDb(), randomTbl(),
randomBool(0.1D), randomBool(0.1D));
private static final Function<Long, InsertEvent> insertEventProducer = eventId
-> new InsertEvent(eventId, testCtl, randomDb(), randomTbl());
private static final Function<Long, DropTableEvent> dropTableEventProducer = eventId
-> new DropTableEvent(eventId, testCtl, randomDb(), randomTbl());
private static final Function<Long, AddPartitionEvent> addPartitionEventProducer = eventId
-> new AddPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartitions());
private static final Function<Long, AlterPartitionEvent> alterPartitionEventProducer = eventId
-> new AlterPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartition(),
randomBool(0.1D));
private static final Function<Long, DropPartitionEvent> dropPartitionEventProducer = eventId
-> new DropPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartitions());
private static final List<Function<Long, ? extends MetastoreEvent>> eventProducers = Arrays.asList(
createDatabaseEventProducer, alterDatabaseEventProducer, dropDatabaseEventProducer,
createTableEventProducer, alterTableEventProducer, insertEventProducer, dropTableEventProducer,
addPartitionEventProducer, alterPartitionEventProducer, dropPartitionEventProducer);
private static String randomDb() {
return "db_" + random.nextInt(10);
}
private static String randomTbl() {
return "tbl_" + random.nextInt(100);
}
private static String randomPartition() {
return "partition_" + random.nextInt(1000);
}
private static List<String> randomPartitions() {
int times = random.nextInt(100) + 1;
Set<String> partitions = Sets.newHashSet();
for (int i = 0; i < times; i++) {
partitions.add(randomPartition());
}
return Lists.newArrayList(partitions);
}
private static boolean randomBool(double possibility) {
Preconditions.checkArgument(possibility >= 0.0D && possibility <= 1.0D);
int upperBound = (int) Math.floor(1000000 * possibility);
return random.nextInt(1000000) <= upperBound;
}
// define MockCatalog/MockDatabase/MockTable/MockPartition to simulate the real catalog/database/table/partition
private static class MockCatalog {
private String ctlName;
private Map<String, MockDatabase> databases = Maps.newHashMap();
private MockCatalog(String ctlName) {
this.ctlName = ctlName;
}
@Override
public int hashCode() {
return 31 * Objects.hash(ctlName) + Arrays.hashCode(
databases.values().stream().sorted(Comparator.comparing(d -> d.dbName)).toArray());
}
@Override
public boolean equals(Object other) {
if (!(other instanceof MockCatalog)) {
return false;
}
if (!Objects.equals(this.ctlName, ((MockCatalog) other).ctlName)) {
return false;
}
Object[] sortedDatabases = databases.values().stream()
.sorted(Comparator.comparing(d -> d.dbName)).toArray();
Object[] otherSortedDatabases = ((MockCatalog) other).databases.values().stream()
.sorted(Comparator.comparing(d -> d.dbName)).toArray();
return Arrays.equals(sortedDatabases, otherSortedDatabases);
}
public MockCatalog copy() {
MockCatalog mockCatalog = new MockCatalog(this.ctlName);
mockCatalog.databases.putAll(this.databases);
return mockCatalog;
}
}
private static class MockDatabase {
private String dbName;
private Map<String, MockTable> tables = Maps.newHashMap();
private MockDatabase(String dbName) {
this.dbName = dbName;
}
@Override
public int hashCode() {
return 31 * Objects.hash(dbName) + Arrays.hashCode(
tables.values().stream().sorted(Comparator.comparing(t -> t.tblName)).toArray());
}
@Override
public boolean equals(Object other) {
if (!(other instanceof MockDatabase)) {
return false;
}
if (!Objects.equals(this.dbName, ((MockDatabase) other).dbName)) {
return false;
}
Object[] sortedTables = tables.values().stream()
.sorted(Comparator.comparing(t -> t.tblName)).toArray();
Object[] otherSortedTables = ((MockDatabase) other).tables.values().stream()
.sorted(Comparator.comparing(t -> t.tblName)).toArray();
return Arrays.equals(sortedTables, otherSortedTables);
}
public MockDatabase copy() {
MockDatabase mockDatabase = new MockDatabase(this.dbName);
mockDatabase.tables.putAll(this.tables);
return mockDatabase;
}
}
private static class MockTable {
private String tblName;
// use this filed to mark if the table has been refreshed
private boolean refreshed;
private Map<String, MockPartition> partitions = Maps.newHashMap();
private MockTable(String tblName) {
this.tblName = tblName;
}
public void refresh() {
this.refreshed = true;
}
@Override
public int hashCode() {
return 31 * Objects.hash(tblName, refreshed) + Arrays.hashCode(
partitions.values().stream().sorted(Comparator.comparing(p -> p.partitionName)).toArray());
}
@Override
public boolean equals(Object other) {
if (!(other instanceof MockTable)) {
return false;
}
if (!Objects.equals(this.tblName, ((MockTable) other).tblName)) {
return false;
}
if (refreshed != ((MockTable) other).refreshed) {
return false;
}
Object[] sortedPartitions = partitions.values().stream()
.sorted(Comparator.comparing(p -> p.partitionName)).toArray();
Object[] otherSortedPartitions = ((MockTable) other).partitions.values().stream()
.sorted(Comparator.comparing(p -> p.partitionName)).toArray();
return Arrays.equals(sortedPartitions, otherSortedPartitions);
}
public MockTable copy() {
MockTable copyTbl = new MockTable(this.tblName);
copyTbl.partitions.putAll(this.partitions);
return copyTbl;
}
}
private static class MockPartition {
private String partitionName;
// use this filed to mark if the partition has been refreshed
private boolean refreshed;
private MockPartition(String partitionName) {
this.partitionName = partitionName;
this.refreshed = false;
}
public void refresh() {
this.refreshed = true;
}
@Override
public int hashCode() {
return Objects.hash(refreshed, partitionName);
}
@Override
public boolean equals(Object other) {
return other instanceof MockPartition
&& refreshed == ((MockPartition) other).refreshed
&& Objects.equals(this.partitionName, ((MockPartition) other).partitionName);
}
}
// simulate the processes when handling hms events
private void processEvent(MockCatalog ctl, MetastoreEvent event) {
switch (event.getEventType()) {
case CREATE_DATABASE:
MockDatabase database = new MockDatabase(event.getDbName());
ctl.databases.put(database.dbName, database);
break;
case DROP_DATABASE:
ctl.databases.remove(event.getDbName());
break;
case ALTER_DATABASE:
String dbName = event.getDbName();
if (((AlterDatabaseEvent) event).isRename()) {
ctl.databases.remove(dbName);
MockDatabase newDatabase = new MockDatabase(((AlterDatabaseEvent) event).getDbNameAfter());
ctl.databases.put(newDatabase.dbName, newDatabase);
} else {
if (ctl.databases.containsKey(event.getDbName())) {
ctl.databases.get(event.getDbName()).tables.clear();
}
}
break;
case CREATE_TABLE:
if (ctl.databases.containsKey(event.getDbName())) {
MockTable tbl = new MockTable(event.getTblName());
ctl.databases.get(event.getDbName()).tables.put(event.getTblName(), tbl);
}
break;
case DROP_TABLE:
if (ctl.databases.containsKey(event.getDbName())) {
ctl.databases.get(event.getDbName()).tables.remove(event.getTblName());
}
break;
case ALTER_TABLE:
case INSERT:
if (ctl.databases.containsKey(event.getDbName())) {
if (event instanceof AlterTableEvent
&& (((AlterTableEvent) event).isRename() || ((AlterTableEvent) event).isView())) {
ctl.databases.get(event.getDbName()).tables.remove(event.getTblName());
MockTable tbl = new MockTable(((AlterTableEvent) event).getTblNameAfter());
ctl.databases.get(event.getDbName()).tables.put(tbl.tblName, tbl);
} else {
MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
if (tbl != null) {
tbl.partitions.clear();
tbl.refresh();
}
}
}
break;
case ADD_PARTITION:
if (ctl.databases.containsKey(event.getDbName())) {
MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
if (tbl != null) {
for (String partitionName : ((AddPartitionEvent) event).getAllPartitionNames()) {
MockPartition partition = new MockPartition(partitionName);
tbl.partitions.put(partitionName, partition);
}
}
}
break;
case ALTER_PARTITION:
if (ctl.databases.containsKey(event.getDbName())) {
MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
AlterPartitionEvent alterPartitionEvent = ((AlterPartitionEvent) event);
if (tbl != null) {
if (alterPartitionEvent.isRename()) {
for (String partitionName : alterPartitionEvent.getAllPartitionNames()) {
tbl.partitions.remove(partitionName);
}
MockPartition partition = new MockPartition(alterPartitionEvent.getPartitionNameAfter());
tbl.partitions.put(partition.partitionName, partition);
} else {
for (String partitionName : alterPartitionEvent.getAllPartitionNames()) {
MockPartition partition = tbl.partitions.get(partitionName);
if (partition != null) {
partition.refresh();
}
}
}
}
}
break;
case DROP_PARTITION:
if (ctl.databases.containsKey(event.getDbName())) {
MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
if (tbl != null) {
for (String partitionName : ((DropPartitionEvent) event).getAllPartitionNames()) {
tbl.partitions.remove(partitionName);
}
}
}
break;
default:
Assertions.fail("Unknown event type : " + event.getEventType());
}
}
static class EventProducer {
// every type of event has a proportion
// for instance, if the `CreateDatabaseEvent`'s proportion is 1
// and the `AlterDatabaseEvent`'s proportion is 10
// the event count of `AlterDatabaseEvent` is always about 10 times as the `CreateDatabaseEvent`
private final List<Integer> proportions;
private final int sumOfProportions;
EventProducer(List<Integer> proportions) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(proportions)
&& proportions.size() == eventProducers.size());
this.proportions = ImmutableList.copyOf(proportions);
this.sumOfProportions = proportions.stream().mapToInt(proportion -> proportion).sum();
}
public MetastoreEvent produceOneEvent(long eventId) {
return eventProducers.get(calIndex(random.nextInt(sumOfProportions))).apply(eventId);
}
private int calIndex(int val) {
int currentIndex = 0;
int currentBound = proportions.get(currentIndex);
while (currentIndex < proportions.size() - 1) {
if (val > currentBound) {
currentBound += proportions.get(++currentIndex);
} else {
return currentIndex;
}
}
return proportions.size() - 1;
}
}
@Test
public void testCreateBatchEvents() {
AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl", "test_db", "t1", "p1", "p1");
AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl", "test_db", "t1", "p1", "p1");
AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl", "test_db", "t1", Arrays.asList("p1"));
AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db", "t1", false, false);
AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db", "t1", true, false);
AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db", "t1", false, true);
DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db", "t1");
InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1");
CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl", "test_db2");
AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl", "test_db", "t2", "p1", "p1");
AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db", "t1", false, false);
CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl", "test_db", "t1");
AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl", "test_db", true);
AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl", "test_db", false);
// for catalog initialization, so just produce CreateXXXEvent / AddXXXEvent
List<Integer> initProportions = Lists.newArrayList(
1, // CreateDatabaseEvent
0, // AlterDatabaseEvent
0, // DropDatabaseEvent
10, // CreateTableEvent
0, // AlterTableEvent
0, // InsertEvent
0, // DropTableEvent
100, // AddPartitionEvent
0, // AlterPartitionEvent
0 // DropPartitionEvent
);
List<MetastoreEvent> mergedEvents;
List<MetastoreEvent> testEvents = Lists.newLinkedList();
List<Integer> proportions = Lists.newArrayList(
5, // CreateDatabaseEvent
1, // AlterDatabaseEvent
5, // DropDatabaseEvent
100, // CreateTableEvent
20000, // AlterTableEvent
2000, // InsertEvent
5000, // DropTableEvent
10000, // AddPartitionEvent
50000, // AlterPartitionEvent
20000 // DropPartitionEvent
);
EventProducer initProducer = new EventProducer(initProportions);
EventProducer producer = new EventProducer(proportions);
testEvents.add(e1);
testEvents.add(e2);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 1);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L);
for (int i = 0; i < 200; i++) {
// create a test catalog and do initialization
MockCatalog testCatalog = new MockCatalog(testCtl);
List<MetastoreEvent> initEvents = Lists.newArrayListWithCapacity(1000);
for (int j = 0; j < 1000; j++) {
initEvents.add(initProducer.produceOneEvent(j));
}
for (MetastoreEvent event : initEvents) {
processEvent(testCatalog, event);
}
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e3);
testEvents.add(e9);
testEvents.add(e10);
testEvents.add(e4);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
// copy the test catalog to the validate catalog
MockCatalog validateCatalog = testCatalog.copy();
// because e5 is a rename event, it will not be merged
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e5);
testEvents.add(e4);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
List<MetastoreEvent> events = Lists.newArrayListWithCapacity(1000);
for (int j = 0; j < 1000; j++) {
events.add(producer.produceOneEvent(j));
}
List<MetastoreEvent> mergedEvents = factory.createBatchEvents(testCtl, events);
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e6);
testEvents.add(e4);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
for (MetastoreEvent event : events) {
processEvent(validateCatalog, event);
}
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e4);
testEvents.add(e11);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 2);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L);
for (MetastoreEvent event : mergedEvents) {
processEvent(testCatalog, event);
}
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e4);
testEvents.add(e8);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 2);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L);
// because e5 is a rename event, it will not be merged
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e5);
testEvents.add(e8);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L);
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e12);
testEvents.add(e4);
testEvents.add(e7);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 2);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L);
// because e5 is a rename event, it will not be merged
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e5);
testEvents.add(e7);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e4);
testEvents.add(e13);
testEvents.add(e7);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 4);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L);
Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L);
testEvents.clear();
testEvents.add(e1);
testEvents.add(e2);
testEvents.add(e10);
testEvents.add(e4);
testEvents.add(e14);
testEvents.add(e7);
mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
Assertions.assertTrue(mergedEvents.size() == 3);
Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L);
Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
// the test catalog should be equals to the validate catalog
// otherwise we must have some bugs at `factory.createBatchEvents()`
Assertions.assertEquals(testCatalog, validateCatalog);
}
}
}