Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication factor column to sys table #14403

Merged
merged 18 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/api-reference/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ Returns a list of all segments for one or more specific datasources enabled in t

`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus`
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved

Returns a list of all segments for each datasource with the full segment metadata and an extra field `overshadowed`.
Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.

`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}`
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved

Returns a list of all segments for one or more specific datasources with the full segment metadata and an extra field `overshadowed`.
Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.

`GET /druid/coordinator/v1/metadata/datasources`

Expand Down
1 change: 1 addition & 0 deletions docs/querying/sql-metadata-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|dimensions|VARCHAR|JSON-serialized form of the segment dimensions|
|metrics|VARCHAR|JSON-serialized form of the segment metrics|
|last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|
|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet.|

For example, to retrieve all currently active segments for datasource "wikipedia", use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"shard_spec": "{\"type\":\"none\"}",
"dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
"metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
"last_compaction_state": null
"last_compaction_state": null,
"replication_factor": 2
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,55 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
*
* SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
* of the DataSegment object.
* This class represents the current state of a segment in the cluster and encapsulates the following:
* <ul>
* <li>the {@code DataSegment} object</li>
* <li>overshadowed status of the segment</li>
* <li>replication factor of the segment</li>
* </ul>
* <br></br>
* Objects of this class are used to sync the state of segments from the Coordinator to different services, typically the Broker.
* The {@link #compareTo} method considers only the {@link SegmentId}.
*/
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
public class SegmentStatusInCluster implements Comparable<SegmentStatusInCluster>
{
private final boolean overshadowed;
/**
* The replication factor for the segment added across all tiers. This value is null if the load rules for
* the segment have not been evaluated yet.
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
*/
private final Integer replicationFactor;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
* enclosing class. If in future, if {@code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
* enclosing class. If in the future, if {@code SegmentStatusInCluster} were to extend {@link DataSegment},
* there will be no change in the serialized format.
*/
@JsonUnwrapped
private final DataSegment dataSegment;

@JsonCreator
public SegmentWithOvershadowedStatus(
@JsonProperty("overshadowed") boolean overshadowed
public SegmentStatusInCluster(
@JsonProperty("overshadowed") boolean overshadowed,
@JsonProperty("replicationFactor") @Nullable Integer replicationFactor
)
{
// Jackson will overwrite dataSegment if needed (even though the field is final)
this(null, overshadowed);
this(null, overshadowed, replicationFactor);
}

public SegmentWithOvershadowedStatus(
public SegmentStatusInCluster(
DataSegment dataSegment,
boolean overshadowed
boolean overshadowed,
Integer replicationFactor
)
{
this.dataSegment = dataSegment;
this.overshadowed = overshadowed;
this.replicationFactor = replicationFactor;
}

@JsonProperty
Expand All @@ -70,44 +86,46 @@ public DataSegment getDataSegment()
return dataSegment;
}

@Nullable
@JsonProperty
public Integer getReplicationFactor()
{
return replicationFactor;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof SegmentWithOvershadowedStatus)) {
return false;
}
final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
if (!dataSegment.equals(that.dataSegment)) {
return false;
}
if (overshadowed != (that.overshadowed)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
SegmentStatusInCluster that = (SegmentStatusInCluster) o;
return overshadowed == that.overshadowed
&& Objects.equals(replicationFactor, that.replicationFactor)
&& Objects.equals(dataSegment, that.dataSegment);
}

@Override
public int hashCode()
{
int result = dataSegment.hashCode();
result = 31 * result + Boolean.hashCode(overshadowed);
return result;
return Objects.hash(overshadowed, replicationFactor, dataSegment);
}

@Override
public int compareTo(SegmentWithOvershadowedStatus o)
public int compareTo(SegmentStatusInCluster o)
{
return dataSegment.getId().compareTo(o.dataSegment.getId());
}

@Override
public String toString()
{
return "SegmentWithOvershadowedStatus{" +
return "SegmentStatusInCluster{" +
"overshadowed=" + overshadowed +
", replicationFactor=" + replicationFactor +
", dataSegment=" + dataSegment +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class SegmentWithOvershadowedStatusTest
public class SegmentStatusInClusterTest
{
private static final ObjectMapper MAPPER = createObjectMapper();
private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02");
private static final ImmutableMap<String, Object> LOAD_SPEC = ImmutableMap.of("something", "or_other");
private static final boolean OVERSHADOWED = true;
private static final Integer REPLICATION_FACTOR = 2;
private static final int TEST_VERSION = 0x9;
private static final SegmentWithOvershadowedStatus SEGMENT = createSegmentWithOvershadowedStatus();
private static final SegmentStatusInCluster SEGMENT = createSegmentForTest();

private static ObjectMapper createObjectMapper()
{
Expand All @@ -59,7 +61,7 @@ private static ObjectMapper createObjectMapper()
return objectMapper;
}

private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus()
private static SegmentStatusInCluster createSegmentForTest()
{
DataSegment dataSegment = new DataSegment(
"something",
Expand All @@ -74,7 +76,7 @@ private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus
1
);

return new SegmentWithOvershadowedStatus(dataSegment, OVERSHADOWED);
return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, REPLICATION_FACTOR);
}

@Test
Expand All @@ -85,7 +87,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);

Assert.assertEquals(11, objectMap.size());
Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Expand All @@ -96,12 +98,13 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed"));
Assert.assertEquals(REPLICATION_FACTOR, objectMap.get("replicationFactor"));

final String json = MAPPER.writeValueAsString(SEGMENT);

final TestSegmentWithOvershadowedStatus deserializedSegment = MAPPER.readValue(
final TestSegment deserializedSegment = MAPPER.readValue(
json,
TestSegmentWithOvershadowedStatus.class
TestSegment.class
);

DataSegment dataSegment = SEGMENT.getDataSegment();
Expand All @@ -114,30 +117,33 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed());
Assert.assertEquals(REPLICATION_FACTOR, deserializedSegment.getReplicationFactor());
}

// Previously, the implementation of SegmentWithOvershadowedStatus had @JsonCreator/@JsonProperty and @JsonUnwrapped
// Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped
// on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9:
// https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051
@Test
public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception
{
String json = MAPPER.writeValueAsString(SEGMENT);
SegmentWithOvershadowedStatus segment = MAPPER.readValue(json, SegmentWithOvershadowedStatus.class);
SegmentStatusInCluster segment = MAPPER.readValue(json, SegmentStatusInCluster.class);
Assert.assertEquals(SEGMENT, segment);
Assert.assertEquals(json, MAPPER.writeValueAsString(segment));
}
}

/**
* Subclass of DataSegment with overshadowed status
* Flat subclass of DataSegment for testing
*/
class TestSegmentWithOvershadowedStatus extends DataSegment
class TestSegment extends DataSegment
{
private final boolean overshadowed;
private final Integer replicationFactor;

@JsonCreator
public TestSegmentWithOvershadowedStatus(
public TestSegment(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
Expand All @@ -154,7 +160,8 @@ public TestSegmentWithOvershadowedStatus(
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
@JsonProperty("overshadowed") boolean overshadowed,
@JsonProperty("replicationFactor") Integer replicationFactor
)
{
super(
Expand All @@ -170,6 +177,7 @@ public TestSegmentWithOvershadowedStatus(
size
);
this.overshadowed = overshadowed;
this.replicationFactor = replicationFactor;
}

@JsonProperty
Expand All @@ -178,23 +186,31 @@ public boolean isOvershadowed()
return overshadowed;
}

@JsonProperty
public Integer getReplicationFactor()
{
return replicationFactor;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
TestSegment that = (TestSegment) o;
return overshadowed == that.overshadowed && Objects.equals(replicationFactor, that.replicationFactor);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), overshadowed, replicationFactor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ public class DruidCoordinator

private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;

/**
* Contains a map of segmentId to total replication factor across all tiers. This map is refreshed when load rules are
* evaluated. It is used by {@link DruidCoordinator} to supply this value to
* {@link org.apache.druid.server.http.MetadataResource}.
*/
private volatile Object2IntMap<SegmentId> segmentIdToReplicationFactor = null;
private volatile DruidCluster cluster = null;

private int cachedBalancerThreadNumber;
Expand Down Expand Up @@ -817,6 +824,12 @@ private List<CoordinatorDuty> makeCompactSegmentsDuty()
return ImmutableList.of(compactSegments);
}

@Nullable
public Integer getReplicationFactorForSegment(SegmentId segmentId)
{
return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.get(segmentId);
Fixed Show fixed Hide fixed
}

@VisibleForTesting
protected class DutiesRunnable implements Runnable
{
Expand Down Expand Up @@ -943,6 +956,13 @@ public void run()
}
}
}

// Update the immutable replication factor map with latest values.
// This value is set here as it is recalculated during load rule evaluation.
if (params.getSegmentReplicantLookup() != null) {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
segmentIdToReplicationFactor = params.getSegmentReplicantLookup().getSegmentIdToReplicationFactor();
}

// Emit the runtime of the full DutiesRunnable
params.getEmitter().emit(
new ServiceMetricEvent.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
Expand Down Expand Up @@ -79,6 +81,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat

private final Table<SegmentId, String, Integer> segmentsInCluster;
private final Table<SegmentId, String, Integer> loadingSegments;
private final Map<SegmentId, Integer> segmentIdToReplicationFactor = new HashMap<>();
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
private final DruidCluster cluster;

private SegmentReplicantLookup(
Expand Down Expand Up @@ -114,6 +117,18 @@ public int getLoadedReplicants(SegmentId segmentId, String tier)
return (retVal == null) ? 0 : retVal;
}

// TODO: Refactor this setter, as this class is following a singleton pattern with only getters, and this breaks convention.
// This would be revamped in https://github.com/apache/druid/pull/13197
public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas)
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
segmentIdToReplicationFactor.put(segmentId, requiredReplicas);
}

public Object2IntMap<SegmentId> getSegmentIdToReplicationFactor()
{
return new Object2IntOpenHashMap<>(segmentIdToReplicationFactor);
}

private int getLoadingReplicants(SegmentId segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
Expand Down
Loading