[fix](broker) Fix bug that heavy broker load may failed due to BrokerException which indicate the fd is not owned by client (#16350)
Co-authored-by: caiconghui1 <caiconghui1@jd.com>
This commit is contained in:
@ -31,7 +31,7 @@ public class BrokerFileSystem {
|
||||
private ReentrantLock lock;
|
||||
private FileSystemIdentity identity;
|
||||
private FileSystem dfsFileSystem;
|
||||
private long lastAccessTimestamp;
|
||||
private volatile long lastAccessTimestamp;
|
||||
private long createTimestamp;
|
||||
private UUID fileSystemId;
|
||||
|
||||
|
||||
@ -131,7 +131,7 @@ public class ClientContextManager {
|
||||
public void run() {
|
||||
try {
|
||||
for (ClientResourceContext clientContext : clientContexts.values()) {
|
||||
if (System.currentTimeMillis() - clientContext.lastPingTimestamp > clientExpirationSeconds * 1000) {
|
||||
if (System.currentTimeMillis() - clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) {
|
||||
for (TBrokerFD fd : clientContext.inputStreams.keySet()) {
|
||||
ClientContextManager.this.removeInputStream(fd);
|
||||
}
|
||||
@ -139,9 +139,9 @@ public class ClientContextManager {
|
||||
ClientContextManager.this.removeOutputStream(fd);
|
||||
}
|
||||
clientContexts.remove(clientContext.clientId);
|
||||
logger.info("client [" + clientContext.clientId
|
||||
+ "] is expired, remove it from contexts. last ping time is "
|
||||
+ clientContext.lastPingTimestamp);
|
||||
logger.info("client [" + clientContext.clientId
|
||||
+ "] is expired, remove it from contexts. last access time is "
|
||||
+ clientContext.lastAccessTimestamp);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -197,24 +197,28 @@ public class ClientContextManager {
|
||||
private String clientId;
|
||||
private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams;
|
||||
private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams;
|
||||
private long lastPingTimestamp;
|
||||
|
||||
private volatile long lastAccessTimestamp;
|
||||
|
||||
public ClientResourceContext(String clientId) {
|
||||
this.clientId = clientId;
|
||||
this.inputStreams = new ConcurrentHashMap<>();
|
||||
this.outputStreams = new ConcurrentHashMap<>();
|
||||
this.lastPingTimestamp = System.currentTimeMillis();
|
||||
this.lastAccessTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) {
|
||||
updateLastAccessTime();
|
||||
inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem));
|
||||
}
|
||||
|
||||
public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) {
|
||||
updateLastAccessTime();
|
||||
outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem));
|
||||
}
|
||||
|
||||
public FSDataInputStream getInputStream(TBrokerFD fd) {
|
||||
updateLastAccessTime();
|
||||
BrokerInputStream brokerInputStream = inputStreams.get(fd);
|
||||
if (brokerInputStream != null) {
|
||||
return brokerInputStream.getInputStream();
|
||||
@ -223,6 +227,7 @@ public class ClientContextManager {
|
||||
}
|
||||
|
||||
public FSDataOutputStream getOutputStream(TBrokerFD fd) {
|
||||
updateLastAccessTime();
|
||||
BrokerOutputStream brokerOutputStream = outputStreams.get(fd);
|
||||
if (brokerOutputStream != null) {
|
||||
return brokerOutputStream.getOutputStream();
|
||||
@ -230,8 +235,12 @@ public class ClientContextManager {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void updateLastAccessTime() {
|
||||
this.lastAccessTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void updateLastPingTime() {
|
||||
this.lastPingTimestamp = System.currentTimeMillis();
|
||||
this.lastAccessTimestamp = System.currentTimeMillis();
|
||||
// Should we also update the underline filesystem? maybe it is time cost
|
||||
for (BrokerInputStream brokerInputStream : inputStreams.values()) {
|
||||
brokerInputStream.updateLastUpdateAccessTime();
|
||||
|
||||
Reference in New Issue
Block a user