diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java index 8feea662f..85f7eaa36 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -79,7 +79,7 @@ public LoadBasedPartitionAssigner(int defaultPartitionBytesInKBRate, int default */ public Map> assignPartitions( ClusterThroughputInfo throughputInfo, Map> currentAssignment, - List unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) { + Set unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) { String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName(); LOG.info("START: assignPartitions for datasteam={}", datastreamGroupName); Map partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap()); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java index 52d50d866..71f2839d2 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -107,9 +108,9 @@ public Map> assignPartitions(Map, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup( + Pair, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup( currentAssignment, datastreamGroupName); - List assignedPartitions = assignedPartitionsAndTaskCount.getKey(); + Set assignedPartitions = assignedPartitionsAndTaskCount.getKey(); int taskCount = assignedPartitionsAndTaskCount.getValue(); LOG.info("Old partition assignment info, assignment: {}", currentAssignment); Validate.isTrue(taskCount > 0, String.format("No tasks found for datastream group %s", datastreamGroup)); @@ -117,7 +118,7 @@ public Map> assignPartitions(Map unassignedPartitions = new ArrayList<>(datastreamPartitions.getPartitions()); + Set unassignedPartitions = new HashSet<>(datastreamPartitions.getPartitions()); unassignedPartitions.removeAll(assignedPartitions); ClusterThroughputInfo clusterThroughputInfo = new ClusterThroughputInfo(StringUtils.EMPTY, Collections.emptyMap()); @@ -192,7 +193,7 @@ public Map> assignPartitions(Map> doAssignment(ClusterThroughputInfo clusterThroughputInfo, - Map> currentAssignment, List unassignedPartitions, + Map> currentAssignment, Set unassignedPartitions, DatastreamGroupPartitionsMetadata datastreamPartitions) { Map> assignment = _assigner.assignPartitions( clusterThroughputInfo, currentAssignment, unassignedPartitions, datastreamPartitions, _maxPartitionPerTask); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java index 87c0970aa..b9b6a0d40 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java @@ -6,7 +6,6 @@ package com.linkedin.datastream.server.assignment; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -51,13 +50,13 @@ public LoadBasedTaskCountEstimator(int taskCapacityMBps, int taskCapacityUtiliza * Gets the estimated number of tasks based on per-partition throughput information. * NOTE: This does not take into account numPartitionsPerTask configuration * @param throughputInfo Per-partition throughput information - * @param assignedPartitions The list of assigned partitions - * @param unassignedPartitions The list of unassigned partitions + * @param assignedPartitions The set of assigned partitions + * @param unassignedPartitions The set of unassigned partitions * @param datastreamName Name of the datastream * @return The estimated number of tasks */ - public int getTaskCount(ClusterThroughputInfo throughputInfo, List assignedPartitions, - List unassignedPartitions, String datastreamName) { + public int getTaskCount(ClusterThroughputInfo throughputInfo, Set assignedPartitions, + Set unassignedPartitions, String datastreamName) { Validate.notNull(throughputInfo, "null throughputInfo"); Validate.notNull(assignedPartitions, "null assignedPartitions"); Validate.notNull(unassignedPartitions, "null unassignedPartitions"); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java index 4f3ae61c3..af7cf093a 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java @@ -185,9 +185,9 @@ public Integer getPartitionsPerTask(DatastreamGroup datastreamGroup) { return resolveConfigWithMetadata(datastreamGroup, CFG_PARTITIONS_PER_TASK, _partitionsPerTask); } - protected Pair, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup( + protected Pair, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup( Map> currentAssignment, String datastreamGroupName) { - List assignedPartitions = new ArrayList<>(); + Set assignedPartitions = new HashSet<>(); int taskCount = 0; for (Set tasks : currentAssignment.values()) { Set dgTask = tasks.stream().filter(t -> datastreamGroupName.equals(t.getTaskPrefix())) @@ -218,9 +218,9 @@ public Map> assignPartitions(Map, Integer> assignedPartitionsAndTaskCount = + Pair, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(currentAssignment, dgName); - List assignedPartitions = assignedPartitionsAndTaskCount.getKey(); + Set assignedPartitions = assignedPartitionsAndTaskCount.getKey(); int totalTaskCount = assignedPartitionsAndTaskCount.getValue(); Validate.isTrue(totalTaskCount > 0, String.format("No tasks found for datastream group %s", dgName)); diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java index d68fa835a..078d77b48 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestLoadBasedTaskCountEstimator.java @@ -5,11 +5,11 @@ */ package com.linkedin.datastream.server; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -42,8 +42,8 @@ public void setup() { @Test public void emptyAssignmentReturnsZeroTasksTest() { ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("pizza"); - List assignedPartitions = Collections.emptyList(); - List unassignedPartitions = Collections.emptyList(); + Set assignedPartitions = Collections.emptySet(); + Set unassignedPartitions = Collections.emptySet(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); @@ -53,9 +53,9 @@ public void emptyAssignmentReturnsZeroTasksTest() { @Test public void lowThroughputAssignmentReturnsOneTaskTest() { ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("pizza"); - List assignedPartitions = new ArrayList<>(); + Set assignedPartitions = new HashSet<>(); assignedPartitions.add("Pepperoni-1"); - List unassignedPartitions = Collections.emptyList(); + Set unassignedPartitions = Collections.emptySet(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); @@ -65,8 +65,8 @@ public void lowThroughputAssignmentReturnsOneTaskTest() { @Test public void highThroughputAssignmentTest() { ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("ice-cream"); - List assignedPartitions = Collections.emptyList(); - List unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet()); + Set assignedPartitions = Collections.emptySet(); + Set unassignedPartitions = throughputInfo.getPartitionInfoMap().keySet(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); @@ -81,8 +81,8 @@ public void highThroughputAssignmentTest() { @Test public void highThroughputAssignmentTest2() { ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("donut"); - List assignedPartitions = Collections.emptyList(); - List unassignedPartitions = new ArrayList<>(throughputInfo.getPartitionInfoMap().keySet()); + Set assignedPartitions = Collections.emptySet(); + Set unassignedPartitions = throughputInfo.getPartitionInfoMap().keySet(); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); @@ -92,8 +92,8 @@ public void highThroughputAssignmentTest2() { @Test public void partitionsHaveDefaultWeightTest() { ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", new HashMap<>()); - List assignedPartitions = Collections.emptyList(); - List unassignedPartitions = Arrays.asList("P1", "P2"); + Set assignedPartitions = Collections.emptySet(); + Set unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2")); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); @@ -103,8 +103,8 @@ public void partitionsHaveDefaultWeightTest() { @Test public void throughputTaskEstimatorWithTopicLevelInformation() { ClusterThroughputInfo throughputInfo = _provider.getThroughputInfo("fruit"); - List assignedPartitions = Collections.emptyList(); - List unassignedPartitions = Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0"); + Set assignedPartitions = Collections.emptySet(); + Set unassignedPartitions = new HashSet<>(Arrays.asList("apple-0", "apple-1", "apple-2", "banana-0")); LoadBasedTaskCountEstimator estimator = new LoadBasedTaskCountEstimator(TASK_CAPACITY_MBPS, TASK_CAPACITY_UTILIZATION_PCT, DEFAULT_BYTES_IN_KB_RATE, DEFAULT_MSGS_IN_RATE); int taskCount = estimator.getTaskCount(throughputInfo, assignedPartitions, unassignedPartitions, "test"); diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java index 935c01297..e6bb9ebd4 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestLoadBasedPartitionAssigner.java @@ -5,7 +5,6 @@ */ package com.linkedin.datastream.server.assignment; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -56,7 +55,7 @@ public void setup() { @Test public void assignFromScratchTest() { - List unassignedPartitions = Arrays.asList("P1", "P2", "P3"); + Set unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2", "P3")); ClusterThroughputInfo throughputInfo = getDummyClusterThroughputInfo(unassignedPartitions); Datastream ds1 = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds1")[0]; @@ -107,9 +106,9 @@ public void assignFromScratchTest() { @Test public void newAssignmentRetainsTasksFromOtherDatastreamsTest() { - List assignedPartitions = Arrays.asList("P1", "P2"); - List unassignedPartitions = Collections.singletonList("P3"); - List allPartitions = new ArrayList<>(assignedPartitions); + Set assignedPartitions = new HashSet<>(Arrays.asList("P1", "P2")); + Set unassignedPartitions = Collections.singleton("P3"); + Set allPartitions = new HashSet<>(assignedPartitions); allPartitions.addAll(unassignedPartitions); ClusterThroughputInfo throughputInfo = getDummyClusterThroughputInfo(allPartitions); @@ -169,7 +168,7 @@ public void newAssignmentRetainsTasksFromOtherDatastreamsTest() { @Test public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() { // this tests the round-robin assignment of partitions that don't have throughput info - List unassignedPartitions = Arrays.asList("P1", "P2", "P3", "P4"); + Set unassignedPartitions = new HashSet<>(Arrays.asList("P1", "P2", "P3", "P4")); ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", new HashMap<>()); Datastream ds1 = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds1")[0]; @@ -208,7 +207,7 @@ public void assignmentDistributesPartitionsWhenThroughputInfoIsMissingTest() { @Test public void lightestTaskGetsNewPartitionTest() { - List unassignedPartitions = Collections.singletonList("P4"); + Set unassignedPartitions = Collections.singleton("P4"); Map throughputInfoMap = new HashMap<>(); throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1")); throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2")); @@ -246,7 +245,7 @@ public void lightestTaskGetsNewPartitionTest() { @Test public void lightestTaskGetsNewPartitionWithTopicMetricsTest() { - List unassignedPartitions = Arrays.asList("P-2", "P-3"); + Set unassignedPartitions = new HashSet<>(Arrays.asList("P-2", "P-3")); Map throughputInfoMap = new HashMap<>(); throughputInfoMap.put("P-1", new PartitionThroughputInfo(5, 5, "P-1")); throughputInfoMap.put("R", new PartitionThroughputInfo(5, 5, "R")); @@ -288,7 +287,7 @@ public void lightestTaskGetsNewPartitionWithTopicMetricsTest() { @Test public void throwsExceptionWhenNotEnoughRoomForAllPartitionsTest() { - List unassignedPartitions = Arrays.asList("P4", "P5"); + Set unassignedPartitions = new HashSet<>(Arrays.asList("P4", "P5")); Map throughputInfoMap = new HashMap<>(); ClusterThroughputInfo throughputInfo = new ClusterThroughputInfo("dummy", throughputInfoMap); @@ -312,7 +311,7 @@ public void throwsExceptionWhenNotEnoughRoomForAllPartitionsTest() { @Test public void taskWithRoomGetsNewPartitionTest() { - List unassignedPartitions = Collections.singletonList("P4"); + Set unassignedPartitions = Collections.singleton("P4"); Map throughputInfoMap = new HashMap<>(); throughputInfoMap.put("P1", new PartitionThroughputInfo(5, 5, "P1")); throughputInfoMap.put("P2", new PartitionThroughputInfo(5, 5, "P2")); @@ -393,7 +392,7 @@ private DatastreamTask createTaskForDatastream(Datastream datastream, List partitions) { + private ClusterThroughputInfo getDummyClusterThroughputInfo(Set partitions) { Map partitionThroughputMap = new HashMap<>(); for (String partitionName : partitions) { int bytesInRate = 5;