Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alert metrics to identify that elastic task configurations require adjustment #860

Merged
merged 6 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.linkedin.datastream.server.Pair;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;

import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS;


/**
* Partition assignment strategy that does assignment based on throughput information supplied by a
Expand Down Expand Up @@ -155,6 +157,27 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
Map<String, Set<DatastreamTask>> newAssignment = doAssignment(clusterThroughputInfo, currentAssignment,
unassignedPartitions, datastreamPartitions);
partitionSanityChecks(newAssignment, datastreamPartitions);

// verify if the elastic task configurations need adjustment for the datastream.
int maxTasks = resolveConfigWithMetadata(datastreamPartitions.getDatastreamGroup(), CFG_MAX_TASKS, 0);
// if numTasks == maxTasks, the task configurations require readjustment from scale point of view.
if (maxTasks > 0 && maxTasks == getTaskCountForDatastreamGroup(datastreamGroup.getTaskPrefix())) {
updateOrRegisterElasticTaskAssignmentMetrics(datastreamGroup.getTaskPrefix(), true);
}

if (_enablePartitionNumBasedTaskCountEstimation) {
// if the partition count does not honor the partitionsPerTask configuration, the elastic task configurations
// require readjustment.
int partitionsPerTask = getPartitionsPerTask(datastreamPartitions.getDatastreamGroup());
for (Set<DatastreamTask> tasksSet : newAssignment.values()) {
for (DatastreamTask task : tasksSet) {
if (task.getTaskPrefix().equals(datastreamGroup.getTaskPrefix()) && task.getPartitionsV2().size() > partitionsPerTask) {
updateOrRegisterElasticTaskAssignmentMetrics(task.getTaskPrefix(), true);
break;
}
}
}
}
return newAssignment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@
public class StickyPartitionAssignmentStrategy extends StickyMulticastStrategy implements MetricsAware {
public static final String CFG_MIN_TASKS = "minTasks";

private static final String CLASS_NAME = StickyPartitionAssignmentStrategy.class.getSimpleName();
@VisibleForTesting
static final String CLASS_NAME = StickyPartitionAssignmentStrategy.class.getSimpleName();
@VisibleForTesting
static final String NUM_TASKS = "numTasks";
private static final String ACTUAL_PARTITIONS_PER_TASK = "actualPartitionsPerTask";
private static final String PARTITIONS_PER_TASK_NEEDS_ADJUSTMENT = "partitionsPerTaskNeedsAdjustment";
@VisibleForTesting
static final String ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT = "elasticTaskParametersNeedAdjustment";

private static final Logger LOG = LoggerFactory.getLogger(StickyPartitionAssignmentStrategy.class.getName());
private static final DynamicMetricsManager DYNAMIC_METRICS_MANAGER = DynamicMetricsManager.getInstance();
Expand Down Expand Up @@ -172,6 +176,15 @@ public Map<String, Set<DatastreamTask>> assign(List<DatastreamGroup> datastreams
return super.assign(datastreams, instances, currentAssignment);
}

/**
* Returns partitions per task based on Datastream group
* @param datastreamGroup Name of the datastream group
* @return partitions per task
*/
public Integer getPartitionsPerTask(DatastreamGroup datastreamGroup) {
return resolveConfigWithMetadata(datastreamGroup, CFG_PARTITIONS_PER_TASK, _partitionsPerTask);
}

