From d9e3536ad4eebdd65bf77a485a0e78f21fdf8d6e Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 31 Jul 2024 18:09:24 +0800 Subject: [PATCH] [Fix](hdfs-fs)The cache expiration should explicitly release the held fs - Implement a removal listener to log and close file systems upon removal. - Safely handle IOException during file system closure. - Improved logging for file system closure and handling of potential exceptions. --- .../org/apache/doris/fs/FileSystemCache.java | 30 +++++++++++++++---- .../doris/fs/remote/RemoteFileSystem.java | 21 ++++++++++++- .../apache/doris/fs/remote/S3FileSystem.java | 7 +++++ .../doris/fs/remote/dfs/DFSFileSystem.java | 6 ++++ 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index e96258dc719fbd2..d1e846b662b0315 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -19,19 +19,26 @@ import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.Pair; import org.apache.doris.fs.remote.RemoteFileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class FileSystemCache { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemCache.class); private final LoadingCache fileSystemCache; public FileSystemCache() { @@ -42,7 +49,20 @@ public FileSystemCache() { Config.max_remote_file_system_cache_num, false, null); - fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem); + CustomThreadFactory threadFactory = new CustomThreadFactory("fs-cache-thread"); + ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory); + fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem, (key, fs, removalCause) -> { + if (key != null) { + LOG.info("Close file system: {}", key.fsIdent); + } + try { + if (fs != null) { + fs.close(); + } + } catch (IOException e) { + LOG.warn("Failed to close file system", e); + } + }, executor); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { @@ -63,9 +83,9 @@ public static class FileSystemCacheKey { private final Configuration conf; public FileSystemCacheKey(Pair fs, - Map properties, - String bindBrokerName, - Configuration conf) { + Map properties, + String bindBrokerName, + Configuration conf) { this.type = fs.first; this.fsIdent = fs.second; this.properties = properties; @@ -74,7 +94,7 @@ public FileSystemCacheKey(Pair fs, } public FileSystemCacheKey(Pair fs, - Map properties, String bindBrokerName) { + Map properties, String bindBrokerName) { this(fs, properties, bindBrokerName, null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 3cb8a036c2d5f17..08b7e1cde78bef3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -30,15 +30,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; -public abstract class RemoteFileSystem extends PersistentFileSystem { +public abstract class RemoteFileSystem extends PersistentFileSystem implements Closeable { // this field will be visited by multi-threads, better use volatile qualifier protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null; + private final ReentrantLock fsLock = new ReentrantLock(); + protected static final AtomicBoolean closed = new AtomicBoolean(false); public RemoteFileSystem(String name, StorageBackend.StorageType type) { super(name, type); @@ -120,4 +125,18 @@ public Status renameDir(String origFilePath, return rename(origFilePath, destFilePath); } + + @Override + public void close() throws IOException { + fsLock.lock(); + try { + if (!closed.getAndSet(true)) { + if (dfsFileSystem != null) { + dfsFileSystem.close(); + } + } + } finally { + fsLock.unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 525d80d6797b353..aa7b9827de85866 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -58,8 +58,15 @@ private void initFsProperties() { @Override protected FileSystem nativeFileSystem(String remotePath) throws UserException { + //todo Extracting a common method to achieve logic reuse + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth()); System.setProperty("com.amazonaws.services.s3.enableV4", "true"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 59fbd73bda78cf1..de31dc40d508ad5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -75,8 +75,14 @@ public DFSFileSystem(StorageBackend.StorageType type, Map proper @VisibleForTesting @Override public FileSystem nativeFileSystem(String remotePath) throws UserException { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); for (Map.Entry propEntry : properties.entrySet()) {