[Fix](CCR) Binlog config is missed when create replica task (#21397)

This commit is contained in:
DeadlineFen
2023-07-05 10:15:13 +08:00
committed by GitHub
parent 0469c02202
commit 93795442a4
7 changed files with 52 additions and 13 deletions

View File

@ -27,6 +27,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -243,6 +244,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
tbl.readLock();
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
@ -281,7 +283,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.storeRowColumn(),
tbl.isDynamicSchema());
tbl.isDynamicSchema(),
binlogConfig);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -237,6 +238,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
@ -278,7 +280,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.storeRowColumn(),
tbl.isDynamicSchema());
tbl.isDynamicSchema(),
binlogConfig);
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);

View File

@ -24,6 +24,7 @@ import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
import org.apache.doris.backup.RestoreFileMapping.IdChain;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -999,6 +1000,14 @@ public class RestoreJob extends AbstractJob {
private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable localTbl, Partition restorePart) {
Set<String> bfColumns = localTbl.getCopiedBfColumns();
double bfFpp = localTbl.getBfFpp();
BinlogConfig binlogConfig;
localTbl.readLock();
try {
binlogConfig = new BinlogConfig(localTbl.getBinlogConfig());
} finally {
localTbl.readUnlock();
}
for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
@ -1024,7 +1033,8 @@ public class RestoreJob extends AbstractJob {
localTbl.enableSingleReplicaCompaction(),
localTbl.skipWriteIndexOnLoad(),
localTbl.storeRowColumn(),
localTbl.isDynamicSchema());
localTbl.isDynamicSchema(),
binlogConfig);
task.setInRestoreMode(true);
batchTask.addTask(task);

View File

@ -1344,6 +1344,7 @@ public class InternalCatalog implements CatalogIf<Database> {
Map<Long, MaterializedIndexMeta> indexIdToMeta;
Set<String> bfColumns;
String partitionName = singlePartitionDesc.getPartitionName();
BinlogConfig binlogConfig;
// check
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
@ -1466,6 +1467,9 @@ public class InternalCatalog implements CatalogIf<Database> {
indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
bfColumns = olapTable.getCopiedBfColumns();
// get BinlogConfig
binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
} finally {
@ -1505,7 +1509,8 @@ public class InternalCatalog implements CatalogIf<Database> {
singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer,
olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema());
olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
binlogConfig);
// check again
olapTable = db.getOlapTableOrDdlException(tableName);
@ -1732,7 +1737,7 @@ public class InternalCatalog implements CatalogIf<Database> {
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad,
boolean storeRowColumn, boolean isDynamicSchema) throws DdlException {
boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
@ -1795,7 +1800,7 @@ public class InternalCatalog implements CatalogIf<Database> {
storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy,
disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad,
storeRowColumn, isDynamicSchema);
storeRowColumn, isDynamicSchema, binlogConfig);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
@ -2086,12 +2091,13 @@ public class InternalCatalog implements CatalogIf<Database> {
Map<String, String> binlogConfigMap = PropertyAnalyzer.analyzeBinlogConfig(properties);
if (binlogConfigMap != null) {
BinlogConfig binlogConfig = new BinlogConfig();
binlogConfig.mergeFromProperties(properties);
binlogConfig.mergeFromProperties(binlogConfigMap);
olapTable.setBinlogConfig(binlogConfig);
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
BinlogConfig binlogConfigForTask = new BinlogConfig(olapTable.getBinlogConfig());
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// if this is an unpartitioned table, we should analyze data property and replication num here.
@ -2264,7 +2270,7 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
storeRowColumn, isDynamicSchema);
storeRowColumn, isDynamicSchema, binlogConfigForTask);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@ -2330,7 +2336,7 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
storeRowColumn, isDynamicSchema);
storeRowColumn, isDynamicSchema, binlogConfigForTask);
olapTable.addPartition(partition);
}
} else {
@ -2692,6 +2698,7 @@ public class InternalCatalog implements CatalogIf<Database> {
Database db = (Database) getDbOrDdlException(dbTbl.getDb());
OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
BinlogConfig binlogConfig;
olapTable.readLock();
try {
olapTable.checkNormalStateForAlter();
@ -2716,6 +2723,8 @@ public class InternalCatalog implements CatalogIf<Database> {
return;
}
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false);
binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
} finally {
olapTable.readUnlock();
}
@ -2747,7 +2756,7 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema());
olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig);
newPartitions.add(newPartition);
}
} catch (DdlException e) {

View File

@ -18,6 +18,7 @@
package org.apache.doris.master;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -744,6 +745,8 @@ public class ReportHandler extends Daemon {
continue;
}
BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
ReplicaState state = replica.getState();
if (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE) {
// if state is PENDING / ROLLUP / CLONE
@ -782,7 +785,8 @@ public class ReportHandler extends Daemon {
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema());
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
binlogConfig);
createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);

View File

@ -19,6 +19,7 @@ package org.apache.doris.task;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
@ -106,6 +107,8 @@ public class CreateReplicaTask extends AgentTask {
private boolean storeRowColumn;
private BinlogConfig binlogConfig;
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
@ -121,7 +124,8 @@ public class CreateReplicaTask extends AgentTask {
boolean enableSingleReplicaCompaction,
boolean skipWriteIndexOnLoad,
boolean storeRowColumn,
boolean isDynamicSchema) {
boolean isDynamicSchema,
BinlogConfig binlogConfig) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.replicaId = replicaId;
@ -159,6 +163,7 @@ public class CreateReplicaTask extends AgentTask {
this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
}
public void setIsRecoverTask(boolean isRecoverTask) {
@ -295,6 +300,11 @@ public class CreateReplicaTask extends AgentTask {
createTabletReq.setTabletType(tabletType);
createTabletReq.setCompressionType(compressionType);
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
if (binlogConfig != null) {
createTabletReq.setBinlogConfig(binlogConfig.toThrift());
}
return createTabletReq;
}
}

View File

@ -107,7 +107,7 @@ public class AgentTaskTest {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
TCompressionType.LZ4F, false, "", false, false, false, false, false);
TCompressionType.LZ4F, false, "", false, false, false, false, false, null);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);