make replayedJournalId to be AtomicLong (#87)

Avoid errors during concurrent access to Catalog::replayedJournalId.
This commit is contained in:
LingBin
2017-09-12 20:45:30 +08:00
committed by morningman
parent bbed1b3b15
commit b175a08363

View File

@ -255,8 +255,8 @@ public class Catalog {
private String metaDir;
private EditLog editLog;
private int clusterId;
private long replayedJournalId; // For checkpoint and observer memory
// replayed marker
// For checkpoint and observer memory replayed marker
private AtomicLong replayedJournalId;
private static Catalog CHECKPOINT = null;
private static long checkpointThreadId = -1;
@ -329,7 +329,7 @@ public class Catalog {
this.canWrite = false;
this.canRead = false;
this.replayedJournalId = 0;
this.replayedJournalId = new AtomicLong(0L);
this.isMaster = false;
this.isElectable = false;
this.synchronizedTimeMs = 0;
@ -464,7 +464,7 @@ public class Catalog {
}
}
// 3. get cluster id and role (Observer or Replica)
// 3. get cluster id and role (Observer or Follower)
getClusterIdAndRole();
// 4. Load image first and replay edits
@ -472,6 +472,7 @@ public class Catalog {
loadImage(IMAGE_DIR); // load image file
editLog.open(); // open bdb env or local output stream
this.userPropertyMgr.setEditLog(editLog);
// 5. start load label cleaner thread
createCleaner();
cleaner.setName("labelCleaner");
@ -532,10 +533,9 @@ public class Catalog {
frontends.add(self);
}
} else {
storage = new Storage(IMAGE_DIR);
clusterId = storage.getClusterID();
}
} else {
} else {
// Designate one helper node. Get the roll and version info
// from the helper node
Storage storage = null;
@ -566,19 +566,19 @@ public class Catalog {
storage.writeFrontendRole(role);
}
if (!versionFile.exists()) {
// If the version file doesn't exist, download it from helper
// node
// If the version file doesn't exist, download it from helper node
if (!getVersionFile()) {
LOG.error("fail to download version file from " + helperNode.first + " will exit.");
System.exit(-1);
}
// NOTE: cluster_id will be init when Storage object is constructed,
// so we new one.
storage = new Storage(IMAGE_DIR);
clusterId = storage.getClusterID();
} else {
// If the version file exist, read the cluster id and check the
// id with helper node
// to make sure they are identical
// id with helper node to make sure they are identical
clusterId = storage.getClusterID();
try {
URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check");
@ -593,7 +593,8 @@ public class Catalog {
System.exit(-1);
}
} catch (Exception e) {
LOG.warn(e);
LOG.warn("fail to check cluster_id from helper node.", e);
System.exit(-1);
}
}
@ -884,7 +885,7 @@ public class Catalog {
LOG.info("image does not exist: {}", curFile.getAbsolutePath());
return;
}
replayedJournalId = storage.getImageSeq();
replayedJournalId.set(storage.getImageSeq());
LOG.info("start load image from {}. is ckpt: {}", curFile.getAbsolutePath(), Catalog.isCheckpointThread());
long loadImageStartTime = System.currentTimeMillis();
DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
@ -1253,9 +1254,9 @@ public class Catalog {
public void saveImage() throws IOException {
// Write image.ckpt
Storage storage = new Storage(IMAGE_DIR);
File curFile = storage.getImageFile(replayedJournalId);
File curFile = storage.getImageFile(replayedJournalId.get());
File ckpt = new File(IMAGE_DIR, Storage.IMAGE_NEW);
saveImage(ckpt, replayedJournalId);
saveImage(ckpt, replayedJournalId.get());
// Move image.ckpt to image.dataVersion
LOG.info("Move " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath());
@ -1761,14 +1762,14 @@ public class Catalog {
if (toJournalId == -1) {
toJournalId = getMaxJournalId();
}
if (toJournalId <= replayedJournalId) {
if (toJournalId <= replayedJournalId.get()) {
return false;
}
LOG.info("replayed journal id is {}, replay to journal id is {}", replayedJournalId, toJournalId);
JournalCursor cursor = editLog.read(replayedJournalId + 1, toJournalId);
JournalCursor cursor = editLog.read(replayedJournalId.get() + 1, toJournalId);
if (cursor == null) {
LOG.warn("failed to get cursor from {} to {}", replayedJournalId + 1, toJournalId);
LOG.warn("failed to get cursor from {} to {}", replayedJournalId.get() + 1, toJournalId);
return false;
}
@ -1781,10 +1782,10 @@ public class Catalog {
}
hasLog = true;
EditLog.loadJournal(this, entity);
replayedJournalId++;
replayedJournalId.incrementAndGet();
LOG.debug("journal {} replayed.", replayedJournalId);
if (!isMaster) {
journalObservable.notifyObservers(replayedJournalId);
journalObservable.notifyObservers(replayedJournalId.get());
}
}
long cost = System.currentTimeMillis() - startTime;
@ -3699,13 +3700,13 @@ public class Catalog {
final Cluster cluster = nameToCluster.get(clusterName);
if (cluster == null) {
throw new AnalysisException("No cluster selected");
}
}
List<String> dbNames = Lists.newArrayList(cluster.getDbNames());
return dbNames;
} finally {
readUnlock();
}
}
}
}
public List<Long> getDbIds() {
readLock();
@ -3866,7 +3867,7 @@ public class Catalog {
}
public long getReplayedJournalId() {
return this.replayedJournalId;
return this.replayedJournalId.get();
}
public HAProtocol getHaProtocol() {
@ -4268,7 +4269,7 @@ public class Catalog {
}
/*
* used for handling AlterClusterStmt
* used for handling AlterClusterStmt
* (for client is the ALTER CLUSTER command).
*/
public void alterCluster(AlterSystemStmt stmt) throws DdlException, InternalException {
@ -4603,7 +4604,7 @@ public class Catalog {
.append(backend.getHeartbeatPort()).toString());
}
// here we reuse the process of decommission backends. but set backend's decommission type to
// here we reuse the process of decommission backends. but set backend's decommission type to
// ClusterDecommission, which means this backend will not be removed from the system
// after decommission is done.
final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
@ -4717,7 +4718,7 @@ public class Catalog {
/**
* return max replicationNum of a db
*
*
* @param db
* @return
*/
@ -4940,10 +4941,10 @@ public class Catalog {
final Cluster cluster = new Cluster();
cluster.readFields(dis);
checksum ^= cluster.getId();
// BE is in default_cluster when added , therefore it is possible that the BE
// in default_cluster are not the latest because cluster cant't be updated when
// loadCluster is after loadBackend. Because of forgeting to remove BE's id in
// BE is in default_cluster when added , therefore it is possible that the BE
// in default_cluster are not the latest because cluster cant't be updated when
// loadCluster is after loadBackend. Because of forgeting to remove BE's id in
// cluster when drop BE or decommission in latest versions, need to update cluster's
// BE.
List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
@ -5075,11 +5076,11 @@ public class Catalog {
final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
cluster.removeBackend(id);
} finally {
writeUnlock();
writeUnlock();
}
backend.setDecommissioned(false);
backend.clearClusterName();
backend.setBackendState(BackendState.free);
backend.setBackendState(BackendState.free);
}
}