[Enhance](broker) add inputstream expire scheduled checker to avoid memory leak for broker scan (#28589)

This pr introduces 2 broker conf:

1. enable_input_stream_expire_check: which indicates whether enable inputStream expire check.
2. input_stream_expire_seconds: which indicates the timeout seconds for inputStream since last update.
This commit is contained in:
DuRipeng
2023-12-19 19:24:29 +08:00
committed by GitHub
parent 9c9249e911
commit 0883d47832
3 changed files with 62 additions and 8 deletions

View File

@ -31,6 +31,12 @@ public class BrokerConfig extends ConfigBase {
@ConfField
public static int client_expire_seconds = 3600;
@ConfField
public static boolean enable_input_stream_expire_check = false;
@ConfField
public static int input_stream_expire_seconds = 300;
@ConfField
public static int broker_ipc_port = 8000;
}

View File

@ -17,9 +17,13 @@
package org.apache.doris.broker.hdfs;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -32,16 +36,21 @@ public class ClientContextManager {
private static Logger logger = Logger
.getLogger(ClientContextManager.class.getName());
private ScheduledExecutorService executorService;
private ScheduledExecutorService clientCheckExecutorService;
private ScheduledExecutorService inputStreamCheckExecuterService;
private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;
public ClientContextManager(ScheduledExecutorService executorService) {
public ClientContextManager() {
clientContexts = new ConcurrentHashMap<>();
fdToClientMap = new ConcurrentHashMap<>();
this.executorService = executorService;
this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
this.clientCheckExecutorService = Executors.newScheduledThreadPool(2);
this.clientCheckExecutorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
if (BrokerConfig.enable_input_stream_expire_check) {
this.inputStreamCheckExecuterService = Executors.newScheduledThreadPool(2);
this.inputStreamCheckExecuterService.schedule(new CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS);
}
}
public void onPing(String clientId) {
@ -126,6 +135,25 @@ public class ClientContextManager {
}
}
public synchronized void remoteExpireInputStreams() {
int inputStreamExpireSeconds = BrokerConfig.input_stream_expire_seconds;
TBrokerFD fd;
for (ClientResourceContext clientContext : clientContexts.values()) {
Iterator<Entry<TBrokerFD, BrokerInputStream>> iter = clientContext.inputStreams.entrySet().iterator();
while (iter.hasNext()) {
Entry<TBrokerFD, BrokerInputStream> entry = iter.next();
fd = entry.getKey();
if (entry.getValue().checkExpire(inputStreamExpireSeconds)) {
ClientContextManager.this.removeInputStream(fd);
}
iter.remove();
logger.info(fd + " in client [" + clientContext.clientId
+ "] is expired, remove it from contexts. last update time is "
+ entry.getValue().getLastPingTimestamp());
}
}
}
class CheckClientExpirationTask implements Runnable {
@Override
public void run() {
@ -145,7 +173,18 @@ public class ClientContextManager {
}
}
} finally {
ClientContextManager.this.executorService.schedule(this, 60, TimeUnit.SECONDS);
ClientContextManager.this.clientCheckExecutorService.schedule(this, 60, TimeUnit.SECONDS);
}
}
}
class CheckInputStreamExpirationTask implements Runnable {
@Override
public void run() {
try {
ClientContextManager.this.remoteExpireInputStreams();
} finally {
ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60, TimeUnit.SECONDS);
}
}
}
@ -175,21 +214,32 @@ public class ClientContextManager {
private final FSDataInputStream inputStream;
private final BrokerFileSystem brokerFileSystem;
private AtomicLong lastPingTimestamp;
public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) {
this.inputStream = inputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
this.lastPingTimestamp = new AtomicLong(System.currentTimeMillis());
}
public FSDataInputStream getInputStream() {
this.brokerFileSystem.updateLastUpdateAccessTime();
this.lastPingTimestamp.set(System.currentTimeMillis());
return inputStream;
}
public void updateLastUpdateAccessTime() {
this.brokerFileSystem.updateLastUpdateAccessTime();
}
public boolean checkExpire(long expireSecond) {
return System.currentTimeMillis() - lastPingTimestamp.get() > expireSecond * 1000;
}
public long getLastPingTimestamp() {
return lastPingTimestamp.get();
}
}
static class ClientResourceContext {

View File

@ -163,8 +163,6 @@ public class FileSystemManager {
private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method";
private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
private int writeBufferSize = 128 << 10; // 128k
@ -173,7 +171,7 @@ public class FileSystemManager {
public FileSystemManager() {
cachedFileSystem = new ConcurrentHashMap<>();
clientContextManager = new ClientContextManager(handleManagementPool);
clientContextManager = new ClientContextManager();
readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
}