protected Pair<List<String>, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup(
Map<String, Set<DatastreamTask>> currentAssignment, String datastreamGroupName) {
List<String> assignedPartitions = new ArrayList<>();
Expand Down Expand Up @@ -510,8 +523,9 @@ public List<BrooklinMetricInfo> getMetricInfos() {
List<BrooklinMetricInfo> metrics = new ArrayList<>();
String prefix = CLASS_NAME + MetricsAware.KEY_REGEX;

metrics.add(new BrooklinGaugeInfo(prefix + NUM_TASKS));
metrics.add(new BrooklinGaugeInfo(prefix + ACTUAL_PARTITIONS_PER_TASK));
metrics.add(new BrooklinGaugeInfo(prefix + PARTITIONS_PER_TASK_NEEDS_ADJUSTMENT));
metrics.add(new BrooklinGaugeInfo(prefix + ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT));

return Collections.unmodifiableList(metrics);
}
Expand Down Expand Up @@ -557,12 +571,24 @@ protected void updateOrRegisterElasticTaskAssignmentMetrics(DatastreamGroupParti
int actualPartitionsPerTask = (totalPartitions / totalTaskCount)
+ (((totalPartitions % totalTaskCount) == 0) ? 0 : 1);

int partitionsPerTask = resolveConfigWithMetadata(datastreamPartitions.getDatastreamGroup(),
CFG_PARTITIONS_PER_TASK, _partitionsPerTask);
int partitionsPerTask = getPartitionsPerTask(datastreamPartitions.getDatastreamGroup());
String taskPrefix = datastreamPartitions.getDatastreamGroup().getTaskPrefix();
updateOrRegisterElasticTaskAssignmentMetrics(taskPrefix, actualPartitionsPerTask, partitionsPerTask);
}

private void updateOrRegisterElasticTaskAssignmentMetrics(String taskPrefix, int actualPartitionsPerTask, int partitionsPerTask) {
ElasticTaskAssignmentInfo elasticTaskAssignmentInfo =
new ElasticTaskAssignmentInfo(actualPartitionsPerTask, actualPartitionsPerTask > partitionsPerTask);

updateOrRegisterElasticTaskAssignmentMetrics(taskPrefix, elasticTaskAssignmentInfo);
}

void updateOrRegisterElasticTaskAssignmentMetrics(String taskPrefix, boolean needsAdjustment) {
ElasticTaskAssignmentInfo elasticTaskAssignmentInfo = new ElasticTaskAssignmentInfo(-1, needsAdjustment);
updateOrRegisterElasticTaskAssignmentMetrics(taskPrefix, elasticTaskAssignmentInfo);
}

private void updateOrRegisterElasticTaskAssignmentMetrics(String taskPrefix, ElasticTaskAssignmentInfo elasticTaskAssignmentInfo) {
if (!_elasticTaskAssignmentInfoHashMap.containsKey(taskPrefix)) {
registerElasticTaskAssignmentMetrics(taskPrefix);
}
Expand All @@ -575,8 +601,7 @@ protected int getTaskCountEstimateBasedOnNumPartitions(DatastreamGroupPartitions
// datastream. Assess the number of tasks needed based on partitionsPerTask and the fullness threshold. If
// the number of tasks needed is smaller than the number of tasks found, throw a DatastreamRuntimeException
// so that LEADER_DO_ASSIGNMENT and LEADER_PARTITION_ASSIGNMENT can be retried with an updated number of tasks.
int partitionsPerTask = resolveConfigWithMetadata(datastreamPartitions.getDatastreamGroup(),
CFG_PARTITIONS_PER_TASK, _partitionsPerTask);
int partitionsPerTask = getPartitionsPerTask(datastreamPartitions.getDatastreamGroup());
int partitionFullnessFactorPct = resolveConfigWithMetadata(datastreamPartitions.getDatastreamGroup(),
CFG_PARTITION_FULLNESS_THRESHOLD_PCT, _partitionFullnessFactorPct);
LOG.info("Calculating number of tasks needed based on partitions per task: calculated->{}:config->{}, "
Expand Down Expand Up @@ -706,6 +731,9 @@ private int getNumTasksForDatastreamFromZK(String stream) {
* Register metrics related to elastic task assignment
*/
private void registerElasticTaskAssignmentMetrics(String taskPrefix) {
Supplier<Integer> numTasksSupplier = () -> getTaskCountForDatastreamGroup(taskPrefix);
DYNAMIC_METRICS_MANAGER.registerGauge(CLASS_NAME, taskPrefix, NUM_TASKS, numTasksSupplier);

Supplier<Double> actualPartitionsPerTaskSupplier = () ->
(double) (_elasticTaskAssignmentInfoHashMap.containsKey(taskPrefix)
? _elasticTaskAssignmentInfoHashMap.get(taskPrefix).getActualPartitionsPerTask() : 0);
Expand All @@ -715,24 +743,26 @@ private void registerElasticTaskAssignmentMetrics(String taskPrefix) {
Supplier<Double> needsAdjustmentSupplier = () ->
(_elasticTaskAssignmentInfoHashMap.containsKey(taskPrefix)
&& _elasticTaskAssignmentInfoHashMap.get(taskPrefix).getNeedsAdjustment()) ? 1.0 : 0.0;
DYNAMIC_METRICS_MANAGER.registerGauge(CLASS_NAME, taskPrefix, PARTITIONS_PER_TASK_NEEDS_ADJUSTMENT,
DYNAMIC_METRICS_MANAGER.registerGauge(CLASS_NAME, taskPrefix, ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT,
needsAdjustmentSupplier);
}

/**
* Unregister metrics for a given datastream group
*/
protected void unregisterMetrics(String datastreamTaskPrefix) {
vmaheshw marked this conversation as resolved.
Show resolved Hide resolved
DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastreamTaskPrefix, NUM_TASKS);
DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastreamTaskPrefix, ACTUAL_PARTITIONS_PER_TASK);
DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastreamTaskPrefix, PARTITIONS_PER_TASK_NEEDS_ADJUSTMENT);
DYNAMIC_METRICS_MANAGER.unregisterMetric(CLASS_NAME, datastreamTaskPrefix, ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT);
}

/**
* Contains the elastic task information for a given datastream group
*/
private static class ElasticTaskAssignmentInfo {
private int _actualPartitionsPerTask;
private boolean _needsAdjustment;
// -1 implies variable partitions per task
private final int _actualPartitionsPerTask;
private final boolean _needsAdjustment;

public ElasticTaskAssignmentInfo(int actualPartitionsPerTask, boolean needsAdjustment) {
_actualPartitionsPerTask = actualPartitionsPerTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -44,6 +47,9 @@
import com.linkedin.datastream.testutil.MetricsTestUtils;

import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.DEFAULT_IMBALANCE_THRESHOLD;
import static com.linkedin.datastream.server.assignment.StickyPartitionAssignmentStrategy.CLASS_NAME;
import static com.linkedin.datastream.server.assignment.StickyPartitionAssignmentStrategy.ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT;
import static com.linkedin.datastream.server.assignment.StickyPartitionAssignmentStrategy.NUM_TASKS;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -56,6 +62,15 @@ public class TestStickyPartitionAssignmentStrategy {
private EmbeddedZookeeper _embeddedZookeeper;
private String _clusterName;
private ZkClient _zkClient;
private DynamicMetricsManager _metricsManager;

/**
* Test class initialization code
*/
@BeforeClass
public void setupClass() throws IOException {
_metricsManager = DynamicMetricsManager.createInstance(new MetricRegistry(), "TestStickyPartitionAssignment");
}

@BeforeMethod
public void setup() throws IOException {
Expand All @@ -64,12 +79,18 @@ public void setup() throws IOException {
String zkConnectionString = _embeddedZookeeper.getConnection();
_embeddedZookeeper.startup();
_zkClient = new ZkClient(zkConnectionString);
DynamicMetricsManager.createInstance(new MetricRegistry(), "TestStickyPartitionAssignment");
}

@AfterMethod
public void teardown() throws Exception {
_embeddedZookeeper.shutdown();
}

/**
* Test class teardown code
*/
@AfterClass
public void teardownClass() throws Exception {
// A hack to force clean up DynamicMetricsManager
Field field = DynamicMetricsManager.class.getDeclaredField("_instance");
try {
Expand Down Expand Up @@ -446,7 +467,7 @@ public void testElasticTaskPartitionAssignmentRepeatedPartitionAssignments() {
StickyPartitionAssignmentStrategy strategy = createStickyPartitionAssignmentStrategy(partitionsPerTask,
fullnessFactorPct, true, _zkClient, _clusterName);

List<DatastreamGroup> datastreams = generateDatastreams("ds", 1, minTasks);
List<DatastreamGroup> datastreams = generateDatastreams("testElasticTaskPartitionAssignmentRepeatedPartitionAssignments", 1, minTasks);
datastreams.forEach(dg -> _zkClient.ensurePath(KeyBuilder.datastream(_clusterName, dg.getTaskPrefix())));

Map<String, Set<DatastreamTask>> assignment = Collections.emptyMap();
Expand Down Expand Up @@ -541,6 +562,10 @@ public void testElasticTaskPartitionAssignmentRepeatedPartitionAssignments() {
validatePartitionAssignment(assignment, partitions, maxPartitionsPerTask, numTasksNeeded);

MetricsTestUtils.verifyMetrics(strategy, DynamicMetricsManager.getInstance());
Gauge<?> gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), NUM_TASKS));
Assert.assertEquals(gauge.getValue(), numTasksNeeded);
gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT));
Assert.assertEquals(gauge.getValue(), 0.0);
}

@Test
Expand All @@ -552,7 +577,7 @@ public void testElasticTaskPartitionAssignmentCreatesMinTasksEvenForSmallPartiti
StickyPartitionAssignmentStrategy strategy = createStickyPartitionAssignmentStrategy(partitionsPerTask,
fullnessFactorPct, true, _zkClient, _clusterName);

List<DatastreamGroup> datastreams = generateDatastreams("ds", 1, minTasks);
List<DatastreamGroup> datastreams = generateDatastreams("testElasticTaskPartitionAssignmentCreatesMinTasksEvenForSmallPartitionCount", 1, minTasks);
datastreams.forEach(dg -> _zkClient.ensurePath(KeyBuilder.datastream(_clusterName, dg.getTaskPrefix())));

Map<String, Set<DatastreamTask>> assignment = Collections.emptyMap();
Expand All @@ -575,6 +600,10 @@ public void testElasticTaskPartitionAssignmentCreatesMinTasksEvenForSmallPartiti
validatePartitionAssignment(assignment, partitions, maxPartitionsPerTask, minTasks);

MetricsTestUtils.verifyMetrics(strategy, DynamicMetricsManager.getInstance());
Gauge<?> gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), NUM_TASKS));
Assert.assertEquals(gauge.getValue(), minTasks);
gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT));
Assert.assertEquals(gauge.getValue(), 0.0);
}

