Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track and emit segment loading rate for HttpLoadQueuePeon on Coordinator #16691

Merged
merged 16 commits into from
Aug 3, 2024
Merged
3 changes: 2 additions & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ See [Enabling metrics](../configuration/index.md#enabling-metrics) for more deta

## Coordination

These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic.
These metrics are emitted by the Druid Coordinator in every run of the corresponding coordinator duty.

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
Expand All @@ -325,6 +325,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`segment/dropSkipped/count`|Number of segments that could not be dropped from any server.|`dataSource`, `tier`, `description`|Varies|
|`segment/loadQueue/size`|Size in bytes of segments to load.|`server`|Varies|
|`segment/loadQueue/count`|Number of segments to load.|`server`|Varies|
|`segment/loading/rateKbps`|Current rate of segment loading on a server in kbps (1000 bits per second). The rate is calculated as a moving average over the last 10 GiB or more of successful segment loads on that server.|`server`|Varies|
|`segment/dropQueue/count`|Number of segments to drop.|`server`|Varies|
|`segment/loadQueue/assigned`|Number of segments assigned for load or drop to the load queue of a server.|`dataSource`, `server`|Varies|
|`segment/loadQueue/success`|Number of segment assignments that completed successfully.|`dataSource`, `server`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private void collectLoadQueueStats(CoordinatorRunStats stats)
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size());
stats.updateMax(Stats.SegmentQueue.LOAD_RATE_KBPS, rowKey, queuePeon.getLoadRateKbps());

queuePeon.getAndResetStats().forEachStat(
(stat, key, statValue) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public long getSizeOfSegmentsToLoad()
return queuedSize.get();
}

@Override
public long getLoadRateKbps()
{
return 0;
}

