Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication factor column to sys table #14403

Merged
merged 18 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/api-reference/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ Returns a list of all segments for each datasource enabled in the cluster.

Returns a list of all segments for one or more specific datasources enabled in the cluster.

`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus`
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
`GET /druid/coordinator/v1/metadata/segments?includeFullDetails`

Returns a list of all segments for each datasource with the full segment metadata and an extra field `overshadowed`.
Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`.

`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}`
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
`GET /druid/coordinator/v1/metadata/segments?includeFullDetails&datasources={dataSourceName1}&datasources={dataSourceName2}`

Returns a list of all segments for one or more specific datasources with the full segment metadata and an extra field `overshadowed`.
Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`.
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved

`GET /druid/coordinator/v1/metadata/datasources`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,50 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
*
* SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
* DataSegment object plus the overshadowed and target number of replicants for the segment. An immutable object.
* <br></br>
* SegmentPlus's {@link #compareTo} method considers only the {@link SegmentId}
* of the DataSegment object.
*/
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
public class SegmentPlus implements Comparable<SegmentPlus>
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
private final boolean overshadowed;
/**
* The target number of replicants for the segment added across all tiers. This value is null if the load rules for
* the segment have not been evaluated yet.
*/
private final Integer totalTargetReplicants;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
* enclosing class. If in future, if {@code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
* enclosing class. If in the future, if {@code SegmentPlus} were to extend {@link DataSegment},
* there will be no change in the serialized format.
*/
@JsonUnwrapped
private final DataSegment dataSegment;

@JsonCreator
public SegmentWithOvershadowedStatus(
@JsonProperty("overshadowed") boolean overshadowed
public SegmentPlus(
@JsonProperty("overshadowed") boolean overshadowed,
@JsonProperty("totalTargetReplicants") Integer targetReplicants
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
)
{
// Jackson will overwrite dataSegment if needed (even though the field is final)
this(null, overshadowed);
this(null, overshadowed, targetReplicants);
}

public SegmentWithOvershadowedStatus(
public SegmentPlus(
DataSegment dataSegment,
boolean overshadowed
boolean overshadowed,
Integer totalTargetReplicants
)
{
this.dataSegment = dataSegment;
this.overshadowed = overshadowed;
this.totalTargetReplicants = totalTargetReplicants;
}

@JsonProperty
Expand All @@ -70,44 +81,48 @@ public DataSegment getDataSegment()
return dataSegment;
}

@Nullable
@JsonProperty
public Integer getTotalTargetReplicants()
{
return totalTargetReplicants;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof SegmentWithOvershadowedStatus)) {
return false;
}
final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
if (!dataSegment.equals(that.dataSegment)) {
return false;
}
if (overshadowed != (that.overshadowed)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
SegmentPlus that = (SegmentPlus) o;
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
return overshadowed == that.overshadowed
&& Objects.equals(totalTargetReplicants, that.totalTargetReplicants)
&& Objects.equals(dataSegment, that.dataSegment);
}

@Override
public int hashCode()
{
int result = dataSegment.hashCode();
result = 31 * result + Boolean.hashCode(overshadowed);
result = 31 * result + Objects.hash(overshadowed, totalTargetReplicants);
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

@Override
public int compareTo(SegmentWithOvershadowedStatus o)
public int compareTo(SegmentPlus o)
{
return dataSegment.getId().compareTo(o.dataSegment.getId());
}

@Override
public String toString()
{
return "SegmentWithOvershadowedStatus{" +
return "SegmentPlus{" +
"overshadowed=" + overshadowed +
", totalTargetReplicants=" + totalTargetReplicants +
", dataSegment=" + dataSegment +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class SegmentWithOvershadowedStatusTest
public class SegmentPlusTest
{
private static final ObjectMapper MAPPER = createObjectMapper();
private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02");
private static final ImmutableMap<String, Object> LOAD_SPEC = ImmutableMap.of("something", "or_other");
private static final boolean OVERSHADOWED = true;
private static final Integer TOTAL_TARGET_REPLICANTS = 2;
private static final int TEST_VERSION = 0x9;
private static final SegmentWithOvershadowedStatus SEGMENT = createSegmentWithOvershadowedStatus();
private static final SegmentPlus SEGMENT = createSegmentPlusForTest();

private static ObjectMapper createObjectMapper()
{
Expand All @@ -59,7 +61,7 @@ private static ObjectMapper createObjectMapper()
return objectMapper;
}

private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus()
private static SegmentPlus createSegmentPlusForTest()
{
DataSegment dataSegment = new DataSegment(
"something",
Expand All @@ -74,7 +76,7 @@ private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus
1
);

return new SegmentWithOvershadowedStatus(dataSegment, OVERSHADOWED);
return new SegmentPlus(dataSegment, OVERSHADOWED, TOTAL_TARGET_REPLICANTS);
}

@Test
Expand All @@ -85,7 +87,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);

Assert.assertEquals(11, objectMap.size());
Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Expand All @@ -96,12 +98,13 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed"));
Assert.assertEquals(TOTAL_TARGET_REPLICANTS, objectMap.get("totalTargetReplicants"));

final String json = MAPPER.writeValueAsString(SEGMENT);

final TestSegmentWithOvershadowedStatus deserializedSegment = MAPPER.readValue(
final TestSegmentPlus deserializedSegment = MAPPER.readValue(
json,
TestSegmentWithOvershadowedStatus.class
TestSegmentPlus.class
);

DataSegment dataSegment = SEGMENT.getDataSegment();
Expand All @@ -114,30 +117,33 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed());
Assert.assertEquals(TOTAL_TARGET_REPLICANTS, deserializedSegment.getTotalTargetReplicants());
}

// Previously, the implementation of SegmentWithOvershadowedStatus had @JsonCreator/@JsonProperty and @JsonUnwrapped
// Previously, the implementation of SegmentPlus had @JsonCreator/@JsonProperty and @JsonUnwrapped
// on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9:
// https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051
@Test
public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception
{
String json = MAPPER.writeValueAsString(SEGMENT);
SegmentWithOvershadowedStatus segment = MAPPER.readValue(json, SegmentWithOvershadowedStatus.class);
SegmentPlus segment = MAPPER.readValue(json, SegmentPlus.class);
Assert.assertEquals(SEGMENT, segment);
Assert.assertEquals(json, MAPPER.writeValueAsString(segment));
}
}

/**
* Subclass of DataSegment with overshadowed status
* Flat subclass of DataSegment for testing
*/
class TestSegmentWithOvershadowedStatus extends DataSegment
class TestSegmentPlus extends DataSegment
{
private final boolean overshadowed;
private final Integer totalTargetReplicants;

@JsonCreator
public TestSegmentWithOvershadowedStatus(
public TestSegmentPlus(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
Expand All @@ -154,7 +160,8 @@ public TestSegmentWithOvershadowedStatus(
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
@JsonProperty("overshadowed") boolean overshadowed,
@JsonProperty("totalTargetReplicants") Integer totalTargetReplicants
)
{
super(
Expand All @@ -170,6 +177,7 @@ public TestSegmentWithOvershadowedStatus(
size
);
this.overshadowed = overshadowed;
this.totalTargetReplicants = totalTargetReplicants;
}

@JsonProperty
Expand All @@ -178,23 +186,34 @@ public boolean isOvershadowed()
return overshadowed;
}

@JsonProperty
public Integer getTotalTargetReplicants()
{
return totalTargetReplicants;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
TestSegmentPlus that = (TestSegmentPlus) o;
return overshadowed == that.overshadowed && Objects.equals(
totalTargetReplicants,
that.totalTargetReplicants
);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), overshadowed, totalTargetReplicants);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class DruidCoordinator

private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private volatile ImmutableMap<SegmentId, Integer> prevTotalTargetReplicantMap = null;
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
private volatile DruidCluster cluster = null;

private int cachedBalancerThreadNumber;
Expand Down Expand Up @@ -817,6 +819,12 @@ private List<CoordinatorDuty> makeCompactSegmentsDuty()
return ImmutableList.of(compactSegments);
}

@Nullable
public Integer getTotalTargetReplicantsForSegment(SegmentId segmentId)
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
return prevTotalTargetReplicantMap == null ? null : prevTotalTargetReplicantMap.get(segmentId);
}

@VisibleForTesting
protected class DutiesRunnable implements Runnable
{
Expand Down Expand Up @@ -943,6 +951,11 @@ public void run()
}
}
}

