diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java new file mode 100644 index 00000000000000..282361c4cb63b0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.common.CustomThreadFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The RemoteFSPhantomManager class is responsible for managing the phantom references + * of RemoteFileSystem objects. It ensures that the associated FileSystem resources are + * automatically cleaned up when the RemoteFileSystem objects are garbage collected. + *
+ * By utilizing a ReferenceQueue and PhantomReference, this class can monitor the lifecycle + * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in use and is + * garbage collected, its corresponding FileSystem resource is properly closed to prevent + * resource leaks. + *
+ * The class provides a thread-safe mechanism to ensure that the cleanup thread is started only once. + *
+ * Main functionalities include:
+ * - Registering phantom references of RemoteFileSystem objects.
+ * - Starting a periodic cleanup thread that automatically closes unused FileSystem resources.
+ */
+public class RemoteFSPhantomManager {
+
+ private static final Logger LOG = LogManager.getLogger(RemoteFSPhantomManager.class);
+
+ // Scheduled executor for periodic resource cleanup
+ private static ScheduledExecutorService cleanupExecutor;
+
+ // Reference queue for monitoring RemoteFileSystem objects' phantom references
+ private static final ReferenceQueue It is possible to create a phantom reference with a {@code null}
+ * queue. Such a reference will never be enqueued.
+ *
+ * @param referent the object the new phantom reference will refer to
+ * @param q the queue with which the reference is to be registered,
+ * or {@code null} if registration is not required
+ */
+ public RemoteFileSystemPhantomReference(RemoteFileSystem referent, ReferenceQueue super RemoteFileSystem> q) {
+ super(referent, q);
+ this.fs = referent.dfsFileSystem;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 525d80d6797b35..87ba086baecbf1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -58,8 +58,15 @@ private void initFsProperties() {
@Override
protected FileSystem nativeFileSystem(String remotePath) throws UserException {
+ //todo Extracting a common method to achieve logic reuse
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
+ }
if (dfsFileSystem == null) {
synchronized (this) {
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
+ }
if (dfsFileSystem == null) {
Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
@@ -72,6 +79,7 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
+ RemoteFSPhantomManager.registerPhantomReference(this);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 59fbd73bda78cf..7034641a9fc128 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -26,6 +26,7 @@
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
+import org.apache.doris.fs.remote.RemoteFSPhantomManager;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -75,8 +76,14 @@ public DFSFileSystem(StorageBackend.StorageType type, Map