[bugfix](metareader) meta reader could not load image (#16148)

This bug is introduced by PR #16009.
Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-01-28 14:22:18 +08:00
committed by GitHub
parent 7f2ff83480
commit 49395390be
3 changed files with 64 additions and 1 deletions

View File

@ -149,6 +149,7 @@ import org.apache.doris.load.ExportChecker;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.StreamLoadRecordMgr;
import org.apache.doris.load.loadv2.LoadEtlChecker;
import org.apache.doris.load.loadv2.LoadJobScheduler;
@ -1711,6 +1712,62 @@ public class Env {
return getInternalCatalog().loadDb(dis, checksum);
}
public long saveLoadJob(CountingDataOutputStream dos, long checksum) throws IOException {
// 1. save load.dbToLoadJob, force set to 0 since there should be none load jobs
int jobSize = 0;
checksum ^= jobSize;
dos.writeInt(jobSize);
// 2. save delete jobs
// delete jobs are moved to DeleteHandler. So here we just set job size as 0.
jobSize = 0;
checksum ^= jobSize;
dos.writeInt(jobSize);
// 3. load error hub info
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
param.write(dos);
// 4. save delete load job info
// delete jobs are moved to DeleteHandler. So here we just set job size as 0.
int deleteJobSize = 0;
checksum ^= deleteJobSize;
dos.writeInt(deleteJobSize);
return checksum;
}
public long loadLoadJob(DataInputStream dis, long checksum) throws IOException, DdlException {
// load jobs
int jobSize = dis.readInt();
long newChecksum = checksum ^ jobSize;
if (jobSize > 0) {
LOG.warn("there should be no load jobs, please rollback to 1.2.x and check if there are hadoop load jobs");
throw new RuntimeException("there should be no load jobs, please rollback to 1.2.x "
+ "and check if there are hadoop load jobs");
}
// delete jobs
// Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
jobSize = dis.readInt();
Preconditions.checkState(jobSize == 0, jobSize);
newChecksum ^= jobSize;
// load error hub info
LoadErrorHub.Param param = new LoadErrorHub.Param();
param.readFields(dis);
load.setLoadErrorHubInfo(param);
// 4. load delete jobs
// Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
int deleteJobSize = dis.readInt();
Preconditions.checkState(deleteJobSize == 0, deleteJobSize);
newChecksum ^= deleteJobSize;
LOG.info("finished replay loadJob from image");
return newChecksum;
}
public long loadExportJob(DataInputStream dis, long checksum) throws IOException, DdlException {
long curTime = System.currentTimeMillis();
long newChecksum = checksum;

View File

@ -71,6 +71,12 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveDb", CountingDataOutputStream.class, long.class);
break;
case "loadJob":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadLoadJob", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveLoadJob", CountingDataOutputStream.class, long.class);
break;
case "alterJob":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadAlterJob", DataInputStream.class, long.class);

View File

@ -35,7 +35,7 @@ public class PersistMetaModules {
public static final List<MetaPersistMethod> MODULES_IN_ORDER;
public static final ImmutableList<String> MODULE_NAMES = ImmutableList.of(
"masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin",
"masterInfo", "frontends", "backends", "datasource", "db", "loadJob", "alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "cooldownJob");