Skip to content

Commit

Permalink
[fix](routine-load) fix bug that routine load task can not find backend
Browse files Browse the repository at this point in the history
Introduced from apache#9492.
  • Loading branch information
morningman committed Jun 1, 2022
1 parent 03a1eec commit 29116ff
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,14 @@ private List<Long> getAvailableBackendIds(long jobId, String cluster) throws Loa
} else {
tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
// user may be dropped. Here we fall back to use replica tag
// user may be dropped, or may not set resource tag property.
// Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
.addTags(tags).build();
return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
}

private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,16 @@ public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
* Select a set of backends by the given policy.
*
* @param policy
* @param number number of backends which need to be selected.
* @param number number of backends which need to be selected. -1 means return as many as possible.
* @return return #number of backend ids,
* or empty set if no backends match the policy, or the number of matched backends is less than "number";
*/
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
Preconditions.checkArgument(number >= -1);
List<Backend> candidates =
idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
if (candidates.size() < number) {
if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
// If only need one Backend, just return a random one.
Expand All @@ -793,7 +795,11 @@ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number)

if (policy.allowOnSameHost) {
Collections.shuffle(candidates);
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
if (number == -1) {
return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
} else {
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
}
}

// for each host, random select one backend.
Expand All @@ -812,11 +818,16 @@ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number)
Collections.shuffle(list);
candidates.add(list.get(0));
}
if (candidates.size() < number) {
if (number != -1 && candidates.size() < number) {
LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
return Lists.newArrayList();
}
Collections.shuffle(candidates);
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
if (number != -1) {
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
} else {
return candidates.stream().map(b -> b.getId()).collect(Collectors.toList());
}
}

public ImmutableMap<Long, Backend> getIdToBackend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,20 @@ public void testSelectBackendIdsByPolicy() throws Exception {
BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
.setStorageMedium(TStorageMedium.SSD).build();
Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy10, 3).size());
// check return as many as possible
Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, -1).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());

BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
.setStorageMedium(TStorageMedium.HDD).build();
BeSelectionPolicy policy11 =
new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)).setStorageMedium(TStorageMedium.HDD)
.build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());

// 7. check disk usage
BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).build();
BeSelectionPolicy policy12 =
new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).setStorageMedium(TStorageMedium.HDD)
.build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
Expand Down

0 comments on commit 29116ff

Please sign in to comment.