diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java index c8a217d205..22f6925d31 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java @@ -31,7 +31,7 @@ public class BrokerFileSystem { private ReentrantLock lock; private FileSystemIdentity identity; private FileSystem dfsFileSystem; - private long lastAccessTimestamp; + private volatile long lastAccessTimestamp; private long createTimestamp; private UUID fileSystemId; diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index e52f248e11..736f9ae448 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -131,7 +131,7 @@ public class ClientContextManager { public void run() { try { for (ClientResourceContext clientContext : clientContexts.values()) { - if (System.currentTimeMillis() - clientContext.lastPingTimestamp > clientExpirationSeconds * 1000) { + if (System.currentTimeMillis() - clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) { for (TBrokerFD fd : clientContext.inputStreams.keySet()) { ClientContextManager.this.removeInputStream(fd); } @@ -139,9 +139,9 @@ public class ClientContextManager { ClientContextManager.this.removeOutputStream(fd); } clientContexts.remove(clientContext.clientId); - logger.info("client [" + clientContext.clientId - + "] is expired, remove it from contexts. last ping time is " - + clientContext.lastPingTimestamp); + logger.info("client [" + clientContext.clientId + + "] is expired, remove it from contexts. last access time is " + + clientContext.lastAccessTimestamp); } } } finally { @@ -197,24 +197,28 @@ public class ClientContextManager { private String clientId; private ConcurrentHashMap inputStreams; private ConcurrentHashMap outputStreams; - private long lastPingTimestamp; + + private volatile long lastAccessTimestamp; public ClientResourceContext(String clientId) { this.clientId = clientId; this.inputStreams = new ConcurrentHashMap<>(); this.outputStreams = new ConcurrentHashMap<>(); - this.lastPingTimestamp = System.currentTimeMillis(); + this.lastAccessTimestamp = System.currentTimeMillis(); } public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) { + updateLastAccessTime(); inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem)); } public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) { + updateLastAccessTime(); outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem)); } public FSDataInputStream getInputStream(TBrokerFD fd) { + updateLastAccessTime(); BrokerInputStream brokerInputStream = inputStreams.get(fd); if (brokerInputStream != null) { return brokerInputStream.getInputStream(); @@ -223,6 +227,7 @@ public class ClientContextManager { } public FSDataOutputStream getOutputStream(TBrokerFD fd) { + updateLastAccessTime(); BrokerOutputStream brokerOutputStream = outputStreams.get(fd); if (brokerOutputStream != null) { return brokerOutputStream.getOutputStream(); @@ -230,8 +235,12 @@ public class ClientContextManager { return null; } + public void updateLastAccessTime() { + this.lastAccessTimestamp = System.currentTimeMillis(); + } + public void updateLastPingTime() { - this.lastPingTimestamp = System.currentTimeMillis(); + this.lastAccessTimestamp = System.currentTimeMillis(); // Should we also update the underline filesystem? maybe it is time cost for (BrokerInputStream brokerInputStream : inputStreams.values()) { brokerInputStream.updateLastUpdateAccessTime();