Skip to content

Commit

Permalink
[refactor](SimpleScheduler) refactor code for getting available backe…
Browse files Browse the repository at this point in the history
…nd in SimpleScheduler (apache#12710)
  • Loading branch information
caiconghui authored and Yijia Su committed Oct 8, 2022
1 parent 3b3b555 commit 93c7805
Showing 1 changed file with 11 additions and 39 deletions.
50 changes: 11 additions & 39 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,38 +126,17 @@ public static TScanRangeLocation getLocation(TScanRangeLocation minLocation,
public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (backends == null || backends.isEmpty()) {
throw new UserException("candidate backends is empty");
long id = nextId.getAndIncrement() % backends.size();
Map.Entry<Long, Backend> backendEntry = backends.entrySet().stream().skip(id).filter(
e -> isAvailable(e.getValue())).findFirst().orElse(null);
if (backendEntry == null && id > 0) {
backendEntry = backends.entrySet().stream().filter(
e -> isAvailable(e.getValue())).limit(id).findFirst().orElse(null);
}
int backendSize = backends.size();
long id = nextId.getAndIncrement() % backendSize;

List<Long> idToBackendId = Lists.newArrayList();
idToBackendId.addAll(backends.keySet());
Long backendId = idToBackendId.get((int) id);
Backend backend = backends.get(backendId);

if (isAvailable(backend)) {
backendIdRef.setRef(backendId);
if (backendEntry != null) {
Backend backend = backendEntry.getValue();
backendIdRef.setRef(backendEntry.getKey());
return new TNetworkAddress(backend.getHost(), backend.getBePort());
} else {
long candidateId = id + 1; // get next candidate id
for (int i = 0; i < backendSize; i++, candidateId++) {
LOG.debug("i={} candidatedId={}", i, candidateId);
if (candidateId >= backendSize) {
candidateId = 0;
}
if (candidateId == id) {
continue;
}
Long candidatebackendId = idToBackendId.get((int) candidateId);
LOG.debug("candidatebackendId={}", candidatebackendId);
Backend candidateBackend = backends.get(candidatebackendId);
if (isAvailable(candidateBackend)) {
backendIdRef.setRef(candidatebackendId);
return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort());
}
}
}
// no backend returned
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
Expand Down Expand Up @@ -253,14 +232,7 @@ public void run() {
}

public static TNetworkAddress getHostByCurrentBackend(Map<TNetworkAddress, Long> addressToBackendID) {
int backendSize = addressToBackendID.size();
if (backendSize == 0) {
return null;
}
Long id = nextId.getAndIncrement() % backendSize;

List<TNetworkAddress> idToBackendId = Lists.newArrayList();
idToBackendId.addAll(addressToBackendID.keySet());
return idToBackendId.get(id.intValue());
long id = nextId.getAndIncrement() % addressToBackendID.size();
return addressToBackendID.keySet().stream().skip(id).findFirst().orElse(null);
}
}

0 comments on commit 93c7805

Please sign in to comment.