Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary cache building for cachingCost #12465

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions core/src/main/java/org/apache/druid/timeline/SegmentId.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -80,6 +79,12 @@ public final class SegmentId implements Comparable<SegmentId>
*/
private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner();

/**
* Store Intervals since creating them each time before returning is an expensive operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this!

* To decrease the memory required for storing intervals, intern them, since the number of distinct values is "low"
*/
private static final Interner<Interval> INTERVAL_INTERNER = Interners.newWeakInterner();

private static final char DELIMITER = '_';
private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER);
private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER);
Expand Down Expand Up @@ -258,14 +263,7 @@ public static SegmentId dummy(String dataSource, int partitionNum)
}

private final String dataSource;
/**
* {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link #intervalChronology} are the three fields of
* an {@link Interval}. Storing them directly to flatten the structure and reduce the heap space consumption.
*/
private final long intervalStartMillis;
private final long intervalEndMillis;
@Nullable
private final Chronology intervalChronology;
private final Interval interval;
private final String version;
private final int partitionNum;

Expand All @@ -278,9 +276,7 @@ public static SegmentId dummy(String dataSource, int partitionNum)
private SegmentId(String dataSource, Interval interval, String version, int partitionNum)
{
this.dataSource = STRING_INTERNER.intern(Objects.requireNonNull(dataSource));
this.intervalStartMillis = interval.getStartMillis();
this.intervalEndMillis = interval.getEndMillis();
this.intervalChronology = interval.getChronology();
this.interval = INTERVAL_INTERNER.intern(interval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can interval ever be null here?
If not, we can add Objects.requireNonNull similar to the datasource validation in the previous line.

// Versions are timestamp-based Strings, interning of them doesn't make sense. If this is not the case, interning
// could be conditionally allowed via a system property.
this.version = Objects.requireNonNull(version);
Expand All @@ -297,9 +293,7 @@ private int computeHashCode()
hashCode = hashCode * 1000003 + version.hashCode();

hashCode = hashCode * 1000003 + dataSource.hashCode();
hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);
hashCode = hashCode * 1000003 + interval.hashCode();
return hashCode;
}

Expand All @@ -310,17 +304,17 @@ public String getDataSource()

public DateTime getIntervalStart()
{
return new DateTime(intervalStartMillis, intervalChronology);
return new DateTime(interval.getStartMillis(), interval.getChronology());
}

public DateTime getIntervalEnd()
{
return new DateTime(intervalEndMillis, intervalChronology);
return new DateTime(interval.getEndMillis(), interval.getChronology());
}

public Interval getInterval()
{
return new Interval(intervalStartMillis, intervalEndMillis, intervalChronology);
return interval;
}

public String getVersion()
Expand All @@ -340,7 +334,7 @@ public SegmentId withInterval(Interval newInterval)

public SegmentDescriptor toDescriptor()
{
return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum);
return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(), interval.getEndMillis()), version, partitionNum);
}

@Override
Expand All @@ -357,9 +351,7 @@ public boolean equals(Object o)
// are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal.
return hashCode == that.hashCode &&
dataSource.equals(that.dataSource) &&
intervalStartMillis == that.intervalStartMillis &&
intervalEndMillis == that.intervalEndMillis &&
Objects.equals(intervalChronology, that.intervalChronology) &&
interval.equals(that.interval) &&
version.equals(that.version);
}

Expand All @@ -376,11 +368,11 @@ public int compareTo(SegmentId o)
if (result != 0) {
return result;
}
result = Long.compare(intervalStartMillis, o.intervalStartMillis);
result = Long.compare(interval.getStartMillis(), o.interval.getStartMillis());
if (result != 0) {
return result;
}
result = Long.compare(intervalEndMillis, o.intervalEndMillis);
result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis());
if (result != 0) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.druid.server.coordinator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.server.coordinator.cost.ClusterCostCache;
import org.apache.druid.server.coordinator.cost.SegmentsCostCache;
import org.apache.druid.timeline.DataSegment;

import java.util.Set;
Expand Down Expand Up @@ -59,7 +59,7 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b
double cost = clusterCostCache.computeCost(serverName, proposalSegment);

// add segments that will be loaded to the cost
cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment);
cost += costCacheForLoadingSegments(server, proposalSegment);

if (server.getAvailableSize() <= 0) {
return Double.POSITIVE_INFINITY;
Expand All @@ -70,10 +70,19 @@ protected double computeCost(DataSegment proposalSegment, ServerHolder server, b
return cost * (server.getMaxSize() / server.getAvailableSize());
}

private ClusterCostCache costCacheForLoadingSegments(ServerHolder server)
private double costCacheForLoadingSegments(ServerHolder server, DataSegment proposalSegment)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to computeCostForLoadingSegmentOnServer

{
final double t0 = proposalSegment.getInterval().getStartMillis();
final double t1 = (proposalSegment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR;
double costForLoadingSegments = 0d;
final Set<DataSegment> loadingSegments = server.getPeon().getSegmentsToLoad();
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build();
for (DataSegment segment : loadingSegments) {
final double start = (segment.getInterval().getStartMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR;
final double end = (segment.getInterval().getEndMillis() - t0) / SegmentsCostCache.MILLIS_FACTOR;
final double multiplier = segment.getDataSource().equals(proposalSegment.getDataSource()) ? 2d : 1d;
costForLoadingSegments += multiplier * intervalCost(t1, start, end);
}
return costForLoadingSegments;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class SegmentsCostCache
*/
private static final double HALF_LIFE_DAYS = 1.0;
private static final double LAMBDA = Math.log(2) / HALF_LIFE_DAYS;
private static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA;
public static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA;

/**
* LIFE_THRESHOLD is used to avoid calculations for segments that are "far"
Expand Down