[Meta][Refactor] Use lambda expressions to save Image (#6646)

The original code in MetaWriter is tedious,So I try use lambda expressions to make the code more clean.
This commit is contained in:
xy720
2021-09-16 10:28:40 +08:00
committed by GitHub
parent 7f5631717e
commit 67472f3518
2 changed files with 56 additions and 52 deletions

View File

@ -2252,7 +2252,7 @@ public class Catalog {
return checksum;
}
public long saveSqlBlockRule(DataOutputStream out, long checksum) throws IOException {
public long saveSqlBlockRule(CountingDataOutputStream out, long checksum) throws IOException {
Catalog.getCurrentCatalog().getSqlBlockRuleMgr().write(out);
return checksum;
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.common;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.io.CountingDataOutputStream;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@ -65,71 +66,74 @@ import java.util.List;
public class MetaWriter {
private static final Logger LOG = LogManager.getLogger(MetaWriter.class);
public static MetaWriter writer = new MetaWriter();
private interface Delegate {
long doWork(String name, WriteMethod method) throws IOException;
}
private interface WriteMethod {
long write() throws IOException;
}
private Delegate delegate;
public void setDelegate(CountingDataOutputStream dos, List<MetaIndex> indices) {
this.delegate = (name, method) -> {
indices.add(new MetaIndex(name, dos.getCount()));
return method.write();
};
}
public long doWork(String name, WriteMethod method) throws IOException {
if(delegate == null){
return method.write();
}
return delegate.doWork(name, method);
}
public static void write(File imageFile, Catalog catalog) throws IOException {
// save image does not need any lock. because only checkpoint thread will call this method.
LOG.info("start save image to {}. is ckpt: {}", imageFile.getAbsolutePath(), Catalog.isCheckpointThread());
long checksum = 0;
final Reference<Long> checksum = new Reference<>(0L);
long saveImageStartTime = System.currentTimeMillis();
long startPosition = MetaHeader.write(imageFile);
List<MetaIndex> metaIndices = Lists.newArrayList();
try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(
new FileOutputStream(imageFile, true)), startPosition)) {
writer.setDelegate(dos, metaIndices);
long replayedJournalId = catalog.getReplayedJournalId();
metaIndices.add(new MetaIndex("header", dos.getCount()));
checksum = catalog.saveHeader(dos, replayedJournalId, checksum);
metaIndices.add(new MetaIndex("masterInfo", dos.getCount()));
checksum = catalog.saveMasterInfo(dos, checksum);
metaIndices.add(new MetaIndex("frontends", dos.getCount()));
checksum = catalog.saveFrontends(dos, checksum);
metaIndices.add(new MetaIndex("backends", dos.getCount()));
checksum = Catalog.getCurrentSystemInfo().saveBackends(dos, checksum);
metaIndices.add(new MetaIndex("db", dos.getCount()));
checksum = catalog.saveDb(dos, checksum);
metaIndices.add(new MetaIndex("loadJob", dos.getCount()));
checksum = catalog.saveLoadJob(dos, checksum);
metaIndices.add(new MetaIndex("alterJob", dos.getCount()));
checksum = catalog.saveAlterJob(dos, checksum);
metaIndices.add(new MetaIndex("recycleBin", dos.getCount()));
checksum = catalog.saveRecycleBin(dos, checksum);
metaIndices.add(new MetaIndex("globalVariable", dos.getCount()));
checksum = catalog.saveGlobalVariable(dos, checksum);
metaIndices.add(new MetaIndex("cluster", dos.getCount()));
checksum = catalog.saveCluster(dos, checksum);
metaIndices.add(new MetaIndex("broker", dos.getCount()));
checksum = catalog.saveBrokers(dos, checksum);
metaIndices.add(new MetaIndex("resources", dos.getCount()));
checksum = catalog.saveResources(dos, checksum);
metaIndices.add(new MetaIndex("exportJob", dos.getCount()));
checksum = catalog.saveExportJob(dos, checksum);
metaIndices.add(new MetaIndex("syncJob", dos.getCount()));
checksum = catalog.saveSyncJobs(dos, checksum);
metaIndices.add(new MetaIndex("backupHandler", dos.getCount()));
checksum = catalog.saveBackupHandler(dos, checksum);
metaIndices.add(new MetaIndex("paloAuth", dos.getCount()));
checksum = catalog.savePaloAuth(dos, checksum);
metaIndices.add(new MetaIndex("transactionState", dos.getCount()));
checksum = catalog.saveTransactionState(dos, checksum);
metaIndices.add(new MetaIndex("colocateTableIndex", dos.getCount()));
checksum = catalog.saveColocateTableIndex(dos, checksum);
metaIndices.add(new MetaIndex("routineLoadJobs", dos.getCount()));
checksum = catalog.saveRoutineLoadJobs(dos, checksum);
metaIndices.add(new MetaIndex("loadJobV2", dos.getCount()));
checksum = catalog.saveLoadJobsV2(dos, checksum);
metaIndices.add(new MetaIndex("smallFiles", dos.getCount()));
checksum = catalog.saveSmallFiles(dos, checksum);
metaIndices.add(new MetaIndex("plugins", dos.getCount()));
checksum = catalog.savePlugins(dos, checksum);
metaIndices.add(new MetaIndex("deleteHandler", dos.getCount()));
checksum = catalog.saveDeleteHandler(dos, checksum);
metaIndices.add(new MetaIndex("sqlBlockRule", dos.getCount()));
checksum = catalog.saveSqlBlockRule(dos, checksum);
checksum.setRef(writer.doWork("header", () -> catalog.saveHeader(dos, replayedJournalId, checksum.getRef())));
checksum.setRef(writer.doWork("masterInfo", () -> catalog.saveMasterInfo(dos, checksum.getRef())));
checksum.setRef(writer.doWork("frontends", () -> catalog.saveFrontends(dos, checksum.getRef())));
checksum.setRef(writer.doWork("backends", () -> Catalog.getCurrentSystemInfo().saveBackends(dos, checksum.getRef())));
checksum.setRef(writer.doWork("db", () -> catalog.saveDb(dos, checksum.getRef())));
checksum.setRef(writer.doWork("loadJob", () -> catalog.saveLoadJob(dos, checksum.getRef())));
checksum.setRef(writer.doWork("alterJob", () -> catalog.saveAlterJob(dos, checksum.getRef())));
checksum.setRef(writer.doWork("recycleBin", () -> catalog.saveRecycleBin(dos, checksum.getRef())));
checksum.setRef(writer.doWork("globalVariable", () -> catalog.saveGlobalVariable(dos, checksum.getRef())));
checksum.setRef(writer.doWork("cluster", () -> catalog.saveCluster(dos, checksum.getRef())));
checksum.setRef(writer.doWork("broker", () -> catalog.saveBrokers(dos, checksum.getRef())));
checksum.setRef(writer.doWork("resources", () -> catalog.saveResources(dos, checksum.getRef())));
checksum.setRef(writer.doWork("exportJob", () -> catalog.saveExportJob(dos, checksum.getRef())));
checksum.setRef(writer.doWork("syncJob", () -> catalog.saveSyncJobs(dos, checksum.getRef())));
checksum.setRef(writer.doWork("backupHandler", () -> catalog.saveBackupHandler(dos, checksum.getRef())));
checksum.setRef(writer.doWork("paloAuth", () -> catalog.savePaloAuth(dos, checksum.getRef())));
checksum.setRef(writer.doWork("transactionState", () -> catalog.saveTransactionState(dos, checksum.getRef())));
checksum.setRef(writer.doWork("colocateTableIndex", () -> catalog.saveColocateTableIndex(dos, checksum.getRef())));
checksum.setRef(writer.doWork("routineLoadJobs", () -> catalog.saveRoutineLoadJobs(dos, checksum.getRef())));
checksum.setRef(writer.doWork("loadJobV2", () -> catalog.saveLoadJobsV2(dos, checksum.getRef())));
checksum.setRef(writer.doWork("smallFiles", () -> catalog.saveSmallFiles(dos, checksum.getRef())));
checksum.setRef(writer.doWork("plugins", () -> catalog.savePlugins(dos, checksum.getRef())));
checksum.setRef(writer.doWork("deleteHandler", () -> catalog.saveDeleteHandler(dos, checksum.getRef())));
checksum.setRef(writer.doWork("sqlBlockRule", () -> catalog.saveSqlBlockRule(dos, checksum.getRef())));
}
MetaFooter.write(imageFile, metaIndices, checksum);
MetaFooter.write(imageFile, metaIndices, checksum.getRef());
long saveImageEndTime = System.currentTimeMillis();
LOG.info("finished save image {} in {} ms. checksum is {}",
imageFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
imageFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum.getRef());
}
}