diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c5f2f06e1a..c6c2749973 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1541,6 +1541,22 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; + /** + * A internal config, to reduce the restore job size during serialization by compress. + * + * WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back. + */ + @ConfField(mutable = false) + public static boolean restore_job_compressed_serialization = false; + + /** + * A internal config, to reduce the backup job size during serialization by compress. + * + * WARNING: Once this option is enabled and a backup is performed, the FE version cannot be rolled back. + */ + @ConfField(mutable = false) + public static boolean backup_job_compressed_serialization = false; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index 0df9155ab3..4e2c3fd199 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -40,7 +40,7 @@ import java.util.Map; public abstract class AbstractJob implements Writable { public enum JobType { - BACKUP, RESTORE + BACKUP, RESTORE, BACKUP_COMPRESSED, RESTORE_COMPRESSED } protected JobType type; @@ -160,10 +160,10 @@ public abstract class AbstractJob implements Writable { public static AbstractJob read(DataInput in) throws IOException { AbstractJob job = null; JobType type = JobType.valueOf(Text.readString(in)); - if (type == JobType.BACKUP) { - job = new BackupJob(); - } else if (type == JobType.RESTORE) { - job = new RestoreJob(); + if (type == JobType.BACKUP || type == JobType.BACKUP_COMPRESSED) { + job = new BackupJob(type); + } else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) { + job = new RestoreJob(type); } else { throw new IOException("Unknown job type: " + type.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 4dc8824d05..fdcc489277 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -61,8 +61,12 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.nio.file.FileVisitOption; @@ -74,6 +78,8 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class BackupJob extends AbstractJob { @@ -124,6 +130,11 @@ public class BackupJob extends AbstractJob { super(JobType.BACKUP); } + public BackupJob(JobType jobType) { + super(jobType); + assert jobType == JobType.BACKUP || jobType == JobType.BACKUP_COMPRESSED; + } + public BackupJob(String label, long dbId, String dbName, List tableRefs, long timeoutMs, BackupContent content, Env env, long repoId) { super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId); @@ -1055,8 +1066,32 @@ public class BackupJob extends AbstractJob { @Override public void write(DataOutput out) throws IOException { + if (Config.backup_job_compressed_serialization) { + type = JobType.BACKUP_COMPRESSED; + } super.write(out); + if (Config.backup_job_compressed_serialization) { + type = JobType.BACKUP; + int written = 0; + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) { + try (DataOutputStream stream = new DataOutputStream(gzipStream)) { + writeOthers(out); + written = stream.size(); + } + } + Text text = new Text(byteStream.toByteArray()); + if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) { + LOG.info("backup job written size {}, compressed size {}", written, text.getLength()); + } + text.write(out); + } else { + writeOthers(out); + } + } + + public void writeOthers(DataOutput out) throws IOException { // table refs out.writeInt(tableRefs.size()); for (TableRef tblRef : tableRefs) { @@ -1111,7 +1146,27 @@ public class BackupJob extends AbstractJob { public void readFields(DataInput in) throws IOException { super.readFields(in); + if (type == JobType.BACKUP_COMPRESSED) { + type = JobType.BACKUP; + Text text = new Text(); + text.readFields(in); + if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) { + LOG.info("read backup job, compressed size {}", text.getLength()); + } + + ByteArrayInputStream byteStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) { + try (DataInputStream stream = new DataInputStream(gzipStream)) { + readOthers(stream); + } + } + } else { + readOthers(in); + } + } + + public void readOthers(DataInput in) throws IOException { // table refs int size = in.readInt(); tableRefs = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 7a3ffabd05..aa22aacc33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -98,8 +98,12 @@ import com.google.common.collect.Table.Cell; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -107,6 +111,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class RestoreJob extends AbstractJob { private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA; @@ -195,6 +201,11 @@ public class RestoreJob extends AbstractJob { super(JobType.RESTORE); } + public RestoreJob(JobType jobType) { + super(jobType); + assert jobType == JobType.RESTORE || jobType == JobType.RESTORE_COMPRESSED; + } + public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, @@ -2482,8 +2493,32 @@ public class RestoreJob extends AbstractJob { @Override public void write(DataOutput out) throws IOException { + if (Config.restore_job_compressed_serialization) { + type = JobType.RESTORE_COMPRESSED; + } super.write(out); + if (Config.restore_job_compressed_serialization) { + type = JobType.RESTORE; + int written = 0; + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + try (DataOutputStream stream = new DataOutputStream(gzipStream)) { + writeOthers(stream); + written = stream.size(); + } + } + Text text = new Text(bytesStream.toByteArray()); + if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) { + LOG.info("restore job written size {}, compressed size {}", written, text.getLength()); + } + text.write(out); + } else { + writeOthers(out); + } + } + + private void writeOthers(DataOutput out) throws IOException { Text.writeString(out, backupTimestamp); jobInfo.write(out); out.writeBoolean(allowLoad); @@ -2557,6 +2592,27 @@ public class RestoreJob extends AbstractJob { public void readFields(DataInput in) throws IOException { super.readFields(in); + if (type == JobType.RESTORE_COMPRESSED) { + type = JobType.RESTORE; + + Text text = new Text(); + text.readFields(in); + if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) { + LOG.info("read restore job, compressed size {}", text.getLength()); + } + + ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + try (DataInputStream stream = new DataInputStream(gzipStream)) { + readOthers(stream); + } + } + } else { + readOthers(in); + } + } + + private void readOthers(DataInput in) throws IOException { backupTimestamp = Text.readString(in); jobInfo = BackupJobInfo.read(in); allowLoad = in.readBoolean();