Skip to content

Commit

Permalink
[improvement](report) report handler discard old report tasks #39469 (#…
Browse files Browse the repository at this point in the history
…39605)

cherry pick from #39469
  • Loading branch information
yujun777 authored Aug 20, 2024
1 parent e302882 commit 6078876
Showing 1 changed file with 76 additions and 14 deletions.
90 changes: 76 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,19 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

public class ReportHandler extends Daemon {
private static final Logger LOG = LogManager.getLogger(ReportHandler.class);

private BlockingQueue<ReportTask> reportQueue = Queues.newLinkedBlockingQueue();
private BlockingQueue<BackendReportType> reportQueue = Queues.newLinkedBlockingQueue();

private Map<BackendReportType, ReportTask> reportTasks = Maps.newHashMap();

private enum ReportType {
UNKNOWN,
TASK,
DISK,
TABLET
Expand Down Expand Up @@ -158,7 +160,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException {
Map<Long, Long> partitionsVersion = null;
long reportVersion = -1;

ReportType reportType = ReportType.UNKNOWN;
ReportType reportType = null;

if (request.isSetTasks()) {
tasks = request.getTasks();
Expand Down Expand Up @@ -189,8 +191,16 @@ public TMasterResult handleReport(TReportRequest request) throws TException {
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
}

ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, partitionsVersion, reportVersion,
request.getStoragePolicy(), request.getResource(), request.getNumCores(),
if (reportType == null) {
tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR);
tStatus.setErrorMsgs(Lists.newArrayList("unknown report type"));
LOG.error("receive unknown report type from be {}. current queue size: {}",
backend.getId(), reportQueue.size());
return result;
}

ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, tablets, partitionsVersion,
reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(),
request.getPipelineExecutorSize());
try {
putToQueue(reportTask);
Expand All @@ -202,8 +212,8 @@ public TMasterResult handleReport(TReportRequest request) throws TException {
tStatus.setErrorMsgs(errorMsgs);
return result;
}
LOG.info("receive report from be {}. type: {}, current queue size: {}",
backend.getId(), reportType, reportQueue.size());
LOG.info("receive report from be {}. type: {}, report version {}, current queue size: {}",
backend.getId(), reportType, reportVersion, reportQueue.size());
return result;
}

Expand All @@ -215,7 +225,14 @@ private void putToQueue(ReportTask reportTask) throws Exception {
"the report queue size exceeds the limit: "
+ Config.report_queue_size + ". current: " + currentSize);
}
reportQueue.put(reportTask);

BackendReportType backendReportType = new BackendReportType(reportTask.beId, reportTask.reportType);

synchronized (reportTasks) {
reportTasks.put(backendReportType, reportTask);
}

reportQueue.put(backendReportType);
}

private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
Expand All @@ -230,9 +247,38 @@ private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
return tabletMap;
}

private class BackendReportType {
private long beId;
private ReportType reportType;

public BackendReportType(long beId, ReportType reportType) {
this.beId = beId;
this.reportType = reportType;
}

@Override
public int hashCode() {
return Objects.hash(beId, reportType);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof BackendReportType)) {
return false;
}
BackendReportType otherBeReport = (BackendReportType) other;
return this.beId == otherBeReport.beId
&& this.reportType == otherBeReport.reportType;
}
}

private class ReportTask extends MasterTask {

private long beId;
private ReportType reportType;
private Map<TTaskType, Set<Long>> tasks;
private Map<String, TDisk> disks;
private Map<Long, TTablet> tablets;
Expand All @@ -244,12 +290,13 @@ private class ReportTask extends MasterTask {
private int cpuCores;
private int pipelineExecutorSize;

public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
public ReportTask(long beId, ReportType reportType, Map<TTaskType, Set<Long>> tasks,
Map<String, TDisk> disks, Map<Long, TTablet> tablets,
Map<Long, Long> partitionsVersion, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores,
int pipelineExecutorSize) {
this.beId = beId;
this.reportType = reportType;
this.tasks = tasks;
this.disks = disks;
this.tablets = tablets;
Expand Down Expand Up @@ -1383,13 +1430,28 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI
@Override
protected void runOneCycle() {
while (true) {
ReportTask task = null;
try {
task = reportQueue.take();
ReportTask task = takeReportTask();
if (task != null) {
task.exec();
} catch (InterruptedException e) {
LOG.warn("got interupted exception when executing report", e);
}
}
}

private ReportTask takeReportTask() {
BackendReportType backendReportType;
try {
backendReportType = reportQueue.take();
} catch (InterruptedException e) {
LOG.warn("got interupted exception when executing report", e);
return null;
}

ReportTask task = null;
synchronized (reportTasks) {
task = reportTasks.get(backendReportType);
reportTasks.remove(backendReportType);
}

return task;
}
}

0 comments on commit 6078876

Please sign in to comment.