Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication factor column to sys table #14403

Merged
merged 18 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/querying/sql-metadata-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|dimensions|VARCHAR|JSON-serialized form of the segment dimensions|
|metrics|VARCHAR|JSON-serialized form of the segment metrics|
|last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|
|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded or is about to be unloaded if it is currently loaded. This value is -1 if load rules for the segment have not been evaluated yet.|
|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet.|

For example, to retrieve all currently active segments for datasource "wikipedia", use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class DruidCoordinator
* evaluated. It is used by {@link DruidCoordinator} to supply this value to
* {@link org.apache.druid.server.http.MetadataResource}.
*/
private volatile Map<SegmentId, Integer> segmentIdToReplicationFactor = null;
private volatile Object2IntMap<SegmentId> segmentIdToReplicationFactor = null;
private volatile DruidCluster cluster = null;

private int cachedBalancerThreadNumber;
Expand Down Expand Up @@ -827,7 +827,7 @@ private List<CoordinatorDuty> makeCompactSegmentsDuty()
@Nullable
public Integer getReplicationFactorForSegment(SegmentId segmentId)
{
return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.get(segmentId);
return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.getInt(segmentId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package org.apache.druid.server.coordinator;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Table;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
Expand Down Expand Up @@ -123,9 +124,9 @@ public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas)
segmentIdToReplicationFactor.put(segmentId, requiredReplicas);
}

public Map<SegmentId, Integer> getSegmentIdToReplicationFactor()
public Object2IntMap<SegmentId> getSegmentIdToReplicationFactor()
{
return ImmutableMap.copyOf(segmentIdToReplicationFactor);
return new Object2IntOpenHashMap<>(segmentIdToReplicationFactor);
}

private int getLoadingReplicants(SegmentId segmentId, String tier)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentStatusInCluster;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.List;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public class MetadataResourceTest
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
private static final String DATASOURCE1 = "datasource1";
private static final String DATASOURCE2 = "datasource2";

private MetadataResource metadataResource;

private SegmentsMetadataManager segmentsMetadataManager;
private DruidCoordinator coordinator;
private HttpServletRequest request;

private final DataSegment dataSegment1 = new DataSegment(
DATASOURCE1,
Intervals.of("2010-01-01/P1D"),
"v0",
null,
null,
null,
null,
0x9,
10
);

private final DataSegment dataSegment2 = new DataSegment(
DATASOURCE1,
Intervals.of("2010-01-22/P1D"),
"v0",
null,
null,
null,
null,
0x9,
20
);

private final DataSegment dataSegment3 = new DataSegment(
DATASOURCE2,
Intervals.of("2010-01-01/P1M"),
"v0",
null,
null,
null,
null,
0x9,
30
);

private final DataSegment dataSegment4 = new DataSegment(
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
DATASOURCE2,
Intervals.of("2010-01-02/P1D"),
"v0",
null,
null,
null,
null,
0x9,
35
);

@Before
public void setUp()
{
// Create mock request
request = mock(HttpServletRequest.class);
doReturn(mock(AuthenticationResult.class)).when(request).getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT);

// Mock SegmentsMetadataManager
segmentsMetadataManager = mock(SegmentsMetadataManager.class);
ImmutableDruidDataSource druidDataSource1 = new ImmutableDruidDataSource(
DATASOURCE1,
ImmutableMap.of(),
ImmutableList.of(
dataSegment1,
dataSegment2
)
);

ImmutableDruidDataSource druidDataSource2 = new ImmutableDruidDataSource(
DATASOURCE1,
ImmutableMap.of(),
ImmutableList.of(
dataSegment3,
dataSegment4
)
);

// Mock segments from cache and coordinator
DataSourcesSnapshot dataSourcesSnapshot = mock(DataSourcesSnapshot.class);
doReturn(dataSourcesSnapshot).when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments();

doReturn(ImmutableList.of(druidDataSource1, druidDataSource2)).when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments();
// Segment 4 is overshadowed

// Mock Coordinator
coordinator = mock(DruidCoordinator.class);
// Segment 1: Replication factor 2, not overshadowed
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
doReturn(2).when(coordinator).getReplicationFactorForSegment(dataSegment1.getId());
// Segment 2: Replication factor null, not overshadowed
doReturn(null).when(coordinator).getReplicationFactorForSegment(dataSegment2.getId());
// Segment 3: Replication factor 1, not overshadowed
doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment3.getId());
// Segment 4: Replication factor 1, overshadowed
doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId());
doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments();

metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper());
}

@Test
public void testGetAllSegmentsWithOvershadowedStatus()
{
Response response = metadataResource.getAllUsedSegments(
request,
null,
"includeOvershadowedStatus"
);

List<SegmentStatusInCluster> resultList = materializeResponse(response);
Assert.assertEquals(resultList.size(), 4);
Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0));
Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1));
Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2));
// Replication factor should be 0 as the segment is overshadowed
Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3));
}

private List<SegmentStatusInCluster> materializeResponse(Response response)
{
Iterable<SegmentStatusInCluster> resultIterator = (Iterable<SegmentStatusInCluster>) response.getEntity();
List<SegmentStatusInCluster> segmentStatusInClusters = new ArrayList<>();
resultIterator.forEach(segmentStatusInClusters::add);
return segmentStatusInClusters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
import org.apache.curator.shaded.com.google.common.cache.Cache;
import org.apache.curator.shaded.com.google.common.cache.CacheBuilder;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.client.JsonParserIterator;
Expand Down Expand Up @@ -80,6 +80,10 @@ public class MetadataSegmentView
*/
@MonotonicNonNull
private volatile ImmutableSortedSet<SegmentStatusInCluster> publishedSegments = null;
/**
* Caches segmentId vs replication factor. In case the coordinator restarts, this is used to refer to previous values
* to prevent randomly flapping to null.
*/
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
private final Cache<SegmentId, Integer> segmentIdToReplicationFactor;
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
private final ScheduledExecutorService scheduledExec;
private final long pollPeriodInMS;
Expand All @@ -102,7 +106,7 @@ public MetadataSegmentView(
this.pollPeriodInMS = config.getMetadataSegmentPollPeriod();
this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
this.segmentIdToReplicationFactor = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build();
}

Expand Down Expand Up @@ -151,7 +155,7 @@ private void poll()
final SegmentStatusInCluster segment = metadataSegments.next();
final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
Integer replicationFactor = segment.getReplicationFactor();
if (segment.getReplicationFactor() == null) {
if (replicationFactor == null) {
replicationFactor = segmentIdToReplicationFactor.getIfPresent(segment.getDataSegment().getId());
} else {
segmentIdToReplicationFactor.put(segment.getDataSegment().getId(), segment.getReplicationFactor());
Expand Down