Skip to content

Commit

Permalink
[fix](broker-load) Correction of kerberos authentication time determi…
Browse files Browse the repository at this point in the history
…nation rule (apache#11793)

Every time a new broker load comes in, Doris will update the start time of Kerberos authentication,
but this logic is wrong.
Because the authentication duration of Kerberos is calculated from the moment when the ticket is obtained.

This PR change the logic:
1. If it is kerberos, check fs expiration by create time.
2.Otherwise, check fs expiration by access time
  • Loading branch information
whutpencil authored and Yijia Su committed Oct 8, 2022
1 parent 090862d commit f70f8cd
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 107 deletions.
2 changes: 1 addition & 1 deletion fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
broker_ipc_port = 8000

# client session will be deleted if not receive ping after this time
client_expire_seconds = 1800
client_expire_seconds = 3600

# Advanced configurations
# sys_log_dir = ${BROKER_HOME}/log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@


public class BrokerConfig extends ConfigBase {

@ConfField
public static int hdfs_read_buffer_size_kb = 1024;

@ConfField
public static int hdfs_write_buffer_size_kb = 1024;

@ConfField
public static int client_expire_seconds = 1800;
public static int client_expire_seconds = 3600;

@ConfField
public static int broker_ipc_port = 8000;

@ConfField
public static String sys_log_dir = System.getenv("BROKER_HOME") + "/log";
@ConfField
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,29 @@ public class BrokerFileSystem {

private static Logger logger = Logger
.getLogger(BrokerFileSystem.class.getName());

private ReentrantLock lock;
private FileSystemIdentity identity;
private FileSystem dfsFileSystem;
private long lastAccessTimestamp;
private long createTimestamp;
private UUID fileSystemId;

public BrokerFileSystem(FileSystemIdentity identity) {
this.identity = identity;
this.lock = new ReentrantLock();
this.dfsFileSystem = null;
this.lastAccessTimestamp = System.currentTimeMillis();
this.createTimestamp = System.currentTimeMillis();
this.fileSystemId = UUID.randomUUID();
}

public synchronized void setFileSystem(FileSystem fileSystem) {
this.dfsFileSystem = fileSystem;
this.lastAccessTimestamp = System.currentTimeMillis();
this.createTimestamp = System.currentTimeMillis();
}

public void closeFileSystem() {
lock.lock();
try {
Expand All @@ -63,31 +66,32 @@ public void closeFileSystem() {
lock.unlock();
}
}

public FileSystem getDFSFileSystem() {
this.lastAccessTimestamp = System.currentTimeMillis();
return dfsFileSystem;
}

public void updateLastUpdateAccessTime() {
this.lastAccessTimestamp = System.currentTimeMillis();
}

public FileSystemIdentity getIdentity() {
return identity;
}

public ReentrantLock getLock() {
return lock;
}

public boolean isExpired(long expirationIntervalSecs) {
if (System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000) {
return true;
}
return false;

public boolean isExpiredByLastAccessTime(long expirationIntervalSecs) {
return System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000;
}


public boolean isExpiredByCreateTime(long expirationIntervalSecs) {
return System.currentTimeMillis() - createTimestamp > expirationIntervalSecs * 1000;
}

@Override
public String toString() {
return "BrokerFileSystem [identity=" + identity + ", dfsFileSystem="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,69 +31,69 @@
public class ClientContextManager {

private static Logger logger = Logger
.getLogger(ClientContextManager.class.getName());
.getLogger(ClientContextManager.class.getName());
private ScheduledExecutorService executorService;
private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;

public ClientContextManager(ScheduledExecutorService executorService) {
clientContexts = new ConcurrentHashMap<>();
fdToClientMap = new ConcurrentHashMap<>();
this.executorService = executorService;
this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
}

public void onPing(String clientId) {
if (!clientContexts.containsKey(clientId)) {
clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId));
}
ClientResourceContext clientContext = clientContexts.get(clientId);
clientContext.updateLastPingTime();
}
public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream,
BrokerFileSystem brokerFileSystem) {

public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream,
BrokerFileSystem brokerFileSystem) {
if (!clientContexts.containsKey(clientId)) {
clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId));
}
ClientResourceContext clientContext = clientContexts.get(clientId);
clientContext.putOutputStream(fd, fsDataOutputStream, brokerFileSystem);
fdToClientMap.putIfAbsent(fd, clientId);
}
public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream,
BrokerFileSystem brokerFileSystem) {

public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream,
BrokerFileSystem brokerFileSystem) {
if (!clientContexts.containsKey(clientId)) {
clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId));
}
ClientResourceContext clientContext = clientContexts.get(clientId);
clientContext.putInputStream(fd, fsDataInputStream, brokerFileSystem);
fdToClientMap.putIfAbsent(fd, clientId);
}