@Test
Expand All @@ -587,7 +616,7 @@ public void testElasticTaskPartitionAssignmentCreatesAtMostMaxTasks() {
StickyPartitionAssignmentStrategy strategy =
createStickyPartitionAssignmentStrategy(partitionsPerTask, fullnessFactorPct, true, _zkClient, _clusterName);

List<DatastreamGroup> datastreams = generateDatastreams("ds", 1, minTasks);
List<DatastreamGroup> datastreams = generateDatastreams("testElasticTaskPartitionAssignmentCreatesAtMostMaxTasks", 1, minTasks);
datastreams.forEach(datastreamGroup -> {
datastreamGroup.getDatastreams().get(0).getMetadata()
.put(BroadcastStrategyFactory.CFG_MAX_TASKS, String.valueOf(maxTasks));
Expand Down Expand Up @@ -631,6 +660,10 @@ public void testElasticTaskPartitionAssignmentCreatesAtMostMaxTasks() {
validatePartitionAssignment(assignment, partitions, maxPartitionsPerTask, maxTasks);

MetricsTestUtils.verifyMetrics(strategy, DynamicMetricsManager.getInstance());
Gauge<?> gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), NUM_TASKS));
Assert.assertEquals(gauge.getValue(), maxTasks);
gauge = _metricsManager.getMetric(MetricRegistry.name(CLASS_NAME, datastreams.get(0).getName(), ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT));
Assert.assertEquals(gauge.getValue(), 1.0);
}

@Test
Expand Down