[fix](split) FileSystemCacheKey are always different in overload equals (#36431)

bp: #36432
## Proposed changes

## Fixed Bugs introduced from #34307
1. `FileSystemCacheKey.equals()` compares properties by `==`, resulting
in creating new file system in each partition
2. `dfsFileSystem` is not synchronized, resulting in creating more file
systems than need.
3. `jobConf.iterator()` will produce more than 2000 pairs of key-value
This commit is contained in:
Ashin Gau
2024-06-20 10:08:05 +08:00
committed by GitHub
parent dabd27edd2
commit 7b36e81b7a
4 changed files with 70 additions and 49 deletions

View File

@ -349,11 +349,11 @@ public class HiveMetaStoreCache {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
Map<String, String> properties = new HashMap<>();
jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), properties, bindBrokerName));
location, bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
@ -781,12 +781,12 @@ public class HiveMetaStoreCache {
return Collections.emptyList();
}
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
Map<String, String> properties = new HashMap<>();
jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), properties, bindBrokerName));
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
@ -806,12 +806,10 @@ public class HiveMetaStoreCache {
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
Map<String, String> properties = new HashMap<>();
jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
properties, bindBrokerName));
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
@ -833,12 +831,10 @@ public class HiveMetaStoreCache {
// base
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
Map<String, String> properties = new HashMap<>();
jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
properties, bindBrokerName));
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {

View File

@ -23,14 +23,16 @@ import org.apache.doris.common.Pair;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
public class FileSystemCache {
private LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;
private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;
public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
@ -40,11 +42,11 @@ public class FileSystemCache {
Config.max_remote_file_system_cache_num,
false,
null);
fileSystemCache = fsCacheFactory.buildCache(key -> loadFileSystem(key));
fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem);
}
private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
return FileSystemFactory.getRemoteFileSystem(key.type, key.properties, key.bindBrokerName);
return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName);
}
public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@ -57,13 +59,32 @@ public class FileSystemCache {
private final String fsIdent;
private final Map<String, String> properties;
private final String bindBrokerName;
// only for creating new file system
private final Configuration conf;
public FileSystemCacheKey(Pair<FileSystemType, String> fs,
Map<String, String> properties, String bindBrokerName) {
Map<String, String> properties,
String bindBrokerName,
Configuration conf) {
this.type = fs.first;
this.fsIdent = fs.second;
this.properties = properties;
this.bindBrokerName = bindBrokerName;
this.conf = conf;
}
public FileSystemCacheKey(Pair<FileSystemType, String> fs,
Map<String, String> properties, String bindBrokerName) {
this(fs, properties, bindBrokerName, null);
}
public Map<String, String> getFsProperties() {
if (conf == null) {
return properties;
}
Map<String, String> result = new HashMap<>();
conf.iterator().forEachRemaining(e -> result.put(e.getKey(), e.getValue()));
return result;
}
@Override
@ -74,13 +95,14 @@ public class FileSystemCache {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& properties == ((FileSystemCacheKey) obj).properties;
FileSystemCacheKey o = (FileSystemCacheKey) obj;
boolean equalsWithoutBroker = type.equals(o.type)
&& fsIdent.equals(o.fsIdent)
&& properties.equals(o.properties);
if (bindBrokerName == null) {
return equalsWithoutBroker;
return equalsWithoutBroker && o.bindBrokerName == null;
}
return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
return equalsWithoutBroker && bindBrokerName.equals(o.bindBrokerName);
}
@Override

View File

@ -58,16 +58,20 @@ public class S3FileSystem extends ObjFileSystem {
@Override
protected FileSystem nativeFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
// the entry value in properties may be null, and
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
try {
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
synchronized (this) {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
// the entry value in properties may be null, and
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
try {
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
}
}
}
return dfsFileSystem;

View File

@ -30,7 +30,6 @@ import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -73,25 +72,25 @@ public class DFSFileSystem extends RemoteFileSystem {
@VisibleForTesting
@Override
public FileSystem nativeFileSystem(String remotePath) throws UserException {
if (dfsFileSystem != null) {
return dfsFileSystem;
}
if (dfsFileSystem == null) {
synchronized (this) {
if (dfsFileSystem == null) {
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
}
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
}
dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
try {
return FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
try {
return FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
operations = new HDFSFileOperations(dfsFileSystem);
}
}
});
Preconditions.checkNotNull(dfsFileSystem);
operations = new HDFSFileOperations(dfsFileSystem);
}
return dfsFileSystem;
}