From f14e1f9d79c67da448938b3d09a46c602575966f Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 31 Jul 2024 18:09:24 +0800 Subject: [PATCH 1/3] [Fix](hdfs-fs)The cache expiration should explicitly release the held fs - Implement a removal listener to log and close file systems upon removal. - Safely handle IOException during file system closure. - Improved logging for file system closure and handling of potential exceptions. --- .../org/apache/doris/fs/FileSystemCache.java | 3 +- .../fs/remote/RemoteFSPhantomManager.java | 117 ++++++++++++++++++ .../doris/fs/remote/RemoteFileSystem.java | 21 +++- .../RemoteFileSystemPhantomReference.java | 44 +++++++ .../apache/doris/fs/remote/S3FileSystem.java | 8 ++ .../doris/fs/remote/dfs/DFSFileSystem.java | 8 ++ 6 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index e96258dc719fbd..f46ffe44dfdc1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -37,7 +37,8 @@ public class FileSystemCache { public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed CacheFactory fsCacheFactory = new CacheFactory( - OptionalLong.of(86400L), + //fixme just for test + OptionalLong.of(10L), OptionalLong.empty(), Config.max_remote_file_system_cache_num, false, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java new file mode 100644 index 00000000000000..bd57ae64736b12 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.common.CustomThreadFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The RemoteFSPhantomManager class is responsible for managing the phantom references + * of RemoteFileSystem objects. It ensures that the associated FileSystem resources are + * automatically cleaned up when the RemoteFileSystem objects are garbage collected. + *

+ * By utilizing a ReferenceQueue and PhantomReference, this class can monitor the lifecycle + * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in use and is + * garbage collected, its corresponding FileSystem resource is properly closed to prevent + * resource leaks. + *

+ * The class provides a thread-safe mechanism to ensure that the cleanup thread is started only once. + *

+ * Main functionalities include: + * - Registering phantom references of RemoteFileSystem objects. + * - Starting a periodic cleanup thread that automatically closes unused FileSystem resources. + */ +public class RemoteFSPhantomManager { + + private static final Logger LOG = LogManager.getLogger(RemoteFSPhantomManager.class); + + // Scheduled executor for periodic resource cleanup + private static ScheduledExecutorService cleanupExecutor; + + // Reference queue for monitoring RemoteFileSystem objects' phantom references + private static final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + + // Map storing the phantom references and their corresponding FileSystem objects + private static final ConcurrentHashMap, FileSystem> referenceMap + = new ConcurrentHashMap<>(); + + // Flag indicating whether the cleanup thread has been started + private static final AtomicBoolean isStarted = new AtomicBoolean(false); + + /** + * Registers a phantom reference for a RemoteFileSystem object in the manager. + * If the cleanup thread has not been started, it will be started. + * + * @param remoteFileSystem the RemoteFileSystem object to be registered + */ + public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) { + if (!isStarted.get()) { + start(); + isStarted.set(true); + } + RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem, + referenceQueue); + referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); + } + + /** + * Starts the cleanup thread, which periodically checks and cleans up unused FileSystem resources. + * The method uses double-checked locking to ensure thread-safe startup of the cleanup thread. + */ + public static void start() { + if (isStarted.compareAndSet(false, true)) { + synchronized (RemoteFSPhantomManager.class) { + LOG.info("Starting cleanup thread for RemoteFileSystem objects"); + if (cleanupExecutor == null) { + CustomThreadFactory threadFactory = new CustomThreadFactory("remote-fs-phantom-cleanup"); + cleanupExecutor = Executors.newScheduledThreadPool(1, threadFactory); + cleanupExecutor.scheduleAtFixedRate(() -> { + Reference ref; + while ((ref = referenceQueue.poll()) != null) { + RemoteFileSystemPhantomReference phantomRef = (RemoteFileSystemPhantomReference) ref; + + FileSystem fs = referenceMap.remove(phantomRef); + if (fs != null) { + try { + fs.close(); + LOG.info("Closed file system: {}", fs.getUri()); + } catch (IOException e) { + LOG.warn("Failed to close file system", e); + } + } + } + }, 0, 20, TimeUnit.SECONDS); + } + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 3cb8a036c2d5f1..08b7e1cde78bef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -30,15 +30,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; -public abstract class RemoteFileSystem extends PersistentFileSystem { +public abstract class RemoteFileSystem extends PersistentFileSystem implements Closeable { // this field will be visited by multi-threads, better use volatile qualifier protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null; + private final ReentrantLock fsLock = new ReentrantLock(); + protected static final AtomicBoolean closed = new AtomicBoolean(false); public RemoteFileSystem(String name, StorageBackend.StorageType type) { super(name, type); @@ -120,4 +125,18 @@ public Status renameDir(String origFilePath, return rename(origFilePath, destFilePath); } + + @Override + public void close() throws IOException { + fsLock.lock(); + try { + if (!closed.getAndSet(true)) { + if (dfsFileSystem != null) { + dfsFileSystem.close(); + } + } + } finally { + fsLock.unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java new file mode 100644 index 00000000000000..89506c7b212242 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.hadoop.fs.FileSystem; + +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; + +public class RemoteFileSystemPhantomReference extends PhantomReference { + + private FileSystem fs; + + /** + * Creates a new phantom reference that refers to the given object and + * is registered with the given queue. + * + *

It is possible to create a phantom reference with a {@code null} + * queue. Such a reference will never be enqueued. + * + * @param referent the object the new phantom reference will refer to + * @param q the queue with which the reference is to be registered, + * or {@code null} if registration is not required + */ + public RemoteFileSystemPhantomReference(RemoteFileSystem referent, ReferenceQueue q) { + super(referent, q); + this.fs = referent.dfsFileSystem; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 525d80d6797b35..87ba086baecbf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -58,8 +58,15 @@ private void initFsProperties() { @Override protected FileSystem nativeFileSystem(String remotePath) throws UserException { + //todo Extracting a common method to achieve logic reuse + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth()); System.setProperty("com.amazonaws.services.s3.enableV4", "true"); @@ -72,6 +79,7 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException { } catch (Exception e) { throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); } + RemoteFSPhantomManager.registerPhantomReference(this); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 59fbd73bda78cf..7034641a9fc128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -26,6 +26,7 @@ import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; import org.apache.doris.fs.operations.OpParams; +import org.apache.doris.fs.remote.RemoteFSPhantomManager; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -75,8 +76,14 @@ public DFSFileSystem(StorageBackend.StorageType type, Map proper @VisibleForTesting @Override public FileSystem nativeFileSystem(String remotePath) throws UserException { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); for (Map.Entry propEntry : properties.entrySet()) { @@ -96,6 +103,7 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { throw new UserException(e); } operations = new HDFSFileOperations(dfsFileSystem); + RemoteFSPhantomManager.registerPhantomReference(this); } } } From e5e0257ba1666387eb9d1222432acd4478e93676 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 4 Sep 2024 19:10:02 +0800 Subject: [PATCH 2/3] test --- .../java/org/apache/doris/fs/FileSystemCache.java | 11 +++++++---- .../doris/fs/remote/RemoteFSPhantomManager.java | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index f46ffe44dfdc1b..c9fa24e6302a28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -18,12 +18,13 @@ package org.apache.doris.fs; import org.apache.doris.common.CacheFactory; -import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.fs.remote.RemoteFileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.Map; @@ -31,22 +32,24 @@ import java.util.OptionalLong; public class FileSystemCache { - + private static final Logger LOG = LogManager.getLogger(FileSystemCache.class); private final LoadingCache fileSystemCache; public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed CacheFactory fsCacheFactory = new CacheFactory( //fixme just for test - OptionalLong.of(10L), + OptionalLong.of(1L), OptionalLong.empty(), - Config.max_remote_file_system_cache_num, + 1L, false, null); fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { + //fixme test + LOG.info("Create new file system: {}", key.getFsProperties()); return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java index bd57ae64736b12..67eb89b02b30aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -94,6 +94,7 @@ public static void start() { CustomThreadFactory threadFactory = new CustomThreadFactory("remote-fs-phantom-cleanup"); cleanupExecutor = Executors.newScheduledThreadPool(1, threadFactory); cleanupExecutor.scheduleAtFixedRate(() -> { + System.gc(); Reference ref; while ((ref = referenceQueue.poll()) != null) { RemoteFileSystemPhantomReference phantomRef = (RemoteFileSystemPhantomReference) ref; @@ -108,7 +109,7 @@ public static void start() { } } } - }, 0, 20, TimeUnit.SECONDS); + }, 0, 1, TimeUnit.SECONDS); } } } From da6709ee58acaad4b39134b494689d1659e52c2b Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 5 Sep 2024 10:29:34 +0800 Subject: [PATCH 3/3] final version --- .../java/org/apache/doris/fs/FileSystemCache.java | 12 ++++-------- .../doris/fs/remote/RemoteFSPhantomManager.java | 3 +-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index c9fa24e6302a28..e96258dc719fbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -18,13 +18,12 @@ package org.apache.doris.fs; import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.fs.remote.RemoteFileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.hadoop.conf.Configuration; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.Map; @@ -32,24 +31,21 @@ import java.util.OptionalLong; public class FileSystemCache { - private static final Logger LOG = LogManager.getLogger(FileSystemCache.class); + private final LoadingCache fileSystemCache; public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed CacheFactory fsCacheFactory = new CacheFactory( - //fixme just for test - OptionalLong.of(1L), + OptionalLong.of(86400L), OptionalLong.empty(), - 1L, + Config.max_remote_file_system_cache_num, false, null); fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - //fixme test - LOG.info("Create new file system: {}", key.getFsProperties()); return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java index 67eb89b02b30aa..282361c4cb63b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -94,7 +94,6 @@ public static void start() { CustomThreadFactory threadFactory = new CustomThreadFactory("remote-fs-phantom-cleanup"); cleanupExecutor = Executors.newScheduledThreadPool(1, threadFactory); cleanupExecutor.scheduleAtFixedRate(() -> { - System.gc(); Reference ref; while ((ref = referenceQueue.poll()) != null) { RemoteFileSystemPhantomReference phantomRef = (RemoteFileSystemPhantomReference) ref; @@ -109,7 +108,7 @@ public static void start() { } } } - }, 0, 1, TimeUnit.SECONDS); + }, 0, 1, TimeUnit.MINUTES); } } }