Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the CPU Intensive RemoveAll with Lists in Sticky & Load Based Partition Assignment Strategies #965

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int default
*/
public Map<String, Set<DatastreamTask>> assignPartitions(
ClusterThroughputInfo throughputInfo, Map<String, Set<DatastreamTask>> currentAssignment,
List<String> unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) {
Set<String> unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) {
String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName();
LOG.info("START: assignPartitions for datasteam={}", datastreamGroupName);
Map<String, PartitionThroughputInfo> partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -107,17 +108,17 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr
}

String datastreamGroupName = datastreamGroup.getName();
Pair<List<String>, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(
Pair<Set<String>, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(
currentAssignment, datastreamGroupName);
List<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();
Set<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();
int taskCount = assignedPartitionsAndTaskCount.getValue();
LOG.info("Old partition assignment info, assignment: {}", currentAssignment);
Validate.isTrue(taskCount > 0, String.format("No tasks found for datastream group %s", datastreamGroup));
Validate.isTrue(currentAssignment.size() > 0,
"Zero tasks assigned. Retry leader partition assignment");

// Calculating unassigned partitions
List<String> unassignedPartitions = new ArrayList<>(datastreamPartitions.getPartitions());
Set<String> unassignedPartitions = new HashSet<>(datastreamPartitions.getPartitions());
unassignedPartitions.removeAll(assignedPartitions);

ClusterThroughputInfo clusterThroughputInfo = new ClusterThroughputInfo(StringUtils.EMPTY, Collections.emptyMap());
Expand Down Expand Up @@ -192,7 +193,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<Datastr

@VisibleForTesting
Map<String, Set<DatastreamTask>> doAssignment(ClusterThroughputInfo clusterThroughputInfo,
Map<String, Set<DatastreamTask>> currentAssignment, List<String> unassignedPartitions,
Map<String, Set<DatastreamTask>> currentAssignment, Set<String> unassignedPartitions,
DatastreamGroupPartitionsMetadata datastreamPartitions) {
Map<String, Set<DatastreamTask>> assignment = _assigner.assignPartitions(
clusterThroughputInfo, currentAssignment, unassignedPartitions, datastreamPartitions, _maxPartitionPerTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.linkedin.datastream.server.assignment;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -51,13 +50,13 @@ public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtiliza
* Gets the estimated number of tasks based on per-partition throughput information.
* NOTE: This does not take into account numPartitionsPerTask configuration
* @param throughputInfo Per-partition throughput information
* @param assignedPartitions The list of assigned partitions
* @param unassignedPartitions The list of unassigned partitions
* @param assignedPartitions The set of assigned partitions
* @param unassignedPartitions The set of unassigned partitions
* @param datastreamName Name of the datastream
* @return The estimated number of tasks
*/
public int getTaskCount(ClusterThroughputInfo throughputInfo, List<String> assignedPartitions,
List<String> unassignedPartitions, String datastreamName) {
public int getTaskCount(ClusterThroughputInfo throughputInfo, Set<String> assignedPartitions,
Set<String> unassignedPartitions, String datastreamName) {
Validate.notNull(throughputInfo, "null throughputInfo");
Validate.notNull(assignedPartitions, "null assignedPartitions");
Validate.notNull(unassignedPartitions, "null unassignedPartitions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ public Integer getPartitionsPerTask(DatastreamGroup datastreamGroup) {
return resolveConfigWithMetadata(datastreamGroup, CFG_PARTITIONS_PER_TASK, _partitionsPerTask);
}

protected Pair<List<String>, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup(
protected Pair<Set<String>, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup(
Map<String, Set<DatastreamTask>> currentAssignment, String datastreamGroupName) {
List<String> assignedPartitions = new ArrayList<>();
Set<String> assignedPartitions = new HashSet<>();
int taskCount = 0;
for (Set<DatastreamTask> tasks : currentAssignment.values()) {
Set<DatastreamTask> dgTask = tasks.stream().filter(t -> datastreamGroupName.equals(t.getTaskPrefix()))
Expand Down Expand Up @@ -218,9 +218,9 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String,
String dgName = datastreamGroup.getName();

// Step 1: collect the # of tasks and figured out the unassigned partitions
Pair<List<String>, Integer> assignedPartitionsAndTaskCount =
Pair<Set<String>, Integer> assignedPartitionsAndTaskCount =
getAssignedPartitionsAndTaskCountForDatastreamGroup(currentAssignment, dgName);
List<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();
Set<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();
int totalTaskCount = assignedPartitionsAndTaskCount.getValue();
Validate.isTrue(totalTaskCount > 0, String.format("No tasks found for datastream group %s", dgName));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
*/
package com.linkedin.datastream.server;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Set;

import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -42,8 +42,8 @@ public void setup() {
@Test
public void emptyAssignmentReturnsZeroTasksTest() {
ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("pizza");
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = Collections.emptyList();
Set<String> assignedPartitions = Collections.emptySet();
Set<String> unassignedPartitions = Collections.emptySet();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand All @@ -53,9 +53,9 @@ public void emptyAssignmentReturnsZeroTasksTest() {
@Test
public void lowThroughputAssignmentReturnsOneTaskTest() {
ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("pizza");
List<String> assignedPartitions = new ArrayList<>();
Set<String> assignedPartitions = new HashSet<>();
assignedPartitions.add("Pepperoni-1");
List<String> unassignedPartitions = Collections.emptyList();
Set<String> unassignedPartitions = Collections.emptySet();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand All @@ -65,8 +65,8 @@ public void lowThroughputAssignmentReturnsOneTaskTest() {
@Test
public void highThroughputAssignmentTest() {
ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("ice-cream");
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
Set<String> assignedPartitions = Collections.emptySet();
Set<String> unassignedPartitions = throughputInfo.getPartitionInfoMap().keySet();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand All @@ -81,8 +81,8 @@ public void highThroughputAssignmentTest() {
@Test
public void highThroughputAssignmentTest2() {
ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("donut");
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet());
Set<String> assignedPartitions = Collections.emptySet();
Set<String> unassignedPartitions = throughputInfo.getPartitionInfoMap().keySet();
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand All @@ -92,8 +92,8 @@ public void highThroughputAssignmentTest2() {
@Test
public void partitionsHaveDefaultWeightTest() {
ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", new HashMap<>());
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = Arrays.asList("P1", "P2");
Set<String> assignedPartitions = Collections.emptySet();
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2"));
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand All @@ -103,8 +103,8 @@ public void partitionsHaveDefaultWeightTest() {
@Test
public void throughputTaskEstimatorWithTopicLevelInformation() {
ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("fruit");
List<String> assignedPartitions = Collections.emptyList();
List<String> unassignedPartitions = Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0");
Set<String> assignedPartitions = Collections.emptySet();
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0"));
LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS,
TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE);
int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package com.linkedin.datastream.server.assignment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -56,7 +55,7 @@ public void setup() {

@Test
public void assignFromScratchTest() {
List<String> unassignedPartitions = Arrays.asList("P1", "P2", "P3");
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2", "P3"));
ClusterThroughputInfo throughputInfo = getDummyClusterThroughputInfo(unassignedPartitions);

Datastream ds1 = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds1")[0];
Expand Down Expand Up @@ -107,9 +106,9 @@ public void assignFromScratchTest() {

@Test
public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
List<String> assignedPartitions = Arrays.asList("P1", "P2");
List<String> unassignedPartitions = Collections.singletonList("P3");
List<String> allPartitions = new ArrayList<>(assignedPartitions);
Set<String> assignedPartitions = new HashSet<>(Arrays.asList("P1", "P2"));
Set<String> unassignedPartitions = Collections.singleton("P3");
Set<String> allPartitions = new HashSet<>(assignedPartitions);
allPartitions.addAll(unassignedPartitions);
ClusterThroughputInfo throughputInfo = getDummyClusterThroughputInfo(allPartitions);

Expand Down Expand Up @@ -169,7 +168,7 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() {
@Test
public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {
// this tests the round-robin assignment of partitions that don't have throughput info
List<String> unassignedPartitions = Arrays.asList("P1", "P2", "P3", "P4");
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2", "P3", "P4"));
ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", new HashMap<>());

Datastream ds1 = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds1")[0];
Expand Down Expand Up @@ -208,7 +207,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() {

@Test
public void lightestTaskGetsNewPartitionTest() {
List<String> unassignedPartitions = Collections.singletonList("P4");
Set<String> unassignedPartitions = Collections.singleton("P4");
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1"));
throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2"));
Expand Down Expand Up @@ -246,7 +245,7 @@ public void lightestTaskGetsNewPartitionTest() {

@Test
public void lightestTaskGetsNewPartitionWithTopicMetricsTest() {
List<String> unassignedPartitions = Arrays.asList("P-2", "P-3");
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("P-2", "P-3"));
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
throughputInfoMap.put("P-1", new PartitionThroughputInfo(5, 5, "P-1"));
throughputInfoMap.put("R", new PartitionThroughputInfo(5, 5, "R"));
Expand Down Expand Up @@ -288,7 +287,7 @@ public void lightestTaskGetsNewPartitionWithTopicMetricsTest() {

@Test
public void throwsExceptionWhenNotEnoughRoomForAllPartitionsTest() {
List<String> unassignedPartitions = Arrays.asList("P4", "P5");
Set<String> unassignedPartitions = new HashSet<>(Arrays.asList("P4", "P5"));
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", throughputInfoMap);

Expand All @@ -312,7 +311,7 @@ public void throwsExceptionWhenNotEnoughRoomForAllPartitionsTest() {

@Test
public void taskWithRoomGetsNewPartitionTest() {
List<String> unassignedPartitions = Collections.singletonList("P4");
Set<String> unassignedPartitions = Collections.singleton("P4");
Map<String, PartitionThroughputInfo> throughputInfoMap = new HashMap<>();
throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1"));
throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2"));
Expand Down Expand Up @@ -393,7 +392,7 @@ private DatastreamTask createTaskForDatastream(Datastream datastream, List<Strin
return task;
}

private ClusterThroughputInfo getDummyClusterThroughputInfo(List<String> partitions) {
private ClusterThroughputInfo getDummyClusterThroughputInfo(Set<String> partitions) {
Map<String, PartitionThroughputInfo> partitionThroughputMap = new HashMap<>();
for (String partitionName : partitions) {
int bytesInRate = 5;
Expand Down