public synchronized FSDataInputStream getFsDataInputStream(TBrokerFD fd) {
String clientId = fdToClientMap.get(fd);
if (clientId == null) {
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"the fd is not owned by client {}", clientId);
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"the fd is not owned by client {}", clientId);
}
ClientResourceContext clientContext = clientContexts.get(clientId);
FSDataInputStream fsDataInputStream = clientContext.getInputStream(fd);
return fsDataInputStream;
}

public synchronized FSDataOutputStream getFsDataOutputStream(TBrokerFD fd) {
String clientId = fdToClientMap.get(fd);
if (clientId == null) {
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"the fd is not owned by client {}", clientId);
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"the fd is not owned by client {}", clientId);
}
ClientResourceContext clientContext = clientContexts.get(clientId);
FSDataOutputStream fsDataOutputStream = clientContext.getOutputStream(fd);
return fsDataOutputStream;
}

public synchronized void removeInputStream(TBrokerFD fd) {
String clientId = fdToClientMap.remove(fd);
if (clientId == null) {
Expand All @@ -109,7 +109,7 @@ public synchronized void removeInputStream(TBrokerFD fd) {
logger.error("errors while close file data input stream", e);
}
}

public synchronized void removeOutputStream(TBrokerFD fd) {
String clientId = fdToClientMap.remove(fd);
if (clientId == null) {
Expand All @@ -125,7 +125,7 @@ public synchronized void removeOutputStream(TBrokerFD fd) {
logger.error("errors while close file data output stream", e);
}
}

class CheckClientExpirationTask implements Runnable {
@Override
public void run() {
Expand All @@ -139,104 +139,104 @@ public void run() {
ClientContextManager.this.removeOutputStream(fd);
}
clientContexts.remove(clientContext.clientId);
logger.info("client [" + clientContext.clientId
+ "] is expired, remove it from contexts. last ping time is "
+ clientContext.lastPingTimestamp);
logger.info("client [" + clientContext.clientId
+ "] is expired, remove it from contexts. last ping time is "
+ clientContext.lastPingTimestamp);
}
}
} finally {
ClientContextManager.this.executorService.schedule(this, 60, TimeUnit.SECONDS);
}
}
}

private static class BrokerOutputStream {

private final FSDataOutputStream outputStream;
private final BrokerFileSystem brokerFileSystem;

public BrokerOutputStream(FSDataOutputStream outputStream, BrokerFileSystem brokerFileSystem) {
this.outputStream = outputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
}

public FSDataOutputStream getOutputStream() {
this.brokerFileSystem.updateLastUpdateAccessTime();
return outputStream;
}

public void updateLastUpdateAccessTime() {
this.brokerFileSystem.updateLastUpdateAccessTime();
}
}

private static class BrokerInputStream {

private final FSDataInputStream inputStream;
private final BrokerFileSystem brokerFileSystem;

public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) {
this.inputStream = inputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
}

public FSDataInputStream getInputStream() {
this.brokerFileSystem.updateLastUpdateAccessTime();
return inputStream;
}

public void updateLastUpdateAccessTime() {
this.brokerFileSystem.updateLastUpdateAccessTime();
}
}

static class ClientResourceContext {

private String clientId;
private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams;
private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams;
private long lastPingTimestamp;

public ClientResourceContext(String clientId) {
this.clientId = clientId;
this.inputStreams = new ConcurrentHashMap<>();
this.outputStreams = new ConcurrentHashMap<>();
this.lastPingTimestamp = System.currentTimeMillis();
}

public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) {
inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem));
}

public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) {
outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem));
}

public FSDataInputStream getInputStream(TBrokerFD fd) {
BrokerInputStream brokerInputStream = inputStreams.get(fd);
if (brokerInputStream != null) {
return brokerInputStream.getInputStream();
}
return null;
}

public FSDataOutputStream getOutputStream(TBrokerFD fd) {
BrokerOutputStream brokerOutputStream = outputStreams.get(fd);
if (brokerOutputStream != null) {
return brokerOutputStream.getOutputStream();
}
return null;
}

public void updateLastPingTime() {
this.lastPingTimestamp = System.currentTimeMillis();
// Should we also update the underline filesystem? maybe it is time cost
for (BrokerInputStream brokerInputStream : inputStreams.values()) {
brokerInputStream.updateLastUpdateAccessTime();
}

for (BrokerOutputStream brokerOutputStream : outputStreams.values()) {
brokerOutputStream.updateLastUpdateAccessTime();
}
Expand Down
Loading

0 comments on commit f70f8cd

Please sign in to comment.