From f70f8cd5dd005f22e5cc0b7cb1a99b69322d0265 Mon Sep 17 00:00:00 2001 From: HB Date: Sun, 18 Sep 2022 17:46:13 +0800 Subject: [PATCH] [fix](broker-load) Correction of kerberos authentication time determination rule (#11793) Every time a new broker load comes in, Doris will update the start time of Kerberos authentication, but this logic is wrong. Because the authentication duration of Kerberos is calculated from the moment when the ticket is obtained. This PR change the logic: 1. If it is kerberos, check fs expiration by create time. 2.Otherwise, check fs expiration by access time --- .../conf/apache_hdfs_broker.conf | 2 +- .../doris/broker/hdfs/BrokerConfig.java | 12 +- .../doris/broker/hdfs/BrokerFileSystem.java | 34 +++--- .../broker/hdfs/ClientContextManager.java | 78 ++++++------- .../doris/broker/hdfs/FileSystemManager.java | 108 ++++++++++-------- 5 files changed, 127 insertions(+), 107 deletions(-) diff --git a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf index 8466de02cdd1ee..e023067cf04daf 100644 --- a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf +++ b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf @@ -27,7 +27,7 @@ broker_ipc_port = 8000 # client session will be deleted if not receive ping after this time -client_expire_seconds = 1800 +client_expire_seconds = 3600 # Advanced configurations # sys_log_dir = ${BROKER_HOME}/log diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java index 6381f16db25017..aa91b9d39391be 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java @@ -21,19 +21,19 @@ public class BrokerConfig extends ConfigBase { - + @ConfField public static int hdfs_read_buffer_size_kb = 1024; - + @ConfField public static int hdfs_write_buffer_size_kb = 1024; - + @ConfField - public static int client_expire_seconds = 1800; - + public static int client_expire_seconds = 3600; + @ConfField public static int broker_ipc_port = 8000; - + @ConfField public static String sys_log_dir = System.getenv("BROKER_HOME") + "/log"; @ConfField diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java index 7b9f6cc6798b24..c8a217d2059295 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java @@ -27,26 +27,29 @@ public class BrokerFileSystem { private static Logger logger = Logger .getLogger(BrokerFileSystem.class.getName()); - + private ReentrantLock lock; private FileSystemIdentity identity; private FileSystem dfsFileSystem; private long lastAccessTimestamp; + private long createTimestamp; private UUID fileSystemId; - + public BrokerFileSystem(FileSystemIdentity identity) { this.identity = identity; this.lock = new ReentrantLock(); this.dfsFileSystem = null; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); this.fileSystemId = UUID.randomUUID(); } - + public synchronized void setFileSystem(FileSystem fileSystem) { this.dfsFileSystem = fileSystem; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); } - + public void closeFileSystem() { lock.lock(); try { @@ -63,31 +66,32 @@ public void closeFileSystem() { lock.unlock(); } } - + public FileSystem getDFSFileSystem() { this.lastAccessTimestamp = System.currentTimeMillis(); return dfsFileSystem; } - + public void updateLastUpdateAccessTime() { this.lastAccessTimestamp = System.currentTimeMillis(); } - + public FileSystemIdentity getIdentity() { return identity; } - + public ReentrantLock getLock() { return lock; } - - public boolean isExpired(long expirationIntervalSecs) { - if (System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000) { - return true; - } - return false; + + public boolean isExpiredByLastAccessTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000; } - + + public boolean isExpiredByCreateTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - createTimestamp > expirationIntervalSecs * 1000; + } + @Override public String toString() { return "BrokerFileSystem [identity=" + identity + ", dfsFileSystem=" diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index 59f0e974b421e9..e52f248e11f785 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -31,19 +31,19 @@ public class ClientContextManager { private static Logger logger = Logger - .getLogger(ClientContextManager.class.getName()); + .getLogger(ClientContextManager.class.getName()); private ScheduledExecutorService executorService; private ConcurrentHashMap clientContexts; private ConcurrentHashMap fdToClientMap; private int clientExpirationSeconds = BrokerConfig.client_expire_seconds; - + public ClientContextManager(ScheduledExecutorService executorService) { clientContexts = new ConcurrentHashMap<>(); fdToClientMap = new ConcurrentHashMap<>(); this.executorService = executorService; this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS); } - + public void onPing(String clientId) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); @@ -51,9 +51,9 @@ public void onPing(String clientId) { ClientResourceContext clientContext = clientContexts.get(clientId); clientContext.updateLastPingTime(); } - - public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -61,9 +61,9 @@ public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDat clientContext.putOutputStream(fd, fsDataOutputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - - public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -71,29 +71,29 @@ public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSData clientContext.putInputStream(fd, fsDataInputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - + public synchronized FSDataInputStream getFsDataInputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataInputStream fsDataInputStream = clientContext.getInputStream(fd); return fsDataInputStream; } - + public synchronized FSDataOutputStream getFsDataOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataOutputStream fsDataOutputStream = clientContext.getOutputStream(fd); return fsDataOutputStream; } - + public synchronized void removeInputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -109,7 +109,7 @@ public synchronized void removeInputStream(TBrokerFD fd) { logger.error("errors while close file data input stream", e); } } - + public synchronized void removeOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -125,7 +125,7 @@ public synchronized void removeOutputStream(TBrokerFD fd) { logger.error("errors while close file data output stream", e); } } - + class CheckClientExpirationTask implements Runnable { @Override public void run() { @@ -139,9 +139,9 @@ public void run() { ClientContextManager.this.removeOutputStream(fd); } clientContexts.remove(clientContext.clientId); - logger.info("client [" + clientContext.clientId - + "] is expired, remove it from contexts. last ping time is " - + clientContext.lastPingTimestamp); + logger.info("client [" + clientContext.clientId + + "] is expired, remove it from contexts. last ping time is " + + clientContext.lastPingTimestamp); } } } finally { @@ -149,71 +149,71 @@ public void run() { } } } - + private static class BrokerOutputStream { - + private final FSDataOutputStream outputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerOutputStream(FSDataOutputStream outputStream, BrokerFileSystem brokerFileSystem) { this.outputStream = outputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataOutputStream getOutputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return outputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + private static class BrokerInputStream { - + private final FSDataInputStream inputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) { this.inputStream = inputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataInputStream getInputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return inputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + static class ClientResourceContext { private String clientId; private ConcurrentHashMap inputStreams; private ConcurrentHashMap outputStreams; private long lastPingTimestamp; - + public ClientResourceContext(String clientId) { this.clientId = clientId; this.inputStreams = new ConcurrentHashMap<>(); this.outputStreams = new ConcurrentHashMap<>(); this.lastPingTimestamp = System.currentTimeMillis(); } - + public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) { inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem)); } - + public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) { outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem)); } - + public FSDataInputStream getInputStream(TBrokerFD fd) { BrokerInputStream brokerInputStream = inputStreams.get(fd); if (brokerInputStream != null) { @@ -221,7 +221,7 @@ public FSDataInputStream getInputStream(TBrokerFD fd) { } return null; } - + public FSDataOutputStream getOutputStream(TBrokerFD fd) { BrokerOutputStream brokerOutputStream = outputStreams.get(fd); if (brokerOutputStream != null) { @@ -229,14 +229,14 @@ public FSDataOutputStream getOutputStream(TBrokerFD fd) { } return null; } - + public void updateLastPingTime() { this.lastPingTimestamp = System.currentTimeMillis(); // Should we also update the underline filesystem? maybe it is time cost for (BrokerInputStream brokerInputStream : inputStreams.values()) { brokerInputStream.updateLastUpdateAccessTime(); } - + for (BrokerOutputStream brokerOutputStream : outputStreams.values()) { brokerOutputStream.updateLastUpdateAccessTime(); } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 262875de25ae36..c6974e70a2da8a 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -58,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; public class FileSystemManager { @@ -108,13 +107,13 @@ public class FileSystemManager { private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache"; private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); - + private int readBufferSize = 128 << 10; // 128k private int writeBufferSize = 128 << 10; // 128k - + private ConcurrentHashMap cachedFileSystem; private ClientContextManager clientContextManager; - + public FileSystemManager() { cachedFileSystem = new ConcurrentHashMap<>(); clientContextManager = new ClientContextManager(handleManagementPool); @@ -140,15 +139,15 @@ private static String preparePrincipal(String originalPrincipal) throws UnknownH return finalPrincipal; } - + /** * visible for test - * + * * @param path * @param properties * @return BrokerFileSystem with different FileSystem based on scheme - * @throws URISyntaxException - * @throws Exception + * @throws URISyntaxException + * @throws Exception */ public BrokerFileSystem getFileSystem(String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); @@ -240,7 +239,7 @@ public BrokerFileSystem getDistributedFileSystem(String path, Map proper String disableCache = properties.getOrDefault(FS_S3A_IMPL_DISABLE_CACHE, "true"); String s3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi); - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -440,7 +438,7 @@ public BrokerFileSystem getKS3FileSystem(String path, Map proper String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); String ks3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -516,7 +514,7 @@ public BrokerFileSystem getChdfsFileSystem(String path, Map prop e.getMessage()); } } - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { // create a new filesystem @@ -618,12 +616,12 @@ public List listPath(String path, boolean fileNameOnly, Map properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -633,16 +631,16 @@ public void deletePath(String path, Map properties) { } catch (IOException e) { logger.error("errors while delete path " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "delete path {} error", path); } } - + public void renamePath(String srcPath, String destPath, Map properties) { WildcardURI srcPathUri = new WildcardURI(srcPath); WildcardURI destPathUri = new WildcardURI(destPath); if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, "only allow rename in same file system"); } BrokerFileSystem fileSystem = getFileSystem(srcPath, properties); @@ -651,17 +649,17 @@ public void renamePath(String srcPath, String destPath, Map prop try { boolean isRenameSuccess = fileSystem.getDFSFileSystem().rename(srcfilePath, destfilePath); if (!isRenameSuccess) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "failed to rename path from {} to {}", srcPath, destPath); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "failed to rename path from {} to {}", srcPath, destPath); } } catch (IOException e) { logger.error("errors while rename path from " + srcPath + " to " + destPath); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while rename {} to {}", srcPath, destPath); } } - + public boolean checkPathExist(String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -672,11 +670,11 @@ public boolean checkPathExist(String path, Map properties) { } catch (IOException e) { logger.error("errors while check path exist: " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while check if path {} exist", path); } } - + public TBrokerFD openReader(String clientId, String path, long startOffset, Map properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); @@ -691,11 +689,11 @@ public TBrokerFD openReader(String clientId, String path, long startOffset, Map< } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public ByteBuffer pread(TBrokerFD fd, long offset, long length) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -746,12 +744,12 @@ public ByteBuffer pread(TBrokerFD fd, long offset, long length) { } } } - + public void seek(TBrokerFD fd, long offset) { - throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, + throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, "seek this method is not supported"); } - + public void closeReader(TBrokerFD fd) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -766,13 +764,13 @@ public void closeReader(TBrokerFD fd) { } } } - + public TBrokerFD openWriter(String clientId, String path, Map properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); BrokerFileSystem fileSystem = getFileSystem(path, properties); try { - FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, + FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, true, writeBufferSize); UUID uuid = UUID.randomUUID(); TBrokerFD fd = parseUUIDToFD(uuid); @@ -781,11 +779,11 @@ public TBrokerFD openWriter(String clientId, String path, Map pr } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public void pwrite(TBrokerFD fd, long offset, byte[] data) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -811,7 +809,7 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) { } } } - + public void closeWriter(TBrokerFD fd) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -827,11 +825,11 @@ public void closeWriter(TBrokerFD fd) { } } } - + public void ping(String clientId) { clientContextManager.onPing(clientId); } - + private static TBrokerFD parseUUIDToFD(UUID uuid) { return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } @@ -849,23 +847,41 @@ private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException return readLength; } - private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity) { - BrokerFileSystem brokerFileSystem = null; + /** + * In view of the different expiration mechanisms of different authentication modes, + * there are two ways to determine whether BrokerFileSystem has expired: + * 1. For the authentication mode of Kerberos and S3 aksk, use the createTime to determine whether it expires + * 2. For other authentication modes, the lastAccessTime is used to determine whether it has expired + */ + private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity, Map properties) { + BrokerFileSystem brokerFileSystem; if (cachedFileSystem.containsKey(fileSystemIdentity)) { brokerFileSystem = cachedFileSystem.get(fileSystemIdentity); - if (brokerFileSystem.isExpired(BrokerConfig.client_expire_seconds)) { - logger.info("file system " + brokerFileSystem + " is expired, close and remove it"); + if (properties.containsKey(KERBEROS_KEYTAB) && properties.containsKey(KERBEROS_PRINCIPAL)) { + if (brokerFileSystem.isExpiredByCreateTime(BrokerConfig.client_expire_seconds)) { + logger.info("file system " + brokerFileSystem + " is expired, update it."); + try { + Configuration conf = new HdfsConfiguration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, AUTHENTICATION_KERBEROS); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI( + preparePrincipal(properties.get(KERBEROS_PRINCIPAL)), properties.get(KERBEROS_KEYTAB)); + // update FileSystem TGT + ugi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + logger.error("errors while checkTGTAndReloginFromKeytab: ", e); + } + } + } else if (brokerFileSystem.isExpiredByLastAccessTime(BrokerConfig.client_expire_seconds)) { brokerFileSystem.getLock().lock(); try { + logger.info("file system " + brokerFileSystem + " is expired, update it."); brokerFileSystem.closeFileSystem(); - } catch (Throwable t) { - logger.error("errors while close file system", t); - } finally { - cachedFileSystem.remove(brokerFileSystem.getIdentity()); brokerFileSystem.getLock().unlock(); - brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); - cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); + } catch (Throwable t) { + logger.error("errors while close file system: ", t); } + brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); + cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); } } else { brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);