@Override
public CoordinatorRunStats getAndResetStats()
{
Expand All @@ -179,7 +185,7 @@ public CoordinatorRunStats getAndResetStats()
@Override
public void loadSegment(final DataSegment segment, SegmentAction action, @Nullable final LoadPeonCallback callback)
{
SegmentHolder segmentHolder = new SegmentHolder(segment, action, callback);
SegmentHolder segmentHolder = new SegmentHolder(segment, action, Duration.ZERO, callback);
final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
Expand All @@ -193,7 +199,7 @@ public void loadSegment(final DataSegment segment, SegmentAction action, @Nullab
@Override
public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
{
SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, callback);
SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, Duration.ZERO, callback);
final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentHashMap<>();
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentHashMap<>();
private final Set<DataSegment> segmentsMarkedToDrop = ConcurrentHashMap.newKeySet();
private final LoadingRateTracker loadingRateTracker = new LoadingRateTracker();

/**
* Segments currently in queue ordered by priority and interval. This includes
Expand Down Expand Up @@ -169,11 +171,10 @@ private void doSegmentManagement()
synchronized (lock) {
final Iterator<SegmentHolder> queuedSegmentIterator = queuedSegments.iterator();

final long currentTimeMillis = System.currentTimeMillis();
while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentTimeMillis above can be removed as it is unused now

final SegmentHolder holder = queuedSegmentIterator.next();
final DataSegment segment = holder.getSegment();
if (hasRequestTimedOut(holder, currentTimeMillis)) {
if (holder.hasRequestTimedOut()) {
onRequestFailed(holder, "timed out");
queuedSegmentIterator.remove();
if (holder.isLoad()) {
Expand All @@ -188,9 +189,13 @@ private void doSegmentManagement()
activeRequestSegments.add(segment);
}
}

if (segmentsToLoad.isEmpty()) {
loadingRateTracker.markBatchLoadingFinished();
}
}

if (newRequests.size() == 0) {
if (newRequests.isEmpty()) {
log.trace(
"[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].",
serverId, segmentsToLoad.size(), segmentsToDrop.size(), config.getBatchSize()
Expand All @@ -201,6 +206,11 @@ private void doSegmentManagement()

try {
log.trace("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId);
final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r instanceof SegmentChangeRequestLoad);
if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) {
loadingRateTracker.markBatchLoadingStarted();
}

BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = httpClient.go(
new Request(HttpMethod.POST, changeRequestURL)
Expand Down Expand Up @@ -234,9 +244,16 @@ public void onSuccess(InputStream result)
return;
}

int numSuccessfulLoads = 0;
long successfulLoadSize = 0;
for (DataSegmentChangeResponse e : statuses) {
switch (e.getStatus().getState()) {
case SUCCESS:
if (e.getRequest() instanceof SegmentChangeRequestLoad) {
++numSuccessfulLoads;
successfulLoadSize +=
((SegmentChangeRequestLoad) e.getRequest()).getSegment().getSize();
}
case FAILED:
handleResponseStatus(e.getRequest(), e.getStatus());
break;
Expand All @@ -248,6 +265,10 @@ public void onSuccess(InputStream result)
log.error("Server[%s] returned unknown state in status[%s].", serverId, e.getStatus());
}
}

if (numSuccessfulLoads > 0) {
loadingRateTracker.incrementBytesLoadedInBatch(successfulLoadSize);
}
}
}
catch (Exception ex) {
Expand Down Expand Up @@ -284,9 +305,7 @@ private void logRequestFailure(Throwable t)
log.error(
t,
"Request[%s] Failed with status[%s]. Reason[%s].",
changeRequestURL,
responseHandler.getStatus(),
responseHandler.getDescription()
changeRequestURL, responseHandler.getStatus(), responseHandler.getDescription()
);
}
},
Expand Down Expand Up @@ -367,7 +386,7 @@ public void stop()
if (stopped) {
return;
}
log.info("Stopping load queue peon for server [%s].", serverId);
log.info("Stopping load queue peon for server[%s].", serverId);
stopped = true;

// Cancel all queued requests
Expand All @@ -379,6 +398,7 @@ public void stop()
queuedSegments.clear();
activeRequestSegments.clear();
queuedSize.set(0L);
loadingRateTracker.stop();
stats.get().clear();
}
}
Expand All @@ -387,7 +407,7 @@ public void stop()
public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallback callback)
{
if (!action.isLoad()) {
log.warn("Invalid load action [%s] for segment [%s] on server [%s].", action, segment.getId(), serverId);
log.warn("Invalid load action[%s] for segment[%s] on server[%s].", action, segment.getId(), serverId);
return;
}

Expand All @@ -407,7 +427,7 @@ public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallb
if (holder == null) {
log.trace("Server[%s] to load segment[%s] queued.", serverId, segment.getId());
queuedSize.addAndGet(segment.getSize());
holder = new SegmentHolder(segment, action, callback);
holder = new SegmentHolder(segment, action, config.getLoadTimeout(), callback);
segmentsToLoad.put(segment, holder);
queuedSegments.add(holder);
processingExecutor.execute(this::doSegmentManagement);
Expand Down Expand Up @@ -436,7 +456,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback)

if (holder == null) {
log.trace("Server[%s] to drop segment[%s] queued.", serverId, segment.getId());
holder = new SegmentHolder(segment, SegmentAction.DROP, callback);
holder = new SegmentHolder(segment, SegmentAction.DROP, config.getLoadTimeout(), callback);
segmentsToDrop.put(segment, holder);
queuedSegments.add(holder);
processingExecutor.execute(this::doSegmentManagement);
Expand Down Expand Up @@ -481,6 +501,12 @@ public long getSizeOfSegmentsToLoad()
return queuedSize.get();
}

@Override
public long getLoadRateKbps()
{
return loadingRateTracker.getMovingAverageLoadRateKbps();
}

@Override
public CoordinatorRunStats getAndResetStats()
{
Expand All @@ -505,19 +531,6 @@ public Set<DataSegment> getSegmentsMarkedToDrop()
return Collections.unmodifiableSet(segmentsMarkedToDrop);
}

/**
* A request is considered to have timed out if the time elapsed since it was
* first sent to the server is greater than the configured load timeout.
*
* @see HttpLoadQueuePeonConfig#getLoadTimeout()
*/
private boolean hasRequestTimedOut(SegmentHolder holder, long currentTimeMillis)
{
return holder.isRequestSentToServer()
&& currentTimeMillis - holder.getFirstRequestMillis()
> config.getLoadTimeout().getMillis();
}

private void onRequestFailed(SegmentHolder holder, String failureCause)
{
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
package org.apache.druid.server.coordinator.loading;

/**
* Callback executed when the load or drop of a segment completes on a server
* either with success or failure.
*/
@FunctionalInterface
public interface LoadPeonCallback
{
/**
* Ideally, this method is called after the load/drop opertion is successfully done, i.e., the historical node
* removes the zookeeper node from loadQueue and announces/unannouces the segment. However, this method will
* also be called in failure scenarios so for implementations of LoadPeonCallback that care about success it
* is important to take extra measures to ensure that whatever side effects they expect to happen upon success
* have happened. Coordinator will have a complete and correct view of the cluster in the next run period.
*/
void execute(boolean success);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public interface LoadQueuePeon

long getSizeOfSegmentsToLoad();

long getLoadRateKbps();

CoordinatorRunStats getAndResetStats();

/**
Expand Down
Loading
Loading