if (params.getSegmentReplicantLookup() != null) {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
prevTotalTargetReplicantMap = params.getSegmentReplicantLookup().createTargetReplicantMapCopy();
}

// Emit the runtime of the full DutiesRunnable
params.getEmitter().emit(
new ServiceMetricEvent.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Table;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
Expand All @@ -30,6 +31,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;

/**
* A lookup for the number of replicants of a given segment for a certain tier.
Expand Down Expand Up @@ -79,6 +81,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat

private final Table<SegmentId, String, Integer> segmentsInCluster;
private final Table<SegmentId, String, Integer> loadingSegments;
private final ConcurrentHashMap<SegmentId, Integer> totalTargetReplicants = new ConcurrentHashMap<>();
private final DruidCluster cluster;

private SegmentReplicantLookup(
Expand Down Expand Up @@ -114,6 +117,16 @@ public int getLoadedReplicants(SegmentId segmentId, String tier)
return (retVal == null) ? 0 : retVal;
}

public void setTotalTargetReplicants(SegmentId segmentId, Integer requiredReplicas)
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
totalTargetReplicants.put(segmentId, requiredReplicas);
}

public ImmutableMap<SegmentId, Integer> createTargetReplicantMapCopy()
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
return ImmutableMap.copyOf(totalTargetReplicants);
}

private int getLoadingReplicants(SegmentId segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
Expand Down
Loading