diff --git a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf index 8466de02cdd1ee..e023067cf04daf 100644 --- a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf +++ b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf @@ -27,7 +27,7 @@ broker_ipc_port = 8000 # client session will be deleted if not receive ping after this time -client_expire_seconds = 1800 +client_expire_seconds = 3600 # Advanced configurations # sys_log_dir = ${BROKER_HOME}/log diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java index 6381f16db25017..aa91b9d39391be 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java @@ -21,19 +21,19 @@ public class BrokerConfig extends ConfigBase { - + @ConfField public static int hdfs_read_buffer_size_kb = 1024; - + @ConfField public static int hdfs_write_buffer_size_kb = 1024; - + @ConfField - public static int client_expire_seconds = 1800; - + public static int client_expire_seconds = 3600; + @ConfField public static int broker_ipc_port = 8000; - + @ConfField public static String sys_log_dir = System.getenv("BROKER_HOME") + "/log"; @ConfField diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java index 7b9f6cc6798b24..c8a217d2059295 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java @@ -27,26 +27,29 @@ public class BrokerFileSystem { private static Logger logger = Logger .getLogger(BrokerFileSystem.class.getName()); - + private ReentrantLock lock; private FileSystemIdentity identity; private FileSystem dfsFileSystem; private long lastAccessTimestamp; + private long createTimestamp; private UUID fileSystemId; - + public BrokerFileSystem(FileSystemIdentity identity) { this.identity = identity; this.lock = new ReentrantLock(); this.dfsFileSystem = null; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); this.fileSystemId = UUID.randomUUID(); } - + public synchronized void setFileSystem(FileSystem fileSystem) { this.dfsFileSystem = fileSystem; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); } - + public void closeFileSystem() { lock.lock(); try { @@ -63,31 +66,32 @@ public void closeFileSystem() { lock.unlock(); } } - + public FileSystem getDFSFileSystem() { this.lastAccessTimestamp = System.currentTimeMillis(); return dfsFileSystem; } - + public void updateLastUpdateAccessTime() { this.lastAccessTimestamp = System.currentTimeMillis(); } - + public FileSystemIdentity getIdentity() { return identity; } - + public ReentrantLock getLock() { return lock; } - - public boolean isExpired(long expirationIntervalSecs) { - if (System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000) { - return true; - } - return false; + + public boolean isExpiredByLastAccessTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000; } - + + public boolean isExpiredByCreateTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - createTimestamp > expirationIntervalSecs * 1000; + } + @Override public String toString() { return "BrokerFileSystem [identity=" + identity + ", dfsFileSystem=" diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index 59f0e974b421e9..e52f248e11f785 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -31,19 +31,19 @@ public class ClientContextManager { private static Logger logger = Logger - .getLogger(ClientContextManager.class.getName()); + .getLogger(ClientContextManager.class.getName()); private ScheduledExecutorService executorService; private ConcurrentHashMap clientContexts; private ConcurrentHashMap fdToClientMap; private int clientExpirationSeconds = BrokerConfig.client_expire_seconds; - + public ClientContextManager(ScheduledExecutorService executorService) { clientContexts = new ConcurrentHashMap<>(); fdToClientMap = new ConcurrentHashMap<>(); this.executorService = executorService; this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS); } - + public void onPing(String clientId) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); @@ -51,9 +51,9 @@ public void onPing(String clientId) { ClientResourceContext clientContext = clientContexts.get(clientId); clientContext.updateLastPingTime(); } - - public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -61,9 +61,9 @@ public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDat clientContext.putOutputStream(fd, fsDataOutputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - - public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -71,29 +71,29 @@ public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSData clientContext.putInputStream(fd, fsDataInputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - + public synchronized FSDataInputStream getFsDataInputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataInputStream fsDataInputStream = clientContext.getInputStream(fd); return fsDataInputStream; } - + public synchronized FSDataOutputStream getFsDataOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataOutputStream fsDataOutputStream = clientContext.getOutputStream(fd); return fsDataOutputStream; } - + public synchronized void removeInputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -109,7 +109,7 @@ public synchronized void removeInputStream(TBrokerFD fd) { logger.error("errors while close file data input stream", e); } } - + public synchronized void removeOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -125,7 +125,7 @@ public synchronized void removeOutputStream(TBrokerFD fd) { logger.error("errors while close file data output stream", e); } } - + class CheckClientExpirationTask implements Runnable { @Override public void run() { @@ -139,9 +139,9 @@ public void run() { ClientContextManager.this.removeOutputStream(fd); } clientContexts.remove(clientContext.clientId); - logger.info("client [" + clientContext.clientId - + "] is expired, remove it from contexts. last ping time is " - + clientContext.lastPingTimestamp); + logger.info("client [" + clientContext.clientId + + "] is expired, remove it from contexts. last ping time is " + + clientContext.lastPingTimestamp); } } } finally { @@ -149,71 +149,71 @@ public void run() { } } } - + private static class BrokerOutputStream { - + private final FSDataOutputStream outputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerOutputStream(FSDataOutputStream outputStream, BrokerFileSystem brokerFileSystem) { this.outputStream = outputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataOutputStream getOutputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return outputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + private static class BrokerInputStream { - + private final FSDataInputStream inputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) { this.inputStream = inputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataInputStream getInputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return inputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + static class ClientResourceContext { private String clientId; private ConcurrentHashMap inputStreams; private ConcurrentHashMap outputStreams; private long lastPingTimestamp; - + public ClientResourceContext(String clientId) { this.clientId = clientId; this.inputStreams = new ConcurrentHashMap<>(); this.outputStreams = new ConcurrentHashMap<>(); this.lastPingTimestamp = System.currentTimeMillis(); } - + public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) { inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem)); } - + public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) { outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem)); } - + public FSDataInputStream getInputStream(TBrokerFD fd) { BrokerInputStream brokerInputStream = inputStreams.get(fd); if (brokerInputStream != null) { @@ -221,7 +221,7 @@ public FSDataInputStream getInputStream(TBrokerFD fd) { } return null; } - + public FSDataOutputStream getOutputStream(TBrokerFD fd) { BrokerOutputStream brokerOutputStream = outputStreams.get(fd); if (brokerOutputStream != null) { @@ -229,14 +229,14 @@ public FSDataOutputStream getOutputStream(TBrokerFD fd) { } return null; } - + public void updateLastPingTime() { this.lastPingTimestamp = System.currentTimeMillis(); // Should we also update the underline filesystem? maybe it is time cost for (BrokerInputStream brokerInputStream : inputStreams.values()) { brokerInputStream.updateLastUpdateAccessTime(); } - + for (BrokerOutputStream brokerOutputStream : outputStreams.values()) { brokerOutputStream.updateLastUpdateAccessTime(); } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 262875de25ae36..c6974e70a2da8a 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -58,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; public class FileSystemManager { @@ -108,13 +107,13 @@ public class FileSystemManager { private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache"; private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); - + private int readBufferSize = 128 << 10; // 128k private int writeBufferSize = 128 << 10; // 128k - + private ConcurrentHashMap cachedFileSystem; private ClientContextManager clientContextManager; - + public FileSystemManager() { cachedFileSystem = new ConcurrentHashMap<>(); clientContextManager = new ClientContextManager(handleManagementPool); @@ -140,15 +139,15 @@ private static String preparePrincipal(String originalPrincipal) throws UnknownH return finalPrincipal; } - + /** * visible for test - * + * * @param path * @param properties * @return BrokerFileSystem with different FileSystem based on scheme - * @throws URISyntaxException - * @throws Exception + * @throws URISyntaxException + * @throws Exception */ public BrokerFileSystem getFileSystem(String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); @@ -240,7 +239,7 @@ public BrokerFileSystem getDistributedFileSystem(String path, Map proper String disableCache = properties.getOrDefault(FS_S3A_IMPL_DISABLE_CACHE, "true"); String s3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi); - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -440,7 +438,7 @@ public BrokerFileSystem getKS3FileSystem(String path, Map proper String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); String ks3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -516,7 +514,7 @@ public BrokerFileSystem getChdfsFileSystem(String path, Map prop e.getMessage()); } } - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { // create a new filesystem @@ -618,12 +616,12 @@ public List listPath(String path, boolean fileNameOnly, Map properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -633,16 +631,16 @@ public void deletePath(String path, Map properties) { } catch (IOException e) { logger.error("errors while delete path " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "delete path {} error", path); } } - + public void renamePath(String srcPath, String destPath, Map properties) { WildcardURI srcPathUri = new WildcardURI(srcPath); WildcardURI destPathUri = new WildcardURI(destPath); if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, "only allow rename in same file system"); } BrokerFileSystem fileSystem = getFileSystem(srcPath, properties); @@ -651,17 +649,17 @@ public void renamePath(String srcPath, String destPath, Map prop try { boolean isRenameSuccess = fileSystem.getDFSFileSystem().rename(srcfilePath, destfilePath); if (!isRenameSuccess) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "failed to rename path from {} to {}", srcPath, destPath); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "failed to rename path from {} to {}", srcPath, destPath); } } catch (IOException e) { logger.error("errors while rename path from " + srcPath + " to " + destPath); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while rename {} to {}", srcPath, destPath); } } - + public boolean checkPathExist(String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -672,11 +670,11 @@ public boolean checkPathExist(String path, Map properties) { } catch (IOException e) { logger.error("errors while check path exist: " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while check if path {} exist", path); } } - + public TBrokerFD openReader(String clientId, String path, long startOffset, Map properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); @@ -691,11 +689,11 @@ public TBrokerFD openReader(String clientId, String path, long startOffset, Map< } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public ByteBuffer pread(TBrokerFD fd, long offset, long length) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -746,12 +744,12 @@ public ByteBuffer pread(TBrokerFD fd, long offset, long length) { } } } - + public void seek(TBrokerFD fd, long offset) { - throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, + throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, "seek this method is not supported"); } - + public void closeReader(TBrokerFD fd) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -766,13 +764,13 @@ public void closeReader(TBrokerFD fd) { } } } - + public TBrokerFD openWriter(String clientId, String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); BrokerFileSystem fileSystem = getFileSystem(path, properties); try { - FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, + FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, true, writeBufferSize); UUID uuid = UUID.randomUUID(); TBrokerFD fd = parseUUIDToFD(uuid); @@ -781,11 +779,11 @@ public TBrokerFD openWriter(String clientId, String path, Map pr } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public void pwrite(TBrokerFD fd, long offset, byte[] data) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -811,7 +809,7 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) { } } } - + public void closeWriter(TBrokerFD fd) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -827,11 +825,11 @@ public void closeWriter(TBrokerFD fd) { } } } - + public void ping(String clientId) { clientContextManager.onPing(clientId); } - + private static TBrokerFD parseUUIDToFD(UUID uuid) { return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } @@ -849,23 +847,41 @@ private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException return readLength; } - private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity) { - BrokerFileSystem brokerFileSystem = null; + /** + * In view of the different expiration mechanisms of different authentication modes, + * there are two ways to determine whether BrokerFileSystem has expired: + * 1. For the authentication mode of Kerberos and S3 aksk, use the createTime to determine whether it expires + * 2. For other authentication modes, the lastAccessTime is used to determine whether it has expired + */ + private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity, Map properties) { + BrokerFileSystem brokerFileSystem; if (cachedFileSystem.containsKey(fileSystemIdentity)) { brokerFileSystem = cachedFileSystem.get(fileSystemIdentity); - if (brokerFileSystem.isExpired(BrokerConfig.client_expire_seconds)) { - logger.info("file system " + brokerFileSystem + " is expired, close and remove it"); + if (properties.containsKey(KERBEROS_KEYTAB) && properties.containsKey(KERBEROS_PRINCIPAL)) { + if (brokerFileSystem.isExpiredByCreateTime(BrokerConfig.client_expire_seconds)) { + logger.info("file system " + brokerFileSystem + " is expired, update it."); + try { + Configuration conf = new HdfsConfiguration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, AUTHENTICATION_KERBEROS); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI( + preparePrincipal(properties.get(KERBEROS_PRINCIPAL)), properties.get(KERBEROS_KEYTAB)); + // update FileSystem TGT + ugi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + logger.error("errors while checkTGTAndReloginFromKeytab: ", e); + } + } + } else if (brokerFileSystem.isExpiredByLastAccessTime(BrokerConfig.client_expire_seconds)) { brokerFileSystem.getLock().lock(); try { + logger.info("file system " + brokerFileSystem + " is expired, update it."); brokerFileSystem.closeFileSystem(); - } catch (Throwable t) { - logger.error("errors while close file system", t); - } finally { - cachedFileSystem.remove(brokerFileSystem.getIdentity()); brokerFileSystem.getLock().unlock(); - brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); - cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); + } catch (Throwable t) { + logger.error("errors while close file system: ", t); } + brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); + cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); } } else